11import COMMANDS from '../commands' ;
2- import RedisSocket , { RedisSocketOptions } from './socket' ;
2+ import RedisSocket , { RedisSocketOptions , RedisTcpSocketOptions } from './socket' ;
33import { BasicAuth , CredentialsError , CredentialsProvider , StreamingCredentialsProvider , UnableToObtainNewCredentialsError , Disposable } from '../authx' ;
44import RedisCommandsQueue , { CommandOptions } from './commands-queue' ;
55import { EventEmitter } from 'node:events' ;
@@ -21,6 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser';
2121import SingleEntryCache from '../single-entry-cache' ;
2222import { version } from '../../package.json'
2323import EnterpriseMaintenanceManager , { MaintenanceUpdate , MovingEndpointType } from './enterprise-maintenance-manager' ;
24+ import { OTelMetrics } from '../opentelemetry/metrics' ;
2425
2526export interface RedisClientOptions <
2627 M extends RedisModules = RedisModules ,
@@ -1064,21 +1065,47 @@ export default class RedisClient<
10641065 args : ReadonlyArray < RedisArgument > ,
10651066 options ?: CommandOptions
10661067 ) : Promise < T > {
1068+ const recordOperation = OTelMetrics . instance . createRecordOperationDuration ( args , {
1069+ host : ( this . _self . #options. socket as RedisTcpSocketOptions ) ?. host || "" ,
1070+ port :
1071+ (
1072+ this . _self . #options. socket as RedisTcpSocketOptions
1073+ ) ?. port ?. toString ( ) || "" ,
1074+ db : this . _self . #selectedDB. toString ( ) ,
1075+ } ) ;
1076+
10671077 if ( ! this . _self . #socket. isOpen ) {
1078+ recordOperation ( new ClientClosedError ( ) ) ;
10681079 return Promise . reject ( new ClientClosedError ( ) ) ;
1069- } else if ( ! this . _self . #socket. isReady && this . _self . #options. disableOfflineQueue ) {
1080+ } else if (
1081+ ! this . _self . #socket. isReady &&
1082+ this . _self . #options. disableOfflineQueue
1083+ ) {
1084+ recordOperation ( new ClientOfflineError ( ) ) ;
10701085 return Promise . reject ( new ClientOfflineError ( ) ) ;
10711086 }
10721087
10731088 // Merge global options with provided options
10741089 const opts = {
10751090 ...this . _self . _commandOptions ,
1076- ...options
1077- }
1091+ ...options ,
1092+ } ;
10781093
10791094 const promise = this . _self . #queue. addCommand < T > ( args , opts ) ;
1095+ OTelMetrics . instance . recordPendingRequests ( 1 ) ;
1096+
1097+ const trackedPromise = promise . then ( ( reply ) => {
1098+ recordOperation ( ) ;
1099+ return reply ;
1100+ } ) . catch ( ( err ) => {
1101+ recordOperation ( err ) ;
1102+ throw err ;
1103+ } ) . finally ( ( ) => {
1104+ OTelMetrics . instance . recordPendingRequests ( - 1 ) ;
1105+ } ) ;
1106+
10801107 this . _self . #scheduleWrite( ) ;
1081- return promise ;
1108+ return trackedPromise ;
10821109 }
10831110
10841111 async SELECT ( db : number ) : Promise < void > {
0 commit comments