import EventEmitter from "events"; import Urbit from "urbit-http"; const SHIP_URL = "http://localhost:8080"; const api = new Urbit(SHIP_URL, ""); let sub: number; // const emitter = new EventEmitter(); // //github.com/oven-sh/bun/issues/13811 // function sse(req: Request, channel: string): Response { // const stream = new ReadableStream({ // type: "direct", // pull(controller: ReadableStreamDirectController) { // let id = +(req.headers.get("last-event-id") ?? 1); // const handler = async (event: string, data: unknown): Promise => { // await controller.write(`id:${id}\n`); // await controller.write(`event:${event}\n`); // if (data) await controller.write(`data:${JSON.stringify(data)}\n\n`); // await controller.flush(); // id++; // emitter.on(channel, handler); // if (req.signal.aborted) { // emitter.off(channel, handler); // controller.close(); // } // }; // return new Promise(() => void 0); // }, // }); // return new Response(stream, { // status: 200, // headers: { "Content-Type": "text/event-stream" }, // }); // } function emit(channel: string, url: string, event?: any): void { // emitter.emit(channel, event, data); const body = JSON.stringify({ event, relay: url }); fetch(SHIP_URL + "/nostr-shim", { method: "PUT", headers: { "Content-type": "application/json" }, body, }); } const sockets: Map = new Map(); const server = Bun.serve({ //http routes: { "/shim": async (req: Request) => { const data = (await req.json()) as ShimRequest; console.log("request data", data); if ("get" in data) { for (const req of data.get) { startWSClient(req.relay, req.filters); } } if ("post" in data) { const ok = validate(data.post.event); if (!ok) return; for (const relay of data.post.relays) { const socket = sockets.get(relay); if (socket) socket.publishEvent(data.post.event); else { await startWSClient(relay, []); const socket = sockets.get(relay); if (socket) socket.publishEvent(data.post.event); else console.error("wtf man"); } } } // server.publish("shim", data); // return sse(req, "all"); return new Response("OK"); }, }, fetch(req, server) { const upgraded = server.upgrade(req, { data: { createdAt: Date.now() } }); if (upgraded) return undefined; return new Response("henlo"); }, websocket: { // Maximum message size (in bytes) maxPayloadLength: 64 * 1024, // Backpressure limit before messages are dropped backpressureLimit: 1024 * 1024, // Close connection if backpressure limit is hit closeOnBackpressureLimit: true, // Handler called when backpressure is relieved drain(ws) { console.log("Backpressure relieved"); }, // Enable per-message deflate compression perMessageDeflate: { compress: true, decompress: true, }, // Send ping frames to keep connection alive sendPings: true, // Handlers for ping/pong frames ping(ws, data) { console.log("Received ping"); }, pong(ws, data) { console.log("Received pong"); }, // Whether server receives its own published messages publishToSelf: false, // handlers async open(ws) { // // ws.subscribe("shim"); // console.log(ws, "hey someone subscribed here"); // if (sub) { // await api.unsubscribe(sub); // sub = await api.subscribe({ app: "nostril", path: "/ws" }); // } else sub = await api.subscribe({ app: "nostril", path: "/ws" }); }, async close(ws) { // // ws.unsubscribe("shim"); // if (sub) await api.unsubscribe(sub); }, async message(ws) { // api.poke({ app: "nostril", mark: "json", json: { ws: ws.data } }); // server.publish("chat", "henlo"); }, }, port: 8888, }); import { Relay } from "./client"; import type { Filter, ShimRequest } from "./types"; import { validate } from "./nostr"; async function startWSClient(url: string, filters: Filter[]) { console.log("connecting to relay..."); const relay = new Relay(url); await relay.connect(); const id = crypto.randomUUID(); relay.subscribe(id, filters, { oneose: () => { console.log("oneose"); }, onevent(event) { console.log("relay event", { url, event }); emit("all", url, event); }, }); sockets.set(url, relay); }