```ts export const listenUsers = () => { const queue = getQueue(QUEUE_NAMES.SUPABASE_QUEUE) let isListening = false const RETRY_DELAY_MS = 2000 const startSupabaseListener = () => { logger.info(`[USERS] Listening to users events...`) if (isListening) { return } isListening = true let subscription = getSupabaseClient() .channel('users') .on('postgres_changes', { event: '*', schema: 'public', table: 'users' }, (event: RealtimePostgresChangesPayload) => { const eventId = v4() const eventHash = createHash('sha256').update(JSON.stringify(event)).digest('hex') logger.info(`[USERS][LISTEN USERS] Received Supabase event, sent to queue (eventId=${eventId}, eventHash=${eventHash})`) queue.add({ ...event, eventId, eventHash }) }) .subscribe((status, error) => { if (error) { logger.error(`[USERS][LISTEN USERS][ERROR] Supabase listener error (status=${status}) : ${error.stack}`) if (status === 'CHANNEL_ERROR' || status === 'TIMED_OUT') { if (subscription) { subscription.unsubscribe() } isListening = false logger.warn(`[USERS][LISTEN USERS] Retrying Supabase listener in ${RETRY_DELAY_MS / 1000}s...`) setTimeout(() => { startSupabaseListener() }, RETRY_DELAY_MS) } } else { logger.info(`[USERS][LISTEN USERS] Supabase listener connected (status=${status})`) if (status === 'CHANNEL_ERROR' || status === 'TIMED_OUT') { if (subscription) { subscription.unsubscribe() } isListening = false logger.warn(`[USERS][LISTEN USERS] Retrying Supabase listener in ${RETRY_DELAY_MS / 1000}s...`) setTimeout(() => { startSupabaseListener() }, RETRY_DELAY_MS) } } }) } startSupabaseListener() } ```