Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ type BinlogSyncerConfig struct {
// Only works with MariaDB flavor.
FillZeroLogPos bool

// PayloadDecoderConcurrency is used to control concurrency for decoding TransactionPayloadEvent.
// Default 0, this will be set to GOMAXPROCS.
PayloadDecoderConcurrency int

// SynchronousEventHandler is used for synchronous event handling.
// This should not be used together with StartBackupWithHandler.
// If this is not nil, GetEvent does not need to be called.
Expand Down
7 changes: 7 additions & 0 deletions replication/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type BinlogParser struct {
ignoreJSONDecodeErr bool
verifyChecksum bool

payloadDecoderConcurrency int

rowsEventDecodeFunc func(*RowsEvent, []byte) error

tableMapOptionalMetaDecodeFunc func([]byte) error
Expand Down Expand Up @@ -215,6 +217,10 @@ func (p *BinlogParser) SetFlavor(flavor string) {
p.flavor = flavor
}

func (p *BinlogParser) SetPayloadDecoderConcurrency(concurrency int) {
p.payloadDecoderConcurrency = concurrency
}

func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error) {
p.rowsEventDecodeFunc = rowsEventDecodeFunc
}
Expand Down Expand Up @@ -456,6 +462,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
func (p *BinlogParser) newTransactionPayloadEvent() *TransactionPayloadEvent {
e := &TransactionPayloadEvent{}
e.format = *p.format
e.concurrency = p.payloadDecoderConcurrency

return e
}
3 changes: 2 additions & 1 deletion replication/transaction_payload_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (

type TransactionPayloadEvent struct {
format FormatDescriptionEvent
concurrency int
Size uint64
UncompressedSize uint64
CompressionType uint64
Expand Down Expand Up @@ -103,7 +104,7 @@ func (e *TransactionPayloadEvent) decodePayload() error {
e.CompressionType, e.compressionType())
}

decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(0))
decoder, err := zstd.NewReader(nil, zstd.WithDecoderConcurrency(e.concurrency))
if err != nil {
return err
}
Expand Down
Loading