Skip to content

Commit

Permalink
awslabs#203: add placeholder support to delivery_stream_name for kine…
Browse files Browse the repository at this point in the history
…sis_firehose. closes awslabs#203
  • Loading branch information
Adam Brunner authored and Adam Brunner committed Jun 25, 2020
1 parent 41432b2 commit 509020d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 36 deletions.
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ def format(tag, time, record)
end

def write(chunk)
delivery_stream_name = extract_placeholders(@delivery_stream_name, chunk)
write_records_batch(chunk) do |batch|
records = batch.map{|(data)|
{ data: data }
}
client.put_record_batch(
delivery_stream_name: @delivery_stream_name,
delivery_stream_name: delivery_stream_name,
records: records,
)
end
Expand Down
10 changes: 5 additions & 5 deletions test/dummy_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def describe_stream_boby(req)
def put_record_boby(req)
body = JSON.parse(req.body)
record = {'Data' => body['Data'], 'PartitionKey' => body['PartitionKey']}
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"SequenceNumber" => "21269319989653637946712965403778482177",
"ShardId" => "shardId-000000000001"
Expand Down Expand Up @@ -278,7 +278,7 @@ def put_records_boby(req)
"ErrorMessage" => "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"SequenceNumber" => "49543463076548007577105092703039560359975228518395019266",
"ShardId" => "shardId-000000000000"
Expand All @@ -303,7 +303,7 @@ def put_record_batch_boby(req)
"ErrorMessage" => "Some message"
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"RecordId" => "49543463076548007577105092703039560359975228518395019266",
}
Expand All @@ -323,13 +323,13 @@ def flatten_records(records, detailed: false)
if @aggregator.aggregated?(data)
agg_data = @aggregator.deaggregate(data)[0]
if detailed
{:stream_name => record[:stream_name], :data => agg_data, :partition_key => partition_key}
{:stream_name => record[:stream_name], :delivery_stream_name => record[:delivery_stream_name], :data => agg_data, :partition_key => partition_key}
else
agg_data
end
else
if detailed
{:stream_name => record[:stream_name], :data => data, :partition_key => partition_key}
{:stream_name => record[:stream_name], :delivery_stream_name => record[:delivery_stream_name], :data => data, :partition_key => partition_key}
else
data
end
Expand Down
115 changes: 85 additions & 30 deletions test/plugin/test_out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def create_driver(conf = default_config)

def self.data_of(size, char = 'a')
new_line_size = 1
char.b * ((size - new_line_size)/char.b.size)
char.b * ((size - new_line_size) / char.b.size)
end

def data_of(size, char = 'a')
Expand All @@ -62,7 +62,7 @@ def data_of(size, char = 'a')
def test_configure
d = create_driver
assert_equal 'test-stream', d.instance.delivery_stream_name
assert_equal 'ap-northeast-1' , d.instance.region
assert_equal 'ap-northeast-1', d.instance.region
end

def test_region
Expand All @@ -71,60 +71,60 @@ def test_region
end

