import { v } from "convex/values"; import { Id } from "./_generated/dataModel"; import { internalMutation, internalQuery } from "./_generated/server"; const now = () => Date.now(); /** --------------------------- * Users – small helpers * --------------------------- */ export const getUserById = internalQuery({ args: { userId: v.id("users") }, handler: async ({ db }, { userId }) => { return await db.get(userId); }, }); export const setUserStripeConnect = internalMutation({ args: { userId: v.id("users"), stripeConnectId: v.string(), stripeConnectStatus: v.optional( v.union( v.literal("not_started"), v.literal("requirements_due"), v.literal("pending_verification"), v.literal("active"), v.literal("restricted") ) ), }, handler: async ({ db }, { userId, stripeConnectId, stripeConnectStatus }) => { const user = await db.get(userId); if (!user) throw new Error("User not found"); await db.patch(userId, { // cache field for quick feed checks (optional) // tippingEnabled updated by sync later updatedAt: now(), }); // Upsert creator_payments row const existing = await db .query("creator_payments") .withIndex("by_user", (q) => q.eq("userId", userId)) .first(); if (existing) { await db.patch(existing._id, { stripeConnectId, stripeConnectStatus: stripeConnectStatus ?? existing.stripeConnectStatus, updatedAt: now(), }); return { creatorPaymentsId: existing._id }; } else { const id = await db.insert("creator_payments", { userId, stripeConnectId, payoutsEnabled: false, stripeConnectStatus: stripeConnectStatus ?? "not_started", createdAt: now(), updatedAt: now(), }); return { creatorPaymentsId: id }; } }, }); /** --------------------------- * Creator payments – source of truth * --------------------------- */ export const getCreatorPaymentsByUserId = internalQuery({ args: { userId: v.id("users") }, handler: async ({ db }, { userId }) => { return await db .query("creator_payments") .withIndex("by_user", (q) => q.eq("userId", userId)) .first(); }, }); export const getCreatorPaymentsByConnectId = internalQuery({ args: { stripeConnectId: v.string() }, handler: async ({ db }, { stripeConnectId }) => { return await db .query("creator_payments") .withIndex("by_payouts", (q) => q // use any index; filter afterwards .gte("payoutsEnabled", false as unknown as boolean) // index anchor ) .collect() .then((rows) => rows.find((r) => r.stripeConnectId === stripeConnectId) ?? null); }, }); /** * Sync from Stripe `account.updated` webhooks. * If we don't find a row by `stripeConnectId`, we optionally create one * if `userId` is provided (e.g., you set account.metadata.userId during creation). */ export const syncConnectStatus = internalMutation({ args: { stripeConnectId: v.string(), payoutsEnabled: v.boolean(), stripeOnboardingComplete: v.boolean(), stripeConnectStatus: v.union( v.literal("not_started"), v.literal("requirements_due"), v.literal("pending_verification"), v.literal("active"), v.literal("restricted") ), detailsSubmittedAt: v.optional(v.number()), userId: v.optional(v.id("users")), }, handler: async ( { db }, { stripeConnectId, payoutsEnabled, stripeOnboardingComplete, stripeConnectStatus, detailsSubmittedAt, userId, } ) => { // Find existing mapping let cp = await db .query("creator_payments") .withIndex("by_payouts", (q) => q.gte("payoutsEnabled", false as unknown as boolean)) .collect() .then((rows) => rows.find((r) => r.stripeConnectId === stripeConnectId) ?? null); if (!cp && userId) { // Create if we can associate to a user cp = { _id: await db.insert("creator_payments", { userId, stripeConnectId, payoutsEnabled, stripeConnectStatus, defaultCurrency: undefined, country: undefined, detailsSubmittedAt, createdAt: now(), updatedAt: now(), }), userId, stripeConnectId, payoutsEnabled, stripeConnectStatus, createdAt: now(), updatedAt: now(), } as any; } else if (!cp) { // No mapping and no userId to create—nothing more we can do return { created: false, updated: false, userFlagUpdated: false }; } else { await db.patch(cp._id, { payoutsEnabled, stripeConnectStatus, detailsSubmittedAt, updatedAt: now(), }); } // Update cached flag on users table for fast feed checks const user = await db.get(cp!.userId as Id<"users">); if (user) { await db.patch(cp!.userId as Id<"users">, { tippingEnabled: payoutsEnabled && !!stripeOnboardingComplete, updatedAt: now(), }); } return { created: !cp, updated: true, userFlagUpdated: true }; }, }); /** * Recompute users.tippingEnabled from creator_payments by connect id. * Call this after sync if you want an explicit step. */ export const updateTippingEnabledFromPayments = internalMutation({ args: { stripeConnectId: v.string() }, handler: async ({ db }, { stripeConnectId }) => { const cp = await db .query("creator_payments") .withIndex("by_payouts", (q) => q.gte("payoutsEnabled", false as unknown as boolean)) .collect() .then((rows) => rows.find((r) => r.stripeConnectId === stripeConnectId) ?? null); if (!cp) return { updated: false }; await db.patch(cp.userId as Id<"users">, { tippingEnabled: !!cp.payoutsEnabled, updatedAt: now(), }); return { updated: true }; }, }); /** --------------------------- * Tips – idempotent upserts * --------------------------- */ /** * Create (idempotent) provisional tip row when you create the PI. * If a row with the same paymentIntentId exists, we return it unchanged. */ export const recordTipProvisional = internalMutation({ args: { paymentIntentId: v.string(), // pi_... tipperId: v.optional(v.id("users")), creatorId: v.id("users"), recipeId: v.optional(v.id("recipes")), amount: v.number(), currency: v.string(), platformFee: v.number(), creatorPayout: v.number(), status: v.literal("processing"), createdAt: v.number(), }, handler: async ( { db }, { paymentIntentId, tipperId, creatorId, recipeId, amount, currency, platformFee, creatorPayout, status, createdAt } ) => { // Idempotency by paymentIntentId const existing = await db .query("tips") .withIndex("by_payment_intent_id", (q) => q.eq("paymentIntentId", paymentIntentId)) .first(); if (existing) { return { _id: existing._id, idempotent: true }; } const _id = await db.insert("tips", { paymentIntentId, tipperId: tipperId ?? (undefined as any), creatorId, recipeId: recipeId ?? (undefined as any), amount, currency, platformFee, creatorPayout, status, createdAt, updatedAt: now(), }); return { _id, idempotent: false }; }, }); /** * Finalize a tip on `payment_intent.succeeded`. * Safe to call multiple times; only patches missing fields and status progression. */ export const finalizeTip = internalMutation({ args: { paymentIntentId: v.string(), chargeId: v.optional(v.string()), applicationFeeId: v.optional(v.string()), status: v.literal("succeeded"), updatedAt: v.number(), }, handler: async ({ db }, { paymentIntentId, chargeId, applicationFeeId, status, updatedAt }) => { const tip = await db .query("tips") .withIndex("by_payment_intent_id", (q) => q.eq("paymentIntentId", paymentIntentId)) .first(); if (!tip) { // Upsert (rare: webhook before provisional insert); create a skeleton row await db.insert("tips", { paymentIntentId, chargeId, applicationFeeId, amount: 0, currency: "gbp", platformFee: 0, creatorPayout: 0, creatorId: undefined as any, tipperId: undefined as any, status: "succeeded", createdAt: updatedAt, updatedAt, }); return { created: true, updated: false }; } // If already terminal (refunded/disputed), don't downgrade const terminal = new Set(["refunded", "disputed"]); if (terminal.has(tip.status)) return { created: false, updated: false, reason: "terminal" }; await db.patch(tip._id, { chargeId: tip.chargeId ?? chargeId, applicationFeeId: tip.applicationFeeId ?? applicationFeeId, status: "succeeded", updatedAt, }); return { created: false, updated: true }; }, }); /** * Generic status updater for failed/refunded/disputed. * Refuses to overwrite a more terminal state with a less terminal one. */ export const updateTipStatus = internalMutation({ args: { paymentIntentId: v.string(), status: v.union( v.literal("failed"), v.literal("refunded"), v.literal("disputed"), v.literal("succeeded") ), updatedAt: v.number(), // Optional Stripe refs if you want to attach during refund/dispute chargeId: v.optional(v.string()), applicationFeeId: v.optional(v.string()), }, handler: async ({ db }, { paymentIntentId, status, updatedAt, chargeId, applicationFeeId }) => { const tip = await db .query("tips") .withIndex("by_payment_intent_id", (q) => q.eq("paymentIntentId", paymentIntentId)) .first(); if (!tip) { // Create minimal record to avoid webhook replays failing await db.insert("tips", { paymentIntentId, chargeId, applicationFeeId, amount: 0, currency: "gbp", platformFee: 0, creatorPayout: 0, creatorId: undefined as any, tipperId: undefined as any, status, createdAt: updatedAt, updatedAt, }); return { created: true, updated: false }; } const rank: Record = { processing: 0, failed: 1, succeeded: 2, refunded: 3, disputed: 4, }; // Only move forward in severity; never downgrade if (rank[status] < rank[tip.status]) { return { created: false, updated: false, reason: "downgrade_blocked" }; } await db.patch(tip._id, { status, updatedAt, chargeId: chargeId ?? tip.chargeId, applicationFeeId: applicationFeeId ?? tip.applicationFeeId, }); return { created: false, updated: true }; }, });