Skip to content

Commit

Permalink
Enable to monitor batch request failure and retries - #150
Browse files Browse the repository at this point in the history
  • Loading branch information
simukappu committed Nov 27, 2020
1 parent 53d7154 commit 09877eb
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 4 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,17 @@ Default:
- `kinesis_firehose`: 4 MB
- `kinesis_streams_aggregated`: 1 MB

### raise_error_on_batch_request_failure
Boolean, default `false`.

If *raise_error_on_batch_request_failure* is disabled (default), the plugin will give up sending records when we get batch request failure after retrying max times configured as *retries_on_batch_request*. This giving up
can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics.

If *raise_error_on_batch_request_failure* is enabled, the plugin will raise error and return chunk to Fluentd buffer when we get batch request failure after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response.

### monitor_num_of_batch_request_retries
Boolean, default `false`. If enabled, the plugin will increment *retry_count* monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor [ProvisionedThroughputExceededException](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus). Note that *retry_count* metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if *monitor_num_of_batch_request_retries* is enabled.

## Configuration: kinesis_streams
Here are `kinesis_streams` specific configurations.

Expand Down
55 changes: 51 additions & 4 deletions lib/fluent/plugin/kinesis_helper/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ def configure(conf)
module BatchRequest
module BatchRequestParams
include Fluent::Configurable
config_param :retries_on_batch_request, :integer, default: 8
config_param :reset_backoff_if_success, :bool, default: true
config_param :batch_request_max_count, :integer, default: nil
config_param :batch_request_max_size, :integer, default: nil
config_param :retries_on_batch_request, :integer, default: 8
config_param :reset_backoff_if_success, :bool, default: true
config_param :batch_request_max_count, :integer, default: nil
config_param :batch_request_max_size, :integer, default: nil
config_param :raise_error_on_batch_request_failure, :bool, default: false
config_param :monitor_num_of_batch_request_retries, :bool, default: false
end

def self.included(mod)
Expand Down Expand Up @@ -97,6 +99,8 @@ def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block)
wait_second = backoff.next
msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second]
log.warn(truncate msg)
# Increment num_errors to monitor batch request retries from "monitor_agent" or "fluent-plugin-prometheus"
increment_num_errors if @monitor_num_of_batch_request_retries
reliable_sleep(wait_second)
batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block)
else
Expand Down Expand Up @@ -173,6 +177,49 @@ def give_up_retries(failed_records)
record[:original]
])
}

if @raise_error_on_batch_request_failure
# Raise error and return chunk to Fluentd for retrying
case request_type
# @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Client.html#put_records-instance_method
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Errors.html
when :streams, :streams_aggregated
provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' }
target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first
target_error = provisioned_throughput_exceeded_records.empty? ?
Aws::Kinesis::Errors::ServiceError :
Aws::Kinesis::Errors::ProvisionedThroughputExceededException
# @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Client.html#put_record_batch-instance_method
# @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Errors.html
when :firehose
service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' }
target_failed_record = service_unavailable_exception_records.first || failed_records.first
target_error = service_unavailable_exception_records.empty? ?
Aws::Firehose::Errors::ServiceError :
Aws::Firehose::Errors::ServiceUnavailableException
end
log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying")
raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message])
else
# Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus"
increment_num_errors
end
end

def increment_num_errors
# Prepare Fluent::Plugin::Output instance variables to count errors in this method.
# These instance variables are initialized here for possible future breaking changes of Fluentd.
@num_errors ||= 0
# @see https://github.com/fluent/fluentd/commit/d245454658d16170431d276fcd5849fb0d88ab2b
if Gem::Version.new(Fluent::VERSION) >= Gem::Version.new('1.7.0')
@counter_mutex ||= Mutex.new
@counter_mutex.synchronize{ @num_errors += 1 }
else
@counters_monitor ||= Monitor.new
@counters_monitor.synchronize{ @num_errors += 1 }
end
end

class Backoff
Expand Down
49 changes: 49 additions & 0 deletions test/kinesis_helper/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ class Mock
attr_accessor :retries_on_batch_request, :reset_backoff_if_success
attr_accessor :failed_scenario, :request_series
attr_accessor :batch_request_max_count, :batch_request_max_size
attr_accessor :num_errors, :raise_error_on_batch_request_failure, :monitor_num_of_batch_request_retries

def initialize
@retries_on_batch_request = 3
@reset_backoff_if_success = true
@failed_scenario = [].to_enum
@request_series = []
@num_errors = 0
@raise_error_on_batch_request_failure = false
@monitor_num_of_batch_request_retries = false
end

def request_type
Expand Down Expand Up @@ -138,4 +142,49 @@ def test_reset_backoff(data)
@backoff.expects(:reset).times(expected)
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
end

data(
'disabled_no_failed_completed' => [false, [0,0,0,0], 0],
'disabled_some_failed_completed' => [false, [3,2,1,0], 0],
'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1],
'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1],
'enabled_no_failed_completed' => [true, [0,0,0,0], 0],
'enabled_some_failed_completed' => [true, [3,2,1,0], 0],
'enabled_some_failed_incompleted' => [true, [4,3,2,1], 1],
'enabled_all_failed_incompleted' => [true, [5,5,5,5], 1],
)
def test_raise_error_on_batch_request_failure(data)
param, failed_scenario, expected = data
batch = Array.new(5, {})
@object.raise_error_on_batch_request_failure = param
@object.failed_scenario = failed_scenario.to_enum
if param && expected > 0
e = assert_raises Aws::Firehose::Errors::ServiceUnavailableException do
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
end
assert_equal @object.failed_response.error_message, e.message
else
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
assert_equal expected, @object.num_errors
end
end

data(
'disabled_no_failed_completed' => [false, [0,0,0,0], 0],
'disabled_some_failed_completed' => [false, [3,2,1,0], 0],
'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1],
'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1],
'enabled_no_failed_completed' => [true, [0,0,0,0], 0],
'enabled_some_failed_completed' => [true, [3,2,1,0], 3],
'enabled_some_failed_incompleted' => [true, [4,3,2,1], 4],
'enabled_all_failed_incompleted' => [true, [5,5,5,5], 4],
)
def test_monitor_num_of_batch_request_retries(data)
param, failed_scenario, expected = data
batch = Array.new(5, {})
@object.monitor_num_of_batch_request_retries = param
@object.failed_scenario = failed_scenario.to_enum
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
assert_equal expected, @object.num_errors
end
end

0 comments on commit 09877eb

Please sign in to comment.