Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for fluentd v0.12 and use new Plugin API #156

Merged
merged 3 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
language: ruby
os: linux
sudo: false

matrix:
include:
- rvm: 2.4.1
os: linux
gemfile: Gemfile
- rvm: 2.1.10
os: linux
gemfile: gemfiles/Gemfile.td-agent-2.3.5
- rvm: 2.4.1
gemfile: gemfiles/Gemfile.fluentd-0.14.10
- rvm: 2.4.4 # https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/config/projects/td-agent3.rb#L22
gemfile: gemfiles/Gemfile.td-agent-3.2.0

script: bundle exec rake test

sudo: false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Or just download specify your Ruby library path. Below is the sample for specify

## Dependencies
* Ruby 2.1.0+
* Fluentd 0.12.35+ (supporting 0.14.x)
* Fluentd 0.14.10+

## Basic Usage
Here are general procedures for using this plugin:
Expand Down
16 changes: 6 additions & 10 deletions benchmark/task.rake
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,21 @@ class KinesisBenchmark
def create_driver(type, conf = default_config)
klass = case type
when :streams
Fluent::KinesisStreamsOutput
Fluent::Plugin::KinesisStreamsOutput
when :streams_aggregated
Fluent::KinesisStreamsAggregatedOutput
Fluent::Plugin::KinesisStreamsAggregatedOutput
when :firehose
Fluent::KinesisFirehoseOutput
Fluent::Plugin::KinesisFirehoseOutput
end
conf += case type
when :streams, :streams_aggregated
"stream_name fluent-plugin-test"
when :firehose
"delivery_stream_name fluent-plugin-test"
end
if fluentd_v0_12?
Fluent::Test::BufferedOutputTestDriver.new(klass) do
end.configure(conf)
else
Fluent::Test::Driver::Output.new(klass) do
end.configure(conf)
end

Fluent::Test::Driver::Output.new(klass) do
end.configure(conf)
end

def benchmark(size, count)
Expand Down
17 changes: 7 additions & 10 deletions fluent-plugin-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = '>= 2.1'

spec.add_dependency "fluentd", ">= 0.12.35", "< 2"
spec.add_dependency "aws-sdk", ">= 2.9.9", "< 4"
# TODO: fluent-plugin-s3 depends on v2 only.
# https://github.com/fluent/fluent-plugin-s3/issues/208
#
# This plugin is sometimes used with s3 plugin.
# Unless s3 plugin is updated to be available with v3,
# this plugin should depend on v2 only.
# spec.add_dependency "aws-sdk-kinesis", "~> 1"
# spec.add_dependency "aws-sdk-firehose", "~> 1"
spec.add_dependency "fluentd", ">= 0.14.0", "< 2"

# This plugin is sometimes used with s3 plugin, so watch out for conflicts
# https://rubygems.org/gems/fluent-plugin-s3
spec.add_dependency "aws-sdk-kinesis", "~> 1"
spec.add_dependency "aws-sdk-firehose", "~> 1"

spec.add_dependency "google-protobuf", "~> 3"

spec.add_development_dependency "bundler", "~> 1.10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,4 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in fluent-plugin-kinesis.gemspec
gemspec path: ".."

# Specify related gems for td-agent v2.3.5
# https://github.com/treasure-data/omnibus-td-agent/blob/release-2.3.5/config/projects/td-agent2.rb#L23
gem "fluentd", "0.12.35"
# https://github.com/treasure-data/omnibus-td-agent/blob/release-2.3.5/plugin_gems.rb#L13-L15
gem "fluent-plugin-s3", "0.8.2"
gem "aws-sdk", "2.9.9"
gem "fluentd", "0.14.10"
31 changes: 31 additions & 0 deletions gemfiles/Gemfile.td-agent-3.2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

source 'https://rubygems.org'

# Specify your gem's dependencies in fluent-plugin-kinesis.gemspec
gemspec path: ".."

# Specify related gems for td-agent v3.2.0
# https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/config/projects/td-agent3.rb#L27
gem "fluentd", "1.2.2"
# https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/plugin_gems.rb#L16-L23
gem "jmespath", "1.4.0"
gem "aws-partitions", "1.87.0"
gem "aws-sigv4", "1.0.2"
gem "aws-sdk-core", "3.21.2"
gem "aws-sdk-kms", "1.5.0"
gem "aws-sdk-sqs", "1.3.0"
gem "aws-sdk-s3", "1.13.0"
gem "fluent-plugin-s3", "1.1.3"
214 changes: 97 additions & 117 deletions lib/fluent/plugin/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,154 +12,134 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

require 'fluent/plugin/output'
require 'fluent/plugin/kinesis_helper/client'
require 'fluent/plugin/kinesis_helper/api'
require 'zlib'

