diff --git a/lib/fluent/plugin/kinesis_helper/api.rb b/lib/fluent/plugin/kinesis_helper/api.rb index 572f77c..c367c56 100644 --- a/lib/fluent/plugin/kinesis_helper/api.rb +++ b/lib/fluent/plugin/kinesis_helper/api.rb @@ -14,6 +14,7 @@ require 'fluent_plugin_kinesis/version' require 'fluent/configurable' +require 'benchmark' module Fluent module Plugin @@ -79,7 +80,7 @@ def split_to_batches(records, &block) yield(batch, size) batch = [] size = 0 - end + end batch << record size += record_size end @@ -96,11 +97,7 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) wait_second = backoff.next msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second] log.warn(truncate msg) - # TODO: sleep() doesn't wait the given seconds sometime. - # The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future. - log.debug("#{Thread.current.object_id} sleep start") - sleep(wait_second) - log.debug("#{Thread.current.object_id} sleep finish") + reliable_sleep(wait_second) batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block) else give_up_retries(failed_records) @@ -108,6 +105,18 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) end end + # TODO: find out who is causing the sleep to be too short and try to make them stop it instead + # The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future. + # Our guess is that something wakes up the thread, so we keep on going to sleep if that happens. + def reliable_sleep(wait_second) + loop do + actual = Benchmark.realtime { sleep(wait_second) } + break if actual >= wait_second + log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}") + wait_second -= actual + end + end + def any_records_shipped?(res) results(res).size > failed_count(res) end diff --git a/test/kinesis_helper/test_api.rb b/test/kinesis_helper/test_api.rb index 4c7954b..359320e 100644 --- a/test/kinesis_helper/test_api.rb +++ b/test/kinesis_helper/test_api.rb @@ -14,6 +14,7 @@ require_relative '../helper' require 'fluent/plugin/kinesis_helper/api' +require 'benchmark' class KinesisHelperAPITest < Test::Unit::TestCase class Mock @@ -114,6 +115,16 @@ def test_batch_request_with_retry(data) assert_equal expected, @object.request_series end + def test_reliable_sleep + time = Benchmark.realtime do + t = Thread.new { @object.send(:reliable_sleep, 0.2) } + sleep 0.1 + t.run + t.join + end + assert_operator time, :>, 0.15 + end + data( 'reset_everytime' => [true, [4,3,2,1], 3], 'disable_reset' => [false, [4,3,2,1], 0],