summaryrefslogtreecommitdiff
path: root/shim/ws-shim/src/server.ts
diff options
context:
space:
mode:
Diffstat (limited to 'shim/ws-shim/src/server.ts')
-rw-r--r--shim/ws-shim/src/server.ts157
1 files changed, 157 insertions, 0 deletions
diff --git a/shim/ws-shim/src/server.ts b/shim/ws-shim/src/server.ts
new file mode 100644
index 0000000..0b807aa
--- /dev/null
+++ b/shim/ws-shim/src/server.ts
@@ -0,0 +1,157 @@
+import EventEmitter from "events";
+import Urbit from "urbit-http";
+
+const SHIP_URL = "http://localhost:8080";
+const api = new Urbit(SHIP_URL, "");
+let sub: number;
+// const emitter = new EventEmitter();
+
+// //github.com/oven-sh/bun/issues/13811
+// function sse(req: Request, channel: string): Response {
+// const stream = new ReadableStream({
+// type: "direct",
+// pull(controller: ReadableStreamDirectController) {
+// let id = +(req.headers.get("last-event-id") ?? 1);
+// const handler = async (event: string, data: unknown): Promise<void> => {
+// await controller.write(`id:${id}\n`);
+// await controller.write(`event:${event}\n`);
+// if (data) await controller.write(`data:${JSON.stringify(data)}\n\n`);
+// await controller.flush();
+// id++;
+// emitter.on(channel, handler);
+// if (req.signal.aborted) {
+// emitter.off(channel, handler);
+// controller.close();
+// }
+// };
+// return new Promise(() => void 0);
+// },
+// });
+// return new Response(stream, {
+// status: 200,
+// headers: { "Content-Type": "text/event-stream" },
+// });
+// }
+function emit(channel: string, url: string, event?: any): void {
+ // emitter.emit(channel, event, data);
+ const body = JSON.stringify({ event, relay: url });
+ fetch(SHIP_URL + "/nostr-shim", {
+ method: "PUT",
+ headers: { "Content-type": "application/json" },
+ body,
+ });
+}
+const sockets: Map<string, Relay> = new Map();
+
+const server = Bun.serve({
+ //http
+ routes: {
+ "/shim": async (req: Request) => {
+ const data = (await req.json()) as ShimRequest;
+ console.log("request data", data);
+ if ("get" in data) {
+ for (const req of data.get) {
+ startWSClient(req.relay, req.filters);
+ }
+ }
+ if ("post" in data) {
+ const ok = validate(data.post.event);
+ if (!ok) return;
+ for (const relay of data.post.relays) {
+ const socket = sockets.get(relay);
+ if (socket) socket.publishEvent(data.post.event);
+ else {
+ await startWSClient(relay, []);
+
+ const socket = sockets.get(relay);
+ if (socket) socket.publishEvent(data.post.event);
+ else console.error("wtf man");
+ }
+ }
+ }
+ // server.publish("shim", data);
+ // return sse(req, "all");
+ return new Response("OK");
+ },
+ },
+ fetch(req, server) {
+ const upgraded = server.upgrade(req, { data: { createdAt: Date.now() } });
+ if (upgraded) return undefined;
+ return new Response("henlo");
+ },
+ websocket: {
+ // Maximum message size (in bytes)
+ maxPayloadLength: 64 * 1024,
+
+ // Backpressure limit before messages are dropped
+ backpressureLimit: 1024 * 1024,
+
+ // Close connection if backpressure limit is hit
+ closeOnBackpressureLimit: true,
+
+ // Handler called when backpressure is relieved
+ drain(ws) {
+ console.log("Backpressure relieved");
+ },
+
+ // Enable per-message deflate compression
+ perMessageDeflate: {
+ compress: true,
+ decompress: true,
+ },
+
+ // Send ping frames to keep connection alive
+ sendPings: true,
+
+ // Handlers for ping/pong frames
+ ping(ws, data) {
+ console.log("Received ping");
+ },
+ pong(ws, data) {
+ console.log("Received pong");
+ },
+
+ // Whether server receives its own published messages
+ publishToSelf: false,
+ // handlers
+ async open(ws) {
+ //
+ // ws.subscribe("shim");
+ // console.log(ws, "hey someone subscribed here");
+ // if (sub) {
+ // await api.unsubscribe(sub);
+ // sub = await api.subscribe({ app: "nostril", path: "/ws" });
+ // } else sub = await api.subscribe({ app: "nostril", path: "/ws" });
+ },
+ async close(ws) {
+ //
+ // ws.unsubscribe("shim");
+ // if (sub) await api.unsubscribe(sub);
+ },
+ async message(ws) {
+ // api.poke({ app: "nostril", mark: "json", json: { ws: ws.data } });
+ // server.publish("chat", "henlo");
+ },
+ },
+ port: 8888,
+});
+
+import { Relay } from "./client";
+import type { Filter, ShimRequest } from "./types";
+import { validate } from "./nostr";
+async function startWSClient(url: string, filters: Filter[]) {
+ console.log("connecting to relay...");
+ const relay = new Relay(url);
+ await relay.connect();
+ const id = crypto.randomUUID();
+ relay.subscribe(id, filters, {
+ oneose: () => {
+ console.log("oneose");
+ },
+ onevent(event) {
+ console.log("relay event", { url, event });
+ emit("all", url, event);
+ },
+ });
+ sockets.set(url, relay);
+}