Skip to content

Commit

Permalink
Merge pull request #156 from adammw/adammw/remove-0.12-code
Browse files Browse the repository at this point in the history
 Remove support for fluentd v0.12 and use new Plugin API
  • Loading branch information
riywo authored Aug 27, 2018
2 parents 17b6d5d + a89b0b9 commit c947627
Show file tree
Hide file tree
Showing 21 changed files with 614 additions and 648 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ matrix:
include:
- rvm: 2.4.1
gemfile: Gemfile
- rvm: 2.4.1
gemfile: gemfiles/Gemfile.fluentd-0.14.10
- rvm: 2.4.4 # https://github.com/treasure-data/omnibus-td-agent/blob/v3.2.0/config/projects/td-agent3.rb#L22
gemfile: gemfiles/Gemfile.td-agent-3.2.0

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Or just download specify your Ruby library path. Below is the sample for specify

## Dependencies
* Ruby 2.1.0+
* Fluentd 0.12.35+ (supporting 0.14.x)
* Fluentd 0.14.10+

## Basic Usage
Here are general procedures for using this plugin:
Expand Down
16 changes: 6 additions & 10 deletions benchmark/task.rake
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,21 @@ class KinesisBenchmark
def create_driver(type, conf = default_config)
klass = case type
when :streams
Fluent::KinesisStreamsOutput
Fluent::Plugin::KinesisStreamsOutput
when :streams_aggregated
Fluent::KinesisStreamsAggregatedOutput
Fluent::Plugin::KinesisStreamsAggregatedOutput
when :firehose
Fluent::KinesisFirehoseOutput
Fluent::Plugin::KinesisFirehoseOutput
end
conf += case type
when :streams, :streams_aggregated
"stream_name fluent-plugin-test"
when :firehose
"delivery_stream_name fluent-plugin-test"
end
if fluentd_v0_12?
Fluent::Test::BufferedOutputTestDriver.new(klass) do
end.configure(conf)
else
Fluent::Test::Driver::Output.new(klass) do
end.configure(conf)
end

Fluent::Test::Driver::Output.new(klass) do
end.configure(conf)
end

def benchmark(size, count)
Expand Down
2 changes: 1 addition & 1 deletion fluent-plugin-kinesis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = '>= 2.1'

spec.add_dependency "fluentd", ">= 0.12.35", "< 2"
spec.add_dependency "fluentd", ">= 0.14.0", "< 2"

# This plugin is sometimes used with s3 plugin, so watch out for conflicts
# https://rubygems.org/gems/fluent-plugin-s3
Expand Down
20 changes: 20 additions & 0 deletions gemfiles/Gemfile.fluentd-0.14.10
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright 2014-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

source 'https://rubygems.org'

# Specify your gem's dependencies in fluent-plugin-kinesis.gemspec
gemspec path: ".."

gem "fluentd", "0.14.10"
214 changes: 97 additions & 117 deletions lib/fluent/plugin/kinesis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,154 +12,134 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

require 'fluent/plugin/output'
require 'fluent/plugin/kinesis_helper/client'
require 'fluent/plugin/kinesis_helper/api'
require 'zlib'

module Fluent
class KinesisOutput < BufferedOutput
def self.fluentd_v0_12?
@fluentd_v0_12 ||= Gem.loaded_specs['fluentd'].version < Gem::Version.create('0.14')
end

include Fluent::SetTimeKeyMixin
include Fluent::SetTagKeyMixin

include KinesisHelper::Client
include KinesisHelper::API

class SkipRecordError < ::StandardError
def initialize(message, record)
super message
@record_message = if record.is_a? Array
record.reverse.map(&:to_s).join(', ')
else
record.to_s
module Plugin
class KinesisOutput < Fluent::Plugin::Output
include Fluent::MessagePackFactory::Mixin
include KinesisHelper::Client
include KinesisHelper::API

class SkipRecordError < ::StandardError
def initialize(message, record)
super message
@record_message = if record.is_a? Array
record.reverse.map(&:to_s).join(', ')
else
record.to_s
end
end
end

