From 286d116e654fe91087953486058b02c9b0a98cba Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Thu, 23 Oct 2025 13:47:05 +0300 Subject: [PATCH 1/5] introduce global interceptors --- packages/test-utils/lib/redis-proxy-spec.ts | 122 +++++++++++++++++--- packages/test-utils/lib/redis-proxy.ts | 75 +++++++++--- 2 files changed, 167 insertions(+), 30 deletions(-) diff --git a/packages/test-utils/lib/redis-proxy-spec.ts b/packages/test-utils/lib/redis-proxy-spec.ts index d0a4120455..6c43eca478 100644 --- a/packages/test-utils/lib/redis-proxy-spec.ts +++ b/packages/test-utils/lib/redis-proxy-spec.ts @@ -1,7 +1,7 @@ import { strict as assert } from 'node:assert'; import { Buffer } from 'node:buffer'; import { testUtils, GLOBAL } from './test-utils'; -import { InterceptorFunction, RedisProxy } from './redis-proxy'; +import { InterceptorDescription, RedisProxy } from './redis-proxy'; import type { RedisClientType } from '@redis/client/lib/client/index.js'; describe('RedisSocketProxy', function () { @@ -118,31 +118,37 @@ describe('RedisSocketProxy', function () { ) => { // Intercept PING commands and modify the response - const pingInterceptor: InterceptorFunction = async (data, next) => { - if (data.includes('PING')) { - return Buffer.from("+PINGINTERCEPTED\r\n"); + const pingInterceptor: InterceptorDescription = { + name: `ping`, + fn: async (data, next) => { + if (data.includes('PING')) { + return Buffer.from("+PINGINTERCEPTED\r\n"); + } + return next(data); } - return next(data); }; // Only intercept GET responses and double numeric values // Does not modify other commands or non-numeric GET responses - const doubleNumberGetInterceptor: InterceptorFunction = async (data, next) => { - const response = await next(data); + const doubleNumberGetInterceptor: InterceptorDescription = { + name: `double-number-get`, + fn: async (data, next) => { + const response = await next(data); - // Not a GET command, return original response - if (!data.includes("GET")) return response; + // Not a GET command, return original response + if (!data.includes("GET")) return response; - const value = (response.toString().split("\r\n"))[1]; - const number = Number(value); - // Not a number, return original response - if(isNaN(number)) return response; + const value = (response.toString().split("\r\n"))[1]; + const number = Number(value); + // Not a number, return original response + if(isNaN(number)) return response; - const doubled = String(number * 2); - return Buffer.from(`$${doubled.length}\r\n${doubled}\r\n`); + const doubled = String(number * 2); + return Buffer.from(`$${doubled.length}\r\n${doubled}\r\n`); + } }; - proxy.setInterceptors([ pingInterceptor, doubleNumberGetInterceptor ]) + proxy.setGlobalInterceptors([ pingInterceptor, doubleNumberGetInterceptor ]) const pingResponse = await proxiedClient.ping(); assert.equal(pingResponse, 'PINGINTERCEPTED', 'Response should be modified by middleware'); @@ -162,6 +168,90 @@ describe('RedisSocketProxy', function () { }, GLOBAL.SERVERS.OPEN_RESP_3, ); + + testUtils.testWithProxiedClient( + "Stats reflect middleware activity", + async ( + proxiedClient: RedisClientType, + proxy: RedisProxy, + ) => { + const PING = `ping`; + const SKIPPED = `skipped`; + proxy.setGlobalInterceptors([ + { + name: PING, + matchLimit: 3, + fn: async (data, next, state) => { + state.invokeCount++; + if(state.matchCount === state.matchLimit) return next(data); + if (data.includes("PING")) { + state.matchCount++; + return Buffer.from("+PINGINTERCEPTED\r\n"); + } + return next(data); + }, + }, + { + name: SKIPPED, + fn: async (data, next, state) => { + state.invokeCount++; + state.matchCount++; + // This interceptor does not match anything + return next(data); + }, + }, + ]); + + await proxiedClient.ping(); + await proxiedClient.ping(); + await proxiedClient.ping(); + + let stats = proxy.getStats(); + let pingInterceptor = stats.globalInterceptors.find( + (i) => i.name === PING, + ); + assert.ok(pingInterceptor, "PING interceptor stats should be present"); + assert.equal(pingInterceptor.invokeCount, 3); + assert.equal(pingInterceptor.matchCount, 3); + + let skipInterceptor = stats.globalInterceptors.find( + (i) => i.name === SKIPPED, + ); + assert.ok(skipInterceptor, "SKIPPED interceptor stats should be present"); + assert.equal(skipInterceptor.invokeCount, 0); + assert.equal(skipInterceptor.matchCount, 0); + + await proxiedClient.set("foo", "bar"); + await proxiedClient.get("foo"); + + stats = proxy.getStats(); + pingInterceptor = stats.globalInterceptors.find( + (i) => i.name === PING, + ); + assert.ok(pingInterceptor, "PING interceptor stats should be present"); + assert.equal(pingInterceptor.invokeCount, 5); + assert.equal(pingInterceptor.matchCount, 3); + + await proxiedClient.ping(); + + stats = proxy.getStats(); + pingInterceptor = stats.globalInterceptors.find( + (i) => i.name === PING, + ); + assert.ok(pingInterceptor, "PING interceptor stats should be present"); + assert.equal(pingInterceptor.invokeCount, 6); + assert.equal(pingInterceptor.matchCount, 3, 'Should not match more than limit'); + + skipInterceptor = stats.globalInterceptors.find( + (i) => i.name === SKIPPED, + ); + assert.ok(skipInterceptor, "PING interceptor stats should be present"); + assert.equal(skipInterceptor.invokeCount, 3); + assert.equal(skipInterceptor.matchCount, 3); + }, + GLOBAL.SERVERS.OPEN_RESP_3, + ); + }); }); diff --git a/packages/test-utils/lib/redis-proxy.ts b/packages/test-utils/lib/redis-proxy.ts index a4ea605285..29abc76d27 100644 --- a/packages/test-utils/lib/redis-proxy.ts +++ b/packages/test-utils/lib/redis-proxy.ts @@ -10,17 +10,22 @@ interface ProxyConfig { readonly enableLogging?: boolean; } -interface ConnectionInfo { +interface ConnectionInfoCommon { readonly id: string; readonly clientAddress: string; readonly clientPort: number; readonly connectedAt: Date; } -interface ActiveConnection extends ConnectionInfo { +interface ConnectionInfo extends ConnectionInfoCommon { + readonly interceptors: InterceptorState[]; +} + +interface ActiveConnection extends ConnectionInfoCommon { readonly clientSocket: net.Socket; readonly serverSocket: net.Socket; inflightRequestsCount: number + interceptors: Interceptor[]; } type SendResult = @@ -33,6 +38,7 @@ interface ProxyStats { readonly activeConnections: number; readonly totalConnections: number; readonly connections: readonly ConnectionInfo[]; + readonly globalInterceptors: InterceptorState[]; } interface ProxyEvents { @@ -50,16 +56,35 @@ interface ProxyEvents { 'close': () => void; } -export type Interceptor = (data: Buffer) => Promise; -export type InterceptorFunction = (data: Buffer, next: Interceptor) => Promise; -type InterceptorInitializer = (init: Interceptor) => Interceptor; +export type Next = (data: Buffer) => Promise; + +export type InterceptorFunction = (data: Buffer, next: Next, state: InterceptorState) => Promise; + +export interface InterceptorDescription { + name: string; + matchLimit?: number; + fn: InterceptorFunction; +} + +export interface InterceptorState { + name: string; + matchLimit?: number; + invokeCount: number; + matchCount: number; +} + +interface Interceptor { + name: string; + state: InterceptorState; + fn: InterceptorFunction; +} export class RedisProxy extends EventEmitter { private readonly server: net.Server; public readonly config: Required; private readonly connections: Map; private isRunning: boolean; - private interceptorInitializer: InterceptorInitializer = (init) => init; + private globalInterceptors: Interceptor[] = []; constructor(config: ProxyConfig) { super(); @@ -119,11 +144,25 @@ export class RedisProxy extends EventEmitter { }); } - public setInterceptors(interceptors: Array) { - this.interceptorInitializer = (init) => interceptors.reduceRight( - (next, mw) => (data) => mw(data, next), - init - ); + private makeInterceptor(description: InterceptorDescription): Interceptor { + const { name, fn, matchLimit } = description; + return { + name, + fn, + state: { + name, + matchCount: 0, + invokeCount: 0, + matchLimit, + }, + }; + } + + public setGlobalInterceptors( + interceptorDescriptions: Array, + ) { + const interceptors: Interceptor[] = interceptorDescriptions.map(this.makeInterceptor); + this.globalInterceptors = interceptors; } public getStats(): ProxyStats { @@ -132,12 +171,14 @@ export class RedisProxy extends EventEmitter { return { activeConnections: connections.length, totalConnections: connections.length, + globalInterceptors: this.globalInterceptors.map(i => i.state), connections: connections.map((conn) => ({ id: conn.id, clientAddress: conn.clientAddress, clientPort: conn.clientPort, connectedAt: conn.connectedAt, - })) + interceptors: conn.interceptors.map(i => i.state) + })), }; } @@ -246,7 +287,8 @@ export class RedisProxy extends EventEmitter { connectedAt: new Date(), clientSocket, serverSocket, - inflightRequestsCount: 0 + inflightRequestsCount: 0, + interceptors: [], }; this.connections.set(connectionId, connectionInfo); @@ -279,7 +321,12 @@ export class RedisProxy extends EventEmitter { }); }; - const interceptorChain = this.interceptorInitializer(last); + const interceptorChain = connectionInfo.interceptors.concat(this.globalInterceptors).reduceRight( + (next, interceptor) => (data) => + interceptor.fn(data, next, interceptor.state), + last, + ); + const response = await interceptorChain(data); clientSocket.write(response); }); From 0e8f4c73410ba7f3d4eaf3c53c915b6edb33ca80 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 24 Oct 2025 11:33:20 +0300 Subject: [PATCH 2/5] move proxy stuff to new folder --- packages/test-utils/lib/index.ts | 2 +- packages/test-utils/lib/{ => proxy}/redis-proxy-spec.ts | 2 +- packages/test-utils/lib/{ => proxy}/redis-proxy.ts | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename packages/test-utils/lib/{ => proxy}/redis-proxy-spec.ts (99%) rename packages/test-utils/lib/{ => proxy}/redis-proxy.ts (100%) diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 5f339e9a42..1a9d1c9845 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -26,7 +26,7 @@ import { hideBin } from 'yargs/helpers'; import * as fs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; -import { RedisProxy, getFreePortNumber } from './redis-proxy'; +import { RedisProxy, getFreePortNumber } from './proxy/redis-proxy'; interface TestUtilsConfig { /** diff --git a/packages/test-utils/lib/redis-proxy-spec.ts b/packages/test-utils/lib/proxy/redis-proxy-spec.ts similarity index 99% rename from packages/test-utils/lib/redis-proxy-spec.ts rename to packages/test-utils/lib/proxy/redis-proxy-spec.ts index 6c43eca478..048bfca789 100644 --- a/packages/test-utils/lib/redis-proxy-spec.ts +++ b/packages/test-utils/lib/proxy/redis-proxy-spec.ts @@ -1,6 +1,6 @@ import { strict as assert } from 'node:assert'; import { Buffer } from 'node:buffer'; -import { testUtils, GLOBAL } from './test-utils'; +import { testUtils, GLOBAL } from '../test-utils'; import { InterceptorDescription, RedisProxy } from './redis-proxy'; import type { RedisClientType } from '@redis/client/lib/client/index.js'; diff --git a/packages/test-utils/lib/redis-proxy.ts b/packages/test-utils/lib/proxy/redis-proxy.ts similarity index 100% rename from packages/test-utils/lib/redis-proxy.ts rename to packages/test-utils/lib/proxy/redis-proxy.ts From e9538e3e4a3540e47c71f04133762fd0fb30e2c6 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 24 Oct 2025 12:07:42 +0300 Subject: [PATCH 3/5] implement resp framer --- .../test-utils/lib/proxy/resp-framer-spec.ts | 735 ++++++++++++++++++ packages/test-utils/lib/proxy/resp-framer.ts | 164 ++++ 2 files changed, 899 insertions(+) create mode 100644 packages/test-utils/lib/proxy/resp-framer-spec.ts create mode 100644 packages/test-utils/lib/proxy/resp-framer.ts diff --git a/packages/test-utils/lib/proxy/resp-framer-spec.ts b/packages/test-utils/lib/proxy/resp-framer-spec.ts new file mode 100644 index 0000000000..1fd0c7bc36 --- /dev/null +++ b/packages/test-utils/lib/proxy/resp-framer-spec.ts @@ -0,0 +1,735 @@ +import { strict as assert } from 'node:assert'; +import RespFramer from './resp-framer'; + +describe('RespFramer - RESP2', () => { + it('should emit a simple string message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('+OK\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit an error message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('-ERR unknown command\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit an integer message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from(':1000\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a bulk string message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('$6\r\nfoobar\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a null bulk string', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('$-1\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit an array message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a null array', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*-1\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit nested arrays', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*2\r\n*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n*1\r\n$3\r\nbaz\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle multiple complete messages', async () => { + const framer = new RespFramer(); + const messages = [ + Buffer.from('+OK\r\n'), + Buffer.from(':42\r\n'), + Buffer.from('$3\r\nfoo\r\n') + ]; + const combined = Buffer.concat(messages); + const received: Buffer[] = []; + + const messagesPromise = new Promise((resolve) => { + framer.on('message', (message) => { + received.push(message); + if (received.length === 3) { + resolve(received); + } + }); + }); + + framer.write(combined); + const result = await messagesPromise; + assert.equal(result.length, messages.length); + messages.forEach((expected, i) => { + assert.deepEqual(result[i], expected); + }); + }); + + it('should handle partial messages across multiple writes', async () => { + const framer = new RespFramer(); + const fullMessage = Buffer.from('$6\r\nfoobar\r\n'); + const part1 = fullMessage.subarray(0, 5); + const part2 = fullMessage.subarray(5); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(part1); + framer.write(part2); + const message = await messagePromise; + assert.deepEqual(message, fullMessage); + }); + + it('should handle array split across multiple writes', async () => { + const framer = new RespFramer(); + const fullMessage = Buffer.from('*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n'); + const part1 = fullMessage.subarray(0, 10); + const part2 = fullMessage.subarray(10); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(part1); + framer.write(part2); + const message = await messagePromise; + assert.deepEqual(message, fullMessage); + }); + + it('should handle empty array', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*0\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle empty bulk string', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('$0\r\n\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle mixed message types in sequence', async () => { + const framer = new RespFramer(); + const messages = [ + Buffer.from('+PONG\r\n'), + Buffer.from('$3\r\nGET\r\n'), + Buffer.from('*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n'), + Buffer.from(':123\r\n'), + Buffer.from('-Error\r\n') + ]; + const received: Buffer[] = []; + + const messagesPromise = new Promise((resolve) => { + framer.on('message', (message) => { + received.push(message); + if (received.length === messages.length) { + resolve(received); + } + }); + }); + + messages.forEach(msg => framer.write(msg)); + const result = await messagesPromise; + assert.equal(result.length, messages.length); + messages.forEach((expected, i) => { + assert.deepEqual(result[i], expected); + }); + }); + + it('should handle bulk string containing \\r\\n in the data', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('$12\r\nhello\r\nworld\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle bulk string with binary data including null bytes', async () => { + const framer = new RespFramer(); + const binaryData = Buffer.from([0x00, 0x01, 0x02, 0xff, 0xfe]); + const expected = Buffer.concat([ + Buffer.from('$5\r\n'), + binaryData, + Buffer.from('\r\n') + ]); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle array with bulk strings containing \\r\\n', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*2\r\n$5\r\nfoo\r\n\r\n$5\r\nbar\r\n\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); +}); + +describe('RespFramer - RESP3', () => { + it('should emit a null message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('_\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a boolean true message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('#t\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a boolean false message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('#f\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a double message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from(',3.14159\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a double infinity message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from(',inf\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a double negative infinity message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from(',-inf\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a big number message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('(3492890328409238509324850943850943825024385\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a bulk error message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('!21\r\nSYNTAX invalid syntax\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a verbatim string message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('=15\r\ntxt:Some string\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a map message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%2\r\n+first\r\n:1\r\n+second\r\n:2\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a set message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('~3\r\n+apple\r\n+banana\r\n+cherry\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit a push message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('>3\r\n+pubsub\r\n+message\r\n+channel\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should emit an attribute message', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('|1\r\n+key-popularity\r\n%2\r\n$1\r\na\r\n,0.1923\r\n$1\r\nb\r\n,0.0012\r\n*2\r\n:2039123\r\n:9543892\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle nested RESP3 structures', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%2\r\n$4\r\nname\r\n$5\r\nAlice\r\n$3\r\nage\r\n:30\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle empty map', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%0\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle empty set', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('~0\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle map with nested arrays', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%1\r\n$4\r\ndata\r\n*2\r\n:1\r\n:2\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle set with mixed types', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('~4\r\n+string\r\n:42\r\n#t\r\n_\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle RESP3 split across multiple writes', async () => { + const framer = new RespFramer(); + const fullMessage = Buffer.from('%2\r\n+key1\r\n:100\r\n+key2\r\n:200\r\n'); + const part1 = fullMessage.subarray(0, 10); + const part2 = fullMessage.subarray(10); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(part1); + framer.write(part2); + const message = await messagePromise; + assert.deepEqual(message, fullMessage); + }); + + it('should handle mixed RESP2 and RESP3 messages', async () => { + const framer = new RespFramer(); + const messages = [ + Buffer.from('*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n'), + Buffer.from('%1\r\n+result\r\n$5\r\nvalue\r\n'), + Buffer.from('#t\r\n'), + Buffer.from('_\r\n'), + Buffer.from(',3.14\r\n') + ]; + const received: Buffer[] = []; + + const messagesPromise = new Promise((resolve) => { + framer.on('message', (message) => { + received.push(message); + if (received.length === messages.length) { + resolve(received); + } + }); + }); + + messages.forEach(msg => framer.write(msg)); + const result = await messagesPromise; + assert.equal(result.length, messages.length); + messages.forEach((expected, i) => { + assert.deepEqual(result[i], expected); + }); + }); + + it('should handle array with attribute metadata', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*3\r\n:1\r\n:2\r\n|1\r\n+ttl\r\n:3600\r\n:3\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle null map', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%-1\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle null set', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('~-1\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle null push', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('>-1\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle attribute with empty metadata', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('|0\r\n:42\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle blob error with binary data', async () => { + const framer = new RespFramer(); + const binaryData = Buffer.from([0x00, 0x01, 0x02, 0xff, 0xfe]); + const expected = Buffer.concat([ + Buffer.from('!5\r\n'), + binaryData, + Buffer.from('\r\n') + ]); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle verbatim string with different encoding', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('=17\r\nmkd:# Hello World\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle double NaN', async () => { + const framer = new RespFramer(); + const expected = Buffer.from(',nan\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle deeply nested structures', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('*2\r\n%1\r\n+key\r\n*2\r\n:1\r\n:2\r\n~2\r\n+a\r\n+b\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle push with nested map', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('>2\r\n+pubsub\r\n%1\r\n+channel\r\n+news\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle attribute split across multiple writes', async () => { + const framer = new RespFramer(); + const fullMessage = Buffer.from('|1\r\n+ttl\r\n:3600\r\n+value\r\n'); + const part1 = fullMessage.subarray(0, 10); + const part2 = fullMessage.subarray(10); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(part1); + framer.write(part2); + const message = await messagePromise; + assert.deepEqual(message, fullMessage); + }); + + it('should handle map with null values', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%2\r\n+key1\r\n_\r\n+key2\r\n$-1\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle nested maps', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('%1\r\n+outer\r\n%2\r\n+inner1\r\n:1\r\n+inner2\r\n:2\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); + + it('should handle set containing arrays', async () => { + const framer = new RespFramer(); + const expected = Buffer.from('~2\r\n*2\r\n:1\r\n:2\r\n*2\r\n:3\r\n:4\r\n'); + + const messagePromise = new Promise((resolve) => { + framer.once('message', resolve); + }); + + framer.write(expected); + const message = await messagePromise; + assert.deepEqual(message, expected); + }); +}); diff --git a/packages/test-utils/lib/proxy/resp-framer.ts b/packages/test-utils/lib/proxy/resp-framer.ts new file mode 100644 index 0000000000..5cc3fe656b --- /dev/null +++ b/packages/test-utils/lib/proxy/resp-framer.ts @@ -0,0 +1,164 @@ +// RespFramer: Frames raw Buffer data into complete RESP messages +// Accumulates incoming bytes and emits each complete RESP message as a separate Buffer + +import EventEmitter from "node:events"; + +export interface RespFramerEvents { + message: (data: Buffer) => void; +} + +export default class RespFramer extends EventEmitter { + private buffer: Buffer; + private offset: number; + + constructor() { + super(); + this.buffer = Buffer.alloc(0); + this.offset = 0; + } + + public write(data: Buffer) { + this.buffer = Buffer.concat([this.buffer, data]); + + while (this.offset < this.buffer.length) { + const messageEnd = this.findMessageEnd(this.buffer, this.offset); + if (messageEnd === -1) { + break; // Incomplete message + } + const message = this.buffer.subarray(this.offset, messageEnd); + this.emit('message', message); + this.offset = messageEnd; + } + + // Remove processed data from the buffer + if (this.offset > 0) { + this.buffer = this.buffer.subarray(this.offset); + this.offset = 0; + } + + } + private findMessageEnd(buffer: Buffer, start: number): number { + if (start >= buffer.length) { + return -1; + } + const prefix = String.fromCharCode(buffer[start]); + switch (prefix) { + case '+': // Simple String + case '-': // Error + case ':': // Integer + case '_': // Null + case '#': // Boolean + case ',': // Double + case '(': // Big Number + return this.findLineEnd(buffer, start); + case '$': // Bulk String + case '!': // Bulk Error + case '=': // Verbatim String + return this.findBulkStringEnd(buffer, start); + case '*': // Array + return this.findArrayEnd(buffer, start); + case '%': // Map + return this.findMapEnd(buffer, start); + case '~': // Set + case '>': // Push + return this.findArrayEnd(buffer, start); + case '|': // Attribute + return this.findAttributeEnd(buffer, start); + default: + return -1; // Unknown prefix + } + } + + private findArrayEnd(buffer: Buffer, start: number): number { + const result = this.readLength(buffer, start); + if (!result) { + return -1; + } + const { length, lineEnd } = result; + if (length === -1) { + return lineEnd; + } + let currentOffset = lineEnd; + for (let i = 0; i < length; i++) { + const elementEnd = this.findMessageEnd(buffer, currentOffset); + if (elementEnd === -1) { + return -1; + } + currentOffset = elementEnd; + } + return currentOffset; + } + + private findBulkStringEnd(buffer: Buffer, start: number): number { + const result = this.readLength(buffer, start); + if (!result) { + return -1; + } + const { length, lineEnd } = result; + if (length === -1) { + return lineEnd; + } + const totalLength = lineEnd + length + 2; + return totalLength <= buffer.length ? totalLength : -1; + } + + private findMapEnd(buffer: Buffer, start: number): number { + const result = this.readLength(buffer, start); + if (!result) { + return -1; + } + const { length, lineEnd } = result; + if (length === -1) { + return lineEnd; + } + let currentOffset = lineEnd; + for (let i = 0; i < length * 2; i++) { + const elementEnd = this.findMessageEnd(buffer, currentOffset); + if (elementEnd === -1) { + return -1; + } + currentOffset = elementEnd; + } + return currentOffset; + } + + private findAttributeEnd(buffer: Buffer, start: number): number { + const result = this.readLength(buffer, start); + if (!result) { + return -1; + } + const { length, lineEnd } = result; + let currentOffset = lineEnd; + for (let i = 0; i < length * 2; i++) { + const elementEnd = this.findMessageEnd(buffer, currentOffset); + if (elementEnd === -1) { + return -1; + } + currentOffset = elementEnd; + } + const valueEnd = this.findMessageEnd(buffer, currentOffset); + if (valueEnd === -1) { + return -1; + } + return valueEnd; + } + + private findLineEnd(buffer: Buffer, start: number): number { + const end = buffer.indexOf('\r\n', start); + return end !== -1 ? end + 2 : -1; + } + + private readLength(buffer: Buffer, start: number): { length: number; lineEnd: number } | null { + const lineEnd = this.findLineEnd(buffer, start); + if (lineEnd === -1) { + return null; + } + const lengthLine = buffer.subarray(start + 1, lineEnd - 2).toString(); + const length = parseInt(lengthLine, 10); + if (isNaN(length)) { + return null; + } + return { length, lineEnd }; + } + +} From 6cec975d3be4495e329f54e8d244b10cf96c66f9 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Fri, 24 Oct 2025 11:33:40 +0300 Subject: [PATCH 4/5] properly handle request/response and push --- .../test-utils/lib/proxy/redis-proxy-spec.ts | 58 +++++++++++++++++++ packages/test-utils/lib/proxy/redis-proxy.ts | 39 ++++++------- packages/test-utils/lib/proxy/resp-framer.ts | 43 +++++++------- packages/test-utils/lib/proxy/resp-queue.ts | 43 ++++++++++++++ 4 files changed, 143 insertions(+), 40 deletions(-) create mode 100644 packages/test-utils/lib/proxy/resp-queue.ts diff --git a/packages/test-utils/lib/proxy/redis-proxy-spec.ts b/packages/test-utils/lib/proxy/redis-proxy-spec.ts index 048bfca789..f86d96d014 100644 --- a/packages/test-utils/lib/proxy/redis-proxy-spec.ts +++ b/packages/test-utils/lib/proxy/redis-proxy-spec.ts @@ -252,6 +252,64 @@ describe('RedisSocketProxy', function () { GLOBAL.SERVERS.OPEN_RESP_3, ); + testUtils.testWithProxiedClient( + "Middleware is given exactly one RESP message at a time", + async ( + proxiedClient: RedisClientType, + proxy: RedisProxy, + ) => { + proxy.setGlobalInterceptors([ + { + name: `ping`, + fn: async (data, next, state) => { + state.invokeCount++; + if (data.equals(Buffer.from("*1\r\n$4\r\nPING\r\n"))) { + state.matchCount++; + } + return next(data); + }, + }, + ]); + + await Promise.all([proxiedClient.ping(), proxiedClient.ping()]); + + const stats = proxy.getStats(); + const pingInterceptor = stats.globalInterceptors.find( + (i) => i.name === `ping`, + ); + assert.ok(pingInterceptor, "PING interceptor stats should be present"); + assert.equal(pingInterceptor.invokeCount, 2); + assert.equal(pingInterceptor.matchCount, 2); + }, + GLOBAL.SERVERS.OPEN_RESP_3, + ); + + testUtils.testWithProxiedClient( + "Proxy passes through push messages", + async ( + proxiedClient: RedisClientType, + proxy: RedisProxy, + ) => { + let resolve: (value: string) => void; + const promise = new Promise((rs) => { resolve = rs; }); + await proxiedClient.subscribe("test-push-channel", (message) => { + resolve(message); + }); + + await proxiedClient.publish("test-push-channel", "hello"); + const result = await promise; + assert.equal(result, "hello", "Should receive push message through proxy"); + }, + { + ...GLOBAL.SERVERS.OPEN_RESP_3, + clientOptions: { + maintNotifications: 'disabled', + disableClientInfo: true, + RESP: 3 + } + }, + ); }); + }); diff --git a/packages/test-utils/lib/proxy/redis-proxy.ts b/packages/test-utils/lib/proxy/redis-proxy.ts index 29abc76d27..9743b99d6e 100644 --- a/packages/test-utils/lib/proxy/redis-proxy.ts +++ b/packages/test-utils/lib/proxy/redis-proxy.ts @@ -1,5 +1,7 @@ import * as net from 'net'; import { EventEmitter } from 'events'; +import RespFramer from './resp-framer'; +import RespQueue from './resp-queue'; interface ProxyConfig { readonly listenPort: number; @@ -24,7 +26,6 @@ interface ConnectionInfo extends ConnectionInfoCommon { interface ActiveConnection extends ConnectionInfoCommon { readonly clientSocket: net.Socket; readonly serverSocket: net.Socket; - inflightRequestsCount: number interceptors: Interceptor[]; } @@ -287,7 +288,6 @@ export class RedisProxy extends EventEmitter { connectedAt: new Date(), clientSocket, serverSocket, - inflightRequestsCount: 0, interceptors: [], }; @@ -301,24 +301,23 @@ export class RedisProxy extends EventEmitter { this.emit('connection', connectionInfo); }); - clientSocket.on('data', async (data) => { - this.emit('data', connectionId, 'client->server', data); + /** + * + * client -> clientSocket -> clientRespFramer -> interceptors -> queue -> serverSocket -> server + * client <- clientSocket <- interceptors <- response | queue <- serverRespFramer <- serverSocket <- server + * client <- clientSocket <- push | + */ + const clientRespFramer = new RespFramer(); + const respQueue = new RespQueue(serverSocket); - connectionInfo.inflightRequestsCount++; + clientRespFramer.on('message', async (data) => { // next1 -> next2 -> ... -> last -> server // next1 <- next2 <- ... <- last <- server - const last = (data: Buffer): Promise => { - return new Promise((resolve, reject) => { - serverSocket.write(data); - serverSocket.once('data', (data) => { - connectionInfo.inflightRequestsCount--; - assert(connectionInfo.inflightRequestsCount >= 0, `inflightRequestsCount for connection ${connectionId} went below zero`); - this.emit('data', connectionId, 'server->client', data); - resolve(data); - }); - serverSocket.once('error', reject); - }); + const last = async (data: Buffer): Promise => { + this.emit('data', connectionId, 'client->server', data); + const response = await respQueue.request(data); + return response; }; const interceptorChain = connectionInfo.interceptors.concat(this.globalInterceptors).reduceRight( @@ -328,11 +327,13 @@ export class RedisProxy extends EventEmitter { ); const response = await interceptorChain(data); + this.emit('data', connectionId, 'server->client', response); clientSocket.write(response); }); - serverSocket.on('data', (data) => { - if (connectionInfo.inflightRequestsCount > 0) return; + clientSocket.on('data', data => clientRespFramer.write(data)); + + respQueue.on('push', (data) => { this.emit('data', connectionId, 'server->client', data); clientSocket.write(data); }); @@ -357,7 +358,6 @@ export class RedisProxy extends EventEmitter { }); serverSocket.on('error', (error) => { - if (connectionInfo.inflightRequestsCount > 0) return; this.log(`Server error for connection ${connectionId}: ${error.message}`); this.emit('error', error, connectionId); clientSocket.destroy(); @@ -391,7 +391,6 @@ export class RedisProxy extends EventEmitter { } } import { createServer } from 'net'; -import assert from 'node:assert'; export function getFreePortNumber(): Promise { return new Promise((resolve, reject) => { diff --git a/packages/test-utils/lib/proxy/resp-framer.ts b/packages/test-utils/lib/proxy/resp-framer.ts index 5cc3fe656b..d92dd6fc80 100644 --- a/packages/test-utils/lib/proxy/resp-framer.ts +++ b/packages/test-utils/lib/proxy/resp-framer.ts @@ -5,6 +5,7 @@ import EventEmitter from "node:events"; export interface RespFramerEvents { message: (data: Buffer) => void; + push: (data: Buffer) => void; } export default class RespFramer extends EventEmitter { @@ -26,7 +27,7 @@ export default class RespFramer extends EventEmitter { break; // Incomplete message } const message = this.buffer.subarray(this.offset, messageEnd); - this.emit('message', message); + this.emit("message", message); this.offset = messageEnd; } @@ -35,34 +36,34 @@ export default class RespFramer extends EventEmitter { this.buffer = this.buffer.subarray(this.offset); this.offset = 0; } - } + private findMessageEnd(buffer: Buffer, start: number): number { if (start >= buffer.length) { return -1; } const prefix = String.fromCharCode(buffer[start]); switch (prefix) { - case '+': // Simple String - case '-': // Error - case ':': // Integer - case '_': // Null - case '#': // Boolean - case ',': // Double - case '(': // Big Number + case "+": // Simple String + case "-": // Error + case ":": // Integer + case "_": // Null + case "#": // Boolean + case ",": // Double + case "(": // Big Number return this.findLineEnd(buffer, start); - case '$': // Bulk String - case '!': // Bulk Error - case '=': // Verbatim String + case "$": // Bulk String + case "!": // Bulk Error + case "=": // Verbatim String return this.findBulkStringEnd(buffer, start); - case '*': // Array + case "*": // Array return this.findArrayEnd(buffer, start); - case '%': // Map + case "%": // Map return this.findMapEnd(buffer, start); - case '~': // Set - case '>': // Push + case "~": // Set + case ">": // Push return this.findArrayEnd(buffer, start); - case '|': // Attribute + case "|": // Attribute return this.findAttributeEnd(buffer, start); default: return -1; // Unknown prefix @@ -144,11 +145,14 @@ export default class RespFramer extends EventEmitter { } private findLineEnd(buffer: Buffer, start: number): number { - const end = buffer.indexOf('\r\n', start); + const end = buffer.indexOf("\r\n", start); return end !== -1 ? end + 2 : -1; } - private readLength(buffer: Buffer, start: number): { length: number; lineEnd: number } | null { + private readLength( + buffer: Buffer, + start: number, + ): { length: number; lineEnd: number } | null { const lineEnd = this.findLineEnd(buffer, start); if (lineEnd === -1) { return null; @@ -160,5 +164,4 @@ export default class RespFramer extends EventEmitter { } return { length, lineEnd }; } - } diff --git a/packages/test-utils/lib/proxy/resp-queue.ts b/packages/test-utils/lib/proxy/resp-queue.ts new file mode 100644 index 0000000000..d4c410a541 --- /dev/null +++ b/packages/test-utils/lib/proxy/resp-queue.ts @@ -0,0 +1,43 @@ +import { EventEmitter } from "node:events"; +import RespFramer from "./resp-framer"; +import { Socket } from "node:net"; + +interface Request { + resolve: (data: Buffer) => void; + reject: (reason: any) => void; +} + +export default class RespQueue extends EventEmitter { + queue: Request[] = []; + respFramer: RespFramer = new RespFramer(); + + constructor(private serverSocket: Socket) { + super(); + this.respFramer.on("message", (msg) => this.handleMessage(msg)); + this.serverSocket.on("data", (data) => this.respFramer.write(data)); + } + + handleMessage(data: Buffer) { + const request = this.queue.shift(); + if (request) { + request.resolve(data); + } else { + this.emit("push", data); + } + } + + request(data: Buffer): Promise { + let resolve: (data: Buffer) => void; + let reject: (reason: any) => void; + + const promise = new Promise((rs, rj) => { + resolve = rs; + reject = rj; + }); + + //@ts-ignore + this.queue.push({ resolve, reject }); + this.serverSocket.write(data); + return promise; + } +} From 94dae8744604ffa76ea7812d7a594abc1ae20634 Mon Sep 17 00:00:00 2001 From: Nikolay Karadzhov Date: Tue, 28 Oct 2025 10:03:48 +0200 Subject: [PATCH 5/5] add global interceptor --- packages/test-utils/lib/proxy/redis-proxy.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/test-utils/lib/proxy/redis-proxy.ts b/packages/test-utils/lib/proxy/redis-proxy.ts index 9743b99d6e..40dca2c717 100644 --- a/packages/test-utils/lib/proxy/redis-proxy.ts +++ b/packages/test-utils/lib/proxy/redis-proxy.ts @@ -166,6 +166,13 @@ export class RedisProxy extends EventEmitter { this.globalInterceptors = interceptors; } + public addGlobalInterceptor( + interceptorDescription: InterceptorDescription, + ) { + const interceptor = this.makeInterceptor(interceptorDescription); + this.globalInterceptors = [interceptor, ...this.globalInterceptors.filter(i => i.name !== interceptor.name)]; + } + public getStats(): ProxyStats { const connections = Array.from(this.connections.values());