Skip to content

Commit

Permalink
Rename raise_error_on_batch_request_failure config name to drop_faile…
Browse files Browse the repository at this point in the history
…d_records_after_batch_request_retries with default true - #150
  • Loading branch information
simukappu committed Feb 6, 2021
1 parent 09877eb commit 4ca3d5f
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 24 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,12 @@ Default:
- `kinesis_firehose`: 4 MB
- `kinesis_streams_aggregated`: 1 MB

### raise_error_on_batch_request_failure
Boolean, default `false`.
### drop_failed_records_after_batch_request_retries
Boolean, default `true`.

If *raise_error_on_batch_request_failure* is disabled (default), the plugin will give up sending records when we get batch request failure after retrying max times configured as *retries_on_batch_request*. This giving up
can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics.
If *drop_failed_records_after_batch_request_retries* is enabled (default), the plugin will drop failed records when batch request fails after retrying max times configured as *retries_on_batch_request*. This dropping can be monitored from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) as *retry_count* or *num_errors* metrics.

If *raise_error_on_batch_request_failure* is enabled, the plugin will raise error and return chunk to Fluentd buffer when we get batch request failure after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response.
If *drop_failed_records_after_batch_request_retries* is disabled, the plugin will raise error and return chunk to Fluentd buffer when batch request fails after retrying max times. Fluentd will retry to send chunk records according to retry config in [Buffer Section](https://docs.fluentd.org/configuration/buffer-section). Note that this retryng may create duplicate records since [PutRecords API](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) of Kinesis Data Streams and [PutRecordBatch API](https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html) of Kinesis Data Firehose may return a partially successful response.

### monitor_num_of_batch_request_retries
Boolean, default `false`. If enabled, the plugin will increment *retry_count* monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor [ProvisionedThroughputExceededException](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) from [monitor_agent](https://docs.fluentd.org/input/monitor_agent) or [fluent-plugin-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus). Note that *retry_count* metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if *monitor_num_of_batch_request_retries* is enabled.
Expand Down
20 changes: 10 additions & 10 deletions lib/fluent/plugin/kinesis_helper/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ def configure(conf)
module BatchRequest
module BatchRequestParams
include Fluent::Configurable
config_param :retries_on_batch_request, :integer, default: 8
config_param :reset_backoff_if_success, :bool, default: true
config_param :batch_request_max_count, :integer, default: nil
config_param :batch_request_max_size, :integer, default: nil
config_param :raise_error_on_batch_request_failure, :bool, default: false
config_param :monitor_num_of_batch_request_retries, :bool, default: false
config_param :retries_on_batch_request, :integer, default: 8
config_param :reset_backoff_if_success, :bool, default: true
config_param :batch_request_max_count, :integer, default: nil
config_param :batch_request_max_size, :integer, default: nil
config_param :drop_failed_records_after_batch_request_retries, :bool, default: true
config_param :monitor_num_of_batch_request_retries, :bool, default: false
end

def self.included(mod)
Expand Down Expand Up @@ -178,7 +178,10 @@ def give_up_retries(failed_records)
])
}

if @raise_error_on_batch_request_failure
if @drop_failed_records_after_batch_request_retries
# Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus"
increment_num_errors
else
# Raise error and return chunk to Fluentd for retrying
case request_type
# @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
Expand All @@ -202,9 +205,6 @@ def give_up_retries(failed_records)
end
log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying")
raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message])
else
# Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus"
increment_num_errors
end
end

Expand Down
18 changes: 9 additions & 9 deletions test/kinesis_helper/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ class Mock
attr_accessor :retries_on_batch_request, :reset_backoff_if_success
attr_accessor :failed_scenario, :request_series
attr_accessor :batch_request_max_count, :batch_request_max_size
attr_accessor :num_errors, :raise_error_on_batch_request_failure, :monitor_num_of_batch_request_retries
attr_accessor :num_errors, :drop_failed_records_after_batch_request_retries, :monitor_num_of_batch_request_retries

def initialize
@retries_on_batch_request = 3
@reset_backoff_if_success = true
@failed_scenario = [].to_enum
@request_series = []
@num_errors = 0
@raise_error_on_batch_request_failure = false
@drop_failed_records_after_batch_request_retries = true
@monitor_num_of_batch_request_retries = false
end

Expand Down Expand Up @@ -144,21 +144,21 @@ def test_reset_backoff(data)
end

data(
'disabled_no_failed_completed' => [false, [0,0,0,0], 0],
'disabled_some_failed_completed' => [false, [3,2,1,0], 0],
'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1],
'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1],
'enabled_no_failed_completed' => [true, [0,0,0,0], 0],
'enabled_some_failed_completed' => [true, [3,2,1,0], 0],
'enabled_some_failed_incompleted' => [true, [4,3,2,1], 1],
'enabled_all_failed_incompleted' => [true, [5,5,5,5], 1],
'disabled_no_failed_completed' => [false, [0,0,0,0], 0],
'disabled_some_failed_completed' => [false, [3,2,1,0], 0],
'disabled_some_failed_incompleted' => [false, [4,3,2,1], 1],
'disabled_all_failed_incompleted' => [false, [5,5,5,5], 1],
)
def test_raise_error_on_batch_request_failure(data)
def test_drop_failed_records_after_batch_request_retries(data)
param, failed_scenario, expected = data
batch = Array.new(5, {})
@object.raise_error_on_batch_request_failure = param
@object.drop_failed_records_after_batch_request_retries = param
@object.failed_scenario = failed_scenario.to_enum
if param && expected > 0
if !param && expected > 0
e = assert_raises Aws::Firehose::Errors::ServiceUnavailableException do
@object.send(:batch_request_with_retry, batch, backoff: @backoff) { |batch| @object.batch_request(batch) }
end
Expand Down

0 comments on commit 4ca3d5f

Please sign in to comment.