data(
'json' => ['json', "{\"a\":1,\"b\":2}"],
'ltsv' => ['ltsv', "a:1\tb:2"],
'json' => ['json', "{\"a\":1,\"b\":2}"],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>")
driver_run(d, [{"a"=>1,"b"=>2}])
driver_run(d, [{"a" => 1, "b" => 2}])
assert_equal (expected + "\n").b, @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_without_append_new_line(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nappend_new_line false")
driver_run(d, [{"a"=>1,"b"=>2}])
driver_run(d, [{"a" => 1, "b" => 2}])
assert_equal (expected + "\n").b, @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_record(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_record true")
driver_run(d, [{"a"=>1,"b"=>2}])
driver_run(d, [{"a" => 1, "b" => 2}])
assert_equal (expected + "\n").b, @server.records.first
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
)
def test_format_with_chomp_record_without_append_new_line(data)
formatter, expected = data
d = create_driver(default_config + "<format>\n@type #{formatter}\n</format>\nchomp_record true\nappend_new_line false")
driver_run(d, [{"a"=>1,"b"=>2}])
driver_run(d, [{"a" => 1, "b" => 2}])
assert_equal expected, @server.records.first
end

def test_data_key
d = create_driver(default_config + "data_key a")
driver_run(d, [{"a"=>1,"b"=>2}, {"b"=>2}])
driver_run(d, [{"a" => 1, "b" => 2}, {"b" => 2}])
assert_equal "1\n", @server.records.first
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
end

def test_data_key_without_append_new_line
d = create_driver(default_config + "data_key a\nappend_new_line false")
driver_run(d, [{"a"=>1,"b"=>2}, {"b"=>2}])
driver_run(d, [{"a" => 1, "b" => 2}, {"b" => 2}])
assert_equal "1", @server.records.first
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
Expand All @@ -133,8 +133,8 @@ def test_data_key_without_append_new_line
def test_max_record_size
d = create_driver(default_config + "data_key a")
driver_run(d, [
{"a"=>data_of(1*MB)},
{"a"=>data_of(1*MB+1)}, # exceeded
{"a" => data_of(1 * MB)},
{"a" => data_of(1 * MB + 1)}, # exceeded
])
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
Expand All @@ -143,8 +143,8 @@ def test_max_record_size
def test_max_record_size_multi_bytes
d = create_driver(default_config + "data_key a")
driver_run(d, [
{"a"=>data_of(1*MB, 'あ')},
{"a"=>data_of(1*MB+6, 'あ')}, # exceeded
{"a" => data_of(1 * MB, 'あ')},
{"a" => data_of(1 * MB + 6, 'あ')}, # exceeded
])
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
Expand All @@ -153,7 +153,7 @@ def test_max_record_size_multi_bytes
def test_single_max_record_size
d = create_driver(default_config + "data_key a")
driver_run(d, [
{"a"=>data_of(1*MB+1)}, # exceeded
{"a" => data_of(1 * MB + 1)}, # exceeded
])
assert_equal 0, @server.records.size
assert_equal 0, @server.error_count
Expand All @@ -163,27 +163,27 @@ def test_single_max_record_size
def test_max_record_size_without_append_new_line
d = create_driver(default_config + "append_new_line false\ndata_key a")
driver_run(d, [
{"a"=>data_of(1*MB+1)},
{"a"=>data_of(1*MB+2)}, # exceeded
{"a" => data_of(1 * MB + 1)},
{"a" => data_of(1 * MB + 2)}, # exceeded
])
assert_equal 1, @server.records.size
assert_equal 1, d.instance.log.out.logs.size
end

data(
'split_by_count' => [Array.new(501, data_of(1*KB)), [500,1]],
'split_by_size' => [Array.new(257, data_of(16*KB)), [256,1]],
'split_by_size_with_space' => [Array.new(255, data_of(16*KB))+[data_of(16*KB+1)], [255,1]],
'no_split_by_size' => [Array.new(256, data_of(16*KB)), [256]],
'split_by_count' => [Array.new(501, data_of(1 * KB)), [500, 1]],
'split_by_size' => [Array.new(257, data_of(16 * KB)), [256, 1]],
'split_by_size_with_space' => [Array.new(255, data_of(16 * KB)) + [data_of(16 * KB + 1)], [255, 1]],
'no_split_by_size' => [Array.new(256, data_of(16 * KB)), [256]],
)
def test_batch_request(data)
records, expected = data
d = create_driver(default_config + "data_key a")
driver_run(d, records.map{|record| {'a' => record}})
driver_run(d, records.map { |record| {'a' => record} })
assert_equal records.size, @server.records.size
assert_equal expected, @server.count_per_requests
@server.size_per_requests.each do |size|
assert size <= 4*MB
assert size <= 4 * MB
end
@server.count_per_requests.each do |count|
assert count <= 500
Expand All @@ -202,12 +202,67 @@ def test_record_count
@server.enable_random_error
d = create_driver
count = 10
driver_run(d, count.times.map{|i|{"a"=>1}})
driver_run(d, count.times.map { |i| {"a" => 1} })
assert_equal count, @server.records.size
assert @server.failed_count > 0
assert @server.error_count > 0
end

class PlaceholdersTest < self
def test_tag_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${tag}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', 'tag', {'@type' => 'memory', }, [])])
)
record = {"a" => "test"}
driver_run(d, [record])
assert_equal("stream-placeholder-test", @server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_time_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${tag}-%Y%m%d",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', 'tag, time', {'@type' => 'memory', 'timekey' => 3600 }, [])])
)
record = {"a" => "test"}
time = event_time
driver_run(d, [record], time: time)
assert_equal("stream-placeholder-test-#{Time.now.strftime("%Y%m%d")}",
@server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_custom_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${$.key.nested}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', '$.key.nested', {'@type' => 'memory', }, [])])
)
record = {"key" => {"nested" => "nested-value"}}
driver_run(d, [record])
assert_equal("stream-placeholder-nested-value", @server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
#def test_chunk_limit_size_for_debug
Expand Down

0 comments on commit 509020d

Please sign in to comment.