/** * Langfuse Exporter for Mastra AI Tracing with Fixed Environment * * This exporter sends tracing data to Langfuse for AI observability with * workspace information included in metadata for debugging and filtering. * * Features: * - Fixed environment 'noeva-agents' for all traces * - Workspace ID included in metadata for debugging/filtering * - Automatic workspace resolution from span metadata (injected by middleware) * - Root spans start traces in Langfuse * - LLM_GENERATION spans become Langfuse generations, all others become spans * - Thread-safe context extraction via NoevaContext * * 🎯 ENHANCED with NoevaAgent Integration: * - Priority extraction from span.metadata (workspaceId, sessionId from middleware) * - Simplified extraction logic leveraging middleware-enriched metadata * - Maintains backward compatibility with legacy extraction methods * * Important: * - Middleware automatically injects workspaceId/sessionId into context * - runtimeContext is a RuntimeContext instance, access values via .get(key) method * - Supports middleware-populated workspace-id and session-id values * - Multiple key name variations supported for compatibility */ import type { AITracingExporter, AITracingEvent, AnyExportedAISpan, LLMGenerationAttributes, } from '@mastra/core/ai-tracing'; import { AISpanType, omitKeys } from '@mastra/core/ai-tracing'; import { ConsoleLogger } from '@mastra/core/logger'; import { LangfuseExporterConfig } from '@mastra/langfuse'; import { Langfuse } from 'langfuse'; import type { LangfuseTraceClient, LangfuseSpanClient, LangfuseGenerationClient, LangfuseEventClient } from 'langfuse'; import { noevaContext } from "../context/noeva-context"; import { noevaService } from "../services/noeva.service"; type TraceData = { trace: LangfuseTraceClient; // Langfuse trace object spans: Map; // Maps span.id to Langfuse span/generation events: Map; // Maps span.id to Langfuse event activeSpans: Set; // Tracks which spans haven't ended yet rootSpanId?: string; // Track the root span ID }; type LangfuseParent = LangfuseTraceClient | LangfuseSpanClient | LangfuseGenerationClient | LangfuseEventClient; export class CustomLangfuseExporter implements AITracingExporter { name = 'langfuse'; private realtime: boolean; private traceMap = new Map(); private logger: ConsoleLogger; private config: LangfuseExporterConfig; constructor(config: LangfuseExporterConfig) { this.config = config; this.realtime = config.realtime ?? false; this.logger = new ConsoleLogger({ level: config.logLevel ?? 'warn' }); } /** * 🎯 Ottiene il client Langfuse dal noevaContext o lo inizializza */ private async getClient(): Promise { const context = noevaContext.getCurrentContext(); if (!context) { this.logger.warn('⚠️ No noevaContext available - skipping trace'); return null; } // 1. Se già presente nel context, usalo if (context.langfuseClient) { return context.langfuseClient; } // 2. Se abbiamo workspaceId e JWT, inizializza client workspace-specific if (context.workspaceId && context.jwtToken) { try { // Fetch config da noeva-server (usa automaticamente il context) const workspaceConfig = await noevaService.fetchLangfuseConfig(); // Crea client con le credenziali del workspace const client = new Langfuse({ publicKey: workspaceConfig.public_key, secretKey: workspaceConfig.secret_key, baseUrl: this.config.baseUrl, // Usa baseUrl dal config del constructor ...this.config.options, }); // Salva nel context per richieste successive noevaContext.updateLangfuseClient(client); this.logger.info('✅ Created workspace-specific Langfuse client', { workspaceId: context.workspaceId }); return client; } catch (error) { this.logger.error('❌ Failed to initialize workspace-specific client', { workspaceId: context.workspaceId, error: error instanceof Error ? error.message : 'Unknown error' }); return null; } } return null; } async exportEvent(event: AITracingEvent): Promise { const client = await this.getClient(); if (!client) { this.logger.warn('⚠️ No Langfuse client available - skipping export'); return; } if (event.exportedSpan.isEvent) { await this.handleEventSpan(event.exportedSpan, client); return; } switch (event.type) { case 'span_started': await this.handleSpanStarted(event.exportedSpan, client); break; case 'span_updated': await this.handleSpanUpdateOrEnd(event.exportedSpan, false, client); break; case 'span_ended': await this.handleSpanUpdateOrEnd(event.exportedSpan, true, client); break; } // Flush immediately in realtime mode for instant visibility if (this.realtime) { await client.flushAsync(); } } private async handleSpanStarted(span: AnyExportedAISpan, client: Langfuse): Promise { if (span.isRootSpan) { this.initTrace(span, client); } const method = 'handleSpanStarted'; const traceData = this.getTraceData({ span, method }); if (!traceData) { return; } const langfuseParent = this.getLangfuseParent({ traceData, span, method }); if (!langfuseParent) { return; } const payload = this.buildSpanPayload(span, true); const langfuseSpan = span.type === AISpanType.LLM_GENERATION ? langfuseParent.generation(payload) : langfuseParent.span(payload); traceData.spans.set(span.id, langfuseSpan); traceData.activeSpans.add(span.id); // Track as active } private async handleSpanUpdateOrEnd(span: AnyExportedAISpan, isEnd: boolean, client: Langfuse): Promise { const method = isEnd ? 'handleSpanEnd' : 'handleSpanUpdate'; const traceData = this.getTraceData({ span, method }); if (!traceData) { return; } const langfuseSpan = traceData.spans.get(span.id); if (!langfuseSpan) { // For event spans that only send SPAN_ENDED, we might not have the span yet if (isEnd && span.isEvent) { // Just make sure it's not in active spans traceData.activeSpans.delete(span.id); if (traceData.activeSpans.size === 0) { this.traceMap.delete(span.traceId); } return; } this.logger.warn('Langfuse exporter: No Langfuse span found for span update/end', { traceId: span.traceId, spanId: span.id, spanName: span.name, spanType: span.type, isRootSpan: span.isRootSpan, parentSpanId: span.parentSpanId, method, }); return; } // use update for both update & end, so that we can use the // end time we set when ending the span. langfuseSpan.update(this.buildSpanPayload(span, false)); if (isEnd) { // Remove from active spans traceData.activeSpans.delete(span.id); if (span.isRootSpan) { traceData.trace.update({ output: span.output }); } // Only clean up the trace when ALL spans have ended if (traceData.activeSpans.size === 0) { this.traceMap.delete(span.traceId); } } } private async handleEventSpan(span: AnyExportedAISpan, client: Langfuse): Promise { if (span.isRootSpan) { this.logger.debug('Langfuse exporter: Creating trace', { traceId: span.traceId, spanId: span.id, spanName: span.name, method: 'handleEventSpan', }); this.initTrace(span, client); } const method = 'handleEventSpan'; const traceData = this.getTraceData({ span, method }); if (!traceData) { return; } const langfuseParent = this.getLangfuseParent({ traceData, span, method }); if (!langfuseParent) { return; } const payload = this.buildSpanPayload(span, true); const langfuseEvent = langfuseParent.event(payload); traceData.events.set(span.id, langfuseEvent); // Event spans are typically immediately ended, but let's track them properly if (!span.endTime) { traceData.activeSpans.add(span.id); } } private initTrace(span: AnyExportedAISpan, client: Langfuse): void { const trace = client.trace(this.buildTracePayload(span)); this.traceMap.set(span.traceId, { trace, spans: new Map(), events: new Map(), activeSpans: new Set(), rootSpanId: span.id, }); } private getTraceData(options: { span: AnyExportedAISpan; method: string }): TraceData | undefined { const { span, method } = options; if (this.traceMap.has(span.traceId)) { return this.traceMap.get(span.traceId); } this.logger.warn('Langfuse exporter: No trace data found for span', { traceId: span.traceId, spanId: span.id, spanName: span.name, spanType: span.type, isRootSpan: span.isRootSpan, parentSpanId: span.parentSpanId, method, }); } private getLangfuseParent(options: { traceData: TraceData; span: AnyExportedAISpan; method: string; }): LangfuseParent | undefined { const { traceData, span, method } = options; const parentId = span.parentSpanId; if (!parentId) { return traceData.trace; } if (traceData.spans.has(parentId)) { return traceData.spans.get(parentId); } if (traceData.events.has(parentId)) { return traceData.events.get(parentId); } this.logger.warn('Langfuse exporter: No parent data found for span', { traceId: span.traceId, spanId: span.id, spanName: span.name, spanType: span.type, isRootSpan: span.isRootSpan, parentSpanId: span.parentSpanId, method, }); } private buildTracePayload(span: AnyExportedAISpan): Record { const payload: Record = { id: span.traceId, name: span.name, environment: process.env.SERVICE_NAME || 'noeva-agents', // ✅ Environment fisso }; const { userId, sessionId, ...remainingMetadata } = span.metadata ?? {}; // 🎯 Payload diretto: sessionId e userId dal context const contextSessionId = this.extractSessionId(); const contextUserId = this.extractUserId(); if (contextSessionId) payload.sessionId = contextSessionId; if (contextUserId) payload.userId = contextUserId; if (span.input) payload.input = span.input; // 🎯 Metadata: workspaceId, threadId e altri dati di debug payload.metadata = { spanType: span.type, workspaceId: this.extractWorkspaceId(), threadId: this.extractThreadId(), // Per debugging/filtering ...span.attributes, ...remainingMetadata, }; return payload; } /** * Estrae sessionId SOLO da noevaContext */ private extractSessionId(): string | undefined { const context = noevaContext.getCurrentContext(); return context?.sessionId; } /** * Estrae userId SOLO da noevaContext */ private extractUserId(): string | undefined { const context = noevaContext.getCurrentContext(); return context?.userId; } /** * Estrae threadId SOLO da noevaContext per metadata */ private extractThreadId(): string | undefined { const context = noevaContext.getCurrentContext(); return context?.threadId; } /** * Estrae workspaceId SOLO da noevaContext */ private extractWorkspaceId(): string { const context = noevaContext.getCurrentContext(); return context?.workspaceId || 'unknown'; } private buildSpanPayload(span: AnyExportedAISpan, isCreate: boolean): Record { const payload: Record = {}; if (isCreate) { payload.id = span.id; payload.name = span.name; payload.startTime = span.startTime; if (span.input !== undefined) payload.input = span.input; // ✅ Environment fisso per spans/generations payload.environment = process.env.SERVICE_NAME || 'noeva-agents'; } if (span.output !== undefined) payload.output = span.output; if (span.endTime !== undefined) payload.endTime = span.endTime; const attributes = (span.attributes ?? {}) as Record; // Strip special fields from metadata if used in top-level keys const attributesToOmit: string[] = []; if (span.type === AISpanType.LLM_GENERATION) { const llmAttr = attributes as LLMGenerationAttributes; if (llmAttr.model !== undefined) { payload.model = llmAttr.model; attributesToOmit.push('model'); } if (llmAttr.usage !== undefined) { payload.usage = llmAttr.usage; attributesToOmit.push('usage'); } if (llmAttr.parameters !== undefined) { payload.modelParameters = llmAttr.parameters; attributesToOmit.push('parameters'); } } // 🎯 Metadata: workspaceId, threadId e debugging info (NO environment duplicato) payload.metadata = { spanType: span.type, workspaceId: this.extractWorkspaceId(), threadId: this.extractThreadId(), ...omitKeys(attributes, attributesToOmit), ...span.metadata, }; if (span.errorInfo) { payload.level = 'ERROR'; payload.statusMessage = span.errorInfo.message; } return payload; } async shutdown(): Promise { // Clean up trace map this.traceMap.clear(); this.logger.debug('🔌 CustomLangfuseExporter shutdown completed'); } }