feat(WebSocket & Event System): Add real-time updates and event subscription

Introduces WebSocket connectivity for live data updates and a new subscription model for events.

Key changes:
- Integrated WebSocketService to handle open, message, and close events via the /ws endpoint.
- Added EventSubscribeService APIs to manage user subscriptions to specific events (matchId) via REST endpoints (`/api/subscribe-event` and `/api/subscribe-event/:matchId`).
- Implemented custom message protocol (JSON format) for WebSocket communication with defined topics (ONLINE_MEMBER_CHANGE, etc.).
- Updated database schema to include the `EventSubs` model for storing subscriptions.
- Refactored Dockerfile to use `bun prisma db push` for database migrations on startup.
- UI Updates:
  - Replaced the Rate component in FavButton with Star icons for better UX.
  - Adjusted layout of FavButton to be absolute positioned.
  - Added debounce to ClubSummary data fetching.
  - Removed unused `isMobile` import from ClubSummary.
- Utilities: Added helper functions `toCustomMessage` and `fromCustomMessage` for parsing WebSocket messages.
This commit is contained in:
kyuuseiryuu 2026-03-17 17:26:52 +09:00
parent c7faeb1b65
commit 09f3ecaca6
10 changed files with 205 additions and 16 deletions

View File

@ -1,6 +1,6 @@
FROM oven/bun:latest
COPY . /app
WORKDIR /app
RUN bun install && bunx --bun prisma generate
RUN bun install && bun prisma db push && bun prisma generate
ENTRYPOINT [ "bun", "start"]
EXPOSE 3000

View File

