import { ensureTopicData, fromCustomMessage, getEventSubKey, toWebProcessMessage, type WsServerSendTopicPayload, type WsServerSendTopics, type WsTopicPayload } from "../utils/common"; import { EventSubscribeService } from "./EventSubscribeService"; import type { WsPaylaod } from "../types"; const publicTopics: WsServerSendTopics[] = [ 'MEMBER_CHANGE', 'MY_CLIENT_ONLINE', 'DEBUG_MSG', 'EVENT_MEMBER_CHANGE', "MSG", ]; type BunServerWebSocket = Bun.ServerWebSocket; export class WebSocketService { static #connections = new Set(); static #userSubTopics = new Map>(); static #userClients = new Map>(); static async addConnection(ws: BunServerWebSocket) { this.#connections.add(ws); const user = ws.data.user; let isNewMember = false; if (!this.#userClients.has(user.sub)) { this.#userClients.set(user.sub, new Set()); isNewMember = true; } this.#userClients.get(user.sub)?.add(ws); await this.#initSubscribe(ws); if (isNewMember) { this.broadcast('MEMBER_CHANGE', this.#userClients.size ?? 0); } this.userClientsBroadcast( ws.data.user.sub, 'MY_CLIENT_ONLINE', this.#userClients.get(user.sub)?.size ?? 0, ); this.userClientsBroadcast( ws.data.user.sub, 'DEBUG_MSG', `SubscribeKeys:\n${[...(this.#userSubTopics.get(ws.data.user.sub) ?? [])].join('|')}`, ); } static async #initSubscribe(ws: BunServerWebSocket) { const user = ws.data.user; const subEvets = await EventSubscribeService.getEvents(user.sub).then(e => e.map(v => getEventSubKey(v))); // this.userSub(ws, user.sub, publicTopics); this.userSub(ws, user.sub, subEvets); // this.userSub(ws, user.sub, [`MSG:${user.sub}`]); } static publish(ws: BunServerWebSocket, topic: string, message: string, withSelf?: boolean) { ws.publish(topic, message); if (withSelf) { ws.send(message); } } static async userClientsBroadcast(user: string, topic: T, data: WsServerSendTopicPayload[T]) { this.#userClients.get(user)?.forEach(ws => { if (!ws.isSubscribed(topic)) return; ws.send(JSON.stringify({ topic, data })); }); } static removeConnection(ws: BunServerWebSocket) { this.#connections.delete(ws); console.debug('Someone disconnected. User: ', ws.data.user.sub); this.#userClients.get(ws.data.user.sub)?.delete(ws); if (this.#userClients.get(ws.data.user.sub)?.size === 0) { this.#userClients.delete(ws.data.user.sub); this.publish(ws, 'MEMBER_CHANGE', toWebProcessMessage('MEMBER_CHANGE', this.#userClients.size)); } this.userClientsBroadcast( ws.data.user.sub, 'MY_CLIENT_ONLINE', this.#userClients.get(ws.data.user.sub)?.size ?? 0, ); } static userSub(ws: BunServerWebSocket, user: string, topics: string[]) { if (!this.#userSubTopics.has(user)) { this.#userSubTopics.set(user, new Set()); } if (!topics.length) return; topics.forEach(topic => { this.#userSubTopics.get(user)?.add(topic); ws.subscribe(topic); }); } static userNotificate(user: string, topic: T, data: WsTopicPayload[T]) { const connections = this.#userClients.get(user); if (!connections?.size) return; connections.forEach(con => { if (con.isSubscribed(topic)) { con.sendText(toWebProcessMessage(topic, data)); } }); } static broadcast(topic: T, data: WsTopicPayload[T], checkTopic?: T | string) { console.debug('broadcast', { topic, data }); this.#connections.values().forEach((con, i) => { const isSubscribed = con.isSubscribed(topic || checkTopic); console.debug(`CheckTopicSubscribed`, { [topic]: con.isSubscribed(topic), ...checkTopic ? { [checkTopic]: con.isSubscribed(checkTopic) } : {}, }); if (isSubscribed) { console.debug('Send broadcast to [%s]: %s', i + 1, con.data.user.sub, { topic, data }); con.send(toWebProcessMessage(topic, data)); } }); } static processMessage(ws: BunServerWebSocket, message: string | Buffer) { const { clientTopic: action, data } = fromCustomMessage(message.toString()); if (!action) return; console.debug('Recieve action: %s', action, message); switch (action) { case 'SUB': { const topic = ensureTopicData<'SUB'>(data)?.topic ?? ''; ws.subscribe(topic); if (!this.#userSubTopics.get(ws.data.user.sub)) { this.#userSubTopics.set(ws.data.user.sub, new Set()); } this.#userSubTopics.get(ws.data.user.sub)?.add(topic); console.debug('Subs', this.#userSubTopics.get(ws.data.user.sub)); this.userClientsBroadcast(ws.data.user.sub, 'DEBUG_MSG', `Some client subscribed to: ${topic}`); break; } case 'UNSUB': { const topic = ensureTopicData<'UNSUB'>(data)?.topic ?? ''; ws.unsubscribe(topic); if (!this.#userSubTopics.get(ws.data.user.sub)) { this.#userSubTopics.set(ws.data.user.sub, new Set()); } this.#userSubTopics.get(ws.data.user.sub)?.delete(topic); console.debug('Subs', this.#userSubTopics.get(ws.data.user.sub)); break; } case 'MSG': { const msgData = ensureTopicData<'MSG'>(data); if (!msgData) return; this.broadcast('MSG', msgData, `MSG:${msgData.channel}`); break; } default: break; } } }