Staging instance, all changes can be removed at any time

Skip to content

Commit kafka messages which offset has reach the high limit

this is necessary to ensure these messages are committed in kafka, otherwise, since the (considered) empty partition is unsubscribed from, it never gets committed in JournalClient.handle_messages() (since this later only commit assigned partitions).

Ensure offset are committed only after worker_fn is executed without error.

This requires to overload the JournalClient.handle_messages() method in JournalClientOffsetRanges to make sure "pending" messages are committed after the proper execution of worker_fn.

Doing so, we can both unsubscribe from "eof" partitions on the fly (with "eof" meaning when the partition has been consumed up to the high watermark offset at the beginning of the export), and commit ALL offsets that need to be, but only after proper execution of the worker_fn callback.

This should guarantee proper and consistent behavior (famous last word...).

Depends on !27 (closed)

(used to be !26 (closed) but phab is freaking this later out for some reason...)


Migrated from D6247 (view on Phabricator)

Merge request reports

Loading