refactoring and better userUpdate mechanism
This commit is contained in:
@@ -0,0 +1,16 @@
|
||||
import { UserModel } from './user';
|
||||
import {appEventBus, USER_UPDATED_EVENT} from "../../utils/eventBus";
|
||||
|
||||
export function watchUserChanges() {
|
||||
const changeStream = UserModel.watch([], { fullDocument: 'updateLookup' });
|
||||
|
||||
changeStream.on('change', (change) => {
|
||||
if (change.operationType === 'update' && change.fullDocument) {
|
||||
const updatedUser = change.fullDocument;
|
||||
|
||||
appEventBus.emit(USER_UPDATED_EVENT, updatedUser);
|
||||
}
|
||||
});
|
||||
|
||||
console.log("Watching for changes in the User collection...");
|
||||
}
|
||||
@@ -17,6 +17,7 @@ import { JwtAuthenticator } from "./utils/jwtAuthenticator";
|
||||
import { authenticateJwt } from "./rest/middleware/authenticateJwt";
|
||||
import { connectToDatabase, disconnectFromDatabase } from "./db/services/db/database.service";
|
||||
import { SpotifyTokenService } from "./db/services/spotifyTokenService";
|
||||
import {watchUserChanges} from "./db/models/userWatch";
|
||||
|
||||
interface ServerConfig {
|
||||
port: number;
|
||||
@@ -42,6 +43,8 @@ export class Server {
|
||||
public async start(): Promise<HttpServer> {
|
||||
await connectToDatabase();
|
||||
|
||||
watchUserChanges();
|
||||
|
||||
this.userService = await UserService.create();
|
||||
const spotifyTokenService = new SpotifyTokenService(this.config.spotifyClientId, this.config.spotifyClientSecret);
|
||||
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
import { EventEmitter } from 'events';
|
||||
export const appEventBus = new EventEmitter();
|
||||
|
||||
export const USER_UPDATED_EVENT = 'user:updated';
|
||||
@@ -1,18 +0,0 @@
|
||||
import {WebsocketEventType} from "./websocketEventType";
|
||||
import {CustomWebsocketEvent} from "./customWebsocketEvent";
|
||||
import {UserAsyncUpdateEvent} from "./updateUserEvent";
|
||||
import {NoData} from "./NoData";
|
||||
|
||||
export class StopUpdateUserEvent extends CustomWebsocketEvent<NoData> {
|
||||
|
||||
event = WebsocketEventType.STOP_UPDATE_USER;
|
||||
|
||||
handler = async () => {
|
||||
if (this.ws.asyncUpdates.has(UserAsyncUpdateEvent)) {
|
||||
clearInterval(this.ws.asyncUpdates.get(UserAsyncUpdateEvent));
|
||||
this.ws.asyncUpdates.delete(UserAsyncUpdateEvent);
|
||||
console.log("User updates stopped");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,28 +1,6 @@
|
||||
import {WebsocketEventType} from "./websocketEventType";
|
||||
import {CustomWebsocketEvent} from "./customWebsocketEvent";
|
||||
import {IUser} from "../../../db/models/user";
|
||||
import {CustomWebsocketEventUserService} from "./customWebsocketEventUserService";
|
||||
import {NoData} from "./NoData";
|
||||
|
||||
export const UserAsyncUpdateEvent = "USER_UPDATE";
|
||||
|
||||
export class UpdateUserEvent extends CustomWebsocketEventUserService<NoData> {
|
||||
event = WebsocketEventType.UPDATE_USER;
|
||||
|
||||
handler = async () => {
|
||||
console.log("Starting user updates")
|
||||
if (this.ws.asyncUpdates.has(UserAsyncUpdateEvent)) {
|
||||
console.log("User updates already running");
|
||||
return;
|
||||
}
|
||||
|
||||
this.ws.asyncUpdates.set(UserAsyncUpdateEvent, setInterval(async () => {
|
||||
const user = await this.userService.getUserByUUID(this.ws.payload.uuid);
|
||||
this.ws.emit(WebsocketEventType.UPDATE_USER_SINGLE, user);
|
||||
}, 1000 * 15));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
export class UpdateUserSingleEvent extends CustomWebsocketEvent<IUser> {
|
||||
event = WebsocketEventType.UPDATE_USER_SINGLE;
|
||||
|
||||
@@ -6,9 +6,8 @@ import {GetStateEvent} from "./getStateEvent";
|
||||
import {GetSingleWeatherUpdateEvent, GetWeatherUpdatesEvent} from "./getWeatherUpdatesEvent";
|
||||
import {StopSpotifyUpdatesEvent} from "./stopSpotifyUpdatesEvent";
|
||||
import {StopWeatherUpdatesEvent} from "./stopWeatherUpdatesEvent";
|
||||
import {UpdateUserEvent, UpdateUserSingleEvent} from "./updateUserEvent";
|
||||
import { UpdateUserSingleEvent} from "./updateUserEvent";
|
||||
import {CustomWebsocketEvent} from "./customWebsocketEvent";
|
||||
import {StopUpdateUserEvent} from "./stopUpdateUserEvent";
|
||||
import {UserService} from "../../../db/services/db/UserService";
|
||||
import {SpotifyTokenService} from "../../../db/services/spotifyTokenService";
|
||||
|
||||
@@ -22,9 +21,7 @@ export function getEventListeners(ws: ExtendedWebSocket, userService: UserServic
|
||||
new GetSingleWeatherUpdateEvent(ws),
|
||||
new GetWeatherUpdatesEvent(ws),
|
||||
new StopWeatherUpdatesEvent(ws),
|
||||
new UpdateUserEvent(ws, userService),
|
||||
new UpdateUserSingleEvent(ws),
|
||||
new StopUpdateUserEvent(ws),
|
||||
new ErrorEvent(ws)
|
||||
];
|
||||
}
|
||||
|
||||
+76
-67
@@ -1,13 +1,14 @@
|
||||
import {Server} from "http";
|
||||
import {Server as WebSocketServer, WebSocket} from "ws";
|
||||
import {verifyClient} from "./utils/verifyClient";
|
||||
import {ExtendedWebSocket} from "./interfaces/extendedWebsocket";
|
||||
import {DecodedToken} from "./interfaces/decodedToken";
|
||||
import {WebsocketServerEventHandler} from "./utils/websocket/websocketServerEventHandler";
|
||||
import {WebsocketEventHandler} from "./utils/websocket/websocketEventHandler";
|
||||
import {WebsocketEventType} from "./utils/websocket/websocketCustomEvents/websocketEventType";
|
||||
import {UserService} from "./db/services/db/UserService";
|
||||
import {SpotifyTokenService} from "./db/services/spotifyTokenService";
|
||||
import { Server } from "http";
|
||||
import { Server as WebSocketServer, WebSocket } from "ws";
|
||||
import { verifyClient } from "./utils/verifyClient";
|
||||
import { ExtendedWebSocket } from "./interfaces/extendedWebsocket";
|
||||
import { WebsocketServerEventHandler } from "./utils/websocket/websocketServerEventHandler";
|
||||
import { WebsocketEventHandler } from "./utils/websocket/websocketEventHandler";
|
||||
import { WebsocketEventType } from "./utils/websocket/websocketCustomEvents/websocketEventType";
|
||||
import { UserService } from "./db/services/db/UserService";
|
||||
import { SpotifyTokenService } from "./db/services/spotifyTokenService";
|
||||
import { appEventBus, USER_UPDATED_EVENT } from "./utils/eventBus";
|
||||
import { IUser } from "./db/models/user";
|
||||
|
||||
export class ExtendedWebSocketServer {
|
||||
private readonly _wss: WebSocketServer;
|
||||
@@ -15,80 +16,42 @@ export class ExtendedWebSocketServer {
|
||||
private readonly spotifyTokenService: SpotifyTokenService;
|
||||
|
||||
constructor(server: Server, userService: UserService, spotifyTokenService: SpotifyTokenService) {
|
||||
this.userService = userService;
|
||||
this.spotifyTokenService = spotifyTokenService;
|
||||
|
||||
this._wss = new WebSocketServer({
|
||||
server,
|
||||
verifyClient: (info, callback) => verifyClient(info.req, callback),
|
||||
});
|
||||
|
||||
this.userService = userService;
|
||||
this.spotifyTokenService = spotifyTokenService;
|
||||
|
||||
this.setupWebSocket();
|
||||
this._setupConnectionHandling();
|
||||
this._listenForAppEvents();
|
||||
}
|
||||
|
||||
private get wss(): WebSocketServer {
|
||||
return this._wss;
|
||||
}
|
||||
|
||||
public broadcast(message: string) {
|
||||
this.wss.clients.forEach((client) => {
|
||||
public broadcast(message: string): void {
|
||||
this.getConnectedClients().forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(message, {binary: false});
|
||||
client.send(message, { binary: false });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public sendMessageToUser(uuid: string, message: string) {
|
||||
this.wss.clients.forEach(
|
||||
(client: WebSocket & { payload?: DecodedToken }) => {
|
||||
if (
|
||||
client.payload?.uuid === uuid &&
|
||||
client.readyState === WebSocket.OPEN
|
||||
) {
|
||||
client.send(message, {binary: false});
|
||||
}
|
||||
},
|
||||
);
|
||||
public sendMessageToUser(uuid: string, message: string): void {
|
||||
const client = this._findClientByUUID(uuid);
|
||||
if (client && client.readyState === WebSocket.OPEN) {
|
||||
client.send(message, { binary: false });
|
||||
}
|
||||
}
|
||||
|
||||
public getConnectedClients(): Set<ExtendedWebSocket> {
|
||||
return this.wss.clients as Set<ExtendedWebSocket>;
|
||||
return this._wss.clients as Set<ExtendedWebSocket>;
|
||||
}
|
||||
|
||||
private setupWebSocket() {
|
||||
const serverEventHandler = new WebsocketServerEventHandler(this.wss, this.userService);
|
||||
private _setupConnectionHandling(): void {
|
||||
const serverEventHandler = new WebsocketServerEventHandler(this._wss, this.userService);
|
||||
|
||||
serverEventHandler.enableConnectionEvent((ws) => {
|
||||
const socketEventHandler = new WebsocketEventHandler(ws, this.userService, this.spotifyTokenService);
|
||||
|
||||
console.log("WebSocket client connected");
|
||||
|
||||
socketEventHandler.enableErrorEvent();
|
||||
socketEventHandler.enablePongEvent();
|
||||
socketEventHandler.enableMessageEvent();
|
||||
|
||||
// Register custom events
|
||||
socketEventHandler.registerCustomEvents();
|
||||
|
||||
socketEventHandler.enableDisconnectEvent(() => {
|
||||
console.log("User disconnected");
|
||||
});
|
||||
|
||||
// send initial state and settings
|
||||
// think about emitting the data needed directly to the event Handler
|
||||
ws.emit(WebsocketEventType.GET_SETTINGS, {});
|
||||
ws.emit(WebsocketEventType.GET_STATE, {});
|
||||
|
||||
// initiate update user event
|
||||
ws.emit(WebsocketEventType.UPDATE_USER, {});
|
||||
|
||||
const mode = ws.user.lastState?.global.mode;
|
||||
if (mode === "clock") {
|
||||
ws.emit(WebsocketEventType.GET_WEATHER_UPDATES, {})
|
||||
}
|
||||
|
||||
if (mode === "music") {
|
||||
ws.emit(WebsocketEventType.GET_SPOTIFY_UPDATES, {})
|
||||
}
|
||||
this._onNewClientReady(ws);
|
||||
});
|
||||
|
||||
const interval = serverEventHandler.enableHeartbeat(30000);
|
||||
@@ -96,4 +59,50 @@ export class ExtendedWebSocketServer {
|
||||
clearInterval(interval);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private _onNewClientReady(ws: ExtendedWebSocket): void {
|
||||
console.log("WebSocket client connected and authenticated");
|
||||
|
||||
const socketEventHandler = new WebsocketEventHandler(ws, this.userService, this.spotifyTokenService);
|
||||
|
||||
socketEventHandler.enableErrorEvent();
|
||||
socketEventHandler.enablePongEvent();
|
||||
socketEventHandler.enableMessageEvent();
|
||||
socketEventHandler.registerCustomEvents();
|
||||
socketEventHandler.enableDisconnectEvent(() => {
|
||||
console.log("User disconnected");
|
||||
});
|
||||
|
||||
// send initial state and settings
|
||||
ws.emit(WebsocketEventType.GET_SETTINGS, {});
|
||||
ws.emit(WebsocketEventType.GET_STATE, {});
|
||||
|
||||
const mode = ws.user.lastState?.global.mode;
|
||||
if (mode === "clock") {
|
||||
ws.emit(WebsocketEventType.GET_WEATHER_UPDATES, {});
|
||||
}
|
||||
if (mode === "music") {
|
||||
ws.emit(WebsocketEventType.GET_SPOTIFY_UPDATES, {});
|
||||
}
|
||||
}
|
||||
|
||||
private _listenForAppEvents(): void {
|
||||
appEventBus.on(USER_UPDATED_EVENT, (user: IUser) => {
|
||||
console.log(`Received update for user ${user.uuid}`);
|
||||
const client = this._findClientByUUID(user.uuid);
|
||||
if (client) {
|
||||
console.log(`Pushing update to user ${user.uuid}`);
|
||||
client.emit(WebsocketEventType.UPDATE_USER_SINGLE, user);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private _findClientByUUID(uuid: string): ExtendedWebSocket | undefined {
|
||||
for (const client of this.getConnectedClients()) {
|
||||
if (client.payload?.uuid === uuid) {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user