Skip to content

Commit

Permalink
#203: add placeholder support to delivery_stream_name for kinesis_fir…
Browse files Browse the repository at this point in the history
…ehose. closes #203
  • Loading branch information
Adam Brunner authored and Adam Brunner committed Jun 25, 2020
1 parent 41432b2 commit 5fa7217
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 30 deletions.
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ def format(tag, time, record)
end

def write(chunk)
delivery_stream_name = extract_placeholders(@delivery_stream_name, chunk)
write_records_batch(chunk) do |batch|
records = batch.map{|(data)|
{ data: data }
}
client.put_record_batch(
delivery_stream_name: @delivery_stream_name,
delivery_stream_name: delivery_stream_name,
records: records,
)
end
Expand Down
10 changes: 5 additions & 5 deletions test/dummy_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def describe_stream_boby(req)
def put_record_boby(req)
body = JSON.parse(req.body)
record = {'Data' => body['Data'], 'PartitionKey' => body['PartitionKey']}
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"SequenceNumber" => "21269319989653637946712965403778482177",
"ShardId" => "shardId-000000000001"
Expand Down Expand Up @@ -278,7 +278,7 @@ def put_records_boby(req)
"ErrorMessage" => "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"SequenceNumber" => "49543463076548007577105092703039560359975228518395019266",
"ShardId" => "shardId-000000000000"
Expand All @@ -303,7 +303,7 @@ def put_record_batch_boby(req)
"ErrorMessage" => "Some message"
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"RecordId" => "49543463076548007577105092703039560359975228518395019266",
}
Expand All @@ -323,13 +323,13 @@ def flatten_records(records, detailed: false)
if @aggregator.aggregated?(data)
agg_data = @aggregator.deaggregate(data)[0]
if detailed
{:stream_name => record[:stream_name], :data => agg_data, :partition_key => partition_key}
{:stream_name => record[:stream_name], :delivery_stream_name => record[:delivery_stream_name], :data => agg_data, :partition_key => partition_key}
else
agg_data
end
else
if detailed
{:stream_name => record[:stream_name], :data => data, :partition_key => partition_key}
{:stream_name => record[:stream_name], :delivery_stream_name => record[:delivery_stream_name], :data => data, :partition_key => partition_key}
else
data
end
Expand Down
103 changes: 79 additions & 24 deletions test/plugin/test_out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ def test_region
end

data(
'json' => ['json', "{\"a\":1,\"b\":2}"],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
'json' => ['json', "{\"a\":1,\"b\":2}"],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>")
Expand All @@ -82,9 +82,9 @@ def test_format(data)
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_without_append_new_line(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nappend_new_line false")
Expand All @@ -93,9 +93,9 @@ def test_format_without_append_new_line(data)
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_record(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_record true")
Expand All @@ -104,9 +104,9 @@ def test_format_with_chomp_record(data)
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_record_without_append_new_line(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_record true\nappend_new_line false")
Expand All @@ -133,8 +133,8 @@ def test_data_key_without_append_new_line
def test_max_record_size
d = create_driver(default_config + "data_key a")
driver_run(d, [
{"a"=>data_of(1*MB)},
{"a"=>data_of(1*MB+1)}, # exceeded
{"a"=>data_of(1*MB)},
{"a"=>data_of(1*MB+1)}, # exceeded
])
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
Expand All @@ -143,8 +143,8 @@ def test_max_record_size
def test_max_record_size_multi_bytes
d = create_driver(default_config + "data_key a")
driver_run(d, [
{"a"=>data_of(1*MB, 'あ')},
{"a"=>data_of(1*MB+6, 'あ')}, # exceeded
{"a"=>data_of(1*MB, 'あ')},
{"a"=>data_of(1*MB+6, 'あ')}, # exceeded
])
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
Expand All @@ -153,7 +153,7 @@ def test_max_record_size_multi_bytes
def test_single_max_record_size
d = create_driver(default_config + "data_key a")
driver_run(d, [
{"a"=>data_of(1*MB+1)}, # exceeded
{"a"=>data_of(1*MB+1)}, # exceeded
])
assert_equal 0, @server.records.size
assert_equal 0, @server.error_count
Expand All @@ -163,19 +163,19 @@ def test_single_max_record_size
def test_max_record_size_without_append_new_line
d = create_driver(default_config + "append_new_line false\ndata_key a")
driver_run(d, [
{"a"=>data_of(1*MB+1)},
{"a"=>data_of(1*MB+2)}, # exceeded
{"a"=>data_of(1*MB+1)},
{"a"=>data_of(1*MB+2)}, # exceeded
])
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
end

data(
'split_by_count' => [Array.new(501, data_of(1*KB)), [500,1]],
'split_by_size' => [Array.new(257, data_of(16*KB)), [256,1]],
'split_by_size_with_space' => [Array.new(255, data_of(16*KB))+[data_of(16*KB+1)], [255,1]],
'no_split_by_size' => [Array.new(256, data_of(16*KB)), [256]],
)
'split_by_count' => [Array.new(501, data_of(1*KB)), [500,1]],
'split_by_size' => [Array.new(257, data_of(16*KB)), [256,1]],
'split_by_size_with_space' => [Array.new(255, data_of(16*KB))+[data_of(16*KB+1)], [255,1]],
'no_split_by_size' => [Array.new(256, data_of(16*KB)), [256]],
)
def test_batch_request(data)
records, expected = data
d = create_driver(default_config + "data_key a")
Expand Down Expand Up @@ -208,6 +208,61 @@ def test_record_count
assert @server.error_count > 0
end

class PlaceholdersTest < self
def test_tag_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${tag}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', 'tag', {'@type' => 'memory', }, [])])
)
record = {"a" => "test"}
driver_run(d, [record])
assert_equal("stream-placeholder-test", @server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_time_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${tag}-%Y%m%d",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', 'tag, time', {'@type' => 'memory', 'timekey' => 3600 }, [])])
)
record = {"a" => "test"}
time = event_time
driver_run(d, [record], time: time)
assert_equal("stream-placeholder-test-#{Time.now.strftime("%Y%m%d")}",
@server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_custom_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${$.key.nested}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', '$.key.nested', {'@type' => 'memory', }, [])])
)
record = {"key" => {"nested" => "nested-value"}}
driver_run(d, [record])
assert_equal("stream-placeholder-nested-value", @server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
#def test_chunk_limit_size_for_debug
Expand Down

0 comments on commit 5fa7217

Please sign in to comment.