def to_s
super + ": " + @record_message
def to_s
super + ": " + @record_message
end
end
end
class KeyNotFoundError < SkipRecordError
def initialize(key, record)
super "Key '#{key}' doesn't exist", record
class KeyNotFoundError < SkipRecordError
def initialize(key, record)
super "Key '#{key}' doesn't exist", record
end
end
end
class ExceedMaxRecordSizeError < SkipRecordError
def initialize(size, record)
super "Record size limit exceeded in #{size/1024} KB", record
class ExceedMaxRecordSizeError < SkipRecordError
def initialize(size, record)
super "Record size limit exceeded in #{size/1024} KB", record
end
end
end
class InvalidRecordError < SkipRecordError
def initialize(record)
super "Invalid type of record", record
class InvalidRecordError < SkipRecordError
def initialize(record)
super "Invalid type of record", record
end
end
end

config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 1024
config_param :compression, :string, default: nil
config_section :format do
config_set_default :@type, 'json'
end
config_section :inject do
config_set_default :time_type, 'string'
config_set_default :time_format, '%Y-%m-%dT%H:%M:%S.%N%z'
end
config_param :data_key, :string, default: nil
config_param :log_truncate_max_size, :integer, default: 1024
config_param :compression, :string, default: nil
config_section :format do
config_set_default :@type, 'json'
end
config_section :inject do
config_set_default :time_type, 'string'
config_set_default :time_format, '%Y-%m-%dT%H:%M:%S.%N%z'
end

config_param :debug, :bool, default: false
config_param :debug, :bool, default: false

if fluentd_v0_12?
config_param :format, :string, default: 'json'
else
helpers :formatter, :inject
end

def configure(conf)
super
@data_formatter = data_formatter_create(conf)
end

def multi_workers_ready?
true
end
def configure(conf)
super
@data_formatter = data_formatter_create(conf)
end

private
def multi_workers_ready?
true
end

def fluentd_v0_12?
self.class.fluentd_v0_12?
end
private

def data_formatter_create(conf)
if fluentd_v0_12?
formatter = Fluent::Plugin.new_formatter(@format)
formatter.configure(conf)
else
def data_formatter_create(conf)
formatter = formatter_create
end
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
unless fluentd_v0_12?
compressor = compressor_create
if @data_key.nil?
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
end
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
compressor.call(record[@data_key].to_s.b)
}
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
compressor.call(record[@data_key].to_s.b)
}
end
end
end

def compressor_create
case @compression
when "zlib"
->(data) { Zlib::Deflate.deflate(data) }
else
->(data) { data }
def compressor_create
case @compression
when "zlib"
->(data) { Zlib::Deflate.deflate(data) }
else
->(data) { data }
end
end
end

def format_for_api(&block)
converted = block.call
size = size_of_values(converted)
if size > @max_record_size
raise ExceedMaxRecordSizeError.new(size, converted)
def format_for_api(&block)
converted = block.call
size = size_of_values(converted)
if size > @max_record_size
raise ExceedMaxRecordSizeError.new(size, converted)
end
converted.to_msgpack
rescue SkipRecordError => e
log.error(truncate e)
''
end
converted.to_msgpack
rescue SkipRecordError => e
log.error(truncate e)
''
end

def write_records_batch(chunk, &block)
if fluentd_v0_12?
unique_id = chunk.unique_id.unpack('H*').first
else
def write_records_batch(chunk, &block)
unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
chunk.open do |io|
records = msgpack_unpacker(io).to_enum
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
end
end
end
records = chunk.to_enum(:msgpack_each)
split_to_batches(records) do |batch, size|
log.debug(sprintf "Write chunk %s / %3d records / %4d KB", unique_id, batch.size, size/1024)
batch_request_with_retry(batch, &block)
log.debug("Finish writing chunk")
end
end

def request_type
self.class::RequestType
end
def request_type
self.class::RequestType
end

def truncate(msg)
if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
msg.to_s
else
msg.to_s[0...@log_truncate_max_size]
def truncate(msg)
if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
msg.to_s
else
msg.to_s[0...@log_truncate_max_size]
end
end
end
end
Expand Down
Loading

0 comments on commit c947627

Please sign in to comment.