Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions packages/microservices/client/client-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import {
import { first, map, retryWhen, scan, skip, switchMap } from 'rxjs/operators';
import {
DISCONNECTED_RMQ_MESSAGE,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NO_ASSERT,
RQM_DEFAULT_NOACK,
RQM_DEFAULT_PERSISTENT,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RMQ_DEFAULT_NO_ASSERT,
RMQ_DEFAULT_NOACK,
RMQ_DEFAULT_PERSISTENT,
RMQ_DEFAULT_PREFETCH_COUNT,
RMQ_DEFAULT_QUEUE,
RMQ_DEFAULT_QUEUE_OPTIONS,
RMQ_DEFAULT_URL,
} from '../constants';
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
import { ReadPacket, RmqOptions, WritePacket } from '../interfaces';
Expand Down Expand Up @@ -71,11 +71,11 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {

constructor(protected readonly options: Required<RmqOptions>['options']) {
super();
this.queue = this.getOptionsProp(this.options, 'queue', RQM_DEFAULT_QUEUE);
this.queue = this.getOptionsProp(this.options, 'queue', RMQ_DEFAULT_QUEUE);
this.queueOptions = this.getOptionsProp(
this.options,
'queueOptions',
RQM_DEFAULT_QUEUE_OPTIONS,
RMQ_DEFAULT_QUEUE_OPTIONS,
);
this.replyQueue = this.getOptionsProp(
this.options,
Expand All @@ -85,7 +85,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
this.noAssert =
this.getOptionsProp(this.options, 'noAssert') ??
this.queueOptions.noAssert ??
RQM_DEFAULT_NO_ASSERT;
RMQ_DEFAULT_NO_ASSERT;

loadPackage('amqplib', ClientRMQ.name, () => require('amqplib'));
rmqPackage = loadPackage('amqp-connection-manager', ClientRMQ.name, () =>
Expand Down Expand Up @@ -150,7 +150,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {

public createClient(): AmqpConnectionManager {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
const urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
const urls = this.getOptionsProp(this.options, 'urls') || [RMQ_DEFAULT_URL];
return rmqPackage.connect(urls, socketOptions);
}

Expand Down Expand Up @@ -199,10 +199,10 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
public async setupChannel(channel: Channel, resolve: Function) {
const prefetchCount =
this.getOptionsProp(this.options, 'prefetchCount') ||
RQM_DEFAULT_PREFETCH_COUNT;
RMQ_DEFAULT_PREFETCH_COUNT;
const isGlobalPrefetchCount =
this.getOptionsProp(this.options, 'isGlobalPrefetchCount') ||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;
RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT;

if (!this.noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
Expand Down Expand Up @@ -239,7 +239,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
}

public async consumeChannel(channel: Channel) {
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
const noAck = this.getOptionsProp(this.options, 'noAck', RMQ_DEFAULT_NOACK);
await channel.consume(
this.replyQueue,
(msg: ConsumeMessage | null) =>
Expand Down Expand Up @@ -384,7 +384,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
RMQ_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
Expand Down Expand Up @@ -435,7 +435,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
persistent: this.getOptionsProp(
this.options,
'persistent',
RQM_DEFAULT_PERSISTENT,
RMQ_DEFAULT_PERSISTENT,
),
...options,
headers: this.mergeHeaders(options?.headers),
Expand Down
20 changes: 10 additions & 10 deletions packages/microservices/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ export const REDIS_DEFAULT_HOST = 'localhost';
export const NATS_DEFAULT_URL = 'nats://localhost:4222';
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883';
export const GRPC_DEFAULT_URL = 'localhost:5000';
export const RQM_DEFAULT_URL = 'amqp://localhost';
export const RMQ_DEFAULT_URL = 'amqp://localhost';
export const KAFKA_DEFAULT_BROKER = 'localhost:9092';
export const KAFKA_DEFAULT_CLIENT = 'nestjs-consumer';
export const KAFKA_DEFAULT_GROUP = 'nestjs-group';
export const MQTT_SEPARATOR = '/';
export const MQTT_WILDCARD_SINGLE = '+';
export const MQTT_WILDCARD_ALL = '#';
export const RQM_DEFAULT_QUEUE = '';
export const RQM_DEFAULT_PREFETCH_COUNT = 0;
export const RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RQM_DEFAULT_QUEUE_OPTIONS = {};
export const RQM_DEFAULT_NOACK = true;
export const RQM_DEFAULT_PERSISTENT = false;
export const RQM_DEFAULT_NO_ASSERT = false;
export const RMQ_DEFAULT_QUEUE = '';
export const RMQ_DEFAULT_PREFETCH_COUNT = 0;
export const RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT = false;
export const RMQ_DEFAULT_QUEUE_OPTIONS = {};
export const RMQ_DEFAULT_NOACK = true;
export const RMQ_DEFAULT_PERSISTENT = false;
export const RMQ_DEFAULT_NO_ASSERT = false;
export const RMQ_SEPARATOR = '.';
export const RMQ_WILDCARD_SINGLE = '*';
export const RMQ_WILDCARD_ALL = '#';
Expand All @@ -40,12 +40,12 @@ export const PARAM_ARGS_METADATA = ROUTE_ARGS_METADATA;
export const REQUEST_PATTERN_METADATA = 'microservices:request_pattern';
export const REPLY_PATTERN_METADATA = 'microservices:reply_pattern';

export const RQM_NO_EVENT_HANDLER = (
export const RMQ_NO_EVENT_HANDLER = (
text: TemplateStringsArray,
pattern: string,
) =>
`An unsupported event was received. It has been negative acknowledged, so it will not be re-delivered. Pattern: ${pattern}`;
export const RQM_NO_MESSAGE_HANDLER = (
export const RMQ_NO_MESSAGE_HANDLER = (
text: TemplateStringsArray,
pattern: string,
) =>
Expand Down
36 changes: 18 additions & 18 deletions packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ import {
RMQ_SEPARATOR,
RMQ_WILDCARD_ALL,
RMQ_WILDCARD_SINGLE,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_NOACK,
RQM_DEFAULT_NO_ASSERT,
RQM_DEFAULT_PREFETCH_COUNT,
RQM_DEFAULT_QUEUE,
RQM_DEFAULT_QUEUE_OPTIONS,
RQM_DEFAULT_URL,
RQM_NO_EVENT_HANDLER,
RQM_NO_MESSAGE_HANDLER,
RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RMQ_DEFAULT_NOACK,
RMQ_DEFAULT_NO_ASSERT,
RMQ_DEFAULT_PREFETCH_COUNT,
RMQ_DEFAULT_QUEUE,
RMQ_DEFAULT_QUEUE_OPTIONS,
RMQ_DEFAULT_URL,
RMQ_NO_EVENT_HANDLER,
RMQ_NO_MESSAGE_HANDLER,
} from '../constants';
import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
Expand Down Expand Up @@ -74,13 +74,13 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {

constructor(protected readonly options: Required<RmqOptions>['options']) {
super();
this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
this.urls = this.getOptionsProp(this.options, 'urls') || [RMQ_DEFAULT_URL];
this.queue =
this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
this.noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
this.getOptionsProp(this.options, 'queue') || RMQ_DEFAULT_QUEUE;
this.noAck = this.getOptionsProp(this.options, 'noAck', RMQ_DEFAULT_NOACK);
this.queueOptions =
this.getOptionsProp(this.options, 'queueOptions') ||
RQM_DEFAULT_QUEUE_OPTIONS;
RMQ_DEFAULT_QUEUE_OPTIONS;

this.loadPackage('amqplib', ServerRMQ.name, () => require('amqplib'));
rmqPackage = this.loadPackage(
Expand Down Expand Up @@ -189,7 +189,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
const noAssert =
this.getOptionsProp(this.options, 'noAssert') ??
this.queueOptions.noAssert ??
RQM_DEFAULT_NO_ASSERT;
RMQ_DEFAULT_NO_ASSERT;

if (!noAssert) {
await channel.assertQueue(this.queue, this.queueOptions);
Expand All @@ -198,12 +198,12 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
const isGlobalPrefetchCount = this.getOptionsProp(
this.options,
'isGlobalPrefetchCount',
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RMQ_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
);
const prefetchCount = this.getOptionsProp(
this.options,
'prefetchCount',
RQM_DEFAULT_PREFETCH_COUNT,
RMQ_DEFAULT_PREFETCH_COUNT,
);

if (this.options.exchange || this.options.wildcards) {
Expand Down Expand Up @@ -279,7 +279,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {

if (!handler) {
if (!this.noAck) {
this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`);
this.logger.warn(RMQ_NO_MESSAGE_HANDLER`${pattern}`);
this.channel!.nack(rmqContext.getMessage() as Message, false, false);
}
const status = 'error';
Expand Down Expand Up @@ -324,7 +324,7 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
const handler = this.getHandlerByPattern(pattern);
if (!handler && !this.noAck) {
this.channel!.nack(context.getMessage() as Message, false, false);
return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`);
return this.logger.warn(RMQ_NO_EVENT_HANDLER`${pattern}`);
}
return super.handleEvent(pattern, packet, context);
}
Expand Down