From e9993fb8ea2c0c7b8536486cee72c20c127c4e7e Mon Sep 17 00:00:00 2001 From: Michael Grosser Date: Tue, 11 Sep 2018 13:55:24 -0700 Subject: [PATCH 1/3] make sleep reliable by measuring how long we actually slept --- lib/fluent/plugin/kinesis_helper/api.rb | 21 +++++++++++++++------ test/kinesis_helper/test_api.rb | 11 +++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/lib/fluent/plugin/kinesis_helper/api.rb b/lib/fluent/plugin/kinesis_helper/api.rb index 572f77c..da7eead 100644 --- a/lib/fluent/plugin/kinesis_helper/api.rb +++ b/lib/fluent/plugin/kinesis_helper/api.rb @@ -14,6 +14,7 @@ require 'fluent_plugin_kinesis/version' require 'fluent/configurable' +require 'benchmark' module Fluent module Plugin @@ -79,7 +80,7 @@ def split_to_batches(records, &block) yield(batch, size) batch = [] size = 0 - end + end batch << record size += record_size end @@ -96,11 +97,7 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) wait_second = backoff.next msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second] log.warn(truncate msg) - # TODO: sleep() doesn't wait the given seconds sometime. - # The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future. - log.debug("#{Thread.current.object_id} sleep start") - sleep(wait_second) - log.debug("#{Thread.current.object_id} sleep finish") + reliable_sleep(wait_second) batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block) else give_up_retries(failed_records) @@ -108,6 +105,18 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) end end + # Sleep seems to not sleep as long as we ask it, our guess is that something wakes up the thread, + # so we keep on going to sleep if that happens. + # TODO: find out who is causing the sleep to be too short and try to make them stop it instead + def reliable_sleep(wait_second) + loop do + actual = Benchmark.realtime { sleep(wait_second) } + break if actual >= wait_second + log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}") + wait_second -= actual + end + end + def any_records_shipped?(res) results(res).size > failed_count(res) end diff --git a/test/kinesis_helper/test_api.rb b/test/kinesis_helper/test_api.rb index 4c7954b..359320e 100644 --- a/test/kinesis_helper/test_api.rb +++ b/test/kinesis_helper/test_api.rb @@ -14,6 +14,7 @@ require_relative '../helper' require 'fluent/plugin/kinesis_helper/api' +require 'benchmark' class KinesisHelperAPITest < Test::Unit::TestCase class Mock @@ -114,6 +115,16 @@ def test_batch_request_with_retry(data) assert_equal expected, @object.request_series end + def test_reliable_sleep + time = Benchmark.realtime do + t = Thread.new { @object.send(:reliable_sleep, 0.2) } + sleep 0.1 + t.run + t.join + end + assert_operator time, :>, 0.15 + end + data( 'reset_everytime' => [true, [4,3,2,1], 3], 'disable_reset' => [false, [4,3,2,1], 0], From 09877ebaf68b89cbadc8ce3ee7c05835533fca48 Mon Sep 17 00:00:00 2001 From: simukappu Date: Sat, 28 Nov 2020 00:57:18 +0900 Subject: [PATCH 2/3] Enable to monitor batch request failure and retries - #150 --- README.md | 11 +++++ lib/fluent/plugin/kinesis_helper/api.rb | 55 +++++++++++++++++++++++-- test/kinesis_helper/test_api.rb | 49 ++++++++++++++++++++++ 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index ee36d46..c74d9eb 100644 --- a/README.md +++ b/README.md @@ -408,6 +408,17 @@ Default: - `kinesis_firehose`: 4 MB - `kinesis_streams_aggregated`: 1 MB +### raise_error_on_batch_request_failure +Boolean, default `false`. + +If *raise_error_on_batch_request_failure* is disabled (default), the plugin will give up sending records when we get batch request failure after retrying max times configured as *retries_on_batch_request*. This giving up +can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics. + +If *raise_error_on_batch_request_failure* is enabled, the plugin will raise error and return chunk to Fluentd buffer when we get batch request failure after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response. + +### monitor_num_of_batch_request_retries +Boolean, default `false`. If enabled, the plugin will increment *retry_count* monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor [ProvisionedThroughputExceededException](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus). Note that *retry_count* metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if *monitor_num_of_batch_request_retries* is enabled. + ## Configuration: kinesis_streams Here are `kinesis_streams` specific configurations. diff --git a/lib/fluent/plugin/kinesis_helper/api.rb b/lib/fluent/plugin/kinesis_helper/api.rb index da7eead..3a66fce 100644 --- a/lib/fluent/plugin/kinesis_helper/api.rb +++ b/lib/fluent/plugin/kinesis_helper/api.rb @@ -41,10 +41,12 @@ def configure(conf) module BatchRequest module BatchRequestParams include Fluent::Configurable - config_param :retries_on_batch_request, :integer, default: 8 - config_param :reset_backoff_if_success, :bool, default: true - config_param :batch_request_max_count, :integer, default: nil - config_param :batch_request_max_size, :integer, default: nil + config_param :retries_on_batch_request, :integer, default: 8 + config_param :reset_backoff_if_success, :bool, default: true + config_param :batch_request_max_count, :integer, default: nil + config_param :batch_request_max_size, :integer, default: nil + config_param :raise_error_on_batch_request_failure, :bool, default: false + config_param :monitor_num_of_batch_request_retries, :bool, default: false end def self.included(mod) @@ -97,6 +99,8 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) wait_second = backoff.next msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second] log.warn(truncate msg) + # Increment num_errors to monitor batch request retries from "monitor_agent" or "fluent-plugin-prometheus" + increment_num_errors if @monitor_num_of_batch_request_retries reliable_sleep(wait_second) batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block) else @@ -173,6 +177,49 @@ def give_up_retries(failed_records) record[:original] ]) } + + if @raise_error_on_batch_request_failure + # Raise error and return chunk to Fluentd for retrying + case request_type + # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html + # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Client.html#put_records-instance_method + # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Errors.html + when :streams, :streams_aggregated + provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' } + target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first + target_error = provisioned_throughput_exceeded_records.empty? ? + Aws::Kinesis::Errors::ServiceError : + Aws::Kinesis::Errors::ProvisionedThroughputExceededException + # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html + # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Client.html#put_record_batch-instance_method + # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Errors.html + when :firehose + service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' } + target_failed_record = service_unavailable_exception_records.first || failed_records.first + target_error = service_unavailable_exception_records.empty? ? + Aws::Firehose::Errors::ServiceError : + Aws::Firehose::Errors::ServiceUnavailableException + end + log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying") + raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message]) + else + # Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus" + increment_num_errors + end + end + + def increment_num_errors + # Prepare Fluent::Plugin::Output instance variables to count errors in this method. + # These instance variables are initialized here for possible future breaking changes of Fluentd. + @num_errors ||= 0 + # @see https://github.com/fluent/fluentd/commit/d245454658d16170431d276fcd5849fb0d88ab2b + if Gem::Version.new(Fluent::VERSION) >= Gem::Version.new('1.7.0') + @counter_mutex ||= Mutex.new + @counter_mutex.synchronize{ @num_errors += 1 } + else + @counters_monitor ||= Monitor.new + @counters_monitor.synchronize{ @num_errors += 1 } + end end class Backoff diff --git a/test/kinesis_helper/test_api.rb b/test/kinesis_helper/test_api.rb index 359320e..3323be2 100644 --- a/test/kinesis_helper/test_api.rb +++ b/test/kinesis_helper/test_api.rb @@ -24,12 +24,16 @@ class Mock attr_accessor :retries_on_batch_request, :reset_backoff_if_success attr_accessor :failed_scenario, :request_series attr_accessor :batch_request_max_count, :batch_request_max_size + attr_accessor :num_errors, :raise_error_on_batch_request_failure, :monitor_num_of_batch_request_retries def initialize @retries_on_batch_request = 3 @reset_backoff_if_success = true @failed_scenario = [].to_enum @request_series = [] + @num_errors = 0 + @raise_error_on_batch_request_failure = false + @monitor_num_of_batch_request_retries = false end def request_type @@ -138,4 +142,49 @@ def test_reset_backoff(data) @backoff.expects(:reset).times(expected) @object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) } end + + data( + 'disabled_no_failed_completed' => [false, [0,0,0,0], 0], + 'disabled_some_failed_completed' => [false, [3,2,1,0], 0], + 'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1], + 'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1], + 'enabled_no_failed_completed' => [true, [0,0,0,0], 0], + 'enabled_some_failed_completed' => [true, [3,2,1,0], 0], + 'enabled_some_failed_incompleted' => [true, [4,3,2,1], 1], + 'enabled_all_failed_incompleted' => [true, [5,5,5,5], 1], + ) + def test_raise_error_on_batch_request_failure(data) + param, failed_scenario, expected = data + batch = Array.new(5, {}) + @object.raise_error_on_batch_request_failure = param + @object.failed_scenario = failed_scenario.to_enum + if param && expected > 0 + e = assert_raises Aws::Firehose::Errors::ServiceUnavailableException do + @object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) } + end + assert_equal @object.failed_response.error_message, e.message + else + @object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) } + assert_equal expected, @object.num_errors + end + end + + data( + 'disabled_no_failed_completed' => [false, [0,0,0,0], 0], + 'disabled_some_failed_completed' => [false, [3,2,1,0], 0], + 'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1], + 'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1], + 'enabled_no_failed_completed' => [true, [0,0,0,0], 0], + 'enabled_some_failed_completed' => [true, [3,2,1,0], 3], + 'enabled_some_failed_incompleted' => [true, [4,3,2,1], 4], + 'enabled_all_failed_incompleted' => [true, [5,5,5,5], 4], + ) + def test_monitor_num_of_batch_request_retries(data) + param, failed_scenario, expected = data + batch = Array.new(5, {}) + @object.monitor_num_of_batch_request_retries = param + @object.failed_scenario = failed_scenario.to_enum + @object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) } + assert_equal expected, @object.num_errors + end end From 4ca3d5f798e575a4216a478ce48df8500cad8674 Mon Sep 17 00:00:00 2001 From: simukappu Date: Sat, 6 Feb 2021 12:30:18 +0900 Subject: [PATCH 3/3] Rename raise_error_on_batch_request_failure config name to drop_failed_records_after_batch_request_retries with default true - #150 --- README.md | 9 ++++----- lib/fluent/plugin/kinesis_helper/api.rb | 20 ++++++++++---------- test/kinesis_helper/test_api.rb | 18 +++++++++--------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index c74d9eb..d421c12 100644 --- a/README.md +++ b/README.md @@ -408,13 +408,12 @@ Default: - `kinesis_firehose`: 4 MB - `kinesis_streams_aggregated`: 1 MB -### raise_error_on_batch_request_failure -Boolean, default `false`. +### drop_failed_records_after_batch_request_retries +Boolean, default `true`. -If *raise_error_on_batch_request_failure* is disabled (default), the plugin will give up sending records when we get batch request failure after retrying max times configured as *retries_on_batch_request*. This giving up -can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics. +If *drop_failed_records_after_batch_request_retries* is enabled (default), the plugin will drop failed records when batch request fails after retrying max times configured as *retries_on_batch_request*. This dropping can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics. -If *raise_error_on_batch_request_failure* is enabled, the plugin will raise error and return chunk to Fluentd buffer when we get batch request failure after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response. +If *drop_failed_records_after_batch_request_retries* is disabled, the plugin will raise error and return chunk to Fluentd buffer when batch request fails after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response. ### monitor_num_of_batch_request_retries Boolean, default `false`. If enabled, the plugin will increment *retry_count* monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor [ProvisionedThroughputExceededException](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus). Note that *retry_count* metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if *monitor_num_of_batch_request_retries* is enabled. diff --git a/lib/fluent/plugin/kinesis_helper/api.rb b/lib/fluent/plugin/kinesis_helper/api.rb index 3a66fce..7a8302a 100644 --- a/lib/fluent/plugin/kinesis_helper/api.rb +++ b/lib/fluent/plugin/kinesis_helper/api.rb @@ -41,12 +41,12 @@ def configure(conf) module BatchRequest module BatchRequestParams include Fluent::Configurable - config_param :retries_on_batch_request, :integer, default: 8 - config_param :reset_backoff_if_success, :bool, default: true - config_param :batch_request_max_count, :integer, default: nil - config_param :batch_request_max_size, :integer, default: nil - config_param :raise_error_on_batch_request_failure, :bool, default: false - config_param :monitor_num_of_batch_request_retries, :bool, default: false + config_param :retries_on_batch_request, :integer, default: 8 + config_param :reset_backoff_if_success, :bool, default: true + config_param :batch_request_max_count, :integer, default: nil + config_param :batch_request_max_size, :integer, default: nil + config_param :drop_failed_records_after_batch_request_retries, :bool, default: true + config_param :monitor_num_of_batch_request_retries, :bool, default: false end def self.included(mod) @@ -178,7 +178,10 @@ def give_up_retries(failed_records) ]) } - if @raise_error_on_batch_request_failure + if @drop_failed_records_after_batch_request_retries + # Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus" + increment_num_errors + else # Raise error and return chunk to Fluentd for retrying case request_type # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html @@ -202,9 +205,6 @@ def give_up_retries(failed_records) end log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying") raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message]) - else - # Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus" - increment_num_errors end end diff --git a/test/kinesis_helper/test_api.rb b/test/kinesis_helper/test_api.rb index 3323be2..ed0e72a 100644 --- a/test/kinesis_helper/test_api.rb +++ b/test/kinesis_helper/test_api.rb @@ -24,7 +24,7 @@ class Mock attr_accessor :retries_on_batch_request, :reset_backoff_if_success attr_accessor :failed_scenario, :request_series attr_accessor :batch_request_max_count, :batch_request_max_size - attr_accessor :num_errors, :raise_error_on_batch_request_failure, :monitor_num_of_batch_request_retries + attr_accessor :num_errors, :drop_failed_records_after_batch_request_retries, :monitor_num_of_batch_request_retries def initialize @retries_on_batch_request = 3 @@ -32,7 +32,7 @@ def initialize @failed_scenario = [].to_enum @request_series = [] @num_errors = 0 - @raise_error_on_batch_request_failure = false + @drop_failed_records_after_batch_request_retries = true @monitor_num_of_batch_request_retries = false end @@ -144,21 +144,21 @@ def test_reset_backoff(data) end data( - 'disabled_no_failed_completed' => [false, [0,0,0,0], 0], - 'disabled_some_failed_completed' => [false, [3,2,1,0], 0], - 'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1], - 'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1], 'enabled_no_failed_completed' => [true, [0,0,0,0], 0], 'enabled_some_failed_completed' => [true, [3,2,1,0], 0], 'enabled_some_failed_incompleted' => [true, [4,3,2,1], 1], 'enabled_all_failed_incompleted' => [true, [5,5,5,5], 1], + 'disabled_no_failed_completed' => [false, [0,0,0,0], 0], + 'disabled_some_failed_completed' => [false, [3,2,1,0], 0], + 'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1], + 'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1], ) - def test_raise_error_on_batch_request_failure(data) + def test_drop_failed_records_after_batch_request_retries(data) param, failed_scenario, expected = data batch = Array.new(5, {}) - @object.raise_error_on_batch_request_failure = param + @object.drop_failed_records_after_batch_request_retries = param @object.failed_scenario = failed_scenario.to_enum - if param && expected > 0 + if !param && expected > 0 e = assert_raises Aws::Firehose::Errors::ServiceUnavailableException do @object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) } end