WIP: working sample, not a module yet

This commit is contained in:
Alexandre HEIM 2025-03-22 18:51:17 +01:00
parent f965f66ae7
commit 87143a6999
6 changed files with 281 additions and 1 deletions

183
NatsClass.ts Normal file
View file

@ -0,0 +1,183 @@
import {
connect,
wsconnect,
deferred,
type Msg,
type NatsConnection,
type PublishOptions,
type RequestOptions,
type Subscription,
type SubscriptionOptions,
} from "@nats-io/transport-node";
import process from "node:process";
export type IteratorResult<T> = { done: true; value?: T } | { done: false; value: T };
// NatsClass provides utility functions to interact with NATS
export default class NatsClass {
server: string | string[];
private nc: NatsConnection | null = null;
public interrupted = deferred();
constructor(server: string | string[]) {
// Handle termination signals
["SIGINT", "SIGTERM"].forEach((signal) => {
process.on(signal, () => {
console.log("Received", signal);
this.interrupted.resolve();
setTimeout(() => {
console.log("forced process exit");
process.exit(2);
}, 10000);
});
});
this.server = server;
}
// connect to the nats server
connect = async () => {
const isWebsocket =
typeof this.server === "string"
? this.server.startsWith("ws")
: this.server[0]?.startsWith("ws") ?? false;
this.nc = await (isWebsocket ? wsconnect : connect)({
servers: this.server,
debug: false,
maxReconnectAttempts: -1,
reconnectTimeWait: 1000,
ignoreClusterUpdates: false,
noRandomize: false,
});
console.log(`connected ${this.nc.getServer()}`);
};
// close the connection to the nats server
close = async () => {
if (this.nc) {
await this.nc.flush();
await this.nc.drain();
this.nc = null;
}
};
// get the server version
serverVersion = (): string => {
return this.nc?.info?.version ?? "Not connected";
};
// publish a message to the nats server
publish = (
subject: string,
data: string | Uint8Array,
options?: PublishOptions
) => {
if (!this.nc) {
throw new Error("Not connected");
}
return this.nc.publish(subject, data, options);
};
// publish data as json, only if data is not a string
publishJson = (subject: string, data: any, options?: PublishOptions) => {
const payload = typeof data === "string" ? data : JSON.stringify(data);
return this.publish(subject, payload, options);
};
// publish data as json, only if data is not a string
publishWithType = async <T>(
subject: string,
data?: T,
options?: PublishOptions
): Promise<void> => {
await this.publishJson(subject, data, options);
};
// sends a request and returns the response message
request = (
subject: string,
data?: string | Uint8Array,
options?: RequestOptions
): Promise<Msg> => {
if (!this.nc) {
throw new Error("Not connected");
}
return this.nc.request(subject, data, options);
};
requestJson = (
subject: string,
data?: any,
options?: RequestOptions
): Promise<Msg> => {
const payload = typeof data === "string" ? data : JSON.stringify(data);
return this.request(subject, payload, options);
};
// request with request and response type
requestWithType = async <TRequest, TResponse>(
subject: string,
data?: TRequest,
options?: RequestOptions
): Promise<TResponse> => {
const msg = await this.requestJson(subject, data, options);
return msg.json<TResponse>();
};
// flush the connection to the nats server
flush = async () => {
if (!this.nc) {
throw new Error("Not connected");
}
await this.nc.flush();
};
// subscribe to a subject, calls nats subscribe function
subscribe = (subject: string, opts?: SubscriptionOptions): Subscription => {
if (!this.nc) {
throw new Error("Not connected");
}
return this.nc.subscribe(subject, opts);
};
subscribeCb = (
subject: string,
callback: (err: Error | null, msg: Msg) => void,
opts?: SubscriptionOptions
): Subscription => {
if (!this.nc) {
throw new Error("Not connected");
}
const subOpts = {
...opts,
callback,
};
return this.nc.subscribe(subject, subOpts);
};
// subscribe to a subject and decode messages to specified type
subscribeToType = <T>(
subject: string,
opts?: SubscriptionOptions
): AsyncIterableIterator<T> => {
const sub = this.subscribe(subject, opts);
const iterator = sub[Symbol.asyncIterator]();
return {
async next(): Promise<IteratorResult<T>> {
const result = await iterator.next();
if (result.done) {
return result;
}
return {
done: false,
value: result.value.json<T>(),
};
},
[Symbol.asyncIterator]() {
return this;
},
};
};
}

