Skip to content

Commit

Permalink
Adding Stream name to debug logs (#214)
Browse files Browse the repository at this point in the history
* adding stream_name to logs in write_records_batch

* Adding stream_name in write_records_batch call

* Adding stream_name in write_records_batch call

* Adding stream_name in write_records_batch call
  • Loading branch information
samsplunks authored Oct 9, 2021
1 parent 2b23a91 commit 87f1ee7
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions lib/fluent/plugin/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,13 @@ def msgpack_unpacker(*args)
include Fluent::MessagePackFactory::Mixin
end

def write_records_batch(chunk, &block)
def write_records_batch(chunk, stream_name, &block)
unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
records = chunk.to_enum(:msgpack_each)
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
log.debug(sprintf "%s: Write chunk %s / %3d records / %4d KB", stream_name, unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
log.debug(sprintf "%s: Finish writing chunk", stream_name)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def format(tag, time, record)

def write(chunk)
delivery_stream_name = extract_placeholders(@delivery_stream_name, chunk)
write_records_batch(chunk) do |batch|
write_records_batch(chunk, delivery_stream_name) do |batch|
records = batch.map{|(data)|
{ data: data }
}
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def format(tag, time, record)

def write(chunk)
stream_name = extract_placeholders(@stream_name, chunk)
write_records_batch(chunk) do |batch|
write_records_batch(chunk, stream_name) do |batch|
records = batch.map{|(data, partition_key)|
{ data: data, partition_key: partition_key }
}
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_kinesis_streams_aggregated.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def format(tag, time, record)

def write(chunk)
stream_name = extract_placeholders(@stream_name, chunk)
write_records_batch(chunk) do |batch|
write_records_batch(chunk, stream_name) do |batch|
key = @partition_key_generator.call
records = batch.map{|(data)|data}
client.put_records(
Expand Down

0 comments on commit 87f1ee7

Please sign in to comment.