Skip to content

Commit

Permalink
Support deriving stream name from tag
Browse files Browse the repository at this point in the history
Add support for kinesis_producer to derive the stream name by
concatenating stream_name_prefix (new configuration option) and the
fluentd tag. Fixes awslabs#67.
  • Loading branch information
Chris Broglie committed May 19, 2016
1 parent 75e8a34 commit 34a9ca7
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 13 deletions.
7 changes: 4 additions & 3 deletions lib/fluent/plugin/kinesis_helper/class_methods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ def config_param_for_firehose

def config_param_for_producer
const_set(:RequestType, :producer)
config_param :stream_name, :string
config_param :region, :string, default: nil
config_param :partition_key, :string, default: nil
config_param :stream_name, :string, default: nil
config_param :stream_name_prefix, :string, default: nil
config_param :region, :string, default: nil
config_param :partition_key, :string, default: nil
config_param_for_credentials
config_param_for_format
config_param_for_debug
Expand Down
12 changes: 11 additions & 1 deletion lib/fluent/plugin/out_kinesis_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ class KinesisProducerOutput < BufferedOutput
Fluent::Plugin.register_output('kinesis_producer', self)
config_param_for_producer

def configure(conf)
super
unless @stream_name or @stream_name_prefix
raise Fluent::ConfigError, "'stream_name' or 'stream_name_prefix' is required"
end
if @stream_name and @stream_name_prefix
raise Fluent::ConfigError, "Only one of 'stream_name' or 'stream_name_prefix' is allowed"
end
end

def write(chunk)
records = convert_to_records(chunk)
wait_futures(write_chunk_to_kpl(records))
Expand All @@ -31,7 +41,7 @@ def convert_format(tag, time, record)
{
data: data_format(tag, time, record),
partition_key: key(record),
stream_name: @stream_name,
stream_name: @stream_name ? @stream_name : @stream_name_prefix + tag,
}
end
end
Expand Down
27 changes: 20 additions & 7 deletions test/dummy_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def requests
end

def records
flatten_records(@accepted_records)
flatten_records(@accepted_records, false)
end

def detailed_records
flatten_records(@accepted_records, true)
end

def failed_count
Expand Down Expand Up @@ -172,7 +176,7 @@ def put_records_boby(req)
"ErrorMessage" => "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
}
else
@accepted_records << record
@accepted_records << {:stream_name => body['StreamName'], :record => record}
{
"SequenceNumber" => "49543463076548007577105092703039560359975228518395019266",
"ShardId" => "shardId-000000000000"
Expand All @@ -197,7 +201,7 @@ def put_record_batch_boby(req)
"ErrorMessage" => "Some message"
}
else
@accepted_records << record
@accepted_records << {:stream_name => body['StreamName'], :record => record}
{
"RecordId" => "49543463076548007577105092703039560359975228518395019266",
}
Expand All @@ -210,15 +214,24 @@ def put_record_batch_boby(req)
}
end

def flatten_records(records)
def flatten_records(records, detailed)
records.flat_map do |record|
data = Base64.decode64(record['Data'])
data = Base64.decode64(record[:record]['Data'])
partition_key = record[:record]['PartitionKey']
if data[0,4] == ['F3899AC2'].pack('H*')
protobuf = data[4,data.length-20]
agg = KinesisProducer::Protobuf::AggregatedRecord.decode(protobuf)
agg.records.map(&:data)
if detailed
{:stream_name => record[:stream_name], :data => agg.records.map(&:data), :partition_key => partition_key}
else
agg.records.map(&:data)
end
else
data
if detailed
{:stream_name => record[:stream_name], :data => data, :partition_key => partition_key}
else
data
end
end
end
end
Expand Down
37 changes: 35 additions & 2 deletions test/plugin/test_out_kinesis_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ def teardown
@server.clear
end

def default_config
def base_config
%[
stream_name test-stream
log_level error
<kinesis_producer>
Expand All @@ -45,6 +44,10 @@ def default_config
]
end

def default_config
"stream_name test-stream " + base_config
end

def create_driver(conf = default_config)
Fluent::Test::BufferedOutputTestDriver.new(Fluent::KinesisProducerOutput) do
end.configure(conf)
Expand All @@ -59,6 +62,20 @@ def test_configure
assert_equal false, d.instance.kinesis_producer.verify_certificate
end

def test_configure_with_stream_name_prefix
d = create_driver("stream_name_prefix test-stream-")
assert_equal 'test-stream-', d.instance.stream_name_prefix
assert_nil d.instance.stream_name
end

def test_configure_without_stream_name_or_prefix
assert_raise(Fluent::ConfigError) { d = create_driver("") }
end

def test_configure_with_stream_name_and_prefix
assert_raise(Fluent::ConfigError) { d = create_driver("stream_name test-stream\nstream_name_prefix test-stream-") }
end

def test_configure_without_section
d = create_driver("stream_name test-stream")
assert_not_nil d.instance.kinesis_producer
Expand All @@ -74,6 +91,22 @@ def test_region
assert_equal 'us-west-2', d.instance.region
end

def test_stream_name
# First record using stream_name_prefix + tag
d = create_driver(base_config + "stream_name_prefix test-stream-")
d.emit({"a"=>1,"b"=>2})
d.run
# Second record using explicit stream_name
d = create_driver
d.emit({"a"=>1,"b"=>2})
d.run

records = @server.detailed_records
assert_equal 2, records.size
assert_equal "test-stream-test", records[0][:stream_name]
assert_equal "test-stream", records[1][:stream_name]
end

data(
'json' => ['json', '{"a":1,"b":2}'],
'ltsv' => ['ltsv', "a:1\tb:2"],
Expand Down

0 comments on commit 34a9ca7

Please sign in to comment.