diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 0000000..7fef5ff --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,28 @@ +version: 2.1 + +orbs: + ruby-orbs: sue445/ruby-orbs@volatile + +jobs: + test: + docker: + - image: cimg/ruby:3.0 + environment: + AWS_REGION: ap-northeast-1 + AWS_ACCESS_KEY_ID: dummy + AWS_SECRET_ACCESS_KEY: dummy + - image: amazon/dynamodb-local + environment: + AWS_REGION: ap-northeast-1 + steps: + - checkout + - ruby-orbs/bundle-install: + gemspec_name: fluent-plugin-dynamodb-add + with_gemfile_lock: false + + - run: bundle exec rake test + +workflows: + test: + jobs: + - test diff --git a/Gemfile b/Gemfile index fa75df1..a4fd5e9 100644 --- a/Gemfile +++ b/Gemfile @@ -1,3 +1,5 @@ source 'https://rubygems.org' gemspec + +gem 'rexml' diff --git a/Rakefile b/Rakefile index f5e4c3f..4d9509e 100644 --- a/Rakefile +++ b/Rakefile @@ -1,9 +1,11 @@ require "bundler/gem_tasks" require 'rake/testtask' +Rake::Task[:release].clear + Rake::TestTask.new(:test) do |test| test.libs << 'lib' << 'test' - test.test_files = FileList['test/test_*.rb'] + test.pattern = 'test/**/test_*.rb' test.verbose = true end diff --git a/fluent-plugin-dynamodb-add.gemspec b/fluent-plugin-dynamodb-add.gemspec index 88d2c93..4bffc9f 100644 --- a/fluent-plugin-dynamodb-add.gemspec +++ b/fluent-plugin-dynamodb-add.gemspec @@ -17,8 +17,10 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] - spec.add_development_dependency "bundler", "~> 1.7" - spec.add_development_dependency "rake", "~> 10.0" - spec.add_dependency "fluentd", "~> 0.10.0" - spec.add_dependency "aws-sdk", ">= 1.56.0", "< 2.0.0" + spec.add_development_dependency "bundler" + spec.add_development_dependency "rake" + spec.add_development_dependency "test-unit" + + spec.add_dependency "fluentd", ">= 1", "< 2" + spec.add_dependency "aws-sdk-dynamodb" end diff --git a/lib/fluent/plugin/out_dynamodb_add.rb b/lib/fluent/plugin/out_dynamodb_add.rb index 22dfe27..eb0bbbd 100644 --- a/lib/fluent/plugin/out_dynamodb_add.rb +++ b/lib/fluent/plugin/out_dynamodb_add.rb @@ -1,87 +1,100 @@ -module Fluent - class DynamodbAdd < Fluent::Output - Fluent::Plugin.register_output('dynamodb_add', self) +require 'aws-sdk-dynamodb' +require 'fluent/plugin/output' - unless method_defined?(:log) - define_method(:log) { $log } - end +class Fluent::Plugin::DynamodbAdd < Fluent::Plugin::Output + Fluent::Plugin.register_output('dynamodb_add', self) - config_param :count_key, :string - config_param :dynamo_count_key, :string - config_param :table_name, :string - config_param :use_iam_role, :bool, :default => false - config_param :aws_key_id, :string, :default => nil - config_param :aws_sec_key, :string, :default => nil - config_param :endpoint, :string, :default => nil - config_param :hash_key, :string, :default => nil - config_param :hash_key_delimiter, :string, :default => ":" - config_param :add_hash_key_prefix, :string, :default => nil - config_param :range_key, :string, :default => nil - config_param :set_timestamp, :string, :default => nil - - def initialize - super - require 'aws-sdk-v1' - end + helpers :event_emitter + helpers :compat_parameters + + config_param :count_key, :string + config_param :dynamo_count_key, :string + config_param :table_name, :string + config_param :use_iam_role, :bool, :default => false + config_param :aws_key_id, :string, :default => nil + config_param :aws_sec_key, :string, :default => nil + config_param :region, :string, :default => nil + config_param :endpoint, :string, :default => nil + config_param :hash_key, :string, :default => nil + config_param :hash_key_delimiter, :string, :default => ":" + config_param :add_hash_key_prefix, :string, :default => nil + config_param :range_key, :string, :default => nil + config_param :set_timestamp, :string, :default => nil + + def initialize + super + end - def configure(conf) - super + def configure(conf) + compat_parameters_convert(conf) - unless use_iam_role - [:aws_key_id, :aws_sec_key].each do |name| - unless self.instance_variable_get("@#{name}") - raise ConfigError, "'#{name}' is required" - end + super + + unless use_iam_role + [:aws_key_id, :aws_sec_key].each do |name| + unless self.instance_variable_get("@#{name}") + raise ConfigError, "'#{name}' is required" end end - @hash_key = hash_key.split(/\s*,\s*/) end + @hash_key = hash_key.split(/\s*,\s*/) + end - def start - super - if use_iam_role - AWS.config(:credential_provider => AWS::Core::CredentialProviders::EC2Provider.new) - else - AWS.config(:access_key_id => @aws_key_id, :secret_access_key => @aws_sec_key) - end + def start + super - AWS.config(:dynamo_db_endpoint => @endpoint) if @endpoint + options = {} - @dynamo_db = AWS::DynamoDB.new - @table = @dynamo_db.tables[table_name] - @table.load_schema + unless use_iam_role + options[:access_key_id] = @aws_key_id + options[:secret_access_key] = @aws_sec_key end - def emit(tag, es, chain) - chain.next - es.each do |time, record| - hash_key = create_key(record) - next unless hash_key || record[@count_key] - - if @range_key - next unless record[@range_key] - item = @table.items[hash_key, record[@range_key]] - else - item = @table.items[hash_key] - end - item.attributes.update {|u| - u.add @dynamo_count_key => record[@count_key] - if @set_timestamp - u.set @set_timestamp => Time.now.to_i - end - } + options[:region] = region + options[:endpoint] = endpoint + + client = Aws::DynamoDB::Client.new(options) + + resource = Aws::DynamoDB::Resource.new(client: client) + @table = resource.table(table_name) + + @dynamo_hash_key = @table.key_schema.find{|e| e.key_type == "HASH" }.attribute_name + @dynamo_range_key = @table.key_schema.find{|e| e.key_type == "RANGE" }&.attribute_name + end + + def process(tag, es) + es.each do |time, record| + hash_key = create_key(record) + next unless hash_key || record[@count_key] + + key = { @dynamo_hash_key => hash_key } + + if @range_key + next unless record[@range_key] + key[@dynamo_range_key] = record[@range_key] end + + @table.update_item({ + key: key, + attribute_updates: { + @dynamo_count_key => { + value: record[@count_key], + action: "ADD" + }, + }, + }) end + end - private - def create_key(record) - key_array = [] - key_array << @add_hash_key_prefix if @add_hash_key_prefix - @hash_key.each do |h| - return nil unless record[h] - key_array << record[h] - end - key_array.join(@hash_key_delimiter) + private + + def create_key(record) + key_array = [] + key_array << @add_hash_key_prefix if @add_hash_key_prefix + @hash_key.each do |h| + return nil unless record[h] + key_array << record[h] end + key_array.join(@hash_key_delimiter) end end diff --git a/test/helper.rb b/test/helper.rb new file mode 100644 index 0000000..f9e989a --- /dev/null +++ b/test/helper.rb @@ -0,0 +1,8 @@ +$LOAD_PATH.unshift(File.expand_path("../../", __FILE__)) +require "test-unit" +require "fluent/test" +require "fluent/test/driver/output" +require "fluent/test/helpers" + +Test::Unit::TestCase.include(Fluent::Test::Helpers) +Test::Unit::TestCase.extend(Fluent::Test::Helpers) diff --git a/test/plugin/test_out_dynamodb_add.rb b/test/plugin/test_out_dynamodb_add.rb new file mode 100644 index 0000000..0e4901c --- /dev/null +++ b/test/plugin/test_out_dynamodb_add.rb @@ -0,0 +1,269 @@ +require 'helper' + +require 'fluent/plugin/out_dynamodb_add' +require 'aws-sdk-dynamodb' + +class DynamodbAddTest < Test::Unit::TestCase + include Fluent::Test::Helpers + + def setup + Fluent::Test.setup + end + + CONFIG = %[ + count_key test_count_key + dynamo_count_key test_dynamo_count_key + table_name test_table_name + use_iam_role false + aws_key_id test_aws_key_id + aws_sec_key test_aws_sec_key + endpoint https://test_endpoint + hash_key test_hash_key1,test_hash_key2 + hash_key_delimiter : + add_hash_key_prefix 3 + range_key test_range_key + ] + + def create_driver(conf = CONFIG) + Fluent::Test::Driver::Output.new(Fluent::Plugin::DynamodbAdd).configure(conf) + end + + sub_test_case 'confguration' do + def test_configure_not_use_iam_role + d = create_driver + assert_equal 'test_count_key', d.instance.count_key + assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key + assert_equal 'test_table_name', d.instance.table_name + assert_equal false, d.instance.use_iam_role + assert_equal 'test_aws_key_id', d.instance.aws_key_id + assert_equal 'test_aws_sec_key', d.instance.aws_sec_key + assert_equal 'https://test_endpoint', d.instance.endpoint + assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key + assert_equal ':', d.instance.hash_key_delimiter + assert_equal '3', d.instance.add_hash_key_prefix + assert_equal 'test_range_key', d.instance.range_key + end + + def test_configure_use_iam_role + conf = CONFIG.clone + conf.gsub!(/use_iam_role\sfalse/, "use_iam_role true") + conf.gsub!(/aws_key_id\stest_aws_key_id/, "") + conf.gsub!(/aws_sec_key\stest_aws_sec_key/, "") + + d = create_driver(conf) + assert_equal 'test_count_key', d.instance.count_key + assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key + assert_equal 'test_table_name', d.instance.table_name + assert_equal true, d.instance.use_iam_role + assert_equal nil, d.instance.aws_key_id + assert_equal nil, d.instance.aws_sec_key + assert_equal 'https://test_endpoint', d.instance.endpoint + assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key + assert_equal ':', d.instance.hash_key_delimiter + assert_equal '3', d.instance.add_hash_key_prefix + assert_equal 'test_range_key', d.instance.range_key + end + + def test_configure_not_use_iam_role_and_not_set_range_key + conf = CONFIG.clone + conf.gsub!(/range_key\stest_range_key/, "") + + d = create_driver(conf) + assert_equal 'test_count_key', d.instance.count_key + assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key + assert_equal 'test_table_name', d.instance.table_name + assert_equal false, d.instance.use_iam_role + assert_equal 'test_aws_key_id', d.instance.aws_key_id + assert_equal 'test_aws_sec_key', d.instance.aws_sec_key + assert_equal 'https://test_endpoint', d.instance.endpoint + assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key + assert_equal ':', d.instance.hash_key_delimiter + assert_equal '3', d.instance.add_hash_key_prefix + assert_equal nil, d.instance.range_key + end + + def test_configure_set_timestamp + conf = CONFIG.clone + conf << " set_timestamp last_updated_at" + + d = create_driver(conf) + assert_equal 'test_count_key', d.instance.count_key + assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key + assert_equal 'test_table_name', d.instance.table_name + assert_equal false, d.instance.use_iam_role + assert_equal 'test_aws_key_id', d.instance.aws_key_id + assert_equal 'test_aws_sec_key', d.instance.aws_sec_key + assert_equal 'https://test_endpoint', d.instance.endpoint + assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key + assert_equal ':', d.instance.hash_key_delimiter + assert_equal '3', d.instance.add_hash_key_prefix + assert_equal 'test_range_key', d.instance.range_key + assert_equal 'last_updated_at', d.instance.set_timestamp + end + end + + + def test_count_with_range_key_table + table_name = 'sample_table' + + create_table(dynamodb_client, table_with_range_key(table_name)) + + d = create_driver( + <<~EOS + count_key count + hash_key project_id + + table_name #{table_name} + range_key time + dynamo_count_key count + + endpoint http://localhost:8000 + region ap-northeast-1 + aws_key_id dummy + aws_sec_key dummy + EOS + ) + + resource = Aws::DynamoDB::Resource.new(client: dynamodb_client) + @table = resource.table(table_name) + + time = 1000 + + d.run(default_tag: 'test') do + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 1, 'project_id' => 1, 'time' => time}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 2, 'project_id' => 1, 'time' => time}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 4, 'project_id' => 1, 'time' => 2000}) + end + + item = @table.get_item(key: { 'Id': '1', 'ViewTimestamp': time.to_i}).item + + assert_equal 3, item['count'] + + d.run(default_tag: 'test') do + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 8, 'project_id' => 1, 'time' => time}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 16, 'project_id' => 1, 'time' => time}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 32, 'project_id' => 1, 'time' => 2000}) + end + + item = @table.get_item(key: { 'Id': '1', 'ViewTimestamp': time.to_i}).item + + assert_equal 27, item['count'] + + delete_table(dynamodb_client, table_name) + end + + def test_count_with_no_range_key_table + table_name = 'sample_table' + + create_table(dynamodb_client, table_without_range_key(table_name)) + + d = create_driver( + <<~EOS + count_key count + hash_key project_id + + table_name #{table_name} + dynamo_count_key count + + endpoint http://localhost:8000 + region ap-northeast-1 + aws_key_id dummy + aws_sec_key dummy + EOS + ) + + resource = Aws::DynamoDB::Resource.new(client: dynamodb_client) + @table = resource.table(table_name) + + d.run(default_tag: 'test') do + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 1, 'project_id' => 1}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 2, 'project_id' => 1}) + end + + item = @table.get_item(key: { 'Id': '1'}).item + + assert_equal 3, item['count'] + + d.run(default_tag: 'test') do + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 4, 'project_id' => 1}) + d.feed(event_time('2022-09-01 10:00:00 UTC'), {'count' => 8, 'project_id' => 1}) + end + + item = @table.get_item(key: { 'Id': '1'}).item + + assert_equal 15, item['count'] + + delete_table(dynamodb_client, table_name) + end + + private + + def create_table(dynamodb_client, table_definition) + response = dynamodb_client.create_table(table_definition) + response.table_description.table_status + rescue StandardError => e + binding.irb + end + + def delete_table(dynamodb_client, table_name) + dynamodb_client.delete_table( + table_name: table_name + ) + end + + def table_without_range_key(table_name) + table_definition = { + table_name: table_name, + key_schema: [ + { + attribute_name: 'Id', + key_type: 'HASH' # Partition key. + }, + ], + attribute_definitions: [ + { + attribute_name: 'Id', + attribute_type: 'S' + }, + ], + billing_mode: "PAY_PER_REQUEST", + } + end + + def table_with_range_key(table_name) + table_definition = { + table_name: table_name, + key_schema: [ + { + attribute_name: 'Id', + key_type: 'HASH' # Partition key. + }, + { + attribute_name: 'ViewTimestamp', + key_type: 'RANGE' # Sort key. + } + ], + attribute_definitions: [ + { + attribute_name: 'Id', + attribute_type: 'S' + }, + { + attribute_name: 'ViewTimestamp', + attribute_type: 'N' + } + ], + billing_mode: "PAY_PER_REQUEST", + } + end + + + def dynamodb_client + @client ||= Aws::DynamoDB::Client.new({ + access_key_id: 'dummy', + secret_access_key: 'dummy', + endpoint: 'http://localhost:8000', + region: 'ap-northeast-1' + }) + end +end diff --git a/test/test_out_dynamodb_add.rb b/test/test_out_dynamodb_add.rb deleted file mode 100644 index 32eaaa2..0000000 --- a/test/test_out_dynamodb_add.rb +++ /dev/null @@ -1,103 +0,0 @@ -require 'fluent/test' -require 'fluent/plugin/out_dynamodb_add' - -class DynamodbAddTest < Test::Unit::TestCase - def setup - Fluent::Test.setup - end - - CONFIG = %[ - count_key test_count_key - dynamo_count_key test_dynamo_count_key - table_name test_table_name - use_iam_role false - aws_key_id test_aws_key_id - aws_sec_key test_aws_sec_key - endpoint https://test_endpoint - hash_key test_hash_key1,test_hash_key2 - hash_key_delimiter : - add_hash_key_prefix 3 - range_key test_range_key - ] - - def create_driver(conf = CONFIG) - Fluent::Test::OutputTestDriver.new(Fluent::DynamodbAdd) do - def write(chunk) - chunk.read - end - end.configure(conf) - end - - def test_configure_not_use_iam_role - d = create_driver - assert_equal 'test_count_key', d.instance.count_key - assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key - assert_equal 'test_table_name', d.instance.table_name - assert_equal false, d.instance.use_iam_role - assert_equal 'test_aws_key_id', d.instance.aws_key_id - assert_equal 'test_aws_sec_key', d.instance.aws_sec_key - assert_equal 'https://test_endpoint', d.instance.endpoint - assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key - assert_equal ':', d.instance.hash_key_delimiter - assert_equal '3', d.instance.add_hash_key_prefix - assert_equal 'test_range_key', d.instance.range_key - end - - def test_configure_use_iam_role - conf = CONFIG.clone - conf.gsub!(/use_iam_role\sfalse/, "use_iam_role true") - conf.gsub!(/aws_key_id\stest_aws_key_id/, "") - conf.gsub!(/aws_sec_key\stest_aws_sec_key/, "") - - d = create_driver(conf) - assert_equal 'test_count_key', d.instance.count_key - assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key - assert_equal 'test_table_name', d.instance.table_name - assert_equal true, d.instance.use_iam_role - assert_equal nil, d.instance.aws_key_id - assert_equal nil, d.instance.aws_sec_key - assert_equal 'https://test_endpoint', d.instance.endpoint - assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key - assert_equal ':', d.instance.hash_key_delimiter - assert_equal '3', d.instance.add_hash_key_prefix - assert_equal 'test_range_key', d.instance.range_key - end - - def test_configure_not_use_iam_role_and_not_set_range_key - conf = CONFIG.clone - conf.gsub!(/range_key\stest_range_key/, "") - - d = create_driver(conf) - assert_equal 'test_count_key', d.instance.count_key - assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key - assert_equal 'test_table_name', d.instance.table_name - assert_equal false, d.instance.use_iam_role - assert_equal 'test_aws_key_id', d.instance.aws_key_id - assert_equal 'test_aws_sec_key', d.instance.aws_sec_key - assert_equal 'https://test_endpoint', d.instance.endpoint - assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key - assert_equal ':', d.instance.hash_key_delimiter - assert_equal '3', d.instance.add_hash_key_prefix - assert_equal nil, d.instance.range_key - end - - def test_configure_set_timestamp - conf = CONFIG.clone - conf << " set_timestamp last_updated_at" - - d = create_driver(conf) - assert_equal 'test_count_key', d.instance.count_key - assert_equal 'test_dynamo_count_key', d.instance.dynamo_count_key - assert_equal 'test_table_name', d.instance.table_name - assert_equal false, d.instance.use_iam_role - assert_equal 'test_aws_key_id', d.instance.aws_key_id - assert_equal 'test_aws_sec_key', d.instance.aws_sec_key - assert_equal 'https://test_endpoint', d.instance.endpoint - assert_equal ['test_hash_key1','test_hash_key2'], d.instance.hash_key - assert_equal ':', d.instance.hash_key_delimiter - assert_equal '3', d.instance.add_hash_key_prefix - assert_equal 'test_range_key', d.instance.range_key - assert_equal 'last_updated_at', d.instance.set_timestamp - end -end -