Skip to content

Commit

Permalink
Merge pull request #125 from AKoetsier/empty-batches
Browse files Browse the repository at this point in the history
Skip empty batches
  • Loading branch information
riywo authored Sep 20, 2017
2 parents ecc4047 + b4797ea commit b848de1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
1 change: 1 addition & 0 deletions lib/fluent/plugin/out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class KinesisStreamsOutput < BufferedOutput
def write(chunk)
records = convert_to_records(chunk)
split_to_batches(records).each do |batch|
next unless batch.size > 0
batch_request_with_retry(batch)
end
log.debug("Written #{records.size} records")
Expand Down
34 changes: 21 additions & 13 deletions test/dummy_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,27 @@ def exceeded?(req, max_count, max_size)
def put_records_boby(req)
body = JSON.parse(req.body)
failed_record_count = 0
records = body['Records'].map do |record|
if random_fail
failed_record_count += 1
{
"ErrorCode" => "ProvisionedThroughputExceededException",
"ErrorMessage" => "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record}
{
"SequenceNumber" => "49543463076548007577105092703039560359975228518395019266",
"ShardId" => "shardId-000000000000"
}
records = if body['Records'].empty?
@error_count += 1
[{
"ErrorCode" => "ValidationException",
"ErrorMessage" => "1 validation error detected: Value '[]' at 'records' failed to satisfy constraint: Member must have length greater than or equal to 1"
}]
else
body['Records'].map do |record|
if random_fail
failed_record_count += 1
{
"ErrorCode" => "ProvisionedThroughputExceededException",
"ErrorMessage" => "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
}
else
@accepted_records << {:stream_name => body['StreamName'], :record => record}
{
"SequenceNumber" => "49543463076548007577105092703039560359975228518395019266",
"ShardId" => "shardId-000000000000"
}
end
end
end
@failed_count += failed_record_count
Expand Down
9 changes: 9 additions & 0 deletions test/plugin/test_out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ def test_max_record_size
assert_equal 1, d.instance.log.logs.size
end

def test_single_max_record_size
d = create_driver
d.emit({"a"=>"a"*(1024*1024-'{"a":""}'.size+1)}) # exceeded
d.run
assert_equal 0, @server.records.size
assert_equal 0, @server.error_count
assert_equal 1, d.instance.log.logs.size
end

pad = 32 + '{"data":""}'.size
data(
'split_by_count' => [Array.new(501, {data:'a'}), [500,1]],
Expand Down

0 comments on commit b848de1

Please sign in to comment.