@ -0,0 +1,7 @@
-- CreateTable
CREATE TABLE `EventSubs` (
`logto_uid` VARCHAR(191) NOT NULL,
`event_id` VARCHAR(191) NOT NULL,
PRIMARY KEY (`logto_uid`, `event_id`)
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@ -17,4 +17,10 @@ model LogtoUserFav {
logto_uid String
kaiqiu_uid String
@@id([logto_uid, kaiqiu_uid])
}
model EventSubs {
logto_uid String
event_id String
@@id([logto_uid, event_id])
}

View File

@ -3,7 +3,7 @@ import { ChangeBackground } from "./ChangeBackground";
import { useMemo, useState } from "react";
import { useRequest } from "ahooks";
import type { ClubDetail } from "../types";
import { isMobile, MapType, openWebMapRaw } from "../utils/front";
import { MapType, openWebMapRaw } from "../utils/front";
import type { ItemType } from "antd/es/menu/interface";
import { NotificationOutlined, PushpinOutlined } from "@ant-design/icons";
@ -15,10 +15,9 @@ export const ClubSummary = (props: Props) => {
const [isArticleOpen, setIsArticleOpen] = useState(false);
const requestClubSummary = useRequest<ClubDetail, []>(async () => {
return fetch(`/api/club/${props.clubId}`).then(r => r.json());
}, { manual: false, refreshDeps: [props.clubId] })
}, { manual: false, refreshDeps: [props.clubId], debounceWait: 300 })
const info = useMemo(() => requestClubSummary.data, [requestClubSummary]);
const noArticle = !info?.article || info.article === '还没有公告';
const isMobileDevice = isMobile();
const mapMenu = useMemo<ItemType[]>(() => {
if (!info) return [];
return [

View File

@ -1,24 +1,19 @@
import { Rate } from "antd";
import { useCallback, useEffect, useState } from "react";
import { useFavPlayerStore, type FavPlayer } from "../store/useFavPlayerStore";
import styled from "styled-components";
import { useRequest } from "ahooks";
import { useLogto } from "@logto/react";
import { useAuthHeaders } from "../hooks/useAuthHeaders";
import { StarFilled, StarOutlined } from "@ant-design/icons";
interface Props {
user?: FavPlayer;
}
const StyledContainer = styled.div`
display: flex;
align-items: center;
.ant-rate {
margin: 0;
.ant-rate-star {
margin: 0;
}
}
position: absolute;
top: 20px;
right: 20px;
`;
export function FavButton(props: Props) {
@ -61,7 +56,11 @@ export function FavButton(props: Props) {
}, []);
return (
<StyledContainer>
<Rate allowClear count={1} value={value} onChange={handleFavClick} />
{value ? (
<StarFilled style={{ color: 'yellow' }} onClick={() => handleFavClick(0)} />
) : (
<StarOutlined onClick={() => handleFavClick(1)} />
)}
</StyledContainer>
);
}

View File

@ -10,6 +10,8 @@ import dayjs from "dayjs";
import utc from 'dayjs/plugin/utc';
import timezone from 'dayjs/plugin/timezone';
import type { IEventInfo } from "./types";
import { EventSubscribeService } from "./services/EventSubscribeService";
import { WebSocketService } from "./services/WebsocketService";
dayjs.extend(utc);
dayjs.extend(timezone);
@ -96,6 +98,40 @@ const server = serve({
} });
}
},
"/api/subscribe-event": {
async GET(req) {
const { sub = '' } = await verifyLogtoToken(req.headers);
if (!sub) return Response.json([]);
const events = await EventSubscribeService.getEvents(sub);
return Response.json(events);
}
},
"/api/subscribe-event/:matchId": {
async GET(req) {
const id = req.params.matchId; // 获取比赛ID
const { sub = '' } = await verifyLogtoToken(req.headers);
if (!sub) return Response.json({ ok: false, message: 'Not login.' });
return Response.json({
isSub: await EventSubscribeService.isSub(sub, id),
});
},
async PUT(req) {
const id = req.params.matchId; // 获取比赛ID
const { sub = '' } = await verifyLogtoToken(req.headers);
if (!sub) return Response.json({ ok: false, message: 'Not login.' });
return Response.json({
ok: await EventSubscribeService.sub(sub, id),
});
},
async DELETE(req) {
const id = req.params.matchId; // 获取比赛ID
const { sub = '' } = await verifyLogtoToken(req.headers);
if (!sub) return Response.json({ ok: false, message: 'Not login.' });
return Response.json({
ok: await EventSubscribeService.unSub(sub, id),
});
},
},
"/api/match/:matchId": {
async GET(req) {
const data = await getMatchInfo(req.params.matchId);
@ -189,8 +225,29 @@ const server = serve({
const data = await BattleService.getBattle(eventId, code);
return Response.json(data);
}
},
'/ws': {
async GET(req, server) {
const user = await verifyLogtoToken(req.headers).catch(() => undefined);
if (!user?.sub) return new Response("Unauthorized", {
status: 401,
});
server.upgrade(req, { data: JSON.stringify(user) as any });
return new Response('Upgraded');
}
}
},
websocket: {
open(ws) {
WebSocketService.addConnection(ws);
},
message(ws, message) {
WebSocketService.processMessage(ws, message);
},
close(ws, code, reason) {
WebSocketService.removeConnection(ws);
},
},
development: process.env.NODE_ENV !== "production" && {
// Enable browser hot reloading in development

View File

@ -0,0 +1,46 @@
import { prisma } from "../prisma/db";
import { xcxApi } from "../utils/server";
import { KaiqiuService } from "./KaiqiuService";
export class EventSubscribeService {
public static async sub(user: string, event: string) {
const success = await prisma.eventSubs.create({ data: { logto_uid: user, event_id: event }})
.then(() => true)
.catch(e => {
console.debug('Subscribe user: %s, event: %s, error: %s', user, event, e);
return false;
});
return success;
}
public static async unSub(user: string, event: string) {
const success = await prisma.eventSubs.deleteMany({
where: { logto_uid: user, event_id: event },
})
.then(() => true)
.catch(() => false);
return success;
}
public static isSub(user: string, event: string) {
return prisma.eventSubs.count({
where: { logto_uid: user, event_id: event },
}).then(value => value > 0);
}
public static async getUsers(event: string) {
const users = await prisma.eventSubs
.findMany({
where: { event_id: event },
select: { logto_uid: true },
distinct: 'logto_uid',
})
.then(value => value.map(e => e.logto_uid))
return users;
}
public static async getEvents(user?: string) {
const events = await prisma.eventSubs.findMany({
where: user ? { logto_uid: user } : {},
select: { event_id: true },
distinct: 'event_id',
}).then(value => value.map(e => e.event_id));
return events;
}
}

View File

@ -0,0 +1,52 @@
import type { JWTPayload } from "jose";
import { fromCustomMessage, toCustomMessage, WSTopic } from "../utils/common";
import { EventSubscribeService } from "./EventSubscribeService";
const publicTopics = [
WSTopic.ONLINE_MEMBER_CHANGE,
];
export class WebSocketService {
static #connections = new Set<Bun.ServerWebSocket>();
static #userSubKeys = new Map<string, Set<string>>();
static async addConnection(ws: Bun.ServerWebSocket) {
const user = JSON.parse(ws.data ?? '{}') as Required<JWTPayload>;
const subEvets = await EventSubscribeService.getEvents(user.sub).then(e => e.map(v => `event:${e}`));
this.#connections.add(ws);
this.userSub(ws, user.sub, publicTopics);
this.userSub(ws, user.sub, subEvets);
const message = toCustomMessage(WSTopic.ONLINE_MEMBER_CHANGE, this.#connections.size);
ws.send(message);
ws.publish(WSTopic.ONLINE_MEMBER_CHANGE, message);
}
static removeConnection(ws: Bun.ServerWebSocket) {
const user = JSON.parse(ws.data ?? '{}') as Required<JWTPayload>;
this.userUnSub(ws, user.sub, [...this.#userSubKeys.get(user.sub) ?? []])
this.userUnSub(ws, user.sub, publicTopics);
this.#connections.delete(ws);
ws.publish(WSTopic.ONLINE_MEMBER_CHANGE, toCustomMessage(WSTopic.ONLINE_MEMBER_CHANGE, this.#connections.size));
}
static userSub(ws: Bun.ServerWebSocket, user: string, keys: string[]) {
if (!this.#userSubKeys.has(user)) {
this.#userSubKeys.set(user, new Set<string>());
}
console.debug('User %s subscribe keys: %s', user, keys.join(','));
keys.forEach(key => {
this.#userSubKeys.get(user)?.add(key);
ws.subscribe(key);
});
}
static userUnSub(ws: Bun.ServerWebSocket, user: string, keys: string[]) {
console.debug('User %s subscribe keys: %s', user, keys.join(','));
keys.forEach(key => {
this.#userSubKeys.get(user)?.delete(key);
ws.unsubscribe(key);
});
}
static processMessage(ws: Bun.ServerWebSocket, message: string | Buffer<ArrayBuffer>) {
const { clientTopic: action, data } = fromCustomMessage(message.toString());
if (!action) return;
console.debug('Recieve: %s, %s', action, JSON.stringify(data));
}
}

View File

@ -33,7 +33,7 @@ export class XCXAPI {
return response.data as T;
}
async getAdvProfile(uid: string) {
async getAdvProfile(uid: string): Promise<XCXProfile | null> {
const cacheProfile = await redis.get(`my-kaiqiuwang:profile:${uid}`);
if (!cacheProfile) {
const url = `/api/User/adv_profile?uid=${uid}`;

View File

@ -85,4 +85,27 @@ export function calculate(winerScore: number, loserScore: number) {
console.debug('lowerWin', absScore, { winerScore, loserScore });
return lowerWin(absScore);
}
}
}
export function toCustomMessage(clientTopic: string, data: any) {
return JSON.stringify({ topic: clientTopic, data });
}
export enum WSTopic {
UNKONOW = 'UNKONOW',
ONLINE_MEMBER_CHANGE = 'ONLINE_MEMBER_CHANGE',
}
export function fromCustomMessage(message: string): {
clientTopic: WSTopic;
data?: any;
} {
try {
const { topic: clientTopic, data } = JSON.parse(message);
return { clientTopic, data };
} catch(e) {
return {
clientTopic: WSTopic.UNKONOW,
};
}
}