stopPollingDatabase: authedProcedure .input(z.object({ clientDatabaseConnectionId: z.string() })) .mutation(async ({ input, ctx }) => { console.log( "stopPollingDatabase: Starting with connectionId:", input.clientDatabaseConnectionId, ); const clientDatabaseConnection = await ctx.db .selectFrom("clientDatabaseConnection") .selectAll() .where("id", "=", input.clientDatabaseConnectionId) .executeTakeFirst(); console.log( "stopPollingDatabase: Retrieved connection:", !!clientDatabaseConnection, ); if (!clientDatabaseConnection) { console.error( "stopPollingDatabase: Client database connection not found", ); throw new Error("Client database connection not found"); } if (!clientDatabaseConnection.processingWorkerId) { return { status: "nothing to cancel" }; } // send CF worker terminate signal try { const instance = await ctx.env.POLL_DATABASE_WORKFLOW.get( clientDatabaseConnection.processingWorkerId, ); if ((await instance.status()).status === "running") { console.log( `Client database connection ${clientDatabaseConnection.id} is being polled by worker ${clientDatabaseConnection.processingWorkerId}. Sending terminate...`, ); try { await instance.terminate(); } catch (error) { // if error status contains "Not implemented yet" then we can if ( error instanceof Error && error.message.includes("Not implemented yet") ) { console.log( `Want to terminate but can't because .terminate() not implemented yet: ${clientDatabaseConnection.processingWorkerId}.`, ); } else { throw error; } } await ctx.db .insertInto("pollingEvent") .values({ clientDatabaseConnectionId: clientDatabaseConnection.id, polledAt: new Date(), pollingEventType: PollingEventType.CANCEL, pollingWorkerId: clientDatabaseConnection.processingWorkerId, }) .execute(); return { status: `cancelled: ${clientDatabaseConnection.processingWorkerId}`, }; } return { status: `not running: ${clientDatabaseConnection.processingWorkerId}`, }; } catch (error) { if ( error instanceof Error && error.message.includes("instance.not_found") ) { console.log("Previous worker instance not found, nothing to stop."); return { status: `not running: ${clientDatabaseConnection.processingWorkerId}`, }; } else { console.error("Unexpected error checking worker status!:", error); throw error; } } }),