17
PlugisClass.ts Normal file
View file

@ -0,0 +1,17 @@
import { CloudEvent } from "cloudevents";
import NatsClass from "./NatsClass";
export class PlugisClass extends NatsClass {
constructor(server: string | string[]) {
super(server);
}
async variableSet(variable: string, value: string) {
const ce = new CloudEvent({
type: "com.plugis.variable.set",
source: "/plugis/variable",
data: { variable, value },
});
this.publishJson("variable.set.>", ce);
}
}

View file

@ -1,3 +1,6 @@
# plugis-client # plugis-client
plugis client for typescript / bun plugis client for typescript / bun
npm config set @plugis:registry https://git.fibre.plugis.com/api/packages/telemac/npm/
npm config set -- '//git.fibre.plugis.com/api/packages/telemac/npm/:_authToken' "{token}"

7
bunfig.toml Normal file
View file

@ -0,0 +1,7 @@
[install.scopes]
# as an object with username/password
# you can reference environment variables
"@plugis" = {
username = "telemac",
url = "https://git.fibre.plugis.com/"
}

61
heartbeat-subscribe.ts Normal file
View file

@ -0,0 +1,61 @@
import { type HeartbeatSent } from "../../../cloudevents/com.plugis/heartbeat/Sent/types/ts/heartbeatSent.d";
import { CloudEvent } from "cloudevents";
import NatsClass from "./NatsClass";
import process from "node:process";
import { PlugisClass } from "./PlugisClass";
const nats = new PlugisClass("ws://nats.plugis.cloud:8222");
await nats.connect();
console.log("server version", nats.serverVersion());
// publish a heartbeat cloud event every 30 seconds
setInterval(() => {
const heartbeatSent: HeartbeatSent = {
hostname: process.env.HOSTNAME || require('os').hostname(),
timestamp: new Date().toISOString(),
mac: "00:11:22:33:44:55",
ip: "192.168.1.100",
started: new Date().toISOString(),
uptime: 1000,
version: "1.0.0",
"nats-service": "test-service",
os: process.platform,
arch: process.arch
};
const ce = new CloudEvent({
type: "com.plugis.heartbeat.Sent",
source: `${import.meta.url}#${new Error().stack?.split('\n')[1]?.split(':')[1] || '0'}`,
data: heartbeatSent,
});
nats.publishJson("com.plugis.heartbeat.Sent.TEST", ce);
}, 1000*60);
// subscribe using a callback
const _sub = nats.subscribeCb("remote.*.event", (err, msg) => {
if (err) {
console.error("error", err);
return;
}
const ce = msg.json<CloudEvent<Waypoint>>();
const waypoint = ce.data as Waypoint;
console.log("msg", msg.subject, "data", msg.string());
});
// subscribe using an iterator
const cloudEventsIterator = nats.subscribeToType<CloudEvent<HeartbeatSent>>("com.plugis.heartbeat.Sent.>");
(async () => {
for await (const ce of cloudEventsIterator) {
const heartbeatSent = ce.data as HeartbeatSent;
console.log("received cloud event", ce);
}
})();
// wait until the process is interrupted
await nats.interrupted;
console.log("interrupted");
// close nats connection
await nats.close();
console.log("nats.close done");
process.exit(0);

9
package.json Normal file
View file

@ -0,0 +1,9 @@
{
"name": "plugis-client",
"version": "0.1.0",
"main": "index.js",
"type": "module",
"publishConfig": {
"registry": "https://git.fibre.plugis.com"
}
}