import EventEmitter from "events"; import Urbit from "urbit-http"; const SHIP_URL = "http://localhost:8080"; const api = new Urbit(SHIP_URL, ""); let sub: number; // const emitter = new EventEmitter(); // //github.com/oven-sh/bun/issues/13811 // function sse(req: Request, channel: string): Response { // const stream = new ReadableStream({ // type: "direct", // pull(controller: ReadableStreamDirectController) { // let id = +(req.headers.get("last-event-id") ?? 1); // const handler = async (event: string, data: unknown): Promise => { // await controller.write(`id:${id}\n`); // await controller.write(`event:${event}\n`); // if (data) await controller.write(`data:${JSON.stringify(data)}\n\n`); // await controller.flush(); // id++; // emitter.on(channel, handler); // if (req.signal.aborted) { // emitter.off(channel, handler); // controller.close(); // } // }; // return new Promise(() => void 0); // }, // }); // return new Response(stream, { // status: 200, // headers: { "Content-Type": "text/event-stream" }, // }); // } function emit(res: ShimResponse): void { // emitter.emit(channel, event, data); const body = JSON.stringify({ ws: res }); fetch(SHIP_URL + "/nostr-shim", { method: "PUT", headers: { "Content-type": "application/json" }, body, }); } const sockets: Map = new Map(); const unreliable: Set = new Set(); const server = Bun.serve({ //http routes: { "/shim": async (req: Request) => { const data = (await req.json()) as ShimRequest; console.log({ data }); if ("ws" in data) { console.log("request from urbit"); console.dir(data, { depth: null }); for (const relay of data.ws.relays) { handleShimRequest(relay, data.ws.req); } return new Response("OK"); } else { return oneOffClient(data.http); } }, }, fetch(req, server) { const upgraded = server.upgrade(req, { data: { createdAt: Date.now() } }); if (upgraded) return undefined; return new Response("henlo"); }, websocket: { // Maximum message size (in bytes) maxPayloadLength: 64 * 1024, // Backpressure limit before messages are dropped backpressureLimit: 1024 * 1024, // Close connection if backpressure limit is hit closeOnBackpressureLimit: true, // Handler called when backpressure is relieved drain(ws) { console.log("Backpressure relieved"); }, // Enable per-message deflate compression perMessageDeflate: { compress: true, decompress: true, }, // Send ping frames to keep connection alive sendPings: true, // Handlers for ping/pong frames ping(ws, data) { console.log("Received ping"); }, pong(ws, data) { console.log("Received pong"); }, // Whether server receives its own published messages publishToSelf: false, // handlers async open(ws) { // // ws.subscribe("shim"); // console.log(ws, "hey someone subscribed here"); // if (sub) { // await api.unsubscribe(sub); // sub = await api.subscribe({ app: "nostril", path: "/ws" }); // } else sub = await api.subscribe({ app: "nostril", path: "/ws" }); }, async close(ws) { // // ws.unsubscribe("shim"); // if (sub) await api.unsubscribe(sub); }, async message(ws) { // api.poke({ app: "nostril", mark: "json", json: { ws: ws.data } }); // server.publish("chat", "henlo"); }, }, port: 8888, }); export async function handleShimRequest(url: string, req: ClientMessage) { switch (req[0]) { case "REQ": { const [, subscription_id, ...filters] = req; await startWSSub(url, filters, subscription_id); break; } case "EVENT": { const [, event] = req; const ok = validate(event); if (!ok) break; const socket = await getClient(url); socket.publishEvent(event); // socket.publishEvent(data.post.event); // socket.disconnect(); // sockets.delete(relay); break; } case "AUTH": { const [, event] = req; const socket = await getClient(url); socket.publishEvent(event); break; } case "CLOSE": { const [, subscriptionId] = req; const socket = await getClient(url); socket.unsubscribe(subscriptionId); break; } } } import { Relay } from "./client"; import type { ClientMessage, Filter, RelayMessage, ShimHttpRequest, ShimRequest, ShimResponse, } from "./types"; import { validate } from "./nostr"; import { parseRelayMsg, wait } from "./lib"; async function oneOffClient(req: ShimHttpRequest) { const { relay: url, delay, subscription_id: subid, filters } = req; const msgs: ShimResponse["msg"][] = []; const relay = new Relay(url, (raw) => { const msg = parseRelayMsg(raw); msgs.push(msg); }); relay.onerror = (error) => { console.log(url); console.error("ws error", error); }; relay.ondisconnect = () => { console.error(url, "relay disconnected"); }; relay.onfailure = () => { console.error(url, "relay failed"); unreliable.add(url); }; relay.onnotice = (notice) => { console.error("on notice", notice); // emit({ relay: url, msg: { notice } }); }; relay.onconnect = () => { console.log("relay connected", url); }; await relay.connect(); for (const f of filters) { console.dir(f, { depth: null }); } relay.subscribe(subid, filters, { oneose: () => { console.log("oneose"); }, // onevent(event) { // console.log("relay event", { url, event }); // }, }); await wait(delay); relay.disconnect(); return Response.json({ http: msgs }); } async function getClient(url: string): Promise { const socket = sockets.get(url); if (socket) { console.log("socket present", socket.status); if (socket.status !== "connected") await socket.connect(); return socket; } else { const relay = new Relay(url, (raw) => { const msg = parseRelayMsg(raw); // console.log("emitting msg", msg); emit({ relay: url, msg }); }); relay.onerror = (error) => { console.log(url); console.error("ws error", error); emit({ relay: url, msg: { error: `${error}` } }); }; relay.ondisconnect = () => { console.error(url, "relay disconnected"); }; relay.onfailure = () => { console.error(url, "relay failed"); unreliable.add(url); emit({ relay: url, msg: { error: "failed to connect" } }); }; relay.onnotice = (notice) => { console.error("on notice", notice); emit({ relay: url, msg: { notice } }); }; relay.onconnect = () => { console.log("relay connected", url); }; await relay.connect(); sockets.set(url, relay); return relay; } } async function startWSSub(url: string, filters: Filter[], subId?: string) { if (unreliable.has(url)) return; // just ignore it const relay = await getClient(url); console.log("subscribing", filters); const subid = subId ? subId : crypto.randomUUID(); relay.subscribe(subid, filters, { oneose: () => { console.log("oneose"); }, // onevent(event) { // console.log("relay event", { url, event }); // }, }); }