export const handleUpsert = internalMutation({ args: { instanceId: v.id("instances"), organizationId: v.string(), payload: v.any(), }, handler: async (ctx, args) => { try { if (!Array.isArray(args.payload)) { throw new Error("payload is not an array"); } const instance = await ctx.db.get(args.instanceId); if (!instance) { console.error(`Instance not found: ${args.instanceId}`); return; // Exit if instance doesn't exist } for (const msg of args.payload) { try { if (!msg?.key?.id) continue; const evolutionMessageId: string = msg.key.id; const existing = await ctx.db .query("messages") .withIndex("by_message_id", (q) => q.eq("messageId", evolutionMessageId) ) .first(); const content = transformMessagePayload(msg); if (!content) continue; const status = mapProviderStatus(msg.status); const timestamp = typeof msg.messageTimestamp === "number" ? msg.messageTimestamp * 1000 : Date.now(); type InsertMessage = Omit, "_id" | "_creationTime">; const messageData: InsertMessage = { instanceId: args.instanceId, organizationId: args.organizationId, messageId: evolutionMessageId, chatJid: msg.key.remoteJid, authorJid: msg.key.fromMe ? undefined : msg.participant || msg.key.remoteJid, fromMe: !!msg.key.fromMe, role: msg.key.fromMe ? "assistant" : "user", timestamp, status, content, quotedMessage: mapQuotedMessage(msg.contextInfo), isRevoked: msg.messageStubType === "REVOKE", isEdited: false, }; const isInbound = !messageData.fromMe; const isNewMessage = !existing; if (isInbound && isNewMessage) { const shouldPersist = await ctx.runQuery( api.db.autoResponderRules.shouldSave, { instanceId: args.instanceId, chatJid: messageData.chatJid, contentType: messageData.content?.type, fromMe: messageData.fromMe, } ); if (!shouldPersist) { console.info( "[messages.handleUpsert] Skipping persistence per autoresponder save rules", { instanceId: args.instanceId, chatJid: messageData.chatJid, messageId: evolutionMessageId, contentType: messageData.content?.type, } ); continue; } } const needsCache = messageData.content.type === "media"; // --- Branch for Existing Messages (Updates) --- if (existing) { const wasEdited = !contentEquals(existing.content, messageData.content); await ctx.db.patch(existing._id, { chatJid: messageData.chatJid, authorJid: messageData.authorJid, fromMe: messageData.fromMe, role: messageData.role, timestamp: messageData.timestamp, status: messageData.status, content: messageData.content, quotedMessage: messageData.quotedMessage, isRevoked: messageData.isRevoked, isEdited: wasEdited || existing.isEdited === true, }); // Update chat last activity const chatDoc = await ctx.db .query("chats") .withIndex("by_instance_and_jid", (q) => q.eq("instanceId", args.instanceId).eq("jid", messageData.chatJid) ) .first(); if (chatDoc && messageData.timestamp > (chatDoc.lastMessageTimestamp ?? 0)) { await ctx.db.patch(chatDoc._id, { lastMessageTimestamp: messageData.timestamp, }); } } // --- Branch for New Messages --- else { const insertedId = await ctx.db.insert("messages", messageData); // Update chat last activity const chatDoc = await ctx.db .query("chats") .withIndex("by_instance_and_jid", (q) => q.eq("instanceId", args.instanceId).eq("jid", messageData.chatJid) ) .first(); if (chatDoc && messageData.timestamp > (chatDoc.lastMessageTimestamp ?? 0)) { await ctx.db.patch(chatDoc._id, { lastMessageTimestamp: messageData.timestamp, }); } console.log("Got here stilll handle upsert before checking from me or inbound") // --- Logic for scheduling actions for new messages --- if (!messageData.fromMe) { if (needsCache) { // For INBOUND MEDIA, schedule the orchestrator action which handles the sequence. await ctx.scheduler.runAfter(0, internal.db.messages.processInboundMedia, { instanceName: instance.instanceName, payload: msg, messageId: insertedId, instanceId: args.instanceId, organizationId: args.organizationId, chatJid: messageData.chatJid, inboundMessageId: evolutionMessageId, }); } else { // For INBOUND TEXT (or other non-media), schedule onInbound directly. console.log("scheduling onInbound for message", insertedId) await ctx.scheduler.runAfter(0, api.actions.assist.onInbound, { instanceId: args.instanceId, chatJid: messageData.chatJid, }); } } else if (needsCache) { // This is an OUTGOING MEDIA message. Just schedule the caching. await ctx.scheduler.runAfter( 0, internal.actions.messages.cacheMediaForMessage, { instanceName: instance.instanceName, messageId: insertedId, payload: msg, instanceId: args.instanceId, organizationId: args.organizationId, } ); } } } catch (e) { console.error("Error processing a single message in handleUpsert:", e); continue; // Move to the next message on error } } } catch (error) { console.error("Fatal error in handleUpsert:", error); // Re-throwing allows the caller to know the entire operation failed. throw error; } }, });