Staging instance, all changes can be removed at any time

Skip to content

Commit kafka messages wich offset has reach the high limit

this is necessary to ensure these messages are committed in kafka, otherwise, since the (considered) empty partition is unsubscribed from, it never gets committed in JournalClient.handle_messages() (since this later only commit assigned partitions).

WARNING: doing this, we DO commit a message BEFORE actually handling it. Since this later operation may fail, said message can be de facto lost...

[the second revision in this diff handles that later aspect; not sure I want to stash these 2 revisions or not...]

Ensure offset are committed ony after worker_fn is executed without tb

this requires to overload the JournalClient.handle_messages() method in JournalClientOffsetRanges to make sure "pending" messages are committed after the proper execution of worker_fn.

Doing so, we can both unsubscribe from "oef" partitions on the fly (with "oef" meaning when the partition has been consumed up to the high watermark offset at the beginning of the export), and commit ALL offsets that needs to be but only after proper execution of the worker_fn callback.

This should guarantee proper and consistent behavior (famous last word...).

Depends on !25 (closed)


Migrated from D6235 (view on Phabricator)

Merge request reports

Loading