import { NodeRuntime } from "@effect/platform-node" import { Data, Deferred, Effect, Exit, Fiber, Layer, Logger, LogLevel, Option, Ref, Stream } from "effect" import { DevToolsLive } from "./DevTools" type ConnectionState = "connected" | "connecting" | "idle" interface Connection { id: string handleStateChange: (mgs: ConnectionMsg) => Effect.Effect openLatch: Deferred.Deferred } const makeConnection = (id: string) => { return Effect.gen(function*() { const openLatch = yield* Deferred.make() const handleStateChange = (msg: ConnectionMsg) => { return Effect.gen(function*() { yield* Effect.logTrace("handle ", msg) switch (msg.type) { case "onDeviceReady": yield* Deferred.succeed(openLatch, undefined) break case "onError": { yield* Deferred.failSync(openLatch, () => new ConnectionError()) break } default: break } }) } return { id, handleStateChange, openLatch } as Connection }) } interface IConnectionService { connectionStateStream: Stream.Stream connect: (target: string) => Effect.Effect disconnect: () => Effect.Effect } export class ConnectionService extends Effect.Tag("ConnectionService")() { static Mock = Layer.effect( this, Effect.gen(function*() { return { connectionStateStream: Stream.empty, connect: () => Effect.void, disconnect: () => Effect.void } }) ) static make = (effect: Effect.Effect) => Layer.effect(this, effect) } interface IConnectionManager { connection: Ref.Ref> connect: (id: string) => Effect.Effect handleConnectionStateChange: (msg: ConnectionMsg) => Effect.Effect } const makeManager = Effect.gen(function*() { const connection = yield* Ref.make>(Option.none()) const connectionService = yield* ConnectionService // here for testing const handleConnectionStateChange = (msg: ConnectionMsg) => { return Effect.gen(function*() { const currentConnection = yield* Ref.get(connection) if (Option.isSome(currentConnection)) { yield* currentConnection.value.handleStateChange(msg) } }) } // handle all connection events yield* connectionService.connectionStateStream.pipe( Stream.runForEach((msg) => handleConnectionStateChange(msg)), Effect.fork ) return { connection, handleConnectionStateChange, connect: (id: string) => { return Effect.gen(function*() { const currentConnection = yield* Ref.get(connection) if (Option.isSome(currentConnection)) { if (currentConnection.value.id === id) { return currentConnection.value } else { yield* connectionService.disconnect() } } const newConnection = yield* makeConnection(id) yield* Ref.set(connection, Option.some(newConnection)) yield* connectionService.connect(id) yield* Effect.logTrace("manager awaiting connection") /** * THIS WORKS */ /* yield* Deferred.await(newConnection.openLatch) */ /** * THIS SAYS "Fiber terminated with an unhandled error" */ yield* Deferred.await(newConnection.openLatch).pipe( Effect.timeoutFail({ duration: 5000, onTimeout: () => { console.log("timeout") return new ConnectionError() } }) ) yield* Effect.logTrace("did await connection") return newConnection }) } } }) export class ConnectionManager extends Effect.Tag("ConnectionManager")() { static Live = Layer.effect(this, makeManager) } export class ConnectionError extends Data.TaggedError("ConnectionError") {} type ConnectionMsg = { type: "onError" | "onDeviceReady" } const program = Effect.gen(function*() { const manager = yield* ConnectionManager yield* Effect.logTrace("start waiting for connection on a fiber") const connectionFiber = yield* manager.connect("test").pipe( Effect.either, Effect.tap((e) => Effect.logInfo("tapped", e)), Effect.fork ) yield* Effect.yieldNow() yield* Effect.logTrace("handle test state change") yield* manager.handleConnectionStateChange({ type: "onError" }) // get the connection const connection = yield* Fiber.join(connectionFiber) yield* Effect.logTrace("got connection 1", connection) }).pipe(Effect.withSpan("program", { attributes: { source: "Playground" } })) program.pipe( Effect.provide(DevToolsLive), Effect.provide( Layer.merge( Logger.minimumLogLevel(LogLevel.Trace), Layer.provideMerge(ConnectionManager.Live, ConnectionService.Mock) ) ), NodeRuntime.runMain )