import { fromCustomMessage, toCustomMessage, WSTopic } from "../utils/common"; import { EventSubscribeService } from "./EventSubscribeService"; import type { WsPaylaod } from "../types"; const publicTopics = [ WSTopic.MEMBER_CHANGE, ]; type BunServerWebSocket = Bun.ServerWebSocket; export class WebSocketService { static #connections = new Set(); static #userSubTopics = new Map>(); static #userClients = new Map>(); static async addConnection(ws: BunServerWebSocket) { this.#connections.add(ws); const user = ws.data.user; let isNewMember = false; if (!this.#userClients.has(user.sub)) { this.#userClients.set(user.sub, new Set()); isNewMember = true; } this.#userClients.get(user.sub)?.add(ws); await this.#initSubscribe(ws); const message = toCustomMessage(WSTopic.MEMBER_CHANGE, this.#userClients.size); if (isNewMember) { this.broadcast(WSTopic.MEMBER_CHANGE, message); } else { ws.send(message); } this.userClientsBroadcast( ws.data.user.sub, WSTopic.MY_CLIENT_ONLINE, toCustomMessage(WSTopic.MY_CLIENT_ONLINE, this.#userClients.get(user.sub)?.size), ); } static async #initSubscribe(ws: BunServerWebSocket) { const user = ws.data.user const subEvets = await EventSubscribeService.getEvents(user.sub).then(e => e.map(v => `event:${v}`)); this.userSub(ws, user.sub, publicTopics); this.userSub(ws, user.sub, subEvets); } static publish(ws: BunServerWebSocket, topic: string, message: string, withSelf?: boolean) { ws.publish(topic, message); if (withSelf) { ws.send(message); } } static async userClientsBroadcast(user: string, topic: string, message: string) { this.#userClients.get(user)?.forEach(ws => ws.send(message)); } static removeConnection(ws: BunServerWebSocket) { this.#connections.delete(ws); console.debug('Someone disconnected. User: ', ws.data.user.sub); this.#userClients.get(ws.data.user.sub)?.delete(ws); if (this.#userClients.get(ws.data.user.sub)?.size === 0) { this.#userClients.delete(ws.data.user.sub); this.publish(ws, WSTopic.MEMBER_CHANGE, toCustomMessage(WSTopic.MEMBER_CHANGE, this.#userClients.size)); } this.userClientsBroadcast( ws.data.user.sub, WSTopic.MY_CLIENT_ONLINE, toCustomMessage(WSTopic.MY_CLIENT_ONLINE, this.#userClients.get(ws.data.user.sub)?.size), ); } static userSub(ws: BunServerWebSocket, user: string, topics: string[]) { if (!this.#userSubTopics.has(user)) { this.#userSubTopics.set(user, new Set()); } if (!topics.length) return; console.debug('User %s subscribe keys: %s', user, topics.join(',')); topics.forEach(topic => { this.#userSubTopics.get(user)?.add(topic); ws.subscribe(topic); }); } static broadcast(topic: string, message: string) { this.#connections.values().forEach(con => { if (con.isSubscribed(topic)) { con.send(message); } }); } static processMessage(ws: BunServerWebSocket, message: string | Buffer) { const { clientTopic: action, data } = fromCustomMessage(message.toString()); if (!action) return; console.debug('Recieve action: %s', action, message); switch (action) { case WSTopic.SUB: ws.subscribe(data.topic); break; case WSTopic.UNSUB: ws.unsubscribe(data.topic); break; case WSTopic.SEND: ws.publish(data.topic, data.message); break; default: this.broadcast("Test", 'This is a broadcast message. Everyone should recieved.'); break; } } }