Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable to monitor batch request failure and retries #211

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,16 @@ Default:
- `kinesis_firehose`: 4 MB
- `kinesis_streams_aggregated`: 1 MB

### drop_failed_records_after_batch_request_retries
Boolean, default `true`.

If *drop_failed_records_after_batch_request_retries* is enabled (default), the plugin will drop failed records when batch request fails after retrying max times configured as *retries_on_batch_request*. This dropping can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics.

If *drop_failed_records_after_batch_request_retries* is disabled, the plugin will raise error and return chunk to Fluentd buffer when batch request fails after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response.

### monitor_num_of_batch_request_retries
Boolean, default `false`. If enabled, the plugin will increment *retry_count* monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor [ProvisionedThroughputExceededException](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus). Note that *retry_count* metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if *monitor_num_of_batch_request_retries* is enabled.

## Configuration: kinesis_streams
Here are `kinesis_streams` specific configurations.

Expand Down
74 changes: 65 additions & 9 deletions lib/fluent/plugin/kinesis_helper/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

require 'fluent_plugin_kinesis/version'
require 'fluent/configurable'
require 'benchmark'

module Fluent
module Plugin
Expand All @@ -40,10 +41,12 @@ def configure(conf)
module BatchRequest
module BatchRequestParams
include Fluent::Configurable
config_param :retries_on_batch_request, :integer, default: 8
config_param :reset_backoff_if_success, :bool, default: true
config_param :batch_request_max_count, :integer, default: nil
config_param :batch_request_max_size, :integer, default: nil
config_param :retries_on_batch_request, :integer, default: 8
config_param :reset_backoff_if_success, :bool, default: true
config_param :batch_request_max_count, :integer, default: nil
config_param :batch_request_max_size, :integer, default: nil
config_param :drop_failed_records_after_batch_request_retries, :bool, default: true
config_param :monitor_num_of_batch_request_retries, :bool, default: false
end

def self.included(mod)
Expand Down Expand Up @@ -96,18 +99,28 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block)
wait_second = backoff.next
msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second]
log.warn(truncate msg)
# TODO: sleep() doesn't wait the given seconds sometime.
# The root cause is unknown so far, so I'd like to add debug print only. It should be fixed in the future.
log.debug("#{Thread.current.object_id} sleep start")
sleep(wait_second)
log.debug("#{Thread.current.object_id} sleep finish")
# Increment num_errors to monitor batch request retries from "monitor_agent" or "fluent-plugin-prometheus"
increment_num_errors if @monitor_num_of_batch_request_retries
reliable_sleep(wait_second)
batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block)
else
give_up_retries(failed_records)
end
end
end

# Sleep seems to not sleep as long as we ask it, our guess is that something wakes up the thread,
# so we keep on going to sleep if that happens.
# TODO: find out who is causing the sleep to be too short and try to make them stop it instead
def reliable_sleep(wait_second)
loop do
actual = Benchmark.realtime { sleep(wait_second) }
break if actual >= wait_second
log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}")
wait_second -= actual
end
end

def any_records_shipped?(res)
results(res).size > failed_count(res)
end
Expand Down Expand Up @@ -164,6 +177,49 @@ def give_up_retries(failed_records)
record[:original]
])
}

if @drop_failed_records_after_batch_request_retries
# Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus"
increment_num_errors
else
# Raise error and return chunk to Fluentd for retrying
case request_type
# @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Client.html#put_records-instance_method
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Errors.html
when :streams, :streams_aggregated
provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' }
target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first
target_error = provisioned_throughput_exceeded_records.empty? ?
Aws::Kinesis::Errors::ServiceError :
Aws::Kinesis::Errors::ProvisionedThroughputExceededException
# @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Client.html#put_record_batch-instance_method
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Errors.html
when :firehose
service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' }
target_failed_record = service_unavailable_exception_records.first || failed_records.first
target_error = service_unavailable_exception_records.empty? ?
Aws::Firehose::Errors::ServiceError :
Aws::Firehose::Errors::ServiceUnavailableException
end
log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying")
raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message])
end
end

