Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reduce_max_size_error_message configuration #127

Merged
merged 1 commit into from
Sep 20, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ If your record contains a field whose string should be sent to Amazon Kinesis di
### log_truncate_max_size
Integer, default 0. When emitting the log entry, the message will be truncated by this size to avoid infinite loop when the log is also sent to Kinesis. The value 0 (default) means no truncation.

### reduce_max_size_error_message
Boolean, default false. When each record exceeds the maximum size, the original record is put on its error message. If this parameter is turned on, the error message will contain only summarized data to prevent large traffic generated by the error message.

## Configuration: kinesis_streams
Here are `kinesis_streams` specific configurations.

Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin/kinesis_helper/class_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def config_param_for_format
config_param :formatter, :string, default: 'json'
config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 0
config_param :reduce_max_size_error_message, :bool, default: false
end

def config_param_for_batch_request
Expand Down
9 changes: 7 additions & 2 deletions lib/fluent/plugin/kinesis_helper/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ def initialize(key, record)
end

class ExceedMaxRecordSizeError < SkipRecordError
def initialize(record)
super "Record size limit exceeded for #{record}"
def initialize(record, reduce_message)
if reduce_message == true
sampled_record = "#{record.slice(0,1024)}...#{record.slice(-1024,1024)}"
super "Record size limit exceeded for #{record.length}-bytes record: #{sampled_record}"
else
super "Record size limit exceeded for #{record}"
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/kinesis_helper/format.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def convert_record(tag, time, record)
converted = convert_format(tag, time, record)
converted[:data] += "\n" if @append_new_line
if converted[:data].size > MaxRecordSize
raise ExceedMaxRecordSizeError, converted[:data]
raise ExceedMaxRecordSizeError.new(converted[:data], @reduce_max_size_error_message)
else
converted
end
Expand Down
16 changes: 15 additions & 1 deletion test/kinesis_helper/test_format.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ class KinesisHelperFormatTest < Test::Unit::TestCase
class Mock
include Fluent::KinesisHelper::Format

attr_accessor :log_truncate_max_size
attr_accessor :log_truncate_max_size, :reduce_max_size_error_message

def initialize
@log_truncate_max_size = 0
@reduce_max_size_error_message = false
end

def log
Expand Down Expand Up @@ -60,6 +61,19 @@ def test_convert_record_max_record_size(data)
assert_equal result.nil? ? 1 : 0, @object.log.logs.size
end

data(
'default' => [false, :>],
'reduce' => [true, :<],
)
def test_reduce_max_size_error_message(data)
@object.reduce_max_size_error_message, expected = data
record = {"a"=>"a"*(1024*1024-'{"a":""}'.size+1)}
result = @object.send(:convert_record, '', '', record)
assert_equal nil, result
assert_equal 1, @object.log.logs.size
assert_operator @object.log.logs.first.size, expected, record.to_s.size
end

data(
'1' => [1, "1"],
'5' => [5, "12345"],
Expand Down