Staging instance, all changes can be removed at any time

Skip to content

journalprocessor: Fix deserialize_message raising EOFError on the last message of each assignment

This caused JournalClientOffsetRanges to ignore the last batch of messages in each assignment, because JournalClient.handle_messages deserializes all messages in the batch before calling the worker function; and raising EOFError from deserialize_message makes it exit early (before calling the worker fn).

Additionally, it doesn't make much sense for a deserialize_message function to raise this kind of exception.

Instead, this commit removes the explicit raise EOFError, and tells JournalClient to stop on EOF. deserialize_message calls handle_offset, which updates the assignment of the Kafka consumer to be the empty set, which causes it to be EOF (since there are no more partitions to read from).


Migrated from D6038 (view on Phabricator)

Merge request reports

Loading