Skip to content

Commit

Permalink
awslabs#203: add placeholder support to delivery_stream_name for kine…
Browse files Browse the repository at this point in the history
…sis_firehose. closes awslabs#203
  • Loading branch information
Adam Brunner authored and Adam Brunner committed Jun 25, 2020
1 parent 41432b2 commit 70abde4
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ def format(tag, time, record)
end

def write(chunk)
delivery_stream_name = extract_placeholders(@delivery_stream_name, chunk)
write_records_batch(chunk) do |batch|
records = batch.map{|(data)|
{ data: data }
}
client.put_record_batch(
delivery_stream_name: @delivery_stream_name,
delivery_stream_name: delivery_stream_name,
records: records,
)
end
Expand Down
10 changes: 5 additions & 5 deletions test/dummy_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def describe_stream_boby(req)
def put_record_boby(req)
body = JSON.parse(req.body)
record = {'Data' => body['Data'], 'PartitionKey' => body['PartitionKey']}
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"SequenceNumber" => "21269319989653637946712965403778482177",
"ShardId" => "shardId-000000000001"
Expand Down Expand Up @@ -278,7 +278,7 @@ def put_records_boby(req)
"ErrorMessage" => "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"SequenceNumber" => "49543463076548007577105092703039560359975228518395019266",
"ShardId" => "shardId-000000000000"
Expand All @@ -303,7 +303,7 @@ def put_record_batch_boby(req)
"ErrorMessage" => "Some message"
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record} if recording?
@accepted_records << {:stream_name => body['StreamName'], :delivery_stream_name => body['DeliveryStreamName'], :record => record} if recording?
{
"RecordId" => "49543463076548007577105092703039560359975228518395019266",
}
Expand All @@ -323,13 +323,13 @@ def flatten_records(records, detailed: false)
if @aggregator.aggregated?(data)
agg_data = @aggregator.deaggregate(data)[0]
if detailed
{:stream_name => record[:stream_name], :data => agg_data, :partition_key => partition_key}
{:stream_name => record[:stream_name], :delivery_stream_name => record[:delivery_stream_name], :data => agg_data, :partition_key => partition_key}
else
agg_data
end
else
if detailed
{:stream_name => record[:stream_name], :data => data, :partition_key => partition_key}
{:stream_name => record[:stream_name], :delivery_stream_name => record[:delivery_stream_name], :data => data, :partition_key => partition_key}
else
data
end
Expand Down
55 changes: 55 additions & 0 deletions test/plugin/test_out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,61 @@ def test_record_count
assert @server.error_count > 0
end

class PlaceholdersTest < self
def test_tag_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${tag}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', 'tag', {'@type' => 'memory', }, [])])
)
record = {"a" => "test"}
driver_run(d, [record])
assert_equal("stream-placeholder-test", @server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_time_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${tag}-%Y%m%d",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', 'tag, time', {'@type' => 'memory', 'timekey' => 3600 }, [])])
)
record = {"a" => "test"}
time = event_time
driver_run(d, [record], time: time)
assert_equal("stream-placeholder-test-#{Time.now.strftime("%Y%m%d")}",
@server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_custom_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"delivery_stream_name" => "stream-placeholder-${$.key.nested}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
}, [Fluent::Config::Element.new('buffer', '$.key.nested', {'@type' => 'memory', }, [])])
)
record = {"key" => {"nested" => "nested-value"}}
driver_run(d, [record])
assert_equal("stream-placeholder-nested-value", @server.detailed_records.first[:delivery_stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
#def test_chunk_limit_size_for_debug
Expand Down

0 comments on commit 70abde4

Please sign in to comment.