Source

socket/investigation-collaboration.js

import { env } from "@config/env";
import { asyncLocalStorage } from "@utils/asyncLocalStorage";
import { ObjectId } from "mongodb";
import { redisUtils } from "../database/redis.client";
import { InvestigationService } from "../services/investigation.service";
/**
 * Register real-time collaboration (presence + action tracking) handlers
 * for a specific investigation on a given socket connection.
 * @param {SocketIOServer} io - Socket.IO server instance.
 * @param {Socket} socket - Connected client socket.
 * @param {ICollaborationContext} context - Connection and user metadata.
 */
export function registerInvestigationCollaborationHandlers(io, socket, context) {
    const { connectionId, logger, investigationId, userId, userName } = context;
    if (!investigationId || !ObjectId.isValid(investigationId)) {
        // No valid investigation context – nothing to register.
        return;
    }
    const presenceKey = `investigation:${investigationId}:users`;
    const userCountKey = `investigation:${investigationId}:userCounts`;
    const lastActionsKey = `investigation:${investigationId}:lastActions`;
    const INACTIVITY_TIMEOUT_MS = env.INVESTIGATION_INACTIVITY_TIMEOUT_MS;
    let inactivityTimer = null;
    /**
     * Emit current presence list to all participants of the investigation room.
     */
    const broadcastPresence = async () => {
        const usersHash = await redisUtils.hgetall(presenceKey);
        const users = Object.entries(usersHash).map(([id, raw]) => {
            try {
                const parsed = JSON.parse(raw);
                return { id, name: parsed.name ?? null };
            }
            catch {
                return { id, name: null };
            }
        });
        io.to(investigationId).emit("investigation:presence", {
            investigationId,
            users,
        });
    };
    /**
     * Reset inactivity timer when user is active.
     */
    const resetInactivityTimer = () => {
        if (inactivityTimer) {
            clearTimeout(inactivityTimer);
        }
        inactivityTimer = setTimeout(() => {
            logger.info({ userId, investigationId }, "Inactivity timeout reached");
            void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
                try {
                    await handleInvestigationUnlockOrTransfer();
                }
                catch (error) {
                    logger.error({ error }, "Error handling inactivity timeout");
                }
            });
        }, INACTIVITY_TIMEOUT_MS);
    };
    socket.data.resetInactivityTimer =
        resetInactivityTimer;
    const handleInvestigationUnlockOrTransfer = async () => {
        try {
            const investigationService = new InvestigationService();
            const investigation = await investigationService.getInvestigationById(investigationId);
            // Check if this user is the current editor
            if (!investigation.isLocked || investigation.lockedBy?.toString() !== userId) {
                return; // Not the editor, nothing to do
            }
            // Get all users from presence
            const usersHash = await redisUtils.hgetall(presenceKey);
            const otherUsers = [];
            for (const [id, raw] of Object.entries(usersHash)) {
                if (id === userId)
                    continue; // Skip current user
                try {
                    const parsed = JSON.parse(raw);
                    if (parsed.connectedAt) {
                        otherUsers.push({ id, connectedAt: parsed.connectedAt });
                    }
                }
                catch {
                    // ignore malformed entries
                }
            }
            if (otherUsers.length > 0) {
                // Find user with earliest connection time
                const earliestUser = otherUsers.reduce((earliest, current) => current.connectedAt < earliest.connectedAt ? current : earliest);
                // Transfer lock to earliest user
                const earliestUserRaw = usersHash[earliestUser.id];
                if (!earliestUserRaw) {
                    logger.warn({ earliestUserId: earliestUser.id }, "Earliest user data not found");
                    return;
                }
                const earliestUserData = JSON.parse(earliestUserRaw);
                await investigationService.lockInvestigation(investigationId, earliestUser.id);
                io.to(investigationId).emit("investigation:locked", {
                    investigationId,
                    lockedBy: earliestUser.id,
                    lockedByUserName: earliestUserData.name ?? null,
                    lockedAt: new Date().toISOString(),
                });
                const socketsInRoom = await io.in(investigationId).fetchSockets();
                const newOwnerSocket = socketsInRoom.find((s) => s.data.userId === earliestUser.id);
                if (newOwnerSocket) {
                    const resetTimer = newOwnerSocket.data
                        .resetInactivityTimer;
                    if (resetTimer) {
                        resetTimer();
                    }
                }
                logger.info({ investigationId, fromUserId: userId, toUserId: earliestUser.id }, "Investigation lock transferred to earliest connected user");
            }
            else {
                // No other users, unlock the investigation
                await investigationService.unlockInvestigation(investigationId, userId);
                io.to(investigationId).emit("investigation:unlocked", {
                    investigationId,
                });
                logger.info({ investigationId, userId }, "Investigation unlocked (no other users)");
            }
        }
        catch (error) {
            logger.error({ error }, "Error in handleInvestigationUnlockOrTransfer");
            throw error;
        }
    };
    /**
     * Send initial collaboration state (presence) to the newly connected socket.
     */
    const sendInitialCollaborationState = async () => {
        const usersHash = await redisUtils.hgetall(presenceKey);
        const users = Object.entries(usersHash).map(([id, raw]) => {
            try {
                const parsed = JSON.parse(raw);
                return { id, name: parsed.name ?? null };
            }
            catch {
                return { id, name: null };
            }
        });
        // Fetch last action from Redis
        let lastAction = null;
        try {
            const lastActionJson = await redisUtils.hget(lastActionsKey, "last");
            if (lastActionJson) {
                lastAction = JSON.parse(lastActionJson);
            }
        }
        catch (error) {
            logger.warn({ error }, "Failed to fetch last action from Redis");
        }
        socket.emit("investigation:state", {
            investigationId,
            users,
            lastAction,
        });
    };
    // Register presence on connect
    void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
        try {
            // Increase user connection count for this investigation
            await redisUtils.hincrby(userCountKey, userId, 1);
            const connectedAt = Date.now();
            // Store basic user info for presence list (including connection timestamp)
            await redisUtils.hset(presenceKey, userId, JSON.stringify({
                name: userName,
                lastSeenAt: connectedAt,
                connectedAt,
            }));
            // Optional TTLs to keep keys tidy over time
            await redisUtils.expire(presenceKey, 60 * 60); // 1 hour
            await redisUtils.expire(userCountKey, 60 * 60); // 1 hour
            // Join investigation room for broadcasting
            void socket.join(investigationId);
            // Handle investigation-level locking
            const investigationService = new InvestigationService();
            const investigation = await investigationService.getInvestigationById(investigationId);
            if (!investigation.isLocked) {
                // Investigation is not locked, lock it for this user
                await investigationService.lockInvestigation(investigationId, userId);
                io.to(investigationId).emit("investigation:locked", {
                    investigationId,
                    lockedBy: userId,
                    lockedByUserName: userName,
                    lockedAt: new Date().toISOString(),
                });
                logger.info({ investigationId, userId }, "Investigation locked for user");
            }
            else {
                // Investigation is already locked, broadcast current lock status
                const lockedByUserId = investigation.lockedBy?.toString() ?? null;
                const lockedByUserData = lockedByUserId
                    ? await redisUtils.hget(presenceKey, lockedByUserId)
                    : null;
                let lockedByUserName = null;
                if (lockedByUserData) {
                    try {
                        const parsed = JSON.parse(lockedByUserData);
                        lockedByUserName = parsed.name ?? null;
                    }
                    catch {
                        // ignore
                    }
                }
                socket.emit("investigation:locked", {
                    investigationId,
                    lockedBy: lockedByUserId,
                    lockedByUserName,
                    lockedAt: investigation.lockedAt?.toISOString() ?? new Date().toISOString(),
                });
            }
            // Send initial collaboration state only to this client
            void sendInitialCollaborationState();
            // Broadcast updated presence to everyone in the room
            await broadcastPresence();
            // Set up inactivity timer
            resetInactivityTimer();
        }
        catch (error) {
            logger.error({ error }, "Error while registering investigation presence for socket connection");
        }
    });
    /**
     * Action tracking: track user actions (clicks, input changes, saves, etc.)
     */
    socket.on("investigation:action", (payload) => {
        const action = payload?.action ?? null;
        logger.info({ action }, "investigation:action");
        if (!action || typeof action !== "string")
            return;
        void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
            try {
                // Reset inactivity timer on user action
                resetInactivityTimer();
                const timestamp = Date.now();
                const actionPayload = {
                    investigationId,
                    action,
                    field: payload?.field ?? null,
                    userId,
                    userName,
                    timestamp,
                    metadata: payload?.metadata ?? null,
                };
                const actionJson = JSON.stringify(actionPayload);
                await redisUtils.hset(lastActionsKey, "last", actionJson);
                await redisUtils.expire(lastActionsKey, 24 * 60 * 60);
                io.to(investigationId).emit("investigation:action", actionPayload);
                logger.debug({ actionPayload }, "User action tracked and stored");
            }
            catch (error) {
                logger.error({ error: error instanceof Error ? error : String(error) }, "Error while processing investigation:action event");
            }
        });
    });
    /**
     * Chat typing indicator: track when users are typing in chat
     */
    socket.on("chat:typing", (payload) => {
        if (!payload || typeof payload !== "object")
            return;
        const isTyping = payload.isTyping ?? false;
        resetInactivityTimer();
        void asyncLocalStorage.run({ logger, requestId: connectionId }, () => {
            try {
                // Broadcast typing status to all users in the room (except sender)
                io.to(investigationId).emit("chat:typing", {
                    investigationId,
                    userId,
                    userName,
                    isTyping,
                });
            }
            catch (error) {
                logger.error({ error: error instanceof Error ? error : String(error) }, "Error while processing chat:typing event");
            }
        });
    });
    /**
     * Cleanup on disconnect: update presence counters, release field locks, and handle investigation unlock/transfer.
     */
    socket.on("disconnect", () => {
        logger.info({ socketId: socket.id }, "Client disconnected");
        if (inactivityTimer) {
            clearTimeout(inactivityTimer);
            inactivityTimer = null;
        }
        void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
            try {
                // Decrement connection count; if zero or below, remove user from presence set
                const remaining = await redisUtils.hincrby(userCountKey, userId, -1);
                if (remaining <= 0) {
                    await redisUtils.hdel(userCountKey, userId);
                    await redisUtils.hdel(presenceKey, userId);
                }
                await broadcastPresence();
                await handleInvestigationUnlockOrTransfer();
            }
            catch (error) {
                logger.error({ error }, "Error while cleaning up investigation presence/locks on disconnect");
            }
        });
    });
}