From 9f73bf7ece793474918eb637f9420005131b759b Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Tue, 27 Sep 2022 16:00:05 +0200 Subject: [PATCH] Introduce strategy for backpressure --- .../kafka/receiver/RebalanceStrategy.kt | 49 +++++++++++++ .../kafka/receiver/ReceiverSettings.kt | 3 + .../kafka/receiver/internals/PollLoop.kt | 69 ++++++++++++------- 3 files changed, 98 insertions(+), 23 deletions(-) create mode 100644 src/main/kotlin/io/github/nomisRev/kafka/receiver/RebalanceStrategy.kt diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/RebalanceStrategy.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/RebalanceStrategy.kt new file mode 100644 index 00000000..67a832fb --- /dev/null +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/RebalanceStrategy.kt @@ -0,0 +1,49 @@ +package io.github.nomisRev.kafka.receiver + +import org.apache.kafka.clients.consumer.KafkaConsumer +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.buffer +import org.apache.kafka.common.TopicPartition + +/** + * When a [KafkaReceiver] does not process message fast enough, + * it will start to buffer messages in a [channelFlow] using the default [Channel.BUFFERED]. + * This can be overwritten by using [buffer]. + * + * When the [Channel] is full, + * it will prevent the [KafkaReceiver] from calling [KafkaConsumer.poll] again. + * + * When the [KafkaConsumer] does not call [KafkaConsumer.poll] every `max.poll.interval.ms`, + * then Kafka will consider our consumer dead and will rebalance the partitions. + * + * [RebalanceStrategy] specifies with which strategy the backpressure should be applied. + */ +public sealed interface RebalanceStrategy { + /** + * When we want to apply backpressure to the Kafka we need to _pause_ the partitions. + * This will prevent Kafka from considering our consumer dead. + * + * Whenever we have space in the [Channel] we will _resume_ the partitions. + * + * NOTE: It can happen that with a slow processor you constantly pause and resume the partitions, + * which can cause considerable overhead to Kafka. + * Consider adding _logging_ or _alerting_ to your [RebalanceListener] to see how often this happens. + */ + public object Backpressure : RebalanceStrategy + + /** + * In some cases you **do not** want to apply automatic backpressure to Kafka. + * For example when you need to guarantee _high throughput_, + * and you **never** want a _pause_ or _resume_ causing a rebalance. + * + * In this scenario you can choose + */ + public class FailFast( + public val failFast: (Set) -> Nothing = { throw BackpressureException(it) } + ) : RebalanceStrategy +} + +public class BackpressureException( + public val partitions: Set +) : Exception("KafkaConsumer is not processing messages fast enough for partitions: $partitions") diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/ReceiverSettings.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/ReceiverSettings.kt index c7a99397..4ae45d36 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/ReceiverSettings.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/ReceiverSettings.kt @@ -36,6 +36,7 @@ public data class ReceiverSettings( val maxCommitAttempts: Int = DEFAULT_MAX_COMMIT_ATTEMPTS, val maxDeferredCommits: Int = 0, val closeTimeout: Duration = Duration.INFINITE, + val rebalanceStrategy: RebalanceStrategy = RebalanceStrategy.Backpressure, val properties: Properties = Properties(), ) { init { @@ -69,6 +70,7 @@ public fun ReceiverSettings( maxCommitAttempts: Int = DEFAULT_MAX_COMMIT_ATTEMPTS, maxDeferredCommits: Int = 0, closeTimeout: Duration = Long.MAX_VALUE.nanoseconds, + rebalanceStrategy: RebalanceStrategy = RebalanceStrategy.Backpressure, properties: Properties = Properties(), ): ReceiverSettings = ReceiverSettings( @@ -83,6 +85,7 @@ public fun ReceiverSettings( maxCommitAttempts, maxDeferredCommits, closeTimeout, + rebalanceStrategy, properties ) diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt index df74ab01..e01ee075 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt @@ -2,6 +2,7 @@ package io.github.nomisRev.kafka.receiver.internals import io.github.nomisRev.kafka.receiver.CommitStrategy import io.github.nomisRev.kafka.receiver.Offset +import io.github.nomisRev.kafka.receiver.RebalanceStrategy import io.github.nomisRev.kafka.receiver.ReceiverSettings import io.github.nomisRev.kafka.receiver.size import kotlinx.coroutines.CoroutineScope @@ -220,6 +221,14 @@ internal class EventLoop( } } + /** + * Returns `true` if you need to _pause_ the consumer, otherwise the consumer was already paused. + * + * If the consumer meets the following conditions, we should wakeup the consumer: + * - paused + * - The downstream can continue processing, requesting.get() == true + * - and we're not in retrying commits + */ private fun checkAndSetPausedByUs(): Boolean { logger.debug("checkAndSetPausedByUs") val pausedNow = !pausedByUs.getAndSet(true) @@ -246,41 +255,55 @@ internal class EventLoop( val pauseForDeferred = (settings.maxDeferredCommits > 0 && commitBatch.deferredCount() >= settings.maxDeferredCommits) - val shouldPoll: Boolean = if (pauseForDeferred || retrying.get()) false else requesting.get() - if (shouldPoll) { - if (!awaitingTransaction.get()) { - if (pausedByUs.getAndSet(false)) { - val toResume: MutableSet = HashSet(consumer.assignment()) - toResume.removeAll(pausedByUser) - pausedByUser.clear() - consumer.resume(toResume) - logger.debug("Resumed") - } - } else { - if (checkAndSetPausedByUs()) { - pausedByUser.addAll(consumer.paused()) - consumer.pause(consumer.assignment()) - logger.debug("Paused - awaiting transaction") - } + val shouldPoll: Boolean = + if (pauseForDeferred || retrying.get()) false + else requesting.get() + + when { + shouldPoll && !awaitingTransaction.get() && pausedByUs.getAndSet(false) -> { + val toResume: MutableSet = HashSet(consumer.assignment()) + toResume.removeAll(pausedByUser) + pausedByUser.clear() + consumer.resume(toResume) + logger.debug("Resumed") } - } else if (checkAndSetPausedByUs()) { - pausedByUser.addAll(consumer.paused()) - consumer.pause(consumer.assignment()) - when { - pauseForDeferred -> logger.debug("Paused - too many deferred commits") - retrying.get() -> logger.debug("Paused - commits are retrying") - else -> logger.debug("Paused - back pressure") + + shouldPoll && awaitingTransaction.get() && checkAndSetPausedByUs() -> { + pausedByUser.addAll(consumer.paused()) + consumer.pause(consumer.assignment()) + logger.debug("Paused - awaiting transaction") } + + // !shouldPoll so we need to apply backpressure to Kafka according to strategy + !shouldPoll && checkAndSetPausedByUs() -> + when (val strategy = settings.rebalanceStrategy) { + RebalanceStrategy.Backpressure -> { + pausedByUser.addAll(consumer.paused()) + consumer.pause(consumer.assignment()) + when { + pauseForDeferred -> logger.debug("Paused - too many deferred commits") + retrying.get() -> logger.debug("Paused - commits are retrying") + else -> logger.debug("Paused - back pressure") + } + } + + is RebalanceStrategy.FailFast -> strategy.failFast(consumer.assignment()) + } } + // Execute poll val records: ConsumerRecords = try { consumer.poll(pollTimeout) } catch (e: WakeupException) { logger.debug("Consumer woken") ConsumerRecords.empty() } + + // Schedule a new poll task on the single threaded dispatcher if (isActive.get()) schedulePoll() + + // Send the records downstream, and check state of buffer. Signal back-pressure in `requesting` if (!records.isEmpty) { if (settings.maxDeferredCommits > 0) { commitBatch.addUncommitted(records)