|
| 1 | +# KIP-848 Migration guide |
| 2 | + |
| 3 | +Starting with **confluent-kafka-javascript 1.6.0** (GA release), the next generation consumer group rebalance protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is **production-ready**. |
| 4 | + |
| 5 | +**Note:** The new consumer group protocol defined in [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) is not enabled by default. There are few contract change associated with the new protocol and might cause breaking changes. `group.protocol` configuration property dictates whether to use the new `consumer` protocol or older `classic` protocol. It defaults to `classic` if not provided. |
| 6 | + |
| 7 | +# Overview |
| 8 | + |
| 9 | +- **What changed:** |
| 10 | + |
| 11 | + The **Group Leader role** (consumer member) is removed. Assignments are calculated by the **Group Coordinator (broker)** and distributed via **heartbeats**. |
| 12 | + |
| 13 | +- **Requirements:** |
| 14 | + |
| 15 | + - Broker version **4.0.0+** |
| 16 | + - confluent-kafka-javascript version **1.6.0+**: GA (production-ready) |
| 17 | + |
| 18 | +- **Enablement (client-side):** |
| 19 | + |
| 20 | + - `group.protocol=consumer` |
| 21 | + - `group.remote.assignor=<assignor>` (optional; broker-controlled if unset; default broker assignor is `uniform`) |
| 22 | + |
| 23 | +# Available Features |
| 24 | + |
| 25 | +All [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol) features are supported including: |
| 26 | + |
| 27 | +- Subscription to one or more topics, including **regular expression (regex) subscriptions** |
| 28 | +- Rebalance callbacks (**incremental only**) |
| 29 | +- Static group membership |
| 30 | +- Configurable remote assignor |
| 31 | +- Enforced max poll interval |
| 32 | +- Upgrade from `classic` protocol or downgrade from `consumer` protocol |
| 33 | +- AdminClient changes as per KIP |
| 34 | + |
| 35 | +# Contract Changes |
| 36 | + |
| 37 | +## Client Configuration changes |
| 38 | + |
| 39 | +| Classic Protocol (Deprecated Configs in KIP-848) | KIP-848 / Next-Gen Replacement | |
| 40 | +|--------------------------------------------------|-------------------------------------------------------| |
| 41 | +| `partition.assignment.strategy` | `group.remote.assignor` | |
| 42 | +| `session.timeout.ms` | Broker config: `group.consumer.session.timeout.ms` | |
| 43 | +| `heartbeat.interval.ms` | Broker config: `group.consumer.heartbeat.interval.ms` | |
| 44 | +| `group.protocol.type` | Not used in the new protocol | |
| 45 | + |
| 46 | +**Note:** The properties listed under “Classic Protocol (Deprecated Configs in KIP-848)” are **no longer used** when using the KIP-848 consumer protocol. |
| 47 | + |
| 48 | +## Rebalance Callback Changes |
| 49 | + |
| 50 | +- The **protocol is fully incremental** in KIP-848. |
| 51 | +- In the **rebalance callbacks**, you **can use** (optional - if not used, client will handle it internally): |
| 52 | + - `assign(partitions)` to assign new partitions |
| 53 | + - `unassign(partitions)` to revoke partitions |
| 54 | +- If you don't call assign/unassign inside rebalance callbacks, the client will automatically use assign/unassign internally. |
| 55 | +- ⚠️ The `partitions` list passed to `assign()` and `unassign()` contains only the **incremental changes** — partitions being **added** or **revoked** — **not the full assignment**, as was the case in the classic protocol. |
| 56 | +- All assignors under KIP-848 are now **sticky**, including `range`, which was **not sticky** in the classic protocol. |
| 57 | + |
| 58 | +## Static Group Membership |
| 59 | + |
| 60 | +- Duplicate `group.instance.id` handling: |
| 61 | + - **Newly joining member** is fenced with **`ErrorCodes.ERR_UNRELEASED_INSTANCE_ID` (fatal)**. |
| 62 | + - (Classic protocol fenced the **existing** member instead.) |
| 63 | +- Implications: |
| 64 | + - Ensure only **one active instance per** `group.instance.id`. |
| 65 | + - Consumers must shut down cleanly to avoid blocking replacements until session timeout expires. |
| 66 | + |
| 67 | +## Session Timeout & Fetching |
| 68 | + |
| 69 | +- **Session timeout is broker-controlled**: |
| 70 | + - If the Coordinator is unreachable, a consumer **continues fetching messages** but cannot commit offsets. |
| 71 | + - Consumer is fenced once a heartbeat response is received from the Coordinator. |
| 72 | +- In the classic protocol, the client stopped fetching when session timeout expired. |
| 73 | + |
| 74 | +## Closing / Auto-Commit |
| 75 | + |
| 76 | +- On `disconnect()`: |
| 77 | + - Member retries committing offsets until a timeout expires. |
| 78 | + - Currently uses the **default remote session timeout** (45s). |
| 79 | + - Future **KIP-1092** will allow custom commit timeouts. |
| 80 | + |
| 81 | +## Error Handling Changes |
| 82 | + |
| 83 | +errors are reported in the logger and in the error callback (not available at the moment). |
| 84 | + |
| 85 | +- `ErrorCodes.ERR_UNKNOWN_TOPIC_OR_PART` (**subscription case**): |
| 86 | + - No longer returned if a topic is missing in the **local cache** when subscribing; the subscription proceeds. |
| 87 | +- `ErrorCodes.ERR_TOPIC_AUTHORIZATION_FAILED`: |
| 88 | + - Reported once per heartbeat or subscription change, even if only one topic is unauthorized. |
| 89 | + |
| 90 | +## Summary of Key Differences (Classic vs Next-Gen) |
| 91 | + |
| 92 | +- **Assignment:** Classic protocol calculated by **Group Leader (consumer)**; KIP-848 calculated by **Group Coordinator (broker)** |
| 93 | +- **Assignors:** Classic range assignor was **not sticky**; KIP-848 assignors are **sticky**, including range |
| 94 | +- **Deprecated configs:** Classic client configs are replaced by `group.remote.assignor` and broker-controlled session/heartbeat configs |
| 95 | +- **Static membership fencing:** KIP-848 fences **new member** on duplicate `group.instance.id` |
| 96 | +- **Session timeout:** Classic: enforced on client; KIP-848: enforced on broker |
| 97 | +- **Auto-commit on disconnect:** Classic: stops at client session timeout; KIP-848: retries until a default timeout (45s) is reached. |
| 98 | +- **Unknown topics:** KIP-848 does not return error on subscription if topic missing |
| 99 | +- **Upgrade/Downgrade:** KIP-848 supports upgrade/downgrade from/to `classic` and `consumer` protocols |
| 100 | + |
| 101 | +# Minimal Example Config |
| 102 | + |
| 103 | +## Classic Protocol |
| 104 | + |
| 105 | +``` properties |
| 106 | +# Optional; default is 'classic' |
| 107 | +group.protocol=classic |
| 108 | + |
| 109 | +partition.assignment.strategy=<range,roundrobin,sticky> |
| 110 | +session.timeout.ms=45000 |
| 111 | +heartbeat.interval.ms=15000 |
| 112 | +``` |
| 113 | + |
| 114 | +## Next-Gen Protocol / KIP-848 |
| 115 | + |
| 116 | +``` properties |
| 117 | +group.protocol=consumer |
| 118 | + |
| 119 | +# Optional: select a remote assignor |
| 120 | +# Valid options currently: 'uniform' or 'range' |
| 121 | +# group.remote.assignor=<uniform,range> |
| 122 | +# If unset, broker chooses the assignor (default: 'uniform') |
| 123 | + |
| 124 | +# Session & heartbeat now controlled by broker: |
| 125 | +# group.consumer.session.timeout.ms |
| 126 | +# group.consumer.heartbeat.interval.ms |
| 127 | +``` |
| 128 | + |
| 129 | +# Rebalance Callback Migration |
| 130 | + |
| 131 | +## Range Assignor (Classic) |
| 132 | + |
| 133 | +``` javascript |
| 134 | +// Rebalance Callback for Range Assignor (Classic Protocol) |
| 135 | +rebalance_cb: function (err, assignment, assignmentFns) { |
| 136 | + if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) { |
| 137 | + assignmentFns.assign(assignment); |
| 138 | + } else { |
| 139 | + assignmentFns.unassign(); |
| 140 | + } |
| 141 | +} |
| 142 | +``` |
| 143 | + |
| 144 | +## Incremental Assignor (KIP-848 with any remote assignor type) |
| 145 | + |
| 146 | +``` javascript |
| 147 | +// Rebalance callback for incremental assignor |
| 148 | +rebalance_cb: function (err, assignment, assignmentFns) { |
| 149 | + if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) { |
| 150 | + assignmentFns.assign(assignment); |
| 151 | + } else { |
| 152 | + assignmentFns.unassign(assignment); |
| 153 | + } |
| 154 | +} |
| 155 | +``` |
| 156 | + |
| 157 | +**Note:** The `assignment` list contains **only partitions being added or revoked**, not the full partition list as in the classic case. |
| 158 | + |
| 159 | +# Upgrade and Downgrade |
| 160 | + |
| 161 | +- A group made up entirely of `classic` consumers runs under the classic protocol. |
| 162 | +- The group is **upgraded to the consumer protocol** as soon as at least one `consumer` protocol member joins. |
| 163 | +- The group is **downgraded back to the classic protocol** if the last `consumer` protocol member leaves while `classic` members remain. |
| 164 | +- Both **rolling upgrade** (classic → consumer) and **rolling downgrade** (consumer → classic) are supported. |
| 165 | + |
| 166 | +# Migration Checklist (Next-Gen Protocol / [KIP-848](https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol)) |
| 167 | + |
| 168 | +1. Upgrade to **confluent-kafka-javascript ≥ 1.6.0** (GA release) |
| 169 | +2. Run against **Kafka brokers ≥ 4.0.0** |
| 170 | +3. Set `group.protocol=consumer` |
| 171 | +4. Optionally set `group.remote.assignor`; leave unspecified for broker-controlled (default: `uniform`), valid options: `uniform` or `range` |
| 172 | +5. Replace deprecated configs with new ones |
| 173 | +6. Update rebalance callbacks there are now **incremental** |
| 174 | +7. Review static membership handling (`group.instance.id`) |
| 175 | +8. Ensure proper shutdown to avoid fencing issues |
| 176 | +9. Adjust error handling for unknown topics and authorization failures |
0 commit comments