import * as Reactivity from "@effect/experimental/Reactivity"; import { makeCompiler } from "@effect/sql-pg/PgClient"; import * as Client from "@effect/sql/SqlClient"; import type { Connection } from "@effect/sql/SqlConnection"; import { SqlError } from "@effect/sql/SqlError"; import type { Custom, Fragment, Primitive } from "@effect/sql/Statement"; import * as Statement from "@effect/sql/Statement"; import { PGlite, type Results } from "@electric-sql/pglite"; import { Context, Effect, Layer, Stream } from "effect"; /** * @category type ids * @since 1.0.0 */ export const TypeId: unique symbol = Symbol.for("@effect/sql-pg/PgLiteClient"); /** * @category type ids * @since 1.0.0 */ export type TypeId = typeof TypeId; /** * @category models * @since 1.0.0 */ export interface PgLiteClient extends Client.SqlClient { readonly db: PGlite; readonly [TypeId]: TypeId; readonly config: PgLiteClientConfig; readonly json: (_: unknown) => Fragment; readonly array: (_: ReadonlyArray) => Fragment; readonly listen: (channel: string) => Stream.Stream; readonly notify: ( channel: string, payload: string ) => Effect.Effect; } /** * @category custom types * @since 1.0.0 */ export type PgCustom = PgJson | PgArray; /** * @category custom types * @since 1.0.0 */ interface PgJson extends Custom<"PgJson", unknown> {} /** * @category custom types * @since 1.0.0 */ const PgJson = Statement.custom("PgJson"); /** * @category custom types * @since 1.0.0 */ interface PgArray extends Custom<"PgArray", ReadonlyArray> {} /** * @category custom types * @since 1.0.0 */ const PgArray = Statement.custom("PgArray"); /** * @category tags * @since 1.0.0 */ export const PgLiteClient = Context.GenericTag( "@effect/sql-pg/PgLiteClient" ); /** * @category constructors * @since 1.0.0 */ export interface PgLiteClientConfig { readonly path?: string; readonly transformResultNames?: ((str: string) => string) | undefined; readonly transformQueryNames?: ((str: string) => string) | undefined; readonly transformJson?: boolean | undefined; } export const make = (options: PgLiteClientConfig) => Effect.gen(function* (_) { const compiler = makeCompiler( options.transformQueryNames, options.transformJson ); const transformRows = options.transformResultNames ? Statement.defaultTransforms( options.transformResultNames, options.transformJson ).array : undefined; const db = new PGlite(options.path); class ConnectionImpl implements Connection { constructor(private readonly client: PGlite) {} private run(query: Promise>) { return Effect.tryPromise({ try: () => query.then((res) => { console.log(res); return res.rows as any; }), catch: (cause) => new SqlError({ cause, message: "Failed to execute statement" }), }); } execute( sql: string, params: readonly Primitive[], transformRows?: ( row: ReadonlyArray ) => ReadonlyArray ) { console.log(sql, params); return transformRows ? Effect.map( this.run(this.client.query(sql, params as any)), transformRows ) : this.run(this.client.query(sql, params as any)); } executeRaw(sql: string, params: readonly Primitive[]) { console.log(sql, params); return this.run(this.client.query(sql, params as any)); } executeWithoutTransform(sql: string, params: Primitive[]) { console.log("executeWithoutTransform", sql, params); return this.run(this.client.query(sql, params)); } executeValues(sql: string, params: readonly Primitive[]) { console.log("executeValues", sql, params); return this.run(this.client.query(sql, params as any)); } executeUnprepared( sql: string, params: readonly Primitive[], transformRows?: ( row: ReadonlyArray ) => ReadonlyArray ) { console.log(sql, params); return this.execute(sql, params, transformRows); } executeStream( sql: string, params: readonly Primitive[], transformRows?: ( row: ReadonlyArray ) => ReadonlyArray ) { return Stream.fromEffect(this.execute(sql, params, transformRows)).pipe( Stream.flatMap((rows) => Stream.fromIterable(rows)) ); } } // Test connection yield* Effect.acquireRelease( Effect.tryPromise({ try: () => db.sql`select 1`, catch: (cause) => new SqlError({ cause, message: "PgClient: Failed to connect" }), }), () => Effect.promise(() => db.close()) ); return Object.assign( yield* Client.make({ acquirer: Effect.succeed(new ConnectionImpl(db)), transactionAcquirer: Effect.succeed(new ConnectionImpl(db)), compiler, spanAttributes: [ ["db.system", "postgresql"], ["db.name", "pglite"], ], }), { [TypeId]: TypeId as TypeId, db, config: options, json: (_: unknown) => PgJson(_), array: (_: ReadonlyArray) => PgArray(_), listen: (channel: string) => Stream.asyncPush((emit) => Effect.acquireRelease( Effect.tryPromise({ try: () => db.listen(channel, (payload) => emit.single(payload)), catch: (cause) => new SqlError({ cause, message: "Failed to listen" }), }), () => Effect.promise(() => db.unlisten(channel)) ) ), notify: (channel: string, payload: string) => Effect.tryPromise({ try: () => db.sql`NOTIFY ${channel}, ${payload}`, catch: (cause) => new SqlError({ cause, message: "Failed to notify" }), }), } ); }); /** * @category layers * @since 1.0.0 */ export const layer = (config: PgLiteClientConfig) => Layer.scopedContext( Effect.map(make(config), (client) => Context.make(PgLiteClient, client).pipe( Context.add(Client.SqlClient, client) ) ) ).pipe(Layer.provide(Reactivity.layer));