Skip to content

Commit 8df479a

Browse files
onlynonedmke
authored andcommitted
Ensure workers can send data popped off the queue at shutdown
Previously, any data still in the queue would be handled by the main thread. But, any data that a worker had already popped off the queue, but not yet sent on the wire, would be lost. This change pushes down the logic for stopping into the writer and worker classes. The worker tells the threads to exit and waits for them to do so.
1 parent 8860580 commit 8df479a

File tree

7 files changed

+120
-55
lines changed

7 files changed

+120
-55
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
For the full commit log, [see here](https://github.com/influxdata/influxdb-ruby/commits/master).
44

5+
## unreleased
6+
7+
- Ensure workers can send data popped off the queue at shutdown (#239,
8+
@onlynone)
9+
510
## v0.8.0, released 2020-02-05
611

712
- Allow dropping of specific series from specific DBs (#233, @cantino)

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,10 @@ async_options = {
295295
# checking if there are new jobs in the queue
296296
sleep_interval: 5,
297297
# whether client will block if queue is full
298-
block_on_full_queue: false
298+
block_on_full_queue: false,
299+
# Max time (in seconds) the main thread will wait for worker threads to stop
300+
# on shutdown. Defaults to 2x sleep_interval.
301+
shutdown_timeout: 10
299302
}
300303

301304
influxdb = InfluxDB::Client.new database, async: async_options

lib/influxdb/client.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,19 @@ def initialize(database = nil, **opts)
5959
end
6060

6161
def stop!
62-
if config.async?
63-
# If retry was infinite (-1), set it to zero to give the main thread one
64-
# last chance to flush the queue
65-
config.retry = 0 if config.retry < 0
66-
writer.worker.stop!
62+
if @writer == self
63+
@stopped = true
64+
else
65+
@writer.stop!
6766
end
68-
@stopped = true
6967
end
7068

7169
def stopped?
72-
@stopped
70+
if @writer == self
71+
@stopped
72+
else
73+
@writer.stopped?
74+
end
7375
end
7476

7577
def now

lib/influxdb/writer/async.rb

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ class Async
99
def initialize(client, config)
1010
@client = client
1111
@config = config
12+
@stopped = false
13+
end
14+
15+
def stopped?
16+
@stopped
17+
end
18+
19+
def stop!
20+
worker.stop!
21+
@stopped = true
1222
end
1323

1424
def write(data, precision = nil, retention_policy = nil, database = nil)
@@ -29,15 +39,16 @@ def worker
2939
end
3040
end
3141

32-
class Worker
42+
class Worker # rubocop:disable Metrics/ClassLength
3343
attr_reader :client,
3444
:queue,
3545
:threads,
3646
:max_post_points,
3747
:max_queue_size,
3848
:num_worker_threads,
3949
:sleep_interval,
40-
:block_on_full_queue
50+
:block_on_full_queue,
51+
:shutdown_timeout
4152

4253
include InfluxDB::Logging
4354

@@ -47,7 +58,7 @@ class Worker
4758
SLEEP_INTERVAL = 5
4859
BLOCK_ON_FULL_QUEUE = false
4960

50-
def initialize(client, config)
61+
def initialize(client, config) # rubocop:disable Metrics/MethodLength
5162
@client = client
5263
config = config.is_a?(Hash) ? config : {}
5364

@@ -56,10 +67,11 @@ def initialize(client, config)
5667
@num_worker_threads = config.fetch(:num_worker_threads, NUM_WORKER_THREADS)
5768
@sleep_interval = config.fetch(:sleep_interval, SLEEP_INTERVAL)
5869
@block_on_full_queue = config.fetch(:block_on_full_queue, BLOCK_ON_FULL_QUEUE)
70+
@shutdown_timeout = config.fetch(:shutdown_timeout, 2 * @sleep_interval)
5971

6072
queue_class = @block_on_full_queue ? SizedQueue : InfluxDB::MaxQueue
6173
@queue = queue_class.new max_queue_size
62-
74+
@should_stop = false
6375
spawn_threads!
6476
end
6577

@@ -68,11 +80,11 @@ def push(payload, precision = nil, retention_policy = nil, database = nil)
6880
end
6981

7082
def current_threads
71-
Thread.list.select { |t| t[:influxdb] == object_id }
83+
@threads
7284
end
7385

7486
def current_thread_count
75-
Thread.list.count { |t| t[:influxdb] == object_id }
87+
@threads.count
7688
end
7789

7890
# rubocop:disable Metrics/CyclomaticComplexity
@@ -87,7 +99,7 @@ def spawn_threads!
8799
@threads << Thread.new do
88100
Thread.current[:influxdb] = object_id
89101

90-
until client.stopped?
102+
until @should_stop
91103
check_background_queue(thread_num)
92104
sleep rand(sleep_interval)
93105
end
@@ -97,7 +109,7 @@ def spawn_threads!
97109
end
98110
end
99111

100-
def check_background_queue(thread_num = 0)
112+
def check_background_queue(thread_num = -1)
101113
log(:debug) do
102114
"Checking background queue on thread #{thread_num} (#{current_thread_count} active)"
103115
end
@@ -123,10 +135,10 @@ def check_background_queue(thread_num = 0)
123135
return if data.values.flatten.empty?
124136

125137
begin
126-
log(:debug) { "Found data in the queue! (#{sizes(data)})" }
138+
log(:debug) { "Found data in the queue! (#{sizes(data)}) on thread #{thread_num}" }
127139
write(data)
128140
rescue StandardError => e
129-
log :error, "Cannot write data: #{e.inspect}"
141+
log :error, "Cannot write data: #{e.inspect} on thread #{thread_num}"
130142
end
131143

132144
break if queue.length > max_post_points
@@ -138,7 +150,22 @@ def check_background_queue(thread_num = 0)
138150
# rubocop:enable Metrics/AbcSize
139151

140152
def stop!
141-
log(:debug) { "Thread exiting, flushing queue." }
153+
log(:debug) { "Worker is being stopped, flushing queue." }
154+
155+
# If retry was infinite (-1), set it to zero to give the threads one
156+
# last chance to write their data
157+
client.config.retry = 0 if client.config.retry < 0
158+
159+
# Signal the background threads that they should exit.
160+
@should_stop = true
161+
162+
# Wait for the threads to exit and then kill them
163+
@threads.each do |t|
164+
r = t.join(shutdown_timeout)
165+
t.kill if r.nil?
166+
end
167+
168+
# Flush any remaining items in the queue on the main thread
142169
check_background_queue until queue.empty?
143170
end
144171

lib/influxdb/writer/udp.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ def initialize(client, host: "localhost".freeze, port: 4444)
1111
@port = port
1212
end
1313

14+
# No-op for UDP writers
15+
def stop!; end
16+
1417
def write(payload, _precision = nil, _retention_policy = nil, _database = nil)
1518
with_socket { |sock| sock.send(payload, 0) }
1619
end

spec/influxdb/cases/async_client_spec.rb

Lines changed: 57 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
require "timeout"
33

44
describe InfluxDB::Client do
5-
let(:async_options) { true }
5+
let(:async_options) { { sleep_interval: 0.1 } }
66
let(:client) { described_class.new(async: async_options) }
77
let(:subject) { client }
88
let(:stub_url) { "http://localhost:8086/write?db=&p=root&precision=s&u=root" }
@@ -18,14 +18,9 @@
1818
subject.write_point('a', values: { i: i })
1919
end
2020

21-
# The timout code is fragile, and heavily dependent on system load
22-
# (and scheduler decisions). On the CI, the system is less
23-
# responsive and needs a bit more time.
24-
timeout_stretch = ENV["TRAVIS"] == "true" ? 10 : 3
21+
sleep 1 until worker.threads.none? { |t| t[:influxdb].nil? }
2522

26-
Timeout.timeout(timeout_stretch * worker.sleep_interval) do
27-
subject.stop!
28-
end
23+
subject.stop!
2924

3025
worker.threads.each do |t|
3126
expect(t.stop?).to be true
@@ -41,6 +36,7 @@
4136
let(:precision) { 'test_precision' }
4237
let(:retention_policy) { 'test_period' }
4338
let(:database) { 'test_database' }
39+
let(:async_options) { { num_worker_threads: 1, sleep_interval: 0.1 } }
4440

4541
it "writes aggregate payload to the client" do
4642
queue = Queue.new
@@ -51,9 +47,11 @@
5147
subject.write_point(series, { values: { t: 60 } }, precision, retention_policy, database)
5248
subject.write_point(series, { values: { t: 61 } }, precision, retention_policy, database)
5349

54-
Timeout.timeout(worker.sleep_interval) do
55-
expect(queue.pop).to eq ["#{series} t=60i\n#{series} t=61i", precision, retention_policy, database]
56-
end
50+
sleep 1 until worker.threads.none? { |t| t[:influxdb].nil? }
51+
52+
subject.stop!
53+
54+
expect(queue.pop).to eq ["#{series} t=60i\n#{series} t=61i", precision, retention_policy, database]
5755
end
5856

5957
context 'when different precision, retention_policy and database are given' do
@@ -72,36 +70,62 @@
7270
subject.write_point(series, { values: { t: 62 } }, precision, retention_policy2, database)
7371
subject.write_point(series, { values: { t: 63 } }, precision, retention_policy, database2)
7472

75-
Timeout.timeout(worker.sleep_interval) do
76-
expect(queue.pop).to eq ["#{series} t=60i", precision, retention_policy, database]
77-
expect(queue.pop).to eq ["#{series} t=61i", precision2, retention_policy, database]
78-
expect(queue.pop).to eq ["#{series} t=62i", precision, retention_policy2, database]
79-
expect(queue.pop).to eq ["#{series} t=63i", precision, retention_policy, database2]
80-
end
73+
sleep 1 until worker.threads.none? { |t| t[:influxdb].nil? }
74+
75+
subject.stop!
76+
77+
expect(queue.pop).to eq ["#{series} t=60i", precision, retention_policy, database]
78+
expect(queue.pop).to eq ["#{series} t=61i", precision2, retention_policy, database]
79+
expect(queue.pop).to eq ["#{series} t=62i", precision, retention_policy2, database]
80+
expect(queue.pop).to eq ["#{series} t=63i", precision, retention_policy, database2]
8181
end
8282
end
8383
end
8484
end
8585

8686
describe "async options" do
87-
let(:async_options) do
88-
{
89-
max_post_points: 10,
90-
max_queue_size: 100,
91-
num_worker_threads: 1,
92-
sleep_interval: 0.5,
93-
block_on_full_queue: false
94-
}
95-
end
96-
9787
subject { worker }
9888
before { worker.stop! }
9989

100-
specify { expect(subject.max_post_points).to be 10 }
101-
specify { expect(subject.max_queue_size).to be 100 }
102-
specify { expect(subject.num_worker_threads).to be 1 }
103-
specify { expect(subject.sleep_interval).to be_within(0.0001).of(0.5) }
104-
specify { expect(subject.block_on_full_queue).to be false }
105-
specify { expect(subject.queue).to be_kind_of(InfluxDB::MaxQueue) }
90+
context 'when all options are given' do
91+
let(:async_options) do
92+
{
93+
max_post_points: 10,
94+
max_queue_size: 100,
95+
num_worker_threads: 1,
96+
sleep_interval: 0.5,
97+
block_on_full_queue: false,
98+
shutdown_timeout: 0.6,
99+
}
100+
end
101+
102+
it "uses the specified values" do
103+
expect(subject.max_post_points).to be 10
104+
expect(subject.max_queue_size).to be 100
105+
expect(subject.num_worker_threads).to be 1
106+
expect(subject.sleep_interval).to be_within(0.0001).of(0.5)
107+
expect(subject.block_on_full_queue).to be false
108+
expect(subject.queue).to be_kind_of(InfluxDB::MaxQueue)
109+
expect(subject.shutdown_timeout).to be_within(0.0001).of(0.6)
110+
end
111+
end
112+
113+
context 'when only sleep_interval is given' do
114+
let(:async_options) { { sleep_interval: 0.2 } }
115+
116+
it "uses a value for shutdown_timeout that is 2x sleep_interval" do
117+
expect(subject.sleep_interval).to be_within(0.0001).of(0.2)
118+
expect(subject.shutdown_timeout).to be_within(0.0001).of(0.4)
119+
end
120+
end
121+
122+
context 'when only shutdown_timeout is given' do
123+
let(:async_options) { { shutdown_timeout: 0.3 } }
124+
125+
it "uses that value" do
126+
expect(subject.sleep_interval).to be_within(0.0001).of(5)
127+
expect(subject.shutdown_timeout).to be_within(0.0001).of(0.3)
128+
end
129+
end
106130
end
107131
end

spec/influxdb/config_spec.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
require 'spec_helper'
22

33
describe InfluxDB::Config do
4-
let(:conf) do
5-
InfluxDB::Client.new(*args).config
6-
end
4+
after { client.stop! }
5+
6+
let(:client) { InfluxDB::Client.new(*args) }
7+
let(:conf) { client.config }
78

89
let(:args) { {} }
910

0 commit comments

Comments
 (0)