183 lines
4.6 KiB
TypeScript
183 lines
4.6 KiB
TypeScript
import {
|
|
connect,
|
|
wsconnect,
|
|
deferred,
|
|
type Msg,
|
|
type NatsConnection,
|
|
type PublishOptions,
|
|
type RequestOptions,
|
|
type Subscription,
|
|
type SubscriptionOptions,
|
|
} from "@nats-io/transport-node";
|
|
import process from "node:process";
|
|
|
|
export type IteratorResult<T> = { done: true; value?: T } | { done: false; value: T };
|
|
|
|
// NatsClass provides utility functions to interact with NATS
|
|
export default class NatsClass {
|
|
server: string | string[];
|
|
private nc: NatsConnection | null = null;
|
|
|
|
public interrupted = deferred();
|
|
|
|
constructor(server: string | string[]) {
|
|
// Handle termination signals
|
|
["SIGINT", "SIGTERM"].forEach((signal) => {
|
|
process.on(signal, () => {
|
|
console.log("Received", signal);
|
|
this.interrupted.resolve();
|
|
setTimeout(() => {
|
|
console.log("forced process exit");
|
|
process.exit(2);
|
|
}, 10000);
|
|
});
|
|
});
|
|
|
|
this.server = server;
|
|
}
|
|
|
|
// connect to the nats server
|
|
connect = async () => {
|
|
const isWebsocket =
|
|
typeof this.server === "string"
|
|
? this.server.startsWith("ws")
|
|
: this.server[0]?.startsWith("ws") ?? false;
|
|
this.nc = await (isWebsocket ? wsconnect : connect)({
|
|
servers: this.server,
|
|
debug: false,
|
|
maxReconnectAttempts: -1,
|
|
reconnectTimeWait: 1000,
|
|
ignoreClusterUpdates: false,
|
|
noRandomize: false,
|
|
});
|
|
console.log(`connected ${this.nc.getServer()}`);
|
|
};
|
|
|
|
// close the connection to the nats server
|
|
close = async () => {
|
|
if (this.nc) {
|
|
await this.nc.flush();
|
|
await this.nc.drain();
|
|
this.nc = null;
|
|
}
|
|
};
|
|
|
|
// get the server version
|
|
serverVersion = (): string => {
|
|
return this.nc?.info?.version ?? "Not connected";
|
|
};
|
|
|
|
// publish a message to the nats server
|
|
publish = (
|
|
subject: string,
|
|
data: string | Uint8Array,
|
|
options?: PublishOptions
|
|
) => {
|
|
if (!this.nc) {
|
|
throw new Error("Not connected");
|
|
}
|
|
return this.nc.publish(subject, data, options);
|
|
};
|
|
|
|
// publish data as json, only if data is not a string
|
|
publishJson = (subject: string, data: any, options?: PublishOptions) => {
|
|
const payload = typeof data === "string" ? data : JSON.stringify(data);
|
|
return this.publish(subject, payload, options);
|
|
};
|
|
|
|
// publish data as json, only if data is not a string
|
|
publishWithType = async <T>(
|
|
subject: string,
|
|
data?: T,
|
|
options?: PublishOptions
|
|
): Promise<void> => {
|
|
await this.publishJson(subject, data, options);
|
|
};
|
|
|
|
// sends a request and returns the response message
|
|
request = (
|
|
subject: string,
|
|
data?: string | Uint8Array,
|
|
options?: RequestOptions
|
|
): Promise<Msg> => {
|
|
if (!this.nc) {
|
|
throw new Error("Not connected");
|
|
}
|
|
return this.nc.request(subject, data, options);
|
|
};
|
|
|
|
requestJson = (
|
|
subject: string,
|
|
data?: any,
|
|
options?: RequestOptions
|
|
): Promise<Msg> => {
|
|
const payload = typeof data === "string" ? data : JSON.stringify(data);
|
|
return this.request(subject, payload, options);
|
|
};
|
|
|
|
// request with request and response type
|
|
requestWithType = async <TRequest, TResponse>(
|
|
subject: string,
|
|
data?: TRequest,
|
|
options?: RequestOptions
|
|
): Promise<TResponse> => {
|
|
const msg = await this.requestJson(subject, data, options);
|
|
return msg.json<TResponse>();
|
|
};
|
|
|
|
// flush the connection to the nats server
|
|
flush = async () => {
|
|
if (!this.nc) {
|
|
throw new Error("Not connected");
|
|
}
|
|
await this.nc.flush();
|
|
};
|
|
|
|
// subscribe to a subject, calls nats subscribe function
|
|
subscribe = (subject: string, opts?: SubscriptionOptions): Subscription => {
|
|
if (!this.nc) {
|
|
throw new Error("Not connected");
|
|
}
|
|
return this.nc.subscribe(subject, opts);
|
|
};
|
|
|
|
subscribeCb = (
|
|
subject: string,
|
|
callback: (err: Error | null, msg: Msg) => void,
|
|
opts?: SubscriptionOptions
|
|
): Subscription => {
|
|
if (!this.nc) {
|
|
throw new Error("Not connected");
|
|
}
|
|
const subOpts = {
|
|
...opts,
|
|
callback,
|
|
};
|
|
return this.nc.subscribe(subject, subOpts);
|
|
};
|
|
|
|
// subscribe to a subject and decode messages to specified type
|
|
subscribeToType = <T>(
|
|
subject: string,
|
|
opts?: SubscriptionOptions
|
|
): AsyncIterableIterator<T> => {
|
|
const sub = this.subscribe(subject, opts);
|
|
const iterator = sub[Symbol.asyncIterator]();
|
|
return {
|
|
async next(): Promise<IteratorResult<T>> {
|
|
const result = await iterator.next();
|
|
if (result.done) {
|
|
return result;
|
|
}
|
|
return {
|
|
done: false,
|
|
value: result.value.json<T>(),
|
|
};
|
|
},
|
|
[Symbol.asyncIterator]() {
|
|
return this;
|
|
},
|
|
};
|
|
};
|
|
}
|
|
|