Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add placeholder support for stream names #174

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gemfile:
matrix:
include:
- rvm: 2.4.1
gemfile: gemfiles/Gemfile.fluentd-0.14.10
gemfile: gemfiles/Gemfile.fluentd-0.14.22
- rvm: 2.4.2 # https://github.com/treasure-data/omnibus-td-agent/blob/v3.1.1/config/projects/td-agent3.rb#L17
gemfile: gemfiles/Gemfile.td-agent-3.1.1
- rvm: 2.4.4 # https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/config/projects/td-agent3.rb#L22
Expand Down
50 changes: 49 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Or just download specify your Ruby library path. Below is the sample for specify

## Dependencies
* Ruby 2.1.0+
* Fluentd 0.14.10+
* Fluentd 0.14.22+

## Basic Usage
Here are general procedures for using this plugin:
Expand Down Expand Up @@ -346,6 +346,30 @@ Here are `kinesis_streams` specific configurations.

### stream_name
Name of the stream to put data.
As of Fluentd v1, we can handle built-in placeholders.
Now, this parameter is supported built-in placeholders.

**NOTE:**
Built-in placeholders request to include target key information with buffer attributes.

e.g.)

If you specify the following `stream_name` configuration with built-in placeholder:

```aconf
stream_name "${$.kubernetes.annotations.kinesis_stream}"
```

You ought to specify the corresponding attributes in buffer directive:

```aconf
# $.kubernetes.annotations.kinesis_stream is needed to set in buffer attributes
<buffer $.kubernetes.annotations.kinesis_stream>
# ...
</buffer>
```

In more detail, please refer [the Placeholders section in the official Fluentd](https://docs.fluentd.org/configuration/buffer-section#placeholders).

### partition_key
A key to extract partition key from JSON object. Default `nil`, which means partition key will be generated randomly.
Expand All @@ -365,6 +389,30 @@ Here are `kinesis_streams_aggregated` specific configurations.

### stream_name
Name of the stream to put data.
As of Fluentd v1, we can handle built-in placeholders.
Now, this parameter is supported built-in placeholders.

**NOTE:**
Built-in placeholders request to include target key information with buffer attributes.

e.g.)

If you specify the following `stream_name` configuration with built-in placeholder:

```aconf
stream_name "${$.kubernetes.annotations.kinesis_stream_aggregated}"
```

You ought to specify the corresponding attributes in buffer directive:

```aconf
# $.kubernetes.annotations.kinesis_stream_aggregated is needed to set in buffer attributes
<buffer $.kubernetes.annotations.kinesis_stream_aggregated>
# ...
</buffer>
```
In more detail, please refer [the Placeholders section in the official Fluentd](https://docs.fluentd.org/configuration/buffer-section#placeholders).


### fixed_partition_key
A value of fixed partition key. Default `nil`, which means partition key will be generated randomly.
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = '>= 2.1'

spec.add_dependency "fluentd", ">= 0.14.10", "< 2"
spec.add_dependency "fluentd", ">= 0.14.22", "< 2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to update dependencies in README Fluentd 0.14.10+ too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 8d2ac48.


# This plugin is sometimes used with s3 plugin, so watch out for conflicts
# https://rubygems.org/gems/fluent-plugin-s3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in fluent-plugin-kinesis.gemspec
gemspec path: ".."

gem "fluentd", "0.14.10"
gem "fluentd", "0.14.22"
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ def format(tag, time, record)
end

def write(chunk)
stream_name = extract_placeholders(@stream_name, chunk)
write_records_batch(chunk) do |batch|
records = batch.map{|(data, partition_key)|
{ data: data, partition_key: partition_key }
}
client.put_records(
stream_name: @stream_name,
stream_name: stream_name,
records: records,
)
end
Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin/out_kinesis_streams_aggregated.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ def format(tag, time, record)
end

def write(chunk)
stream_name = extract_placeholders(@stream_name, chunk)
write_records_batch(chunk) do |batch|
key = @partition_key_generator.call
records = batch.map{|(data)|data}
client.put_records(
stream_name: @stream_name,
stream_name: stream_name,
records: [{
partition_key: key,
data: aggregator.aggregate(records, key),
Expand Down
71 changes: 71 additions & 0 deletions test/plugin/test_out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,77 @@ def test_multibyte_input
assert_equal (record.to_json + "\n").b, @server.records.first
end

class PlaceholdersTest < self
def test_tag_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"stream_name" => "stream-placeholder-${tag}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
},[
Fluent::Config::Element.new('buffer', 'tag', {
'@type' => 'memory',
}, [])
])
)

record = {"a" => "test"}
driver_run(d, [record])
assert_equal("stream-placeholder-test", @server.detailed_records.first[:stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_time_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"stream_name" => "stream-placeholder-${tag}-%Y%m%d",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
},[
Fluent::Config::Element.new('buffer', 'tag, time', {
'@type' => 'memory',
'timekey' => 3600
}, [])
])
)

record = {"a" => "test"}
time = event_time
driver_run(d, [record], time: time)
assert_equal("stream-placeholder-test-#{Time.now.strftime("%Y%m%d")}",
@server.detailed_records.first[:stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_custom_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"stream_name" => "stream-placeholder-${$.key.nested}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
},[
Fluent::Config::Element.new('buffer', '$.key.nested', {
'@type' => 'memory',
}, [])
])
)

record = {"key" => {"nested" => "nested-value"}}
driver_run(d, [record])
assert_equal("stream-placeholder-nested-value", @server.detailed_records.first[:stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end
end

def test_record_count
@server.enable_random_error
d = create_driver
Expand Down
71 changes: 71 additions & 0 deletions test/plugin/test_out_kinesis_streams_aggregated.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,77 @@ def test_record_count
assert @server.raw_records.size < count
end

class PlaceholdersTest < self
def test_tag_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"stream_name" => "stream-placeholder-${tag}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
},[
Fluent::Config::Element.new('buffer', 'tag', {
'@type' => 'memory',
}, [])
])
)

record = {"a" => "test"}
driver_run(d, [record])
assert_equal("stream-placeholder-test", @server.detailed_records.first[:stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_time_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"stream_name" => "stream-placeholder-${tag}-%Y%m%d",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
},[
Fluent::Config::Element.new('buffer', 'tag, time', {
'@type' => 'memory',
'timekey' => 3600
}, [])
])
)

record = {"a" => "test"}
time = event_time
driver_run(d, [record], time: time)
assert_equal("stream-placeholder-test-#{Time.now.strftime("%Y%m%d")}",
@server.detailed_records.first[:stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end

def test_custom_placeholder
d = create_driver(
Fluent::Config::Element.new('ROOT', '', {
"stream_name" => "stream-placeholder-${$.key.nested}",
"@log_level" => "error",
"retries_on_batch_request" => 10,
"endpoint" => "https://localhost:#{@server.port}",
"ssl_verify_peer" => false,
},[
Fluent::Config::Element.new('buffer', '$.key.nested', {
'@type' => 'memory',
}, [])
])
)

record = {"key" => {"nested" => "nested-value"}}
driver_run(d, [record])
assert_equal("stream-placeholder-nested-value", @server.detailed_records.first[:stream_name])
assert_equal 0, d.instance.log.out.logs.size
assert_equal (record.to_json + "\n").b, @server.records.first
end
end

# Debug test case for the issue that it fails to flush the buffer
# https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/133
#def test_chunk_limit_size_for_debug
Expand Down