Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for fluentd v0.12 and use new Plugin API #156

Merged
merged 3 commits into from
Aug 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
language: ruby
os: linux
sudo: false

matrix:
include:
- rvm: 2.4.1
os: linux
gemfile: Gemfile
- rvm: 2.1.10
os: linux
gemfile: gemfiles/Gemfile.td-agent-2.3.5
- rvm: 2.4.1
gemfile: gemfiles/Gemfile.fluentd-0.14.10
- rvm: 2.4.4 # https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/config/projects/td-agent3.rb#L22
gemfile: gemfiles/Gemfile.td-agent-3.2.0

script: bundle exec rake test

sudo: false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Or just download specify your Ruby library path. Below is the sample for specify

## Dependencies
* Ruby 2.1.0+
* Fluentd 0.12.35+ (supporting 0.14.x)
* Fluentd 0.14.10+

## Basic Usage
Here are general procedures for using this plugin:
Expand Down
12 changes: 4 additions & 8 deletions benchmark/task.rake
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,22 @@ class KinesisBenchmark
def create_driver(type, conf = default_config)
klass = case type
when :streams
Fluent::KinesisStreamsOutput
Fluent::Plugin::KinesisStreamsOutput
when :streams_aggregated
Fluent::KinesisStreamsAggregatedOutput
Fluent::Plugin::KinesisStreamsAggregatedOutput
when :firehose
Fluent::KinesisFirehoseOutput
Fluent::Plugin::KinesisFirehoseOutput
end
conf += case type
when :streams, :streams_aggregated
"stream_name fluent-plugin-test"
when :firehose
"delivery_stream_name fluent-plugin-test"
end
if fluentd_v0_12?
Fluent::Test::BufferedOutputTestDriver.new(klass) do
end.configure(conf)
else

Fluent::Test::Driver::Output.new(klass) do
end.configure(conf)
end
end

def benchmark(size, count)
record = {"a"=>"a"*size}
Expand Down
17 changes: 7 additions & 10 deletions fluent-plugin-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = '>= 2.1'

spec.add_dependency "fluentd", ">= 0.12.35", "< 2"
spec.add_dependency "aws-sdk", ">= 2.9.9", "< 4"
# TODO: fluent-plugin-s3 depends on v2 only.
# https://github.com/fluent/fluent-plugin-s3/issues/208
#
# This plugin is sometimes used with s3 plugin.
# Unless s3 plugin is updated to be available with v3,
# this plugin should depend on v2 only.
# spec.add_dependency "aws-sdk-kinesis", "~> 1"
# spec.add_dependency "aws-sdk-firehose", "~> 1"
spec.add_dependency "fluentd", ">= 0.14.0", "< 2"

# This plugin is sometimes used with s3 plugin, so watch out for conflicts
# https://rubygems.org/gems/fluent-plugin-s3
spec.add_dependency "aws-sdk-kinesis", "~> 1"
spec.add_dependency "aws-sdk-firehose", "~> 1"

spec.add_dependency "google-protobuf", "~> 3"

spec.add_development_dependency "bundler", "~> 1.10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,4 @@ source 'https://rubygems.org'
# Specify your gem's dependencies in fluent-plugin-kinesis.gemspec
gemspec path: ".."

# Specify related gems for td-agent v2.3.5
# https://github.com/treasure-data/omnibus-td-agent/blob/release-2.3.5/config/projects/td-agent2.rb#L23
gem "fluentd", "0.12.35"
# https://github.com/treasure-data/omnibus-td-agent/blob/release-2.3.5/plugin_gems.rb#L13-L15
gem "fluent-plugin-s3", "0.8.2"
gem "aws-sdk", "2.9.9"
gem "fluentd", "0.14.10"
31 changes: 31 additions & 0 deletions gemfiles/Gemfile.td-agent-3.2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

source 'https://rubygems.org'

# Specify your gem's dependencies in fluent-plugin-kinesis.gemspec
gemspec path: ".."

# Specify related gems for td-agent v3.2.0
# https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/config/projects/td-agent3.rb#L27
gem "fluentd", "1.2.2"
# https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/plugin_gems.rb#L16-L23
gem "jmespath", "1.4.0"
gem "aws-partitions", "1.87.0"
gem "aws-sigv4", "1.0.2"
gem "aws-sdk-core", "3.21.2"
gem "aws-sdk-kms", "1.5.0"
gem "aws-sdk-sqs", "1.3.0"
gem "aws-sdk-s3", "1.13.0"
gem "fluent-plugin-s3", "1.1.3"
36 changes: 8 additions & 28 deletions lib/fluent/plugin/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,15 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

require 'fluent/plugin/output'
require 'fluent/plugin/kinesis_helper/client'
require 'fluent/plugin/kinesis_helper/api'
require 'zlib'

module Fluent
class KinesisOutput < BufferedOutput
def self.fluentd_v0_12?
@fluentd_v0_12 ||= Gem.loaded_specs['fluentd'].version < Gem::Version.create('0.14')
end

include Fluent::SetTimeKeyMixin
include Fluent::SetTagKeyMixin

module Plugin
class KinesisOutput < Fluent::Plugin::Output
include Fluent::MessagePackFactory::Mixin
include KinesisHelper::Client
include KinesisHelper::API

Expand Down Expand Up @@ -71,11 +67,7 @@ def initialize(record)

config_param :debug, :bool, default: false

if fluentd_v0_12?
config_param :format, :string, default: 'json'
else
helpers :formatter, :inject
end

def configure(conf)
super
Expand All @@ -88,23 +80,12 @@ def multi_workers_ready?

private

def fluentd_v0_12?
self.class.fluentd_v0_12?
end

