From 575c96add35c30d2a1fe90e2a711a30e09a1a683 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nh=E1=BA=ADt=20Ho=C3=A0ng?= Date: Sat, 16 Aug 2025 14:38:59 +0700 Subject: [PATCH] feat: Add role property to SignalRMessage and StreamingMessage for enhanced message handling --- packages/realtime/src/chat-service.ts | 317 ++++++++++++++--------- packages/realtime/src/signalr-service.ts | 1 + packages/realtime/src/types.ts | 2 + 3 files changed, 194 insertions(+), 126 deletions(-) diff --git a/packages/realtime/src/chat-service.ts b/packages/realtime/src/chat-service.ts index b76f979..d117a48 100644 --- a/packages/realtime/src/chat-service.ts +++ b/packages/realtime/src/chat-service.ts @@ -1,7 +1,15 @@ -import { SignalRService } from './signalr-service'; -import { SignalRConfig, SignalRMessage, ChatMessage, StreamingMessage, StreamingChatState, StreamingMessageHandler, StreamingStateHandler } from './types'; - -export interface ChatServiceConfig extends Omit { +import { SignalRService } from "./signalr-service"; +import { + SignalRConfig, + SignalRMessage, + ChatMessage, + StreamingMessage, + StreamingChatState, + StreamingMessageHandler, + StreamingStateHandler, +} from "./types"; + +export interface ChatServiceConfig extends Omit { baseUrl?: string; hubPath?: string; } @@ -12,16 +20,20 @@ export class ChatService { private isProcessing = false; private streamingState: StreamingChatState = { isStreaming: false, - currentMessage: '' + currentMessage: "", }; private streamingHandlers: StreamingMessageHandler[] = []; private streamingStateHandlers: StreamingStateHandler[] = []; private regularMessageHandlers: ((message: any) => void)[] = []; constructor(config: ChatServiceConfig) { - const baseUrl = config.baseUrl || (typeof window !== 'undefined' ? window.location.origin : 'https://mixcore.net'); - const hubPath = config.hubPath || '/hub/llm_chat'; - + const baseUrl = + config.baseUrl || + (typeof window !== "undefined" + ? window.location.origin + : "https://mixcore.net"); + const hubPath = config.hubPath || "/hub/llm_chat"; + const signalRConfig: SignalRConfig = { ...config, hubUrl: `${baseUrl}${hubPath}`, @@ -32,28 +44,29 @@ export class ChatService { } private setupMessageHandlers(): void { - this.signalRService.onMessage('receive_message', (message: SignalRMessage) => { - // Process the message and convert to ChatMessage if needed - this.handleIncomingMessage(message); - }); + this.signalRService.onMessage( + "receive_message", + (message: SignalRMessage) => { + // Process the message and convert to ChatMessage if needed + this.handleIncomingMessage(message); + } + ); } private handleIncomingMessage(message: SignalRMessage): void { - console.log('🔍 Raw incoming message:', message); - console.log('🔍 Message keys:', Object.keys(message || {})); - console.log('🔍 Message action:', message?.action); - console.log('🔍 Message data keys:', Object.keys(message?.data || {})); - + console.log("🔍 Raw incoming message:", message); + console.log("🔍 Message keys:", Object.keys(message || {})); + console.log("🔍 Message action:", message?.action); + console.log("🔍 Message data keys:", Object.keys(message?.data || {})); // Check if this is a streaming format message if (this.isStreamingMessage(message)) { - console.log('✅ Detected as streaming format message'); - this.handleStreamingMessage(message); + console.log("✅ Detected as streaming format message"); + this.appendStreamingChunk(message.data.response, message.data.role); return; } - // Handle based on action like AngularJS code if (message?.action) { - console.log('🎯 Processing action-based message:', message.action); + console.log("🎯 Processing action-based message:", message.action); switch (message.action) { case "NewMessage": this.handleNewMessage(message); @@ -62,68 +75,77 @@ export class ChatService { this.handleNewStreamingMessage(message); break; default: - console.log('⚠️ Unknown action:', message.action); + console.log("⚠️ Unknown action:", message.action); } } else { - console.log('📄 Processing as regular message (no action)'); + console.log("📄 Processing as regular message (no action)"); // Fallback for regular SignalR messages this.handleRegularMessage(message); } } - + private handleNewMessage(message: SignalRMessage): void { - console.log('📤 Handling NewMessage:', message); - console.log('🔍 Current streaming state:', this.streamingState.isStreaming); - + console.log("📤 Handling NewMessage:", message); + console.log("🔍 Current streaming state:", this.streamingState.isStreaming); + if (this.streamingState.isStreaming) { - console.log('🏁 NewMessage received during streaming - treating as completion signal'); + console.log( + "🏁 NewMessage received during streaming - treating as completion signal" + ); // Use NewMessage as the completion trigger for streaming this.completeStreaming(); return; } - + if (message?.data?.response) { - console.log('✅ Processing NewMessage as regular message (no streaming was active)'); + console.log( + "✅ Processing NewMessage as regular message (no streaming was active)" + ); // Reset streaming data and add final message (like AngularJS) this.streamingState.isStreaming = false; - this.streamingState.currentMessage = ''; + this.streamingState.currentMessage = ""; this.notifyStreamingStateChange(); - + // Emit as regular message this.emitRegularMessage({ id: Date.now().toString(), content: message.data.response, - role: "assistant" as const, + role: message.data.role, timestamp: new Date().toISOString(), }); } } - + private handleNewStreamingMessage(message: SignalRMessage): void { - console.log('🌊 Handling NewStreamingMessage:', message); - + console.log("🌊 Handling NewStreamingMessage:", message); if (message?.data?.response) { - console.log('✅ Found response data:', message.data.response); + console.log("✅ Found response data:", message.data.response); // Accumulate streaming content (like AngularJS) - this.appendStreamingChunk(message.data.response); + this.appendStreamingChunk(message.data.response, message.data.role); } else { - console.log('❌ No response data found in streaming message'); + console.log("❌ No response data found in streaming message"); } } - + private handleRegularMessage(message: SignalRMessage): void { - console.log('📄 Handling regular message - this bypasses streaming'); - console.log('📄 Streaming state:', this.streamingState.isStreaming); - + console.log("📄 Handling regular message - this bypasses streaming"); + console.log("📄 Streaming state:", this.streamingState.isStreaming); + // Only process if not currently streaming if (!this.streamingState.isStreaming && message?.data?.response) { - console.log('✅ Processing regular message response:', message.data.response); - - let content = ''; - if (typeof message.data.response === 'string') { + console.log( + "✅ Processing regular message response:", + message.data.response + ); + + let content = ""; + if (typeof message.data.response === "string") { content = message.data.response; - console.log('📝 String content:', content); - } else if (typeof message.data.response === 'object' && message.data.response !== null) { + console.log("📝 String content:", content); + } else if ( + typeof message.data.response === "object" && + message.data.response !== null + ) { const responseObj = message.data.response as any; if (responseObj.content) { content = responseObj.content; @@ -134,14 +156,18 @@ export class ChatService { } else { content = JSON.stringify(message.data.response, null, 2); } - console.log('📝 Object content extracted:', content); + console.log("📝 Object content extracted:", content); } else { content = String(message.data.response); - console.log('📝 Converted content:', content); + console.log("📝 Converted content:", content); } - - if (content.trim() && !content.includes('"type":1') && !content.includes('"type":3')) { - console.log('🚀 Emitting regular message with content:', content); + + if ( + content.trim() && + !content.includes('"type":1') && + !content.includes('"type":3') + ) { + console.log("🚀 Emitting regular message with content:", content); this.emitRegularMessage({ id: Date.now().toString(), content: content, @@ -149,173 +175,206 @@ export class ChatService { timestamp: new Date().toISOString(), }); } else { - console.log('❌ Content filtered out or contains streaming markers'); + console.log("❌ Content filtered out or contains streaming markers"); } } else { - console.log('❌ Regular message skipped - streaming active or no response'); + console.log( + "❌ Regular message skipped - streaming active or no response" + ); } } private isStreamingMessage(message: any): boolean { // Check for action-based pattern (like AngularJS) - if (message?.action === 'NewStreamingMessage') { + if (message?.action === "NewStreamingMessage") { return true; } - + // Check direct streaming format (type 1/3 pattern) - if (typeof message === 'object' && - typeof message.type === 'number' && - typeof message.target === 'string' && - Array.isArray(message.arguments)) { + if ( + typeof message === "object" && + typeof message.type === "number" && + typeof message.target === "string" && + Array.isArray(message.arguments) + ) { return true; } - + // Check if it's a SignalR message containing streaming data - if (message?.data?.response && typeof message.data.response === 'string') { + if (message?.data?.response && typeof message.data.response === "string") { // Check if response contains streaming JSON pattern const response = message.data.response; - return response.includes('"type":1') && response.includes('"target":"receive_message"') || - response.includes('"type":3') || - response.includes('"action":"NewStreamingMessage"'); + return ( + (response.includes('"type":1') && + response.includes('"target":"receive_message"')) || + response.includes('"type":3') || + response.includes('"action":"NewStreamingMessage"') + ); } - + return false; } private handleStreamingMessage(rawMessage: any): void { try { let streamingMessage: StreamingMessage; - + // If it's wrapped in SignalR format, extract the streaming data - if (rawMessage?.data?.response && typeof rawMessage.data.response === 'string') { + if ( + rawMessage?.data?.response && + typeof rawMessage.data.response === "string" + ) { const response = rawMessage.data.response; - console.log('Parsing streaming response:', response); - + console.log("Parsing streaming response:", response); + // Split by }{ pattern to handle concatenated JSON objects - const jsonChunks = response.split('}{').map((chunk, index, array) => { + const jsonChunks = response.split("}{").map((chunk, index, array) => { if (index === 0 && array.length > 1) { - return chunk + '}'; + return chunk + "}"; } else if (index === array.length - 1 && array.length > 1) { - return '{' + chunk; + return "{" + chunk; } else if (array.length > 1) { - return '{' + chunk + '}'; + return "{" + chunk + "}"; } return chunk; }); - + for (const jsonChunk of jsonChunks) { try { const parsed = JSON.parse(jsonChunk); if (parsed.type === 1 || parsed.type === 3) { - console.log('Processing streaming chunk:', parsed); + console.log("Processing streaming chunk:", parsed); this.processStreamingMessage(parsed); } } catch (e) { - console.warn('Failed to parse streaming chunk:', jsonChunk, e); + console.warn("Failed to parse streaming chunk:", jsonChunk, e); } } return; } - + // Direct streaming message format streamingMessage = rawMessage as StreamingMessage; this.processStreamingMessage(streamingMessage); - } catch (error) { - console.error('Error handling streaming message:', error); + console.error("Error handling streaming message:", error); } } - + private processStreamingMessage(streamingMessage: StreamingMessage): void { - console.log('Processing streaming message:', streamingMessage); - + console.log("Processing streaming message:", streamingMessage); + // Handle streaming data messages (type 1) - if (streamingMessage.type === 1 && streamingMessage.target === 'receive_message') { + if ( + streamingMessage.type === 1 && + streamingMessage.target === "receive_message" + ) { for (const arg of streamingMessage.arguments) { - if (arg.action === 'NewStreamingMessage' && arg.data.isSuccess) { - console.log('Appending chunk:', arg.data.response); + if (arg.action === "NewStreamingMessage" && arg.data.isSuccess) { + console.log("Appending chunk:", arg.data.response); this.appendStreamingChunk(arg.data.response); } } } - + // Handle completion messages (type 3) if (streamingMessage.type === 3) { - console.log('Completing streaming'); + console.log("Completing streaming"); this.completeStreaming(); } } - private appendStreamingChunk(chunk: string): void { - console.log('📝 Appending streaming chunk:', chunk); - + private appendStreamingChunk(chunk: string, role?: string): void { + if (chunk == "[DONE]") { + console.log("📝 completing streaming:"); + this.completeStreaming(); + return; + } + if (role === "system") { + return; + } + console.log("📝 Appending streaming chunk:", chunk); + if (!this.streamingState.isStreaming) { - console.log('🚀 Starting new stream'); + console.log("🚀 Starting new stream"); this.streamingState.isStreaming = true; - this.streamingState.currentMessage = ''; + this.streamingState.currentMessage = ""; this.notifyStreamingStateChange(); } this.streamingState.currentMessage += chunk; - console.log('📄 Current streaming message:', this.streamingState.currentMessage); - + console.log( + "📄 Current streaming message:", + this.streamingState.currentMessage + ); + // Notify streaming handlers - this.streamingHandlers.forEach(handler => { + this.streamingHandlers.forEach((handler) => { try { - console.log('🔔 Calling streaming handler with chunk:', chunk); + console.log("🔔 Calling streaming handler with chunk:", chunk); handler(chunk, false); } catch (error) { - console.error('Error in streaming handler:', error); + console.error("Error in streaming handler:", error); } }); } private completeStreaming(): void { - console.log('🏁 Completing streaming...'); - console.log('📄 Final streaming content:', this.streamingState.currentMessage); - + console.log("🏁 Completing streaming..."); + console.log( + "📄 Final streaming content:", + this.streamingState.currentMessage + ); + if (this.streamingState.isStreaming) { // Notify completion to UI handlers first - console.log('📢 Notifying streaming handlers of completion'); - this.streamingHandlers.forEach(handler => { + console.log("📢 Notifying streaming handlers of completion"); + this.streamingHandlers.forEach((handler) => { try { - handler('', true); // Empty chunk with isComplete = true + handler("", true); // Empty chunk with isComplete = true } catch (error) { - console.error('Error in streaming completion handler:', error); + console.error("Error in streaming completion handler:", error); } }); // Reset streaming state - console.log('🔄 Resetting streaming state'); + console.log("🔄 Resetting streaming state"); this.streamingState.isStreaming = false; const finalMessage = this.streamingState.currentMessage; - this.streamingState.currentMessage = ''; - + this.streamingState.currentMessage = ""; + this.notifyStreamingStateChange(); - - console.log('✅ Streaming completed. Final message length:', finalMessage.length); + + console.log( + "✅ Streaming completed. Final message length:", + finalMessage.length + ); } else { - console.log('⚠️ completeStreaming called but streaming was not active'); + console.log("⚠️ completeStreaming called but streaming was not active"); } } private notifyStreamingStateChange(): void { - this.streamingStateHandlers.forEach(handler => { + this.streamingStateHandlers.forEach((handler) => { try { handler({ ...this.streamingState }); } catch (error) { - console.error('Error in streaming state handler:', error); + console.error("Error in streaming state handler:", error); } }); } private emitRegularMessage(message: any): void { - console.log('Emitting regular message:', message); - this.regularMessageHandlers.forEach(handler => { + console.log("Emitting regular message:", message); + if (message.role === "system") { + console.log(`✅ Not emitting ${message.role} message`); + return; // Skip system messages + } + this.regularMessageHandlers.forEach((handler) => { try { handler(message); } catch (error) { - console.error('Error in regular message handler:', error); + console.error("Error in regular message handler:", error); } }); } @@ -341,13 +400,13 @@ export class ChatService { public async sendMessage(content: string): Promise { if (!content.trim()) { - throw new Error('Message content cannot be empty'); + throw new Error("Message content cannot be empty"); } try { - await this.signalRService.invoke('AskAI', content); + await this.signalRService.invoke("AskAI", content); } catch (error) { - console.error('Failed to send message:', error); + console.error("Failed to send message:", error); throw error; } } @@ -357,17 +416,23 @@ export class ChatService { } public onMessageReceived(handler: (message: SignalRMessage) => void): void { - this.signalRService.onMessage('receive_message', handler); + this.signalRService.onMessage("receive_message", handler); } public offMessageReceived(handler: (message: SignalRMessage) => void): void { - this.signalRService.offMessage('receive_message', handler); + this.signalRService.offMessage("receive_message", handler); } public onStreaming(handler: StreamingMessageHandler): void { - console.log('📋 Registering streaming handler. Current handlers:', this.streamingHandlers.length); + console.log( + "📋 Registering streaming handler. Current handlers:", + this.streamingHandlers.length + ); this.streamingHandlers.push(handler); - console.log('📋 After registration, total handlers:', this.streamingHandlers.length); + console.log( + "📋 After registration, total handlers:", + this.streamingHandlers.length + ); } public offStreaming(handler: StreamingMessageHandler): void { @@ -422,4 +487,4 @@ export class ChatService { this.regularMessageHandlers.length = 0; this.signalRService.dispose(); } -} \ No newline at end of file +} diff --git a/packages/realtime/src/signalr-service.ts b/packages/realtime/src/signalr-service.ts index 492c8ec..6eba8b1 100644 --- a/packages/realtime/src/signalr-service.ts +++ b/packages/realtime/src/signalr-service.ts @@ -90,6 +90,7 @@ export class SignalRService { isSuccess: parsedMsg.data?.isSuccess ?? false, response: parsedMsg.data?.response ?? '', result: parsedMsg.data?.result ?? '', + role: parsedMsg.data?.role ?? '', }, createdDateTime: parsedMsg.createdDateTime ?? new Date().toISOString(), }; diff --git a/packages/realtime/src/types.ts b/packages/realtime/src/types.ts index 4de53e6..857bc49 100644 --- a/packages/realtime/src/types.ts +++ b/packages/realtime/src/types.ts @@ -7,6 +7,7 @@ export interface SignalRMessage { type: string; data: { isSuccess: boolean; + role: string; response: string; result: string; }; @@ -23,6 +24,7 @@ export interface StreamingMessage { export interface StreamingMessageArgument { action: string; + role: string; data: { isSuccess: boolean; response: string;