From 86d56c039a4db4f34bc78193359b90d6a763cbc4 Mon Sep 17 00:00:00 2001 From: burleyb Date: Wed, 16 Oct 2019 17:57:00 +0000 Subject: [PATCH 1/4] allow for a random partition key for kinesis writes in order to scale with sharding seamlessly --- lib/stream/leo-stream.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index 8d082dff..d660d619 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -8,6 +8,8 @@ var https = require("https"); var PassThrough = require('stream').PassThrough; var moment = require("moment"); var async = require("async"); +const uuidv4 = require('uuid/v4'); + var backoff = require("backoff"); var extend = require("extend"); @@ -635,10 +637,14 @@ module.exports = function(configure) { logger.time("kinesis request"); kinesis.putRecords({ Records: records.map((r) => { + let randomhashkey = 0 + if(opts.partitionHashKey && opts.partitionHashKey == "random") { + randomhashkey = uuidv4() + } return { Data: r, PartitionKey: "0", - ExplicitHashKey: (opts.partitionHashKey || 0).toString() + ExplicitHashKey: (opts.partitionHashKey || randomhashkey).toString() }; }), StreamName: configure.stream From 52aa39b6193e5bd787d8289003d979f99e6dc7ee Mon Sep 17 00:00:00 2001 From: burleyb Date: Wed, 16 Oct 2019 18:06:11 +0000 Subject: [PATCH 2/4] change default to use random --- lib/stream/leo-stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index d660d619..1d768d54 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -638,7 +638,7 @@ module.exports = function(configure) { kinesis.putRecords({ Records: records.map((r) => { let randomhashkey = 0 - if(opts.partitionHashKey && opts.partitionHashKey == "random") { + if(!opts.partitionHashKey || opts.partitionHashKey == "random") { randomhashkey = uuidv4() } return { From 3fd85970828abd1b874c06e6b6c663c1f28cead0 Mon Sep 17 00:00:00 2001 From: burleyb Date: Wed, 16 Oct 2019 18:33:39 +0000 Subject: [PATCH 3/4] use moment instead of uuid because of partitionkey constraints --- lib/stream/leo-stream.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index 1d768d54..a110eda7 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -8,7 +8,6 @@ var https = require("https"); var PassThrough = require('stream').PassThrough; var moment = require("moment"); var async = require("async"); -const uuidv4 = require('uuid/v4'); var backoff = require("backoff"); @@ -639,7 +638,7 @@ module.exports = function(configure) { Records: records.map((r) => { let randomhashkey = 0 if(!opts.partitionHashKey || opts.partitionHashKey == "random") { - randomhashkey = uuidv4() + randomhashkey = moment().valueOf() } return { Data: r, From 7509387c06968e27558af061b13beb480a673744 Mon Sep 17 00:00:00 2001 From: burleyb Date: Wed, 16 Oct 2019 20:57:36 +0000 Subject: [PATCH 4/4] use partitionkey rather than explicitkeyhash, duh --- lib/stream/leo-stream.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/stream/leo-stream.js b/lib/stream/leo-stream.js index a110eda7..d6c94b4e 100644 --- a/lib/stream/leo-stream.js +++ b/lib/stream/leo-stream.js @@ -637,14 +637,17 @@ module.exports = function(configure) { kinesis.putRecords({ Records: records.map((r) => { let randomhashkey = 0 - if(!opts.partitionHashKey || opts.partitionHashKey == "random") { + if(!opts.partitionHashKey) { randomhashkey = moment().valueOf() } - return { + let ret = { Data: r, - PartitionKey: "0", - ExplicitHashKey: (opts.partitionHashKey || randomhashkey).toString() - }; + PartitionKey: randomhashkey.toString() + } + if(opts.partitionHashKey) { + ret.ExplicitHashKey = opts.partitionHashKey.toString() + } + return ret }), StreamName: configure.stream }, function(err, data) {