: refactor websocket

This commit is contained in:
StarAppeal
2024-02-21 00:27:30 +01:00
parent dd082eb922
commit 5a45da27be
6 changed files with 131 additions and 44 deletions
+15 -2
View File
@@ -1,5 +1,6 @@
import express, { Application, Request, Response } from "express";
import { ExtendedWebSocketServer } from "./websocket";
import { DecodedToken } from "./interfaces/decodedToken";
export class RestAPI {
constructor(
@@ -21,11 +22,23 @@ export class RestAPI {
this.app.post("/send-message", (req, res) => {
const message = req.body.message;
const user = req.body.user;
const users: Array<string> = req.body.users;
this.webSocketServer.sendMessageToUser(user, message);
users.forEach((user) =>
this.webSocketServer.sendMessageToUser(user, message),
);
res.status(200).send("OK");
});
this.app.get("/all-clients", (req, res) => {
const connectedClients = this.webSocketServer.getConnectedClients();
const result: Array<DecodedToken> = [];
connectedClients.forEach((client) => result.push(client.payload));
res.status(200).send(result);
});
}
}
+1
View File
@@ -16,6 +16,7 @@ export function verifyClient(
} else {
jwt.verify(token, process.env.SECRET_KEY as string, (err, decoded) => {
if (err) {
console.log(err);
reject(request, callback);
} else {
(request as ExtendedIncomingMessage).payload = decoded as DecodedToken;
@@ -0,0 +1,26 @@
import { ExtendedWebSocket } from "../../interfaces/extendedWebsocket";
export class WebsocketEventHandler {
constructor(private webSocket: ExtendedWebSocket) {}
public enableErrorEvent() {
this.webSocket.on("error", console.error);
}
//needed for the heartbeat mechanism
public enablePongEvent() {
this.webSocket.on("pong", () => {
this.webSocket.isAlive = true;
console.log("Pong received");
});
}
public enableMessageEvent() {
this.webSocket.on("message", (data) => {
const message = data.toString();
console.log("Received message:", message);
// just echo the message back to the client
this.webSocket.send(message);
});
}
}
@@ -0,0 +1,42 @@
import { ExtendedWebSocket } from "../../interfaces/extendedWebsocket";
import { ExtendedIncomingMessage } from "../../interfaces/extendedIncomingMessage";
import { Server as WebSocketServer } from "ws";
import { heartbeat } from "./websocketServerHeartbeatInterval";
export class WebsocketServerEventHandler {
private readonly heartbeat: () => void;
constructor(private webSocketServer: WebSocketServer) {
this.heartbeat = heartbeat(this.webSocketServer);
}
public enableConnectionEvent(
callback: (ws: ExtendedWebSocket, request: ExtendedIncomingMessage) => void,
) {
this.webSocketServer.on(
"connection",
(ws: ExtendedWebSocket, request: ExtendedIncomingMessage) => {
// first: map the payload from the request to the ws object
ws.payload = request.payload;
// second: set the isAlive flag to true
ws.isAlive = true;
// last: call the callback function
callback(ws, request);
},
);
}
public enableHeartbeat(interval: number) {
return setInterval(() => {
this.heartbeat();
}, interval);
}
public enableCloseEvent(callback: () => void) {
this.webSocketServer.on("close", () => {
console.log("WebSocket server closed");
callback();
});
}
}
@@ -0,0 +1,23 @@
import { WebSocket, WebSocketServer } from "ws";
import { DecodedToken } from "../../interfaces/decodedToken";
export function heartbeat(wss: WebSocketServer) {
return () => {
wss.clients.forEach(
(ws: WebSocket & { isAlive?: boolean; payload?: DecodedToken }) => {
console.log(
new Date().toLocaleString("de-DE") +
":" +
ws.payload?.name +
": isAlive: " +
ws.isAlive,
);
if (!ws.isAlive) return ws.terminate();
ws.send("keepalive");
ws.isAlive = false;
ws.ping();
},
);
};
}
+24 -42
View File
@@ -1,65 +1,39 @@
import { Server } from "http";
import { WebSocket, Server as WebSocketServer } from "ws";
import { verifyClient } from "./utils/verifyClient";
import { ExtendedIncomingMessage } from "./interfaces/extendedIncomingMessage";
import { ExtendedWebSocket } from "./interfaces/extendedWebsocket";
import { DecodedToken } from "./interfaces/decodedToken";
import { WebsocketServerEventHandler } from "./utils/websocket/websocketServerEventHandler";
import { WebsocketEventHandler } from "./utils/websocket/websocketEventHandler";
export class ExtendedWebSocketServer {
private wss: WebSocketServer;
private readonly _wss: WebSocketServer;
constructor(server: Server) {
this.wss = new WebSocketServer({
this._wss = new WebSocketServer({
server,
verifyClient: (info, callback) => verifyClient(info.req, callback),
});
this.setupWebSocket();
const interval = setInterval(() => {
this.wss.clients.forEach(
(ws: WebSocket & { isAlive?: boolean; payload?: DecodedToken }) => {
console.log(ws.payload?.name + ": isAlive: " + ws.isAlive);
if (!ws.isAlive) return ws.terminate();
ws.send("keepalive");
ws.isAlive = false;
ws.ping();
},
);
}, 30000);
this.wss.on("close", function close() {
clearInterval(interval);
});
}
private setupWebSocket() {
this.wss.on(
"connection",
(ws: ExtendedWebSocket, request: ExtendedIncomingMessage) => {
ws.payload = request.payload;
ws.isAlive = true;
const serverEventHandler = new WebsocketServerEventHandler(this.wss);
serverEventHandler.enableConnectionEvent((ws) => {
let socketEventHandler = new WebsocketEventHandler(ws);
console.log("WebSocket client connected");
console.log("WebSocket client connected");
ws.on("error", console.error);
socketEventHandler.enableErrorEvent();
socketEventHandler.enablePongEvent();
socketEventHandler.enableMessageEvent();
});
ws.on("pong", () => {
ws.isAlive = true;
});
ws.on("message", (data) => {
const message = data.toString();
console.log("Received message:", message);
const json: { username: string; message: string } =
JSON.parse(message);
this.sendMessageToUser(json.username, json.message);
});
},
);
const interval = serverEventHandler.enableHeartbeat(30000);
serverEventHandler.enableCloseEvent(() => {
clearInterval(interval);
});
}
public broadcast(message: string) {
@@ -82,4 +56,12 @@ export class ExtendedWebSocketServer {
},
);
}
public getConnectedClients(): Set<ExtendedWebSocket> {
return this.wss.clients as Set<ExtendedWebSocket>;
}
private get wss(): WebSocketServer {
return this._wss;
}
}