Skip to content

Commit

Permalink
Add debug test case for failure to flush the buffer - awslabs#133
Browse files Browse the repository at this point in the history
  • Loading branch information
simukappu committed Aug 25, 2018
1 parent a89b0b9 commit 694c8e2
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 0 deletions.
10 changes: 10 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ def driver_run(d, records, time: nil)
require 'webmock/test_unit'
WebMock.disable!
include Fluent::Test::Helpers

def localize_method(klass, method, &block)
unbound = klass.instance_method(method)
klass.send(:define_method, method) do |*args|
block.call(unbound.bind(self), *args)
end
lambda {
klass.send(:define_method, method, unbound)
}
end
41 changes: 41 additions & 0 deletions test/plugin/test_out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,45 @@ def test_record_count
assert @server.failed_count > 0
assert @server.error_count > 0
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
def test_chunk_limit_size_for_debug
config = <<~CONF
log_level warn
<buffer>
chunk_limit_size "1m"
</buffer>
CONF
d = create_driver(default_config + config)

begin
# Override Aws::Firehose::Client.put_record_batch to simple dummy method temporarily
# since wait_flush_completion is hard coded as 10s in Fluent::Test::Driver::Output class.
# See https://github.com/fluent/fluentd/blob/master/lib/fluent/test/driver/output.rb
release = localize_method(Aws::Firehose::Client, :put_record_batch) do |original|
OpenStruct.new(
failed_put_count: 0,
request_responses: [
OpenStruct.new(
record_id: "49543463076548007577105092703039560359975228518395019266"
)
]
)
end

d.run(wait_flush_completion: true, force_flush_retry: true) do
10.times do
time = Fluent::EventTime.now
events = Array.new(Kernel.rand(3000..5000)).map { [time, { msg: "x" * 256 }] }
d.feed("test", events)
end
end

puts d.logs
d.logs.each { |log_record| assert_not_match(/NoMethodError/, log_record) }
ensure
release.call
end
end
end
42 changes: 42 additions & 0 deletions test/plugin/test_out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,46 @@ def test_record_count
assert @server.failed_count > 0
assert @server.error_count > 0
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
def test_chunk_limit_size_for_debug
config = <<~CONF
log_level warn
<buffer>
chunk_limit_size "1m"
</buffer>
CONF
d = create_driver(default_config + config)

begin
# Override Aws::Kinesis::Client.put_records to simple dummy method temporarily
# since wait_flush_completion is hard coded as 10s in Fluent::Test::Driver::Output class.
# See https://github.com/fluent/fluentd/blob/master/lib/fluent/test/driver/output.rb
release = localize_method(Aws::Kinesis::Client, :put_records) do |original|
OpenStruct.new(
failed_record_count: 0,
records: [
OpenStruct.new(
sequence_number: "21269319989653637946712965403778482177",
shard_id: "shardId-000000000001"
)
]
)
end

d.run(wait_flush_completion: true, force_flush_retry: true) do
10.times do
time = Fluent::EventTime.now
events = Array.new(Kernel.rand(3000..5000)).map { [time, { msg: "x" * 256 }] }
d.feed("test", events)
end
end

puts d.logs
d.logs.each { |log_record| assert_not_match(/NoMethodError/, log_record) }
ensure
release.call
end
end
end
42 changes: 42 additions & 0 deletions test/plugin/test_out_kinesis_streams_aggregated.rb
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,46 @@ def test_record_count
assert @server.error_count > 0
assert @server.raw_records.size < count
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
def test_chunk_limit_size_for_debug
config = <<~CONF
log_level warn
<buffer>
chunk_limit_size "1m"
</buffer>
CONF
d = create_driver(default_config + config)

begin
# Override Aws::Kinesis::Client.put_records to simple dummy method temporarily
# since wait_flush_completion is hard coded as 10s in Fluent::Test::Driver::Output class.
# See https://github.com/fluent/fluentd/blob/master/lib/fluent/test/driver/output.rb
release = localize_method(Aws::Kinesis::Client, :put_records) do |original|
OpenStruct.new(
failed_record_count: 0,
records: [
OpenStruct.new(
sequence_number: "21269319989653637946712965403778482177",
shard_id: "shardId-000000000001"
)
]
)
end

d.run(wait_flush_completion: true, force_flush_retry: true) do
10.times do
time = Fluent::EventTime.now
events = Array.new(Kernel.rand(3000..5000)).map { [time, { msg: "x" * 256 }] }
d.feed("test", events)
end
end

puts d.logs
d.logs.each { |log_record| assert_not_match(/NoMethodError/, log_record) }
ensure
release.call
end
end
end

0 comments on commit 694c8e2

Please sign in to comment.