def data_formatter_create(conf)
if fluentd_v0_12?
formatter = Fluent::Plugin.new_formatter(@format)
formatter.configure(conf)
else
formatter = formatter_create
end
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
unless fluentd_v0_12?
record = inject_values_to_record(tag, time, record)
end
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
Expand Down Expand Up @@ -138,18 +119,16 @@ def format_for_api(&block)
end

def write_records_batch(chunk, &block)
if fluentd_v0_12?
unique_id = chunk.unique_id.unpack('H*').first
else
unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
end
records = chunk.to_enum(:msgpack_each)
chunk.open do |io|
records = msgpack_unpacker(io).to_enum
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
end
end
end

def request_type
self.class::RequestType
Expand All @@ -164,3 +143,4 @@ def truncate(msg)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/kinesis_helper/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
end

module Fluent
module Plugin
module KinesisHelper
class Aggregator
AggregatedRecord = Google::Protobuf::DescriptorPool.generated_pool.lookup("AggregatedRecord").msgclass
Expand Down Expand Up @@ -97,3 +98,4 @@ def aggregator
end
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/kinesis_helper/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require 'fluent/configurable'

module Fluent
module Plugin
module KinesisHelper
module API
MaxRecordSize = 1024 * 1024 # 1 MB
Expand Down Expand Up @@ -194,3 +195,4 @@ def scaling_factor
end
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/kinesis_helper/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require 'aws-sdk-core'

module Fluent
module Plugin
module KinesisHelper
module Client
module ClientParams
Expand Down Expand Up @@ -166,3 +167,4 @@ def setup_credentials
end
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_kinesis_firehose.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require 'fluent/plugin/kinesis'

module Fluent
module Plugin
class KinesisFirehoseOutput < KinesisOutput
Fluent::Plugin.register_output('kinesis_firehose', self)

Expand Down Expand Up @@ -55,3 +56,4 @@ def write(chunk)
end
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_kinesis_streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
require 'fluent/plugin/kinesis'

module Fluent
module Plugin
class KinesisStreamsOutput < KinesisOutput
Fluent::Plugin.register_output('kinesis_streams', self)

Expand Down Expand Up @@ -67,3 +68,4 @@ def key_formatter_create
end
end
end
end
2 changes: 2 additions & 0 deletions lib/fluent/plugin/out_kinesis_streams_aggregated.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require 'fluent/plugin/kinesis_helper/aggregator'

module Fluent
module Plugin
class KinesisStreamsAggregatedOutput < KinesisOutput
Fluent::Plugin.register_output('kinesis_streams_aggregated', self)
include KinesisHelper::Aggregator::Mixin
Expand Down Expand Up @@ -74,3 +75,4 @@ def create_partition_key_generator
end
end
end
end
2 changes: 1 addition & 1 deletion test/dummy_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def initialize(seed: 0, port: nil)
@failed_count = 0
@error_count = 0
@server, @port = init_server(port)
@aggregator = Fluent::KinesisHelper::Aggregator.new
@aggregator = Fluent::Plugin::KinesisHelper::Aggregator.new
@recording = true
end

Expand Down
10 changes: 0 additions & 10 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,18 @@
require 'aws-sdk-core'
require 'fluent/test'
require 'fluent/test/helpers'
def fluentd_v0_12?
@fluentd_v0_12 ||= Gem.loaded_specs['fluentd'].version < Gem::Version.create('0.14')
end
def aws_sdk_v2?
@aws_sdk_v2 ||= Gem.loaded_specs['aws-sdk-core'].version < Gem::Version.create('3')
end
def driver_run(d, records, time: nil)
time ||= event_time("2011-01-02 13:14:15 UTC")
if fluentd_v0_12?
records.each{|record| d.emit(record, time)}
d.run
else
d.instance.log.out.flush_logs = false
d.run(default_tag: "test") do
records.each{|record| d.feed(time, record)}
end
end
end
if !fluentd_v0_12?
require 'fluent/test/log'
require 'fluent/test/driver/output'
end
if aws_sdk_v2?
require 'aws-sdk'
else
Expand Down
6 changes: 3 additions & 3 deletions test/kinesis_helper/test_aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
require 'fluent/plugin/kinesis_helper/aggregator'

class KinesisHelperAggregatorTest < Test::Unit::TestCase
AggregateOffset = Fluent::KinesisHelper::Aggregator::Mixin::AggregateOffset
RecordOffset = Fluent::KinesisHelper::Aggregator::Mixin::RecordOffset
AggregateOffset = Fluent::Plugin::KinesisHelper::Aggregator::Mixin::AggregateOffset
RecordOffset = Fluent::Plugin::KinesisHelper::Aggregator::Mixin::RecordOffset

def setup
@aggregator = Fluent::KinesisHelper::Aggregator.new
@aggregator = Fluent::Plugin::KinesisHelper::Aggregator.new
end

def teardown
Expand Down
4 changes: 2 additions & 2 deletions test/kinesis_helper/test_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

class KinesisHelperAPITest < Test::Unit::TestCase
class Mock
include Fluent::KinesisHelper::API
include Fluent::KinesisHelper::API::BatchRequest
include Fluent::Plugin::KinesisHelper::API
include Fluent::Plugin::KinesisHelper::API::BatchRequest

attr_accessor :retries_on_batch_request, :reset_backoff_if_success
attr_accessor :failed_scenario, :request_series
Expand Down
2 changes: 1 addition & 1 deletion test/kinesis_helper/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

class KinesisHelperClientTest < Test::Unit::TestCase
class Mock
include Fluent::KinesisHelper::Client
include Fluent::Plugin::KinesisHelper::Client

def initialize
@region = 'us-east-1'
Expand Down
Loading