@@ -912,6 +912,68 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
912912 }
913913}
914914
915+ // FetchMessageBatch fetches a batch of messages from the reader. It is similar to
916+ // FetchMessage, except it blocks until no. of messages read reaches batchSize.
917+ func (r * Reader ) FetchMessageBatch (ctx context.Context , batchSize int ) ([]Message , error ) {
918+ r .activateReadLag ()
919+ msgBatch := make ([]Message , 0 , batchSize )
920+
921+ var i int
922+ for i <= batchSize {
923+ r .mutex .Lock ()
924+
925+ if ! r .closed && r .version == 0 {
926+ r .start (r .getTopicPartitionOffset ())
927+ }
928+
929+ version := r .version
930+ r .mutex .Unlock ()
931+
932+ select {
933+ case <- ctx .Done ():
934+ return []Message {}, ctx .Err ()
935+
936+ case err := <- r .runError :
937+ return []Message {}, err
938+
939+ case m , ok := <- r .msgs :
940+ if ! ok {
941+ return []Message {}, io .EOF
942+ }
943+
944+ if m .version < version {
945+ continue
946+ }
947+
948+ r .mutex .Lock ()
949+
950+ switch {
951+ case m .error != nil :
952+ case version == r .version :
953+ r .offset = m .message .Offset + 1
954+ r .lag = m .watermark - r .offset
955+ }
956+
957+ r .mutex .Unlock ()
958+
959+ if errors .Is (m .error , io .EOF ) {
960+ // io.EOF is used as a marker to indicate that the stream
961+ // has been closed, in case it was received from the inner
962+ // reader we don't want to confuse the program and replace
963+ // the error with io.ErrUnexpectedEOF.
964+ m .error = io .ErrUnexpectedEOF
965+ }
966+ if m .error != nil {
967+ return nil , m .error
968+ }
969+
970+ msgBatch = append (msgBatch , m .message )
971+ }
972+ i ++
973+ }
974+ return msgBatch , nil
975+ }
976+
915977// ReadLag returns the current lag of the reader by fetching the last offset of
916978// the topic and partition and computing the difference between that value and
917979// the offset of the last message returned by ReadMessage.
@@ -1487,7 +1549,7 @@ func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, star
14871549 return
14881550}
14891551
1490- func (r * reader ) read (ctx context.Context , offset int64 , conn * Conn ) (int64 , error ) {
1552+ func (r * reader ) read (ctx context.Context , offset int64 , conn * Conn , batchSize int ) (int64 , error ) {
14911553 r .stats .fetches .observe (1 )
14921554 r .stats .offset .observe (offset )
14931555
0 commit comments