Skip to content

Commit 99edd42

Browse files
committed
feat: integrate WebSocket support for real-time log streaming and enhance log watcher initialization
Signed-off-by: kaifcoder <kaifmohd2014@gmail.com>
1 parent 8ccb40b commit 99edd42

File tree

4 files changed

+113
-137
lines changed

4 files changed

+113
-137
lines changed

bin/lib/admin.js

Lines changed: 84 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -2,155 +2,70 @@ import fs from 'fs';
22
import path from 'path';
33
import chalk from 'chalk';
44
import http from 'http';
5-
import crypto from 'crypto';
65
import { spawn } from 'node:child_process';
6+
import { WebSocketServer, WebSocket } from 'ws';
77
import { getLogsForAPI, LogFileWatcher } from './logs.js';
88

9-
// Simple WebSocket implementation for real-time log streaming
10-
function handleWebSocketUpgrade(request, socket, head) {
11-
const key = request.headers['sec-websocket-key'];
12-
if (!key) {
13-
socket.end('HTTP/1.1 400 Bad Request\r\n\r\n');
14-
return;
15-
}
16-
17-
const acceptKey = crypto
18-
.createHash('sha1')
19-
.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11')
20-
.digest('base64');
21-
22-
const responseHeaders = [
23-
'HTTP/1.1 101 Switching Protocols',
24-
'Upgrade: websocket',
25-
'Connection: Upgrade',
26-
`Sec-WebSocket-Accept: ${acceptKey}`,
27-
'\r\n'
28-
].join('\r\n');
29-
30-
socket.write(responseHeaders);
31-
32-
// Handle WebSocket frames (simplified - only handles text frames)
33-
socket.on('data', (buffer) => {
34-
// Simple frame parsing for text messages
35-
if (buffer.length > 2) {
36-
const opcode = buffer[0] & 0x0f;
37-
if (opcode === 0x01) { // Text frame
38-
let payloadLength = buffer[1] & 0x7f;
39-
let maskStart = 2;
40-
41-
if (payloadLength === 126) {
42-
payloadLength = buffer.readUInt16BE(2);
43-
maskStart = 4;
44-
} else if (payloadLength === 127) {
45-
payloadLength = buffer.readBigUInt64BE(2);
46-
maskStart = 10;
47-
}
48-
49-
const mask = buffer.slice(maskStart, maskStart + 4);
50-
const payload = buffer.slice(maskStart + 4, maskStart + 4 + Number(payloadLength));
51-
52-
// Unmask payload
53-
for (let i = 0; i < payload.length; i++) {
54-
payload[i] ^= mask[i % 4];
55-
}
56-
57-
try {
58-
const message = JSON.parse(payload.toString());
59-
handleWebSocketMessage(socket, message);
60-
} catch (e) {
61-
console.error('Invalid WebSocket message:', e.message);
62-
}
63-
}
64-
}
65-
});
66-
67-
socket.on('close', () => {
68-
// Clean up any active log stream listeners for this socket
69-
if (socket.logUnsubscribe) {
70-
socket.logUnsubscribe();
71-
socket.logUnsubscribe = null;
9+
// ws helper
10+
function sendWebSocketMessage(ws, message) {
11+
// Use WebSocket.OPEN constant (instance does not expose OPEN reliably)
12+
if (ws.readyState === WebSocket.OPEN) {
13+
try {
14+
ws.send(JSON.stringify(message));
15+
} catch (e) {
16+
// Silently ignore send failures; connection will be cleaned by heartbeat
7217
}
73-
});
74-
}
75-
76-
function sendWebSocketMessage(socket, message) {
77-
const payload = JSON.stringify(message);
78-
const payloadBuffer = Buffer.from(payload);
79-
const frame = Buffer.alloc(2 + payloadBuffer.length);
80-
81-
frame[0] = 0x81; // FIN + text frame
82-
frame[1] = payloadBuffer.length;
83-
payloadBuffer.copy(frame, 2);
84-
85-
socket.write(frame);
18+
}
8619
}
8720

8821
// Global log watcher instance
8922
let globalLogWatcher = null;
23+
let wsServer = null;
9024

91-
function handleWebSocketMessage(socket, message) {
25+
async function handleWebSocketMessage(ws, message) {
9226
if (message.type === 'start_log_stream') {
93-
// Start streaming logs for specified service
94-
const serviceName = message.service;
95-
96-
// Send initial logs from watcher cache
97-
if (globalLogWatcher) {
98-
const logs = globalLogWatcher.getCurrentLogs(serviceName, { tail: 100 });
99-
sendWebSocketMessage(socket, {
100-
type: 'log_data',
101-
service: serviceName,
102-
logs: logs
103-
});
104-
105-
// Set up listener for real-time updates
106-
setupLogStreamListener(socket, serviceName);
107-
} else {
108-
sendWebSocketMessage(socket, {
109-
type: 'error',
110-
message: 'Log watcher not initialized'
111-
});
27+
ws.serviceFilter = message.service || null;
28+
if (!globalLogWatcher) {
29+
sendWebSocketMessage(ws, { type: 'error', message: 'Log watcher not initialized' });
30+
return;
11231
}
113-
} else if (message.type === 'stop_log_stream') {
114-
// Stop streaming logs
115-
if (socket.logUnsubscribe) {
116-
socket.logUnsubscribe();
117-
socket.logUnsubscribe = null;
32+
let logs = globalLogWatcher.getCurrentLogs(ws.serviceFilter, { tail: 100 });
33+
// Fallback: if no logs found but service specified, attempt direct file read
34+
if (logs.length === 0) {
35+
try {
36+
logs = await getLogsForAPI(ws.serviceFilter, { tail: 100 });
37+
} catch (e) {
38+
// ignore fallback failure
39+
}
11840
}
41+
sendWebSocketMessage(ws, { type: 'log_data', service: ws.serviceFilter, logs });
42+
} else if (message.type === 'stop_log_stream') {
43+
ws.serviceFilter = null;
11944
}
12045
}
12146

12247
// Set up listener for real-time log updates
123-
function setupLogStreamListener(socket, serviceName) {
124-
if (!globalLogWatcher) return;
125-
126-
// Remove existing listener if any
127-
if (socket.logUnsubscribe) {
128-
socket.logUnsubscribe();
129-
}
130-
131-
// Add new listener
132-
socket.logUnsubscribe = globalLogWatcher.addListener((event, data) => {
133-
// Filter by service if specified
134-
if (serviceName && data.service !== serviceName) return;
135-
48+
function broadcastLogEvent(event, payload) {
49+
if (!wsServer) return;
50+
wsServer.clients.forEach(ws => {
51+
if (ws.readyState !== WebSocket.OPEN) return;
52+
// Filter by service if client requested specific service
53+
if (ws.serviceFilter && payload.service !== ws.serviceFilter) return;
13654
if (event === 'logsUpdated') {
137-
sendWebSocketMessage(socket, {
55+
sendWebSocketMessage(ws, {
13856
type: 'log_update',
139-
service: data.service,
140-
logs: data.logs.map(log => ({
57+
service: payload.service,
58+
logs: (payload.logs || []).map(log => ({
14159
timestamp: log.timestamp instanceof Date ? log.timestamp.toISOString() : log.timestamp,
14260
level: log.level,
14361
service: log.service,
14462
message: log.message,
14563
data: log.data
14664
})),
147-
event: data.event
65+
event: payload.event || 'change'
14866
});
14967
} else if (event === 'logsCleared') {
150-
sendWebSocketMessage(socket, {
151-
type: 'logs_cleared',
152-
service: data.service
153-
});
68+
sendWebSocketMessage(ws, { type: 'logs_cleared', service: payload.service });
15469
}
15570
});
15671
}
@@ -751,12 +666,38 @@ export async function startAdminDashboard(options = {}) {
751666
});
752667

753668
// Handle WebSocket upgrades
754-
server.on('upgrade', (request, socket, head) => {
755-
if (request.url === '/ws') {
756-
handleWebSocketUpgrade(request, socket, head);
757-
} else {
758-
socket.end();
759-
}
669+
// Initialize ws server for /ws path
670+
wsServer = new WebSocketServer({ server, path: '/ws' });
671+
// Heartbeat to detect dead connections
672+
const heartbeatInterval = setInterval(() => {
673+
wsServer.clients.forEach(ws => {
674+
if (ws.isAlive === false) {
675+
try { ws.terminate(); } catch {}
676+
return;
677+
}
678+
ws.isAlive = false;
679+
try { ws.ping(); } catch {}
680+
});
681+
}, 30000);
682+
683+
wsServer.on('connection', (ws) => {
684+
ws.serviceFilter = null; // default: all services
685+
ws.isAlive = true;
686+
ws.on('pong', () => { ws.isAlive = true; });
687+
ws.on('message', (raw) => {
688+
try {
689+
const msg = JSON.parse(raw.toString());
690+
handleWebSocketMessage(ws, msg);
691+
} catch (e) {
692+
sendWebSocketMessage(ws, { type: 'error', message: 'Invalid JSON payload' });
693+
}
694+
});
695+
// Auto-start stream for all logs if client hasn't sent a start message within short delay
696+
setTimeout(() => {
697+
if (!ws.serviceFilter && ws.readyState === WebSocket.OPEN) {
698+
handleWebSocketMessage(ws, { type: 'start_log_stream' });
699+
}
700+
}, 250);
760701
});
761702

762703
// Graceful shutdown
@@ -776,6 +717,16 @@ export async function startAdminDashboard(options = {}) {
776717
globalLogWatcher = null;
777718
}
778719

720+
if (wsServer) {
721+
try {
722+
wsServer.clients.forEach(c => c.close());
723+
wsServer.close();
724+
} catch (e) {
725+
console.warn(chalk.yellow('⚠️ Error closing WebSocket server:'), e.message);
726+
}
727+
wsServer = null;
728+
}
729+
779730
// Close HTTP server
780731
try {
781732
server.close(() => {
@@ -806,5 +757,9 @@ export async function startAdminDashboard(options = {}) {
806757
console.log(chalk.gray(`[${timestamp}] Services: ${chalk.green(upCount + ' up')}, ${chalk.red(downCount + ' down')}, ${chalk.yellow(errorCount + ' error')}`));
807758
}, refreshInterval);
808759

760+
// Hook watcher events -> broadcast
761+
if (globalLogWatcher) {
762+
globalLogWatcher.addListener((event, data) => broadcastLogEvent(event, data));
763+
}
809764
return server;
810765
}

bin/lib/logs.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -604,17 +604,15 @@ export class LogFileWatcher {
604604

605605
// Determine watch paths for all services
606606
for (const service of cfg.services) {
607-
// Check both services/ (new) and apps/ (legacy) directories
607+
// Determine service directory (supports legacy apps/)
608608
let serviceDir = path.join(this.workspacePath, 'services', service.name);
609609
if (!fs.existsSync(serviceDir)) {
610610
serviceDir = path.join(this.workspacePath, 'apps', service.name);
611611
}
612-
613612
if (fs.existsSync(serviceDir)) {
614-
const logsDir = getLogsDir(serviceDir);
615-
if (fs.existsSync(logsDir)) {
616-
watchPaths.push(path.join(logsDir, '*.log'));
617-
}
613+
// Ensure .logs directory exists so watcher can subscribe immediately
614+
const logsDir = ensureLogsDir(serviceDir);
615+
watchPaths.push(path.join(logsDir, '*.log'));
618616
}
619617
}
620618

package-lock.json

Lines changed: 23 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@
5555
"execa": "^9.6.0",
5656
"fs-extra": "^11.3.2",
5757
"lodash": "^4.17.21",
58-
"prompts": "^2.4.2"
58+
"prompts": "^2.4.2",
59+
"ws": "^8.16.0"
5960
},
6061
"devDependencies": {
6162
"vitepress": "^1.3.3",

0 commit comments

Comments
 (0)