Skip to content

Commit d6cbbe1

Browse files
Expand XADD and XTRIM with exact trimming, limit and deletion policy.
Closes #3232 Signed-off-by: viktoriya.kutsarova <viktoriya.kutsarova@redis.com> # Conflicts: # src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java # src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java # src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java # src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java # src/test/java/org/springframework/data/redis/connection/jedis/StreamConvertersUnitTest.java # Conflicts: # Makefile
1 parent e7afe95 commit d6cbbe1

23 files changed

+1725
-93
lines changed

src/main/java/org/springframework/data/redis/connection/DefaultStringRedisConnection.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3021,6 +3021,11 @@ public Long xTrim(String key, long count, boolean approximateTrimming) {
30213021
return convertAndReturn(delegate.xTrim(serialize(key), count, approximateTrimming), Converters.identityConverter());
30223022
}
30233023

3024+
@Override
3025+
public Long xTrim(String key, XTrimOptions options) {
3026+
return convertAndReturn(delegate.xTrim(serialize(key), options), Converters.identityConverter());
3027+
}
3028+
30243029
@Override
30253030
public Long xAck(byte[] key, String group, RecordId... recordIds) {
30263031
return delegate.xAck(key, group, recordIds);
@@ -3129,6 +3134,11 @@ public Long xTrim(byte[] key, long count, boolean approximateTrimming) {
31293134
return delegate.xTrim(key, count, approximateTrimming);
31303135
}
31313136

3137+
@Override
3138+
public Long xTrim(byte[] key, XTrimOptions options) {
3139+
return delegate.xTrim(key, options);
3140+
}
3141+
31323142
/**
31333143
* Specifies if pipelined and tx results should be deserialized to Strings. If false, results of
31343144
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the underlying connection

src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,12 @@ default Long xTrim(byte[] key, long count, boolean approximateTrimming) {
692692
return streamCommands().xTrim(key, count, approximateTrimming);
693693
}
694694

695+
@Override
696+
@Deprecated
697+
default Long xTrim(byte[] key, XTrimOptions options) {
698+
return streamCommands().xTrim(key, options);
699+
}
700+
695701
// LIST COMMANDS
696702

697703
/** @deprecated in favor of {@link RedisConnection#listCommands()}}. */

src/main/java/org/springframework/data/redis/connection/ReactiveStreamCommands.java

Lines changed: 85 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.data.redis.connection.RedisStreamCommands.XAddOptions;
3737
import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions;
3838
import org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions;
39+
import org.springframework.data.redis.connection.RedisStreamCommands.XTrimOptions;
3940
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
4041
import org.springframework.data.redis.connection.stream.Consumer;
4142
import org.springframework.data.redis.connection.stream.PendingMessage;
@@ -200,20 +201,13 @@ default Mono<Long> xAck(ByteBuffer key, String group, RecordId... recordIds) {
200201
class AddStreamRecord extends KeyCommand {
201202

202203
private final ByteBufferRecord record;
203-
private final boolean nomkstream;
204-
private final @Nullable Long maxlen;
205-
private final boolean approximateTrimming;
206-
private final @Nullable RecordId minId;
204+
private final XAddOptions options;
207205

208-
private AddStreamRecord(ByteBufferRecord record, @Nullable Long maxlen, boolean nomkstream,
209-
boolean approximateTrimming, @Nullable RecordId minId) {
206+
private AddStreamRecord(ByteBufferRecord record, XAddOptions options) {
210207

211208
super(record.getStream());
212209
this.record = record;
213-
this.maxlen = maxlen;
214-
this.nomkstream = nomkstream;
215-
this.approximateTrimming = approximateTrimming;
216-
this.minId = minId;
210+
this.options = options;
217211
}
218212

219213
/**
@@ -226,7 +220,7 @@ public static AddStreamRecord of(ByteBufferRecord record) {
226220

227221
Assert.notNull(record, "Record must not be null");
228222

229-
return new AddStreamRecord(record, null, false, false, null);
223+
return new AddStreamRecord(record, XAddOptions.none());
230224
}
231225

232226
/**
@@ -239,7 +233,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
239233

240234
Assert.notNull(body, "Body must not be null");
241235

242-
return new AddStreamRecord(StreamRecords.rawBuffer(body), null, false, false, null);
236+
return new AddStreamRecord(StreamRecords.rawBuffer(body), XAddOptions.none());
243237
}
244238

245239
/**
@@ -249,7 +243,7 @@ public static AddStreamRecord body(Map<ByteBuffer, ByteBuffer> body) {
249243
* @return a new {@link ReactiveGeoCommands.GeoAddCommand} with {@literal key} applied.
250244
*/
251245
public AddStreamRecord to(ByteBuffer key) {
252-
return new AddStreamRecord(record.withStreamKey(key), maxlen, nomkstream, approximateTrimming, minId);
246+
return new AddStreamRecord(record.withStreamKey(key), options);
253247
}
254248

255249
/**
@@ -259,7 +253,7 @@ public AddStreamRecord to(ByteBuffer key) {
259253
* @since 2.6
260254
*/
261255
public AddStreamRecord makeNoStream() {
262-
return new AddStreamRecord(record, maxlen, true, approximateTrimming, minId);
256+
return new AddStreamRecord(record, XAddOptions.makeNoStream());
263257
}
264258

265259
/**
@@ -270,7 +264,7 @@ public AddStreamRecord makeNoStream() {
270264
* @since 2.6
271265
*/
272266
public AddStreamRecord makeNoStream(boolean makeNoStream) {
273-
return new AddStreamRecord(record, maxlen, makeNoStream, approximateTrimming, minId);
267+
return new AddStreamRecord(record, XAddOptions.makeNoStream(makeNoStream));
274268
}
275269

276270
/**
@@ -279,7 +273,7 @@ public AddStreamRecord makeNoStream(boolean makeNoStream) {
279273
* @return new instance of {@link AddStreamRecord}.
280274
*/
281275
public AddStreamRecord maxlen(long maxlen) {
282-
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
276+
return new AddStreamRecord(record, XAddOptions.maxlen(maxlen));
283277
}
284278

285279
/**
@@ -290,7 +284,7 @@ public AddStreamRecord maxlen(long maxlen) {
290284
* @since 2.7
291285
*/
292286
public AddStreamRecord minId(RecordId minId) {
293-
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
287+
return new AddStreamRecord(record, options.minId(minId));
294288
}
295289

296290
/**
@@ -299,7 +293,23 @@ public AddStreamRecord minId(RecordId minId) {
299293
* @return new instance of {@link AddStreamRecord}.
300294
*/
301295
public AddStreamRecord approximateTrimming(boolean approximateTrimming) {
302-
return new AddStreamRecord(record, maxlen, nomkstream, approximateTrimming, minId);
296+
return new AddStreamRecord(record, options.approximateTrimming(approximateTrimming));
297+
}
298+
299+
/**
300+
* Apply the given {@link XAddOptions} to configure the {@literal XADD} command.
301+
* <p>
302+
* This method allows setting all XADD options at once, including trimming strategies
303+
* ({@literal MAXLEN}, {@literal MINID}), stream creation behavior ({@literal NOMKSTREAM}),
304+
* and other parameters. Constructs a new command instance with all previously configured
305+
* properties except the options, which are replaced by the provided {@link XAddOptions}.
306+
*
307+
* @param options the {@link XAddOptions} to apply. Must not be {@literal null}.
308+
* @return a new {@link AddStreamRecord} with the specified options applied.
309+
* @since 4.0
310+
*/
311+
public AddStreamRecord withOptions(XAddOptions options) {
312+
return new AddStreamRecord(record, options);
303313
}
304314

305315
/**
@@ -318,7 +328,7 @@ public ByteBufferRecord getRecord() {
318328
* @since 2.6
319329
*/
320330
public boolean isNoMkStream() {
321-
return nomkstream;
331+
return options.isNoMkStream();
322332
}
323333

324334
/**
@@ -328,39 +338,45 @@ public boolean isNoMkStream() {
328338
* @since 2.3
329339
*/
330340
public @Nullable Long getMaxlen() {
331-
return maxlen;
341+
return options.getMaxlen();
332342
}
333343

334344
/**
335345
* @return {@literal true} if {@literal MAXLEN} is set.
336346
* @since 2.3
337347
*/
338-
public boolean hasMaxlen() {
339-
return maxlen != null;
340-
}
348+
public boolean hasMaxlen() { return options.hasMaxlen(); }
341349

342350
/**
343351
* @return {@literal true} if {@literal approximateTrimming} is set.
344352
* @since 2.7
345353
*/
346354
public boolean isApproximateTrimming() {
347-
return approximateTrimming;
355+
return options.isApproximateTrimming();
348356
}
349357

350358
/**
351359
* @return the minimum record Id to retain during trimming.
352360
* @since 2.7
353361
*/
354362
public @Nullable RecordId getMinId() {
355-
return minId;
363+
return options.getMinId();
356364
}
357365

358366
/**
359367
* @return {@literal true} if {@literal MINID} is set.
360368
* @since 2.7
361369
*/
362370
public boolean hasMinId() {
363-
return minId != null;
371+
return options.hasMinId();
372+
}
373+
374+
/**
375+
* @return the XAddOptions options.
376+
* @since 4.0
377+
*/
378+
public XAddOptions getOptions() {
379+
return options;
364380
}
365381
}
366382

@@ -409,18 +425,8 @@ default Mono<RecordId> xAdd(ByteBufferRecord record, XAddOptions xAddOptions) {
409425
Assert.notNull(record, "Record must not be null");
410426
Assert.notNull(xAddOptions, "XAddOptions must not be null");
411427

412-
AddStreamRecord addStreamRecord = AddStreamRecord.of(record)
413-
.approximateTrimming(xAddOptions.isApproximateTrimming()).makeNoStream(xAddOptions.isNoMkStream());
414-
415-
if (xAddOptions.hasMaxlen()) {
416-
addStreamRecord = addStreamRecord.maxlen(xAddOptions.getMaxlen());
417-
}
418-
419-
if (xAddOptions.hasMinId()) {
420-
addStreamRecord = addStreamRecord.minId(xAddOptions.getMinId());
421-
}
422-
423-
return xAdd(Mono.just(addStreamRecord)).next().map(CommandResponse::getOutput);
428+
return xAdd(Mono.just(AddStreamRecord.of(record).withOptions(xAddOptions))).next()
429+
.map(CommandResponse::getOutput);
424430
}
425431

426432
/**
@@ -1565,13 +1571,11 @@ default Flux<ByteBufferRecord> xRevRange(ByteBuffer key, Range<String> range, Li
15651571
*/
15661572
class TrimCommand extends KeyCommand {
15671573

1568-
private @Nullable Long count;
1569-
private boolean approximateTrimming;
1574+
private final XTrimOptions options;
15701575

1571-
private TrimCommand(@Nullable ByteBuffer key, @Nullable Long count, boolean approximateTrimming) {
1576+
private TrimCommand(@Nullable ByteBuffer key, XTrimOptions options) {
15721577
super(key);
1573-
this.count = count;
1574-
this.approximateTrimming = approximateTrimming;
1578+
this.options = options;
15751579
}
15761580

15771581
/**
@@ -1584,18 +1588,18 @@ public static TrimCommand stream(ByteBuffer key) {
15841588

15851589
Assert.notNull(key, "Key must not be null");
15861590

1587-
return new TrimCommand(key, null, false);
1591+
return new TrimCommand(key, XTrimOptions.none());
15881592
}
15891593

15901594
/**
1591-
* Applies the numeric {@literal count}. Constructs a new command instance with all previously configured
1595+
* Applies the numeric {@literal limit}. Constructs a new command instance with all previously configured
15921596
* properties.
15931597
*
1594-
* @param count
1595-
* @return a new {@link TrimCommand} with {@literal count} applied.
1598+
* @param limit
1599+
* @return a new {@link TrimCommand} with {@literal limit} applied.
15961600
*/
1597-
public TrimCommand to(long count) {
1598-
return new TrimCommand(getKey(), count, approximateTrimming);
1601+
public TrimCommand to(long limit) {
1602+
return new TrimCommand(getKey(), options.limit(limit));
15991603
}
16001604

16011605
/**
@@ -1616,18 +1620,39 @@ public TrimCommand approximate() {
16161620
* @since 2.4
16171621
*/
16181622
public TrimCommand approximate(boolean approximateTrimming) {
1619-
return new TrimCommand(getKey(), count, approximateTrimming);
1623+
return new TrimCommand(getKey(), options.approximateTrimming(approximateTrimming));
1624+
}
1625+
1626+
/**
1627+
* Apply the given {@link XTrimOptions} to configure the {@literal XTRIM} command.
1628+
* <p>
1629+
* This method allows setting all XTRIM options at once, including trimming strategies
1630+
* ({@literal MAXLEN}, {@literal MINID}), stream creation behavior ({@literal NOMKSTREAM}),
1631+
* and other parameters. Constructs a new command instance with all previously configured
1632+
* properties except the options, which are replaced by the provided {@link XTrimOptions}.
1633+
*
1634+
* @param options the {@link XTrimOptions} to apply. Must not be {@literal null}.
1635+
* @return a new {@link TrimCommand} with the specified options applied.
1636+
* @since 4.0
1637+
*/
1638+
public TrimCommand withOptions(XTrimOptions options) {
1639+
return new TrimCommand(getKey(), options);
16201640
}
16211641

16221642
/**
16231643
* @return can be {@literal null}.
16241644
*/
16251645
public @Nullable Long getCount() {
1626-
return count;
1646+
return options.getLimit();
16271647
}
16281648

1649+
16291650
public boolean isApproximateTrimming() {
1630-
return approximateTrimming;
1651+
return options.isApproximateTrimming();
1652+
}
1653+
1654+
public XTrimOptions getOptions() {
1655+
return options;
16311656
}
16321657
}
16331658

@@ -1661,6 +1686,14 @@ default Mono<Long> xTrim(ByteBuffer key, long count, boolean approximateTrimming
16611686
.map(NumericResponse::getOutput);
16621687
}
16631688

1689+
default Mono<Long> xTrim(ByteBuffer key, XTrimOptions options) {
1690+
1691+
Assert.notNull(key, "Key must not be null");
1692+
1693+
return xTrim(Mono.just(TrimCommand.stream(key).withOptions(options))).next()
1694+
.map(NumericResponse::getOutput);
1695+
}
1696+
16641697
/**
16651698
* Trims the stream to {@code count} elements.
16661699
*

0 commit comments

Comments
 (0)