Staging instance, all changes can be removed at any time

Skip to content

Delay the unsubscribe to the end of handle_messages

to prevent some possible race condition leading to kafka errors like:

rd_kafka_assignment_partition_stopped: Assertion `rktp->rktp_started' failed

This could occur when, at the time of unsubscribing from a partition, another partition is also depleted. Since the unsubscription consist in resubscribing to all partitions except unsubscribes ones, we could try to subscribe to such a depleted parition, leading to the error message listed above.

Depends on !45 (closed).


Migrated from D7463 (view on Phabricator)

Merge request reports

Loading