diff options
Diffstat (limited to 'shim/ws-shim/src/server.ts')
-rw-r--r-- | shim/ws-shim/src/server.ts | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/shim/ws-shim/src/server.ts b/shim/ws-shim/src/server.ts new file mode 100644 index 0000000..0b807aa --- /dev/null +++ b/shim/ws-shim/src/server.ts @@ -0,0 +1,157 @@ +import EventEmitter from "events"; +import Urbit from "urbit-http"; + +const SHIP_URL = "http://localhost:8080"; +const api = new Urbit(SHIP_URL, ""); +let sub: number; +// const emitter = new EventEmitter(); + +// //github.com/oven-sh/bun/issues/13811 +// function sse(req: Request, channel: string): Response { +// const stream = new ReadableStream({ +// type: "direct", +// pull(controller: ReadableStreamDirectController) { +// let id = +(req.headers.get("last-event-id") ?? 1); +// const handler = async (event: string, data: unknown): Promise<void> => { +// await controller.write(`id:${id}\n`); +// await controller.write(`event:${event}\n`); +// if (data) await controller.write(`data:${JSON.stringify(data)}\n\n`); +// await controller.flush(); +// id++; +// emitter.on(channel, handler); +// if (req.signal.aborted) { +// emitter.off(channel, handler); +// controller.close(); +// } +// }; +// return new Promise(() => void 0); +// }, +// }); +// return new Response(stream, { +// status: 200, +// headers: { "Content-Type": "text/event-stream" }, +// }); +// } +function emit(channel: string, url: string, event?: any): void { + // emitter.emit(channel, event, data); + const body = JSON.stringify({ event, relay: url }); + fetch(SHIP_URL + "/nostr-shim", { + method: "PUT", + headers: { "Content-type": "application/json" }, + body, + }); +} +const sockets: Map<string, Relay> = new Map(); + +const server = Bun.serve({ + //http + routes: { + "/shim": async (req: Request) => { + const data = (await req.json()) as ShimRequest; + console.log("request data", data); + if ("get" in data) { + for (const req of data.get) { + startWSClient(req.relay, req.filters); + } + } + if ("post" in data) { + const ok = validate(data.post.event); + if (!ok) return; + for (const relay of data.post.relays) { + const socket = sockets.get(relay); + if (socket) socket.publishEvent(data.post.event); + else { + await startWSClient(relay, []); + + const socket = sockets.get(relay); + if (socket) socket.publishEvent(data.post.event); + else console.error("wtf man"); + } + } + } + // server.publish("shim", data); + // return sse(req, "all"); + return new Response("OK"); + }, + }, + fetch(req, server) { + const upgraded = server.upgrade(req, { data: { createdAt: Date.now() } }); + if (upgraded) return undefined; + return new Response("henlo"); + }, + websocket: { + // Maximum message size (in bytes) + maxPayloadLength: 64 * 1024, + + // Backpressure limit before messages are dropped + backpressureLimit: 1024 * 1024, + + // Close connection if backpressure limit is hit + closeOnBackpressureLimit: true, + + // Handler called when backpressure is relieved + drain(ws) { + console.log("Backpressure relieved"); + }, + + // Enable per-message deflate compression + perMessageDeflate: { + compress: true, + decompress: true, + }, + + // Send ping frames to keep connection alive + sendPings: true, + + // Handlers for ping/pong frames + ping(ws, data) { + console.log("Received ping"); + }, + pong(ws, data) { + console.log("Received pong"); + }, + + // Whether server receives its own published messages + publishToSelf: false, + // handlers + async open(ws) { + // + // ws.subscribe("shim"); + // console.log(ws, "hey someone subscribed here"); + // if (sub) { + // await api.unsubscribe(sub); + // sub = await api.subscribe({ app: "nostril", path: "/ws" }); + // } else sub = await api.subscribe({ app: "nostril", path: "/ws" }); + }, + async close(ws) { + // + // ws.unsubscribe("shim"); + // if (sub) await api.unsubscribe(sub); + }, + async message(ws) { + // api.poke({ app: "nostril", mark: "json", json: { ws: ws.data } }); + // server.publish("chat", "henlo"); + }, + }, + port: 8888, +}); + +import { Relay } from "./client"; +import type { Filter, ShimRequest } from "./types"; +import { validate } from "./nostr"; +async function startWSClient(url: string, filters: Filter[]) { + console.log("connecting to relay..."); + const relay = new Relay(url); + await relay.connect(); + const id = crypto.randomUUID(); + relay.subscribe(id, filters, { + oneose: () => { + console.log("oneose"); + }, + onevent(event) { + console.log("relay event", { url, event }); + emit("all", url, event); + }, + }); + sockets.set(url, relay); +} |