import { env } from "@config/env";
import { asyncLocalStorage } from "@utils/asyncLocalStorage";
import { ObjectId } from "mongodb";
import { redisUtils } from "../database/redis.client";
import { InvestigationService } from "../services/investigation.service";
/**
* Register real-time collaboration (presence + action tracking) handlers
* for a specific investigation on a given socket connection.
* @param {SocketIOServer} io - Socket.IO server instance.
* @param {Socket} socket - Connected client socket.
* @param {ICollaborationContext} context - Connection and user metadata.
*/
export function registerInvestigationCollaborationHandlers(io, socket, context) {
const { connectionId, logger, investigationId, userId, userName } = context;
if (!investigationId || !ObjectId.isValid(investigationId)) {
// No valid investigation context – nothing to register.
return;
}
const presenceKey = `investigation:${investigationId}:users`;
const userCountKey = `investigation:${investigationId}:userCounts`;
const lastActionsKey = `investigation:${investigationId}:lastActions`;
const INACTIVITY_TIMEOUT_MS = env.INVESTIGATION_INACTIVITY_TIMEOUT_MS;
let inactivityTimer = null;
/**
* Emit current presence list to all participants of the investigation room.
*/
const broadcastPresence = async () => {
const usersHash = await redisUtils.hgetall(presenceKey);
const users = Object.entries(usersHash).map(([id, raw]) => {
try {
const parsed = JSON.parse(raw);
return { id, name: parsed.name ?? null };
}
catch {
return { id, name: null };
}
});
io.to(investigationId).emit("investigation:presence", {
investigationId,
users,
});
};
/**
* Reset inactivity timer when user is active.
*/
const resetInactivityTimer = () => {
if (inactivityTimer) {
clearTimeout(inactivityTimer);
}
inactivityTimer = setTimeout(() => {
logger.info({ userId, investigationId }, "Inactivity timeout reached");
void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
try {
await handleInvestigationUnlockOrTransfer();
}
catch (error) {
logger.error({ error }, "Error handling inactivity timeout");
}
});
}, INACTIVITY_TIMEOUT_MS);
};
socket.data.resetInactivityTimer =
resetInactivityTimer;
const handleInvestigationUnlockOrTransfer = async () => {
try {
const investigationService = new InvestigationService();
const investigation = await investigationService.getInvestigationById(investigationId);
// Check if this user is the current editor
if (!investigation.isLocked || investigation.lockedBy?.toString() !== userId) {
return; // Not the editor, nothing to do
}
// Get all users from presence
const usersHash = await redisUtils.hgetall(presenceKey);
const otherUsers = [];
for (const [id, raw] of Object.entries(usersHash)) {
if (id === userId)
continue; // Skip current user
try {
const parsed = JSON.parse(raw);
if (parsed.connectedAt) {
otherUsers.push({ id, connectedAt: parsed.connectedAt });
}
}
catch {
// ignore malformed entries
}
}
if (otherUsers.length > 0) {
// Find user with earliest connection time
const earliestUser = otherUsers.reduce((earliest, current) => current.connectedAt < earliest.connectedAt ? current : earliest);
// Transfer lock to earliest user
const earliestUserRaw = usersHash[earliestUser.id];
if (!earliestUserRaw) {
logger.warn({ earliestUserId: earliestUser.id }, "Earliest user data not found");
return;
}
const earliestUserData = JSON.parse(earliestUserRaw);
await investigationService.lockInvestigation(investigationId, earliestUser.id);
io.to(investigationId).emit("investigation:locked", {
investigationId,
lockedBy: earliestUser.id,
lockedByUserName: earliestUserData.name ?? null,
lockedAt: new Date().toISOString(),
});
const socketsInRoom = await io.in(investigationId).fetchSockets();
const newOwnerSocket = socketsInRoom.find((s) => s.data.userId === earliestUser.id);
if (newOwnerSocket) {
const resetTimer = newOwnerSocket.data
.resetInactivityTimer;
if (resetTimer) {
resetTimer();
}
}
logger.info({ investigationId, fromUserId: userId, toUserId: earliestUser.id }, "Investigation lock transferred to earliest connected user");
}
else {
// No other users, unlock the investigation
await investigationService.unlockInvestigation(investigationId, userId);
io.to(investigationId).emit("investigation:unlocked", {
investigationId,
});
logger.info({ investigationId, userId }, "Investigation unlocked (no other users)");
}
}
catch (error) {
logger.error({ error }, "Error in handleInvestigationUnlockOrTransfer");
throw error;
}
};
/**
* Send initial collaboration state (presence) to the newly connected socket.
*/
const sendInitialCollaborationState = async () => {
const usersHash = await redisUtils.hgetall(presenceKey);
const users = Object.entries(usersHash).map(([id, raw]) => {
try {
const parsed = JSON.parse(raw);
return { id, name: parsed.name ?? null };
}
catch {
return { id, name: null };
}
});
// Fetch last action from Redis
let lastAction = null;
try {
const lastActionJson = await redisUtils.hget(lastActionsKey, "last");
if (lastActionJson) {
lastAction = JSON.parse(lastActionJson);
}
}
catch (error) {
logger.warn({ error }, "Failed to fetch last action from Redis");
}
socket.emit("investigation:state", {
investigationId,
users,
lastAction,
});
};
// Register presence on connect
void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
try {
// Increase user connection count for this investigation
await redisUtils.hincrby(userCountKey, userId, 1);
const connectedAt = Date.now();
// Store basic user info for presence list (including connection timestamp)
await redisUtils.hset(presenceKey, userId, JSON.stringify({
name: userName,
lastSeenAt: connectedAt,
connectedAt,
}));
// Optional TTLs to keep keys tidy over time
await redisUtils.expire(presenceKey, 60 * 60); // 1 hour
await redisUtils.expire(userCountKey, 60 * 60); // 1 hour
// Join investigation room for broadcasting
void socket.join(investigationId);
// Handle investigation-level locking
const investigationService = new InvestigationService();
const investigation = await investigationService.getInvestigationById(investigationId);
if (!investigation.isLocked) {
// Investigation is not locked, lock it for this user
await investigationService.lockInvestigation(investigationId, userId);
io.to(investigationId).emit("investigation:locked", {
investigationId,
lockedBy: userId,
lockedByUserName: userName,
lockedAt: new Date().toISOString(),
});
logger.info({ investigationId, userId }, "Investigation locked for user");
}
else {
// Investigation is already locked, broadcast current lock status
const lockedByUserId = investigation.lockedBy?.toString() ?? null;
const lockedByUserData = lockedByUserId
? await redisUtils.hget(presenceKey, lockedByUserId)
: null;
let lockedByUserName = null;
if (lockedByUserData) {
try {
const parsed = JSON.parse(lockedByUserData);
lockedByUserName = parsed.name ?? null;
}
catch {
// ignore
}
}
socket.emit("investigation:locked", {
investigationId,
lockedBy: lockedByUserId,
lockedByUserName,
lockedAt: investigation.lockedAt?.toISOString() ?? new Date().toISOString(),
});
}
// Send initial collaboration state only to this client
void sendInitialCollaborationState();
// Broadcast updated presence to everyone in the room
await broadcastPresence();
// Set up inactivity timer
resetInactivityTimer();
}
catch (error) {
logger.error({ error }, "Error while registering investigation presence for socket connection");
}
});
/**
* Action tracking: track user actions (clicks, input changes, saves, etc.)
*/
socket.on("investigation:action", (payload) => {
const action = payload?.action ?? null;
logger.info({ action }, "investigation:action");
if (!action || typeof action !== "string")
return;
void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
try {
// Reset inactivity timer on user action
resetInactivityTimer();
const timestamp = Date.now();
const actionPayload = {
investigationId,
action,
field: payload?.field ?? null,
userId,
userName,
timestamp,
metadata: payload?.metadata ?? null,
};
const actionJson = JSON.stringify(actionPayload);
await redisUtils.hset(lastActionsKey, "last", actionJson);
await redisUtils.expire(lastActionsKey, 24 * 60 * 60);
io.to(investigationId).emit("investigation:action", actionPayload);
logger.debug({ actionPayload }, "User action tracked and stored");
}
catch (error) {
logger.error({ error: error instanceof Error ? error : String(error) }, "Error while processing investigation:action event");
}
});
});
/**
* Chat typing indicator: track when users are typing in chat
*/
socket.on("chat:typing", (payload) => {
if (!payload || typeof payload !== "object")
return;
const isTyping = payload.isTyping ?? false;
resetInactivityTimer();
void asyncLocalStorage.run({ logger, requestId: connectionId }, () => {
try {
// Broadcast typing status to all users in the room (except sender)
io.to(investigationId).emit("chat:typing", {
investigationId,
userId,
userName,
isTyping,
});
}
catch (error) {
logger.error({ error: error instanceof Error ? error : String(error) }, "Error while processing chat:typing event");
}
});
});
/**
* Cleanup on disconnect: update presence counters, release field locks, and handle investigation unlock/transfer.
*/
socket.on("disconnect", () => {
logger.info({ socketId: socket.id }, "Client disconnected");
if (inactivityTimer) {
clearTimeout(inactivityTimer);
inactivityTimer = null;
}
void asyncLocalStorage.run({ logger, requestId: connectionId }, async () => {
try {
// Decrement connection count; if zero or below, remove user from presence set
const remaining = await redisUtils.hincrby(userCountKey, userId, -1);
if (remaining <= 0) {
await redisUtils.hdel(userCountKey, userId);
await redisUtils.hdel(presenceKey, userId);
}
await broadcastPresence();
await handleInvestigationUnlockOrTransfer();
}
catch (error) {
logger.error({ error }, "Error while cleaning up investigation presence/locks on disconnect");
}
});
});
}
Source