-
Notifications
You must be signed in to change notification settings - Fork 500
Description
Goal:
Remove the current NodeRangeIndexCache but provide the same performance for read the stream of sso.
General Design
- Lazy load streamSetObject and fill index to help sso search for stream.
- BloomFilter help check if sso really contains stream data.
- partial load node streamSetObject list to reduce loadStreamSetObject time.
- try StreamSetObject Compaction by object Count to control cluster total streamSetObject number.
1. Lazy init StreamSetObjectRangeIndex by load StreamSetObject
-
StreamSetObjectRangeIndex is a index
streamId -> TreeMap(startOffset -> sso-id) -
The content of the StreamSetObjectRangeIndex is by lazy load streamSetObject
-
After the StreamSetObject is loaded the
List<StreamRangeOffset>
will update the StreamSetObjectRangeIndex
and the BloomFilter -
All the streamId in the streamSetObject will be insert into the streamSetObject
which can help cold read of the large partition topic on the same node. -
Cache Size Eviction:
- The Cache with a ttl of 10 minutes and maxItemNumber is 20 * 10000
- when eviction the streamId index will be removed
2. StreamIdBloomFilter
-
A index for check if streamRange in streamSetObject
(streamSetObjectId -> exist(streamId) ? -
the false positive probability will set to 0.01
-
each StreamSetObject will have one bloomFilter which is static no update will be insert into bloomfilter
-
the size of bloomFilter estimated will be
9000 long -> 10784 bytes
900 long -> 1080 bytes
90 long -> 112 bytes -
For 5000 sso each max contains 9000 stream will consume 52MB. but not all the stream will have 9000 stream the size can be optimized.
-
When the sso compaction the object will be removed when metadataImage update the StreamIdBloomFilter will also removed
3. Change of the loadStreamSetObjectInfo logic and index search logic
Before:
getStartSearchIndex ->
loadStreamSetObjectInfo (load all the sso on the node[startIndex:-1] ->
search sso
After the logic will be partial loadStreamSetObject on the node:
getStartSearchIndex ->
loadStreamSetObjectInfo(maxWaitLoad=5, startIndex, list) ->
search sso [startIndex, maxLoadedIndex) ->
getStartSearchIndex (check if can fast skip objects index may be updated) ->
loadStreamSetObjectInfo(maxWaitLoad=5, newSearchIndexPos, list) ->
search sso [newSearchIndexPos, maxLoadedIndex) ->
........
Example metadataImage
RangeMetadata{epoch=0, rangeIndex=0, startOffset=0, endOffset=2000, nodeId=4}
NodeId: 4
S3StreamSetObject{objectId=0, orderId=0, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=2, orderId=2, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=3, orderId=3, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=5, orderId=5, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=7, orderId=7, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=9, orderId=9, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=10, orderId=10, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=12, orderId=12, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=14, orderId=14, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=16, orderId=16, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=18, orderId=18, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=20, orderId=20, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=22, orderId=22, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=26, orderId=26, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=28, orderId=28, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=30, orderId=30, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=32, orderId=32, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=34, orderId=34, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=36, orderId=36, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=38, orderId=38, nodeId=4, dataTimeInMs=-1},
S3StreamSetObject{objectId=42, orderId=42, nodeId=4, dataTimeInMs=-1}
ObjectId: 0 [StreamOffsetRange(startOffset=0, endOffset=105)]
ObjectId: 2 [StreamOffsetRange(startOffset=105, endOffset=128)]
ObjectId: 3 [StreamOffsetRange(startOffset=128, endOffset=259)]
ObjectId: 5 [StreamOffsetRange(startOffset=259, endOffset=394)]
ObjectId: 7 [StreamOffsetRange(startOffset=394, endOffset=456)]
ObjectId: 9 [StreamOffsetRange(startOffset=456, endOffset=528)]
ObjectId: 10 [StreamOffsetRange(startOffset=528, endOffset=636)]
ObjectId: 12 [StreamOffsetRange(startOffset=636, endOffset=686)]
ObjectId: 14 [StreamOffsetRange(startOffset=686, endOffset=833)]
ObjectId: 16 [StreamOffsetRange(startOffset=833, endOffset=974)]
ObjectId: 18 [StreamOffsetRange(startOffset=974, endOffset=1017)]
ObjectId: 20 [StreamOffsetRange(startOffset=1017, endOffset=1056)]
ObjectId: 22 [StreamOffsetRange(startOffset=1056, endOffset=1085)]
ObjectId: 26 [StreamOffsetRange(startOffset=1128, endOffset=1196)]
ObjectId: 28 [StreamOffsetRange(startOffset=1196, endOffset=1241)]
ObjectId: 30 [StreamOffsetRange(startOffset=1241, endOffset=1350)]
ObjectId: 32 [StreamOffsetRange(startOffset=1350, endOffset=1491)]
ObjectId: 34 [StreamOffsetRange(startOffset=1491, endOffset=1624)]
ObjectId: 36 [StreamOffsetRange(startOffset=1624, endOffset=1678)]
ObjectId: 38 [StreamOffsetRange(startOffset=1678, endOffset=1732)]
ObjectId: 42 [StreamOffsetRange(startOffset=1850, endOffset=2000)]
3.1 Partial loadStreamSetObjectInfo
When read from startOffset=100, endOffset=200, limit=4,
current implementation find the range on nodeId=4,
and only after load all the 21 streamSetObject the object search will execute.
But from the result we know only objectId [0,2,3] can get the result.
We can partially load the streamSetObject by 5 objects each time.
after search the 5 object is done. we can trigger the next 5 object load and search the object
which can help reduce getObjects time.
eg.
Search startOffset=100, endOffset=200, limit=4
We load the first 5 streamSetObject [0,2,3,5,7]
and when the check is done. the result can be found in [0,2,3] no more load sso is needed.
eg.
search startOffset=400, endOffset=600, limit=4
We load the first 5 streamSetObject [0,2,3,5,7]
and the current loadedStreamSetIndex is 5 (exclusive)
next time we load sso-list[5:10) and sso-list[10:15)
3.2 Index miss will load the all the streamSetObject:
Cold Read at (startOffset=1000, endOffset=1600, limit=4)
No index help search (nodeId=4, streamId=420000, startOffset=1000) = -1
So we will search from first to the end of node sso list
we know that objectId: 18 have the (startOffset=974, endOffset=1017)
but we need to load 18 sequencely to find the target object.
3.3 Preload more streamSetObject help speedup search:
For the first round of loadssoInfo, we only load 5 sso which is [0,2,3,5,7]
and we know nothing will be find but the sso is loaded.
We can preload more sso when sequence search sso.
The startPos of sso may in the tail or middle of the list, after preload object.
The index maybe updated that can speedup search from the position
eg. preload sso [14, 22, 32, 42]
and before the next round of loadssoInfo
we search the index again by startOffset=1000
the index return objectId=14, which pos to (startOffset=686, endOffset=833)
and we can skip the object [9, 10, 12] to load and help search.
3.4 Preload streamSetObject when read from end:
Cold Read at (startOffset=1980, endOffset=-1, limit=4)
which is readEndOffset which means that read the tail of the metadata
- loadssoInfo, first 5 object is checked [0,2,3,5,7], and nothing helps
- if no more preload sso to build the index, we will load the whole 21 sso to check if the result exist.
Preload from end to middle
- Preload from sso-list(end-5:end]
- and next time we search the index we can jump from object=7 to object=42
3.5 BloomFiler for check streamId in StreamSetObject
From the example we know that many StreamSetObject may not contain stream.
After read the streamSetObject we can add a bloomfilter of this sso.
and we can skip load this sso and save much more time.