diff options
author | polwex <polwex@sortug.com> | 2025-09-17 12:24:41 +0700 |
---|---|---|
committer | polwex <polwex@sortug.com> | 2025-09-17 12:24:41 +0700 |
commit | 387af8fc1603805b02ce03f8adba4fa73a954f7c (patch) | |
tree | 6ac4fe9c33a14d9da418a97955a38efb9338d869 /shim/ws-shim/src/server.ts | |
parent | 31a47ce72255bb56920e417d250541b04be82648 (diff) |
relay much more robust
Diffstat (limited to 'shim/ws-shim/src/server.ts')
-rw-r--r-- | shim/ws-shim/src/server.ts | 176 |
1 files changed, 142 insertions, 34 deletions
diff --git a/shim/ws-shim/src/server.ts b/shim/ws-shim/src/server.ts index 0b807aa..f375fc7 100644 --- a/shim/ws-shim/src/server.ts +++ b/shim/ws-shim/src/server.ts @@ -32,9 +32,9 @@ let sub: number; // headers: { "Content-Type": "text/event-stream" }, // }); // } -function emit(channel: string, url: string, event?: any): void { +function emit(res: ShimResponse): void { // emitter.emit(channel, event, data); - const body = JSON.stringify({ event, relay: url }); + const body = JSON.stringify({ ws: res }); fetch(SHIP_URL + "/nostr-shim", { method: "PUT", headers: { "Content-type": "application/json" }, @@ -42,36 +42,24 @@ function emit(channel: string, url: string, event?: any): void { }); } const sockets: Map<string, Relay> = new Map(); +const unreliable: Set<string> = new Set(); const server = Bun.serve({ //http routes: { "/shim": async (req: Request) => { const data = (await req.json()) as ShimRequest; - console.log("request data", data); - if ("get" in data) { - for (const req of data.get) { - startWSClient(req.relay, req.filters); + console.log({ data }); + if ("ws" in data) { + console.log("request from urbit"); + console.dir(data, { depth: null }); + for (const relay of data.ws.relays) { + handleShimRequest(relay, data.ws.req); } + return new Response("OK"); + } else { + return oneOffClient(data.http); } - if ("post" in data) { - const ok = validate(data.post.event); - if (!ok) return; - for (const relay of data.post.relays) { - const socket = sockets.get(relay); - if (socket) socket.publishEvent(data.post.event); - else { - await startWSClient(relay, []); - - const socket = sockets.get(relay); - if (socket) socket.publishEvent(data.post.event); - else console.error("wtf man"); - } - } - } - // server.publish("shim", data); - // return sse(req, "all"); - return new Response("OK"); }, }, fetch(req, server) { @@ -136,22 +124,142 @@ const server = Bun.serve({ port: 8888, }); +export async function handleShimRequest(url: string, req: ClientMessage) { + switch (req[0]) { + case "REQ": { + const [, subscription_id, ...filters] = req; + await startWSSub(url, filters, subscription_id); + break; + } + case "EVENT": { + const [, event] = req; + const ok = validate(event); + if (!ok) break; + const socket = await getClient(url); + socket.publishEvent(event); + // socket.publishEvent(data.post.event); + // socket.disconnect(); + // sockets.delete(relay); + break; + } + case "AUTH": { + const [, event] = req; + const socket = await getClient(url); + socket.publishEvent(event); + break; + } + case "CLOSE": { + const [, subscriptionId] = req; + const socket = await getClient(url); + socket.unsubscribe(subscriptionId); + break; + } + } +} + import { Relay } from "./client"; -import type { Filter, ShimRequest } from "./types"; +import type { + ClientMessage, + Filter, + RelayMessage, + ShimHttpRequest, + ShimRequest, + ShimResponse, +} from "./types"; import { validate } from "./nostr"; -async function startWSClient(url: string, filters: Filter[]) { - console.log("connecting to relay..."); - const relay = new Relay(url); +import { parseRelayMsg, wait } from "./lib"; + +async function oneOffClient(req: ShimHttpRequest) { + const { relay: url, delay, subscription_id: subid, filters } = req; + const msgs: ShimResponse["msg"][] = []; + const relay = new Relay(url, (raw) => { + const msg = parseRelayMsg(raw); + msgs.push(msg); + }); + relay.onerror = (error) => { + console.log(url); + console.error("ws error", error); + }; + relay.ondisconnect = () => { + console.error(url, "relay disconnected"); + }; + relay.onfailure = () => { + console.error(url, "relay failed"); + unreliable.add(url); + }; + relay.onnotice = (notice) => { + console.error("on notice", notice); + // emit({ relay: url, msg: { notice } }); + }; + relay.onconnect = () => { + console.log("relay connected", url); + }; await relay.connect(); - const id = crypto.randomUUID(); - relay.subscribe(id, filters, { + for (const f of filters) { + console.dir(f, { depth: null }); + } + relay.subscribe(subid, filters, { oneose: () => { console.log("oneose"); }, - onevent(event) { - console.log("relay event", { url, event }); - emit("all", url, event); + // onevent(event) { + // console.log("relay event", { url, event }); + // }, + }); + await wait(delay); + relay.disconnect(); + return Response.json({ http: msgs }); +} + +async function getClient(url: string): Promise<Relay> { + const socket = sockets.get(url); + if (socket) { + console.log("socket present", socket.status); + if (socket.status !== "connected") await socket.connect(); + return socket; + } else { + const relay = new Relay(url, (raw) => { + const msg = parseRelayMsg(raw); + // console.log("emitting msg", msg); + emit({ relay: url, msg }); + }); + relay.onerror = (error) => { + console.log(url); + console.error("ws error", error); + emit({ relay: url, msg: { error: `${error}` } }); + }; + relay.ondisconnect = () => { + console.error(url, "relay disconnected"); + }; + relay.onfailure = () => { + console.error(url, "relay failed"); + unreliable.add(url); + emit({ relay: url, msg: { error: "failed to connect" } }); + }; + relay.onnotice = (notice) => { + console.error("on notice", notice); + emit({ relay: url, msg: { notice } }); + }; + relay.onconnect = () => { + console.log("relay connected", url); + }; + await relay.connect(); + sockets.set(url, relay); + return relay; + } +} +async function startWSSub(url: string, filters: Filter[], subId?: string) { + if (unreliable.has(url)) return; // just ignore it + + const relay = await getClient(url); + console.log("subscribing", filters); + const subid = subId ? subId : crypto.randomUUID(); + relay.subscribe(subid, filters, { + oneose: () => { + console.log("oneose"); }, + // onevent(event) { + // console.log("relay event", { url, event }); + // }, }); - sockets.set(url, relay); } |