Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions io/blob_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ BlobWriter::BlobWriter(const Path& filename, hwy::ThreadPool& pool)
// we will later write a footer, else we will update the header.
std::vector<uint8_t> bytes_before_blobs = BlobStore::BytesBeforeBlobsV2();
file_->Write(bytes_before_blobs.data(), bytes_before_blobs.size(), 0);
curr_offset_ = bytes_before_blobs.size();
}

void BlobWriter::Add(const std::string& key, const void* data, size_t bytes) {
Expand All @@ -485,32 +486,35 @@ void BlobWriter::Add(const std::string& key, const void* data, size_t bytes) {
blob_sizes_.push_back(bytes);

std::vector<BlobIO> writes;
EnqueueChunks(keys_.size() - 1, file_->FileSize(), bytes,
EnqueueChunks(keys_.size() - 1, curr_offset_, bytes,
static_cast<const uint8_t*>(data), writes);

hwy::ThreadPool null_pool(0);
hwy::ThreadPool& pool_or_serial = file_->IsAppendOnly() ? null_pool : pool_;
pool_or_serial.Run(
0, writes.size(), [this, &writes](uint64_t i, size_t /*thread*/) {
const BlobRange& range = writes[i].range;

if (!file_->Write(writes[i].data, range.bytes, range.offset)) {
const std::string& key = StringFromKey(keys_[range.key_idx]);
HWY_ABORT("Write failed for %s from %zu, %zu bytes to %p.",
key.c_str(), static_cast<size_t>(range.offset), range.bytes,
writes[i].data);
}
});
curr_offset_ = writes.back().range.End();
}

void BlobWriter::Finalize() {
if (!file_->IsAppendOnly() && curr_offset_ != file_->FileSize()) {
HWY_WARN("Computed offset %zu does not match file size %zu.",
curr_offset_, file_->FileSize());
}
const BlobStore bs = BlobStore(keys_, blob_sizes_);

// Write the rest of the bytes, which contains: paddings + directory + header.
const auto bytes_after_blobs = bs.BytesAfterBlobs();
file_->Write(bytes_after_blobs.data(), bytes_after_blobs.size(),
file_->FileSize());

curr_offset_);
file_.reset(); // closes the file
}

Expand Down
2 changes: 2 additions & 0 deletions io/blob_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ class BlobWriter {
std::vector<hwy::uint128_t> keys_;
std::vector<size_t> blob_sizes_;
hwy::ThreadPool& pool_;
// Current offset in the file used for writing.
int64_t curr_offset_ = 0;
};

} // namespace gcpp
Expand Down
Loading