module Fluent
class KinesisOutput < BufferedOutput
def self.fluentd_v0_12?
@fluentd_v0_12 ||= Gem.loaded_specs['fluentd'].version < Gem::Version.create('0.14')
end

include Fluent::SetTimeKeyMixin
include Fluent::SetTagKeyMixin

include KinesisHelper::Client
include KinesisHelper::API

class SkipRecordError < ::StandardError
def initialize(message, record)
super message
@record_message = if record.is_a? Array
record.reverse.map(&:to_s).join(', ')
else
record.to_s
module Plugin
class KinesisOutput < Fluent::Plugin::Output
include Fluent::MessagePackFactory::Mixin
include KinesisHelper::Client
include KinesisHelper::API

class SkipRecordError < ::StandardError
def initialize(message, record)
super message
@record_message = if record.is_a? Array
record.reverse.map(&:to_s).join(', ')
else
record.to_s
end
end
end

def to_s
super + ": " + @record_message
def to_s
super + ": " + @record_message
end
end
end
class KeyNotFoundError < SkipRecordError
def initialize(key, record)
super "Key '#{key}' doesn't exist", record
class KeyNotFoundError < SkipRecordError
def initialize(key, record)
super "Key '#{key}' doesn't exist", record
end
end
end
class ExceedMaxRecordSizeError < SkipRecordError
def initialize(size, record)
super "Record size limit exceeded in #{size/1024} KB", record
class ExceedMaxRecordSizeError < SkipRecordError
def initialize(size, record)
super "Record size limit exceeded in #{size/1024} KB", record
end
end
end
class InvalidRecordError < SkipRecordError
def initialize(record)
super "Invalid type of record", record
class InvalidRecordError < SkipRecordError
def initialize(record)
super "Invalid type of record", record
end
end
end

config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 1024
config_param :compression, :string, default: nil
config_section :format do
config_set_default :@type, 'json'
end
config_section :inject do
config_set_default :time_type, 'string'
config_set_default :time_format, '%Y-%m-%dT%H:%M:%S.%N%z'
end
config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 1024
config_param :compression, :string, default: nil
config_section :format do
config_set_default :@type, 'json'
end
config_section :inject do
config_set_default :time_type, 'string'
config_set_default :time_format, '%Y-%m-%dT%H:%M:%S.%N%z'
end

config_param :debug, :bool, default: false
config_param :debug, :bool, default: false

if fluentd_v0_12?
config_param :format, :string, default: 'json'
else
helpers :formatter, :inject
end

def configure(conf)
super
@data_formatter = data_formatter_create(conf)
end

def multi_workers_ready?
true
end
def configure(conf)
super
@data_formatter = data_formatter_create(conf)
end

private
def multi_workers_ready?
true
end

def fluentd_v0_12?
self.class.fluentd_v0_12?
end
private

def data_formatter_create(conf)
if fluentd_v0_12?
formatter = Fluent::Plugin.new_formatter(@format)
formatter.configure(conf)
else
def data_formatter_create(conf)
formatter = formatter_create
end
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
unless fluentd_v0_12?
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
end
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
compressor.call(record[@data_key].to_s.b)
}
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
compressor.call(record[@data_key].to_s.b)
}
end
end
end

def compressor_create
case @compression
when "zlib"
->(data) { Zlib::Deflate.deflate(data) }
else
->(data) { data }
def compressor_create
case @compression
when "zlib"
->(data) { Zlib::Deflate.deflate(data) }
else
->(data) { data }
end
end
end

def format_for_api(&block)
converted = block.call
size = size_of_values(converted)
if size > @max_record_size
raise ExceedMaxRecordSizeError.new(size, converted)
def format_for_api(&block)
converted = block.call
size = size_of_values(converted)
if size > @max_record_size
raise ExceedMaxRecordSizeError.new(size, converted)
end
converted.to_msgpack
rescue SkipRecordError => e
log.error(truncate e)
''
end
converted.to_msgpack
rescue SkipRecordError => e
log.error(truncate e)
''
end

def write_records_batch(chunk, &block)
if fluentd_v0_12?
unique_id = chunk.unique_id.unpack('H*').first
else
def write_records_batch(chunk, &block)
unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
chunk.open do |io|
records = msgpack_unpacker(io).to_enum
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
end
end
end
records = chunk.to_enum(:msgpack_each)
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
end
end

def request_type
self.class::RequestType
end
def request_type
self.class::RequestType
end

def truncate(msg)
if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
msg.to_s
else
msg.to_s[0...@log_truncate_max_size]
def truncate(msg)
if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
msg.to_s
else
msg.to_s[0...@log_truncate_max_size]
end
end
end
end
Expand Down
Loading