summaryrefslogtreecommitdiff
path: root/shim/ws-shim/src/server.ts
diff options
context:
space:
mode:
authorpolwex <polwex@sortug.com>2025-09-17 12:24:41 +0700
committerpolwex <polwex@sortug.com>2025-09-17 12:24:41 +0700
commit387af8fc1603805b02ce03f8adba4fa73a954f7c (patch)
tree6ac4fe9c33a14d9da418a97955a38efb9338d869 /shim/ws-shim/src/server.ts
parent31a47ce72255bb56920e417d250541b04be82648 (diff)
relay much more robust
Diffstat (limited to 'shim/ws-shim/src/server.ts')
-rw-r--r--shim/ws-shim/src/server.ts176
1 files changed, 142 insertions, 34 deletions
diff --git a/shim/ws-shim/src/server.ts b/shim/ws-shim/src/server.ts
index 0b807aa..f375fc7 100644
--- a/shim/ws-shim/src/server.ts
+++ b/shim/ws-shim/src/server.ts
@@ -32,9 +32,9 @@ let sub: number;
// headers: { "Content-Type": "text/event-stream" },
// });
// }
-function emit(channel: string, url: string, event?: any): void {
+function emit(res: ShimResponse): void {
// emitter.emit(channel, event, data);
- const body = JSON.stringify({ event, relay: url });
+ const body = JSON.stringify({ ws: res });
fetch(SHIP_URL + "/nostr-shim", {
method: "PUT",
headers: { "Content-type": "application/json" },
@@ -42,36 +42,24 @@ function emit(channel: string, url: string, event?: any): void {
});
}
const sockets: Map<string, Relay> = new Map();
+const unreliable: Set<string> = new Set();
const server = Bun.serve({
//http
routes: {
"/shim": async (req: Request) => {
const data = (await req.json()) as ShimRequest;
- console.log("request data", data);
- if ("get" in data) {
- for (const req of data.get) {
- startWSClient(req.relay, req.filters);
+ console.log({ data });
+ if ("ws" in data) {
+ console.log("request from urbit");
+ console.dir(data, { depth: null });
+ for (const relay of data.ws.relays) {
+ handleShimRequest(relay, data.ws.req);
}
+ return new Response("OK");
+ } else {
+ return oneOffClient(data.http);
}
- if ("post" in data) {
- const ok = validate(data.post.event);
- if (!ok) return;
- for (const relay of data.post.relays) {
- const socket = sockets.get(relay);
- if (socket) socket.publishEvent(data.post.event);
- else {
- await startWSClient(relay, []);
-
- const socket = sockets.get(relay);
- if (socket) socket.publishEvent(data.post.event);
- else console.error("wtf man");
- }
- }
- }
- // server.publish("shim", data);
- // return sse(req, "all");
- return new Response("OK");
},
},
fetch(req, server) {
@@ -136,22 +124,142 @@ const server = Bun.serve({
port: 8888,
});
+export async function handleShimRequest(url: string, req: ClientMessage) {
+ switch (req[0]) {
+ case "REQ": {
+ const [, subscription_id, ...filters] = req;
+ await startWSSub(url, filters, subscription_id);
+ break;
+ }
+ case "EVENT": {
+ const [, event] = req;
+ const ok = validate(event);
+ if (!ok) break;
+ const socket = await getClient(url);
+ socket.publishEvent(event);
+ // socket.publishEvent(data.post.event);
+ // socket.disconnect();
+ // sockets.delete(relay);
+ break;
+ }
+ case "AUTH": {
+ const [, event] = req;
+ const socket = await getClient(url);
+ socket.publishEvent(event);
+ break;
+ }
+ case "CLOSE": {
+ const [, subscriptionId] = req;
+ const socket = await getClient(url);
+ socket.unsubscribe(subscriptionId);
+ break;
+ }
+ }
+}
+
import { Relay } from "./client";
-import type { Filter, ShimRequest } from "./types";
+import type {
+ ClientMessage,
+ Filter,
+ RelayMessage,
+ ShimHttpRequest,
+ ShimRequest,
+ ShimResponse,
+} from "./types";
import { validate } from "./nostr";
-async function startWSClient(url: string, filters: Filter[]) {
- console.log("connecting to relay...");
- const relay = new Relay(url);
+import { parseRelayMsg, wait } from "./lib";
+
+async function oneOffClient(req: ShimHttpRequest) {
+ const { relay: url, delay, subscription_id: subid, filters } = req;
+ const msgs: ShimResponse["msg"][] = [];
+ const relay = new Relay(url, (raw) => {
+ const msg = parseRelayMsg(raw);
+ msgs.push(msg);
+ });
+ relay.onerror = (error) => {
+ console.log(url);
+ console.error("ws error", error);
+ };
+ relay.ondisconnect = () => {
+ console.error(url, "relay disconnected");
+ };
+ relay.onfailure = () => {
+ console.error(url, "relay failed");
+ unreliable.add(url);
+ };
+ relay.onnotice = (notice) => {
+ console.error("on notice", notice);
+ // emit({ relay: url, msg: { notice } });
+ };
+ relay.onconnect = () => {
+ console.log("relay connected", url);
+ };
await relay.connect();
- const id = crypto.randomUUID();
- relay.subscribe(id, filters, {
+ for (const f of filters) {
+ console.dir(f, { depth: null });
+ }
+ relay.subscribe(subid, filters, {
oneose: () => {
console.log("oneose");
},
- onevent(event) {
- console.log("relay event", { url, event });
- emit("all", url, event);
+ // onevent(event) {
+ // console.log("relay event", { url, event });
+ // },
+ });
+ await wait(delay);
+ relay.disconnect();
+ return Response.json({ http: msgs });
+}
+
+async function getClient(url: string): Promise<Relay> {
+ const socket = sockets.get(url);
+ if (socket) {
+ console.log("socket present", socket.status);
+ if (socket.status !== "connected") await socket.connect();
+ return socket;
+ } else {
+ const relay = new Relay(url, (raw) => {
+ const msg = parseRelayMsg(raw);
+ // console.log("emitting msg", msg);
+ emit({ relay: url, msg });
+ });
+ relay.onerror = (error) => {
+ console.log(url);
+ console.error("ws error", error);
+ emit({ relay: url, msg: { error: `${error}` } });
+ };
+ relay.ondisconnect = () => {
+ console.error(url, "relay disconnected");
+ };
+ relay.onfailure = () => {
+ console.error(url, "relay failed");
+ unreliable.add(url);
+ emit({ relay: url, msg: { error: "failed to connect" } });
+ };
+ relay.onnotice = (notice) => {
+ console.error("on notice", notice);
+ emit({ relay: url, msg: { notice } });
+ };
+ relay.onconnect = () => {
+ console.log("relay connected", url);
+ };
+ await relay.connect();
+ sockets.set(url, relay);
+ return relay;
+ }
+}
+async function startWSSub(url: string, filters: Filter[], subId?: string) {
+ if (unreliable.has(url)) return; // just ignore it
+
+ const relay = await getClient(url);
+ console.log("subscribing", filters);
+ const subid = subId ? subId : crypto.randomUUID();
+ relay.subscribe(subid, filters, {
+ oneose: () => {
+ console.log("oneose");
},
+ // onevent(event) {
+ // console.log("relay event", { url, event });
+ // },
});
- sockets.set(url, relay);
}