def increment_num_errors
# Prepare Fluent::Plugin::Output instance variables to count errors in this method.
# These instance variables are initialized here for possible future breaking changes of Fluentd.
@num_errors ||= 0
# @see https://github.com/fluent/fluentd/commit/d245454658d16170431d276fcd5849fb0d88ab2b
if Gem::Version.new(Fluent::VERSION) >= Gem::Version.new('1.7.0')
@counter_mutex ||= Mutex.new
@counter_mutex.synchronize{ @num_errors += 1 }
else
@counters_monitor ||= Monitor.new
@counters_monitor.synchronize{ @num_errors += 1 }
end
end

class Backoff
Expand Down
60 changes: 60 additions & 0 deletions test/kinesis_helper/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

require_relative '../helper'
require 'fluent/plugin/kinesis_helper/api'
require 'benchmark'

class KinesisHelperAPITest < Test::Unit::TestCase
class Mock
Expand All @@ -23,12 +24,16 @@ class Mock
attr_accessor :retries_on_batch_request, :reset_backoff_if_success
attr_accessor :failed_scenario, :request_series
attr_accessor :batch_request_max_count, :batch_request_max_size
attr_accessor :num_errors, :drop_failed_records_after_batch_request_retries, :monitor_num_of_batch_request_retries

def initialize
@retries_on_batch_request = 3
@reset_backoff_if_success = true
@failed_scenario = [].to_enum
@request_series = []
@num_errors = 0
@drop_failed_records_after_batch_request_retries = true
@monitor_num_of_batch_request_retries = false
end

def request_type
Expand Down Expand Up @@ -114,6 +119,16 @@ def test_batch_request_with_retry(data)
assert_equal expected, @object.request_series
end

def test_reliable_sleep
time = Benchmark.realtime do
t = Thread.new { @object.send(:reliable_sleep, 0.2) }
sleep 0.1
t.run
t.join
end
assert_operator time, :>, 0.15
end

data(
'reset_everytime' => [true, [4,3,2,1], 3],
'disable_reset' => [false, [4,3,2,1], 0],
Expand All @@ -127,4 +142,49 @@ def test_reset_backoff(data)
@backoff.expects(:reset).times(expected)
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
end

data(
'enabled_no_failed_completed' => [true, [0,0,0,0], 0],
'enabled_some_failed_completed' => [true, [3,2,1,0], 0],
'enabled_some_failed_incompleted' => [true, [4,3,2,1], 1],
'enabled_all_failed_incompleted' => [true, [5,5,5,5], 1],
'disabled_no_failed_completed' => [false, [0,0,0,0], 0],
'disabled_some_failed_completed' => [false, [3,2,1,0], 0],
'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1],
'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1],
)
def test_drop_failed_records_after_batch_request_retries(data)
param, failed_scenario, expected = data
batch = Array.new(5, {})
@object.drop_failed_records_after_batch_request_retries = param
@object.failed_scenario = failed_scenario.to_enum
if !param && expected > 0
e = assert_raises Aws::Firehose::Errors::ServiceUnavailableException do
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
end
assert_equal @object.failed_response.error_message, e.message
else
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
assert_equal expected, @object.num_errors
end
end

data(
'disabled_no_failed_completed' => [false, [0,0,0,0], 0],
'disabled_some_failed_completed' => [false, [3,2,1,0], 0],
'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1],
'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1],
'enabled_no_failed_completed' => [true, [0,0,0,0], 0],
'enabled_some_failed_completed' => [true, [3,2,1,0], 3],
'enabled_some_failed_incompleted' => [true, [4,3,2,1], 4],
'enabled_all_failed_incompleted' => [true, [5,5,5,5], 4],
)
def test_monitor_num_of_batch_request_retries(data)
param, failed_scenario, expected = data
batch = Array.new(5, {})
@object.monitor_num_of_batch_request_retries = param
@object.failed_scenario = failed_scenario.to_enum
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
assert_equal expected, @object.num_errors
end
end