Skip to content

Commit

Permalink
Merge pull request #127 from awslabs/feature/add_reduce_max_size_erro…
Browse files Browse the repository at this point in the history
…r_message

Add reduce_max_size_error_message configuration
  • Loading branch information
riywo authored Sep 20, 2017
2 parents b848de1 + bf34fb9 commit a124634
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 4 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ If your record contains a field whose string should be sent to Amazon Kinesis di
### log_truncate_max_size
Integer, default 0. When emitting the log entry, the message will be truncated by this size to avoid infinite loop when the log is also sent to Kinesis. The value 0 (default) means no truncation.

### reduce_max_size_error_message
Boolean, default false. When each record exceeds the maximum size, the original record is put on its error message. If this parameter is turned on, the error message will contain only summarized data to prevent large traffic generated by the error message.

## Configuration: kinesis_streams
Here are `kinesis_streams` specific configurations.

Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/kinesis_helper/class_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def config_param_for_format
config_param :formatter, :string, default: 'json'
config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 0
config_param :reduce_max_size_error_message, :bool, default: false
end

def config_param_for_batch_request
Expand Down
9 changes: 7 additions & 2 deletions lib/fluent/plugin/kinesis_helper/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ def initialize(key, record)
end

class ExceedMaxRecordSizeError < SkipRecordError
def initialize(record)
super "Record size limit exceeded for #{record}"
def initialize(record, reduce_message)
if reduce_message == true
sampled_record = "#{record.slice(0,1024)}...#{record.slice(-1024,1024)}"
super "Record size limit exceeded for #{record.length}-bytes record: #{sampled_record}"
else
super "Record size limit exceeded for #{record}"
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/kinesis_helper/format.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def convert_record(tag, time, record)
converted = convert_format(tag, time, record)
converted[:data] += "\n" if @append_new_line
if converted[:data].size > MaxRecordSize
raise ExceedMaxRecordSizeError, converted[:data]
raise ExceedMaxRecordSizeError.new(converted[:data], @reduce_max_size_error_message)
else
converted
end
Expand Down
16 changes: 15 additions & 1 deletion test/kinesis_helper/test_format.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ class KinesisHelperFormatTest < Test::Unit::TestCase
class Mock
include Fluent::KinesisHelper::Format

attr_accessor :log_truncate_max_size
attr_accessor :log_truncate_max_size, :reduce_max_size_error_message

def initialize
@log_truncate_max_size = 0
@reduce_max_size_error_message = false
end

def log
Expand Down Expand Up @@ -60,6 +61,19 @@ def test_convert_record_max_record_size(data)
assert_equal result.nil? ? 1 : 0, @object.log.logs.size
end

data(
'default' => [false, :>],
'reduce' => [true, :<],
)
def test_reduce_max_size_error_message(data)
@object.reduce_max_size_error_message, expected = data
record = {"a"=>"a"*(1024*1024-'{"a":""}'.size+1)}
result = @object.send(:convert_record, '', '', record)
assert_equal nil, result
assert_equal 1, @object.log.logs.size
assert_operator @object.log.logs.first.size, expected, record.to_s.size
end

data(
'1' => [1, "1"],
'5' => [5, "12345"],
Expand Down

0 comments on commit a124634

Please sign in to comment.