Compare commits

...

1 Commits

Author SHA1 Message Date
Taylor McKinnon e3e984eaa5 stash 2024-05-31 10:15:22 -07:00
2 changed files with 182 additions and 2 deletions

178
lib/clustering/RemoteRPC.ts Normal file
View File

@ -0,0 +1,178 @@
import assert from 'assert';
import * as http from 'http';
import { Server as IOServer } from 'socket.io';
import { Logger } from 'werelogs';
import { BaseClient, BaseService, Callback } from "../network/rpc/rpc";
import * as sioStream from '../network/rpc/sio-stream';
import { sendWorkerCommand } from './ClusterRPC';
// export async function sendWorkerCommand(
// toWorkers: string,
// toHandler: string,
// uids: string,
// payload: object,
// timeoutMs: number = 60000
// ) {
function relayCommand(
env: Record<string, any>,
toWorkers: string,
toHandler: string,
uids: string,
payload: object,
timeoutMs: number = 60000,
cb: (err: Error | null, res: any) => void
) {
sendWorkerCommand(toWorkers, toHandler, uids, payload, timeoutMs)
.then(res => cb(null, res))
.catch(err => cb(err, null));
}
class RelayService extends BaseService {
constructor(...args: ConstructorParameters<typeof BaseService>) {
super(...args);
const api = {
relayCommand,
};
this.registerAsyncAPI(api);
}
}
type RelayClientParams = ConstructorParameters<typeof BaseClient>[0] & {
namespace: string;
apiVersion?: string;
}
class RelayClient extends BaseClient {
private _relayService: RelayService;
constructor(params: RelayClientParams) {
super(params);
this._relayService = new RelayService(params);
}
connect(cb: Callback) {
super.connect((err?: Error | null, data?: any) => {
if (err) {
return cb(err);
}
this.socket.on("call", (remoteCall: string, args: any, cb: Callback) => {
const decodedArgs = this.socketStreams.decodeStreams(args);
this._relayService._onCall(remoteCall, decodedArgs, (err, res) => {
if (err) {
return cb(err);
}
const encodedRes = this.socketStreams.encodeStreams(res);
return cb(err, encodedRes);
});
});
return cb();
});
}
async sendWorkerCommand(
toWorkers: string,
toHandler: string,
uids: string,
payload: object,
timeoutMs: number = 60000
) {
//@ts-expect-error
return this.relayCommand(toWorkers, toHandler, uids, payload, timeoutMs);
}
}
class PrimaryClient extends BaseClient {
async sendWorkerCommand(
toWorkers: string,
toHandler: string,
uids: string,
payload: object,
timeoutMs: number = 60000
) {
return sendWorkerCommand(toWorkers, toHandler, uids, payload, timeoutMs);
}
}
/**
* @brief create a server object that serves remote requests through
* socket.io events.
*
* Services associated to namespaces (aka. URL base path) must be
* registered thereafter on this server.
*
* Each service may customize the sending and reception of RPC
* messages through subclassing, e.g. LevelDbService looks up a
* particular sub-level before forwarding the RPC, providing it the
* target sub-level handle.
*
* @param params - params object
* @param params.logger - logger object
* @param [params.streamMaxPendingAck] - max number of
* in-flight output stream packets sent to the server without an ack
* received yet
* @param [params.streamAckTimeoutMs] - timeout for receiving
* an ack after an output stream packet is sent to the server
* @return a server object, not yet listening on a TCP port
* (you must call listen(port) on the returned object)
*/
export function RPCRelay(params: {
logger: Logger;
streamMaxPendingAck?: number;
streamAckTimeoutMs?: number;
}) {
assert(params.logger);
const httpServer = http.createServer();
const server = new IOServer(httpServer, { maxHttpBufferSize: 1e8 });
const log = params.logger;
/**
* register a list of service objects on this server
*
* It's not necessary to call this function if you provided a
* "server" parameter to the service constructor.
*
* @param {BaseService} serviceList - list of services to register
*/
(server as any).registerServices = function registerServices(...serviceList: any[]) {
serviceList.forEach(service => {
const sock = this.of(service.namespace);
sock.on('connection', conn => {
const streamsSocket = sioStream.createSocket(
conn,
params.logger,
params.streamMaxPendingAck,
params.streamAckTimeoutMs);
conn.on('error', err => {
log.error('error on socket.io connection',
{ namespace: service.namespace, error: err });
});
conn.on('call', (remoteCall, args, cb) => {
const decodedArgs = streamsSocket.decodeStreams(args);
service._onCall(remoteCall, decodedArgs, (err, res) => {
if (err) {
return cb(err);
}
const encodedRes = streamsSocket.encodeStreams(res);
return cb(err, encodedRes);
});
});
});
});
};
(server as any).listen = function listen(port, bindAddress = undefined) {
httpServer.listen(port, bindAddress);
};
return server;
}

View File

@ -135,6 +135,7 @@ export class BaseClient extends EventEmitter {
getCallTimeout() { getCallTimeout() {
return this.callTimeoutMs; return this.callTimeoutMs;
} }
setCallTimeout(newTimeoutMs: number) { setCallTimeout(newTimeoutMs: number) {
this.callTimeoutMs = newTimeoutMs; this.callTimeoutMs = newTimeoutMs;
} }
@ -290,7 +291,7 @@ export class BaseService {
namespace: string; namespace: string;
logger: Logger; logger: Logger;
apiVersion?: string; apiVersion?: string;
server: any; server?: any;
}) { }) {
const { namespace, logger, apiVersion, server } = params; const { namespace, logger, apiVersion, server } = params;
assert(namespace); assert(namespace);
@ -493,10 +494,11 @@ export function RPCServer(params: {
logger: Logger; logger: Logger;
streamMaxPendingAck?: number; streamMaxPendingAck?: number;
streamAckTimeoutMs?: number; streamAckTimeoutMs?: number;
httpServer?: http.Server;
}) { }) {
assert(params.logger); assert(params.logger);
const httpServer = http.createServer(); const httpServer = params.httpServer ? params.httpServer : http.createServer();
const server = new IOServer(httpServer, { maxHttpBufferSize: 1e8 }); const server = new IOServer(httpServer, { maxHttpBufferSize: 1e8 });
const log = params.logger; const log = params.logger;