summaryrefslogtreecommitdiff
path: root/shim/ws-shim/src
diff options
context:
space:
mode:
authorpolwex <polwex@sortug.com>2025-09-11 01:50:29 +0700
committerpolwex <polwex@sortug.com>2025-09-11 01:50:29 +0700
commit91b15ad49092c314dd6d3483aec47f0be7a37506 (patch)
tree9a0b040ed6e1c2793e4f9fc269a5d6118b16e453 /shim/ws-shim/src
parentb1d68ac307ed87d63e83820cbdf843fff0fd9f7f (diff)
ihategit
Diffstat (limited to 'shim/ws-shim/src')
-rw-r--r--shim/ws-shim/src/client.ts209
-rw-r--r--shim/ws-shim/src/nostr.ts24
-rw-r--r--shim/ws-shim/src/server.ts157
-rw-r--r--shim/ws-shim/src/test.ts44
-rw-r--r--shim/ws-shim/src/types.ts77
5 files changed, 511 insertions, 0 deletions
diff --git a/shim/ws-shim/src/client.ts b/shim/ws-shim/src/client.ts
new file mode 100644
index 0000000..b7df3e9
--- /dev/null
+++ b/shim/ws-shim/src/client.ts
@@ -0,0 +1,209 @@
+import type {
+ NostrEvent,
+ ClientMessage,
+ RelayMessage,
+ Filter,
+ Subscription,
+} from "./types";
+
+export class Relay {
+ private url: string;
+ private ws: WebSocket | null = null;
+ private subscriptions: Map<string, Subscription> = new Map();
+ private messageQueue: ClientMessage[] = [];
+ private reconnectTimer: Timer | null = null;
+ private reconnectAttempts = 0;
+ private maxReconnectAttempts = 5;
+ private reconnectDelay = 1000;
+
+ public status: "connecting" | "connected" | "disconnected" | "error" =
+ "disconnected";
+
+ public onconnect?: () => void;
+ public ondisconnect?: () => void;
+ public onerror?: (error: Error) => void;
+ public onnotice?: (message: string) => void;
+
+ constructor(url: string) {
+ this.url = url;
+ }
+
+ async connect(): Promise<void> {
+ return new Promise((resolve, reject) => {
+ if (this.ws?.readyState === WebSocket.OPEN) {
+ resolve();
+ return;
+ }
+
+ this.status = "connecting";
+ this.ws = new WebSocket(this.url);
+
+ this.ws.onopen = () => {
+ this.status = "connected";
+ this.reconnectAttempts = 0;
+ this.flushMessageQueue();
+ this.onconnect?.();
+ resolve();
+ };
+
+ this.ws.onclose = () => {
+ this.status = "disconnected";
+ this.ondisconnect?.();
+ this.attemptReconnect();
+ };
+
+ this.ws.onerror = (event) => {
+ this.status = "error";
+ const error = new Error(`WebSocket error: ${event.type}`);
+ this.onerror?.(error);
+ reject(error);
+ };
+
+ this.ws.onmessage = (event) => {
+ this.handleMessage(event.data);
+ };
+ });
+ }
+
+ disconnect(): void {
+ if (this.reconnectTimer) {
+ clearTimeout(this.reconnectTimer);
+ this.reconnectTimer = null;
+ }
+
+ if (this.ws) {
+ this.ws.close();
+ this.ws = null;
+ }
+
+ this.status = "disconnected";
+ this.subscriptions.clear();
+ this.messageQueue = [];
+ }
+
+ private attemptReconnect(): void {
+ if (this.reconnectAttempts >= this.maxReconnectAttempts) {
+ this.status = "error";
+ this.onerror?.(new Error("Max reconnection attempts reached"));
+ return;
+ }
+
+ this.reconnectAttempts++;
+ const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
+
+ this.reconnectTimer = setTimeout(() => {
+ this.connect().catch((error) => {
+ console.error("Reconnection failed:", error);
+ });
+ }, delay);
+ }
+
+ private flushMessageQueue(): void {
+ while (this.messageQueue.length > 0) {
+ const message = this.messageQueue.shift();
+ if (message) {
+ this.send(message);
+ }
+ }
+ }
+
+ private send(message: ClientMessage): void {
+ if (this.ws?.readyState === WebSocket.OPEN) {
+ this.ws.send(JSON.stringify(message));
+ } else {
+ this.messageQueue.push(message);
+ }
+ }
+
+ private handleMessage(data: string): void {
+ try {
+ const message = JSON.parse(data) as RelayMessage;
+
+ switch (message[0]) {
+ case "EVENT": {
+ const [, subscriptionId, event] = message;
+ const subscription = this.subscriptions.get(subscriptionId);
+
+ if (subscription) {
+ subscription.onevent?.(event);
+ }
+ break;
+ }
+
+ case "OK": {
+ const [, eventId, success, messag] = message;
+ if (!success) {
+ console.error(`Event ${eventId} rejected: ${messag}`);
+ }
+ break;
+ }
+
+ case "EOSE": {
+ const [, subscriptionId] = message;
+ const subscription = this.subscriptions.get(subscriptionId);
+ subscription?.oneose?.();
+ break;
+ }
+
+ case "CLOSED": {
+ const [, subscriptionId, messag] = message;
+ this.subscriptions.delete(subscriptionId);
+ console.log(`Subscription ${subscriptionId} closed: ${messag}`);
+ break;
+ }
+
+ case "NOTICE": {
+ const [, messag] = message;
+ this.onnotice?.(messag);
+ break;
+ }
+
+ case "AUTH": {
+ console.warn("AUTH not implemented");
+ break;
+ }
+ }
+ } catch (error) {
+ console.error("Failed to handle message:", error);
+ }
+ }
+
+ publishEvent(event: NostrEvent): void {
+ this.send(["EVENT", event]);
+ }
+
+ subscribe(
+ id: string,
+ filters: Filter[],
+ handlers: {
+ onevent?: (event: NostrEvent) => void;
+ oneose?: () => void;
+ },
+ ): () => void {
+ const subscription: Subscription = {
+ id,
+ filters,
+ ...handlers,
+ };
+
+ this.subscriptions.set(id, subscription);
+ this.send(["REQ", id, ...filters]);
+
+ return () => {
+ this.unsubscribe(id);
+ };
+ }
+
+ unsubscribe(id: string): void {
+ this.subscriptions.delete(id);
+ this.send(["CLOSE", id]);
+ }
+
+ getStatus(): string {
+ return this.status;
+ }
+
+ getUrl(): string {
+ return this.url;
+ }
+}
diff --git a/shim/ws-shim/src/nostr.ts b/shim/ws-shim/src/nostr.ts
new file mode 100644
index 0000000..0b084b6
--- /dev/null
+++ b/shim/ws-shim/src/nostr.ts
@@ -0,0 +1,24 @@
+import { finalizeEvent, validateEvent, verifyEvent } from "nostr-tools";
+import type { NostrEvent } from "./types";
+import { hexToBytes } from "nostr-tools/utils";
+
+export function validate(event: NostrEvent) {
+ console.log("constructing event in js");
+ const priv =
+ "d862c25aacfae2f66380448eafdeefeccb970a382f2ff185f3e0c5a538d60e35";
+ const sk = hexToBytes(priv);
+ const raw = {
+ kind: event.kind,
+ created_at: event.created_at,
+ tags: event.tags,
+ content: event.content,
+ };
+ const ev = finalizeEvent(raw, sk);
+ console.log("js event", ev);
+ console.log("validating my event", event);
+ const ok = validateEvent(event);
+ console.log("is valid?", ok);
+ const ok2 = verifyEvent(event);
+ console.log("is verified?", ok2);
+ return ok && ok2;
+}
diff --git a/shim/ws-shim/src/server.ts b/shim/ws-shim/src/server.ts
new file mode 100644
index 0000000..0b807aa
--- /dev/null
+++ b/shim/ws-shim/src/server.ts
@@ -0,0 +1,157 @@
+import EventEmitter from "events";
+import Urbit from "urbit-http";
+
+const SHIP_URL = "http://localhost:8080";
+const api = new Urbit(SHIP_URL, "");
+let sub: number;
+// const emitter = new EventEmitter();
+
+// //github.com/oven-sh/bun/issues/13811
+// function sse(req: Request, channel: string): Response {
+// const stream = new ReadableStream({
+// type: "direct",
+// pull(controller: ReadableStreamDirectController) {
+// let id = +(req.headers.get("last-event-id") ?? 1);
+// const handler = async (event: string, data: unknown): Promise<void> => {
+// await controller.write(`id:${id}\n`);
+// await controller.write(`event:${event}\n`);
+// if (data) await controller.write(`data:${JSON.stringify(data)}\n\n`);
+// await controller.flush();
+// id++;
+// emitter.on(channel, handler);
+// if (req.signal.aborted) {
+// emitter.off(channel, handler);
+// controller.close();
+// }
+// };
+// return new Promise(() => void 0);
+// },
+// });
+// return new Response(stream, {
+// status: 200,
+// headers: { "Content-Type": "text/event-stream" },
+// });
+// }
+function emit(channel: string, url: string, event?: any): void {
+ // emitter.emit(channel, event, data);
+ const body = JSON.stringify({ event, relay: url });
+ fetch(SHIP_URL + "/nostr-shim", {
+ method: "PUT",
+ headers: { "Content-type": "application/json" },
+ body,
+ });
+}
+const sockets: Map<string, Relay> = new Map();
+
+const server = Bun.serve({
+ //http
+ routes: {
+ "/shim": async (req: Request) => {
+ const data = (await req.json()) as ShimRequest;
+ console.log("request data", data);
+ if ("get" in data) {
+ for (const req of data.get) {
+ startWSClient(req.relay, req.filters);
+ }
+ }
+ if ("post" in data) {
+ const ok = validate(data.post.event);
+ if (!ok) return;
+ for (const relay of data.post.relays) {
+ const socket = sockets.get(relay);
+ if (socket) socket.publishEvent(data.post.event);
+ else {
+ await startWSClient(relay, []);
+
+ const socket = sockets.get(relay);
+ if (socket) socket.publishEvent(data.post.event);
+ else console.error("wtf man");
+ }
+ }
+ }
+ // server.publish("shim", data);
+ // return sse(req, "all");
+ return new Response("OK");
+ },
+ },
+ fetch(req, server) {
+ const upgraded = server.upgrade(req, { data: { createdAt: Date.now() } });
+ if (upgraded) return undefined;
+ return new Response("henlo");
+ },
+ websocket: {
+ // Maximum message size (in bytes)
+ maxPayloadLength: 64 * 1024,
+
+ // Backpressure limit before messages are dropped
+ backpressureLimit: 1024 * 1024,
+
+ // Close connection if backpressure limit is hit
+ closeOnBackpressureLimit: true,
+
+ // Handler called when backpressure is relieved
+ drain(ws) {
+ console.log("Backpressure relieved");
+ },
+
+ // Enable per-message deflate compression
+ perMessageDeflate: {
+ compress: true,
+ decompress: true,
+ },
+
+ // Send ping frames to keep connection alive
+ sendPings: true,
+
+ // Handlers for ping/pong frames
+ ping(ws, data) {
+ console.log("Received ping");
+ },
+ pong(ws, data) {
+ console.log("Received pong");
+ },
+
+ // Whether server receives its own published messages
+ publishToSelf: false,
+ // handlers
+ async open(ws) {
+ //
+ // ws.subscribe("shim");
+ // console.log(ws, "hey someone subscribed here");
+ // if (sub) {
+ // await api.unsubscribe(sub);
+ // sub = await api.subscribe({ app: "nostril", path: "/ws" });
+ // } else sub = await api.subscribe({ app: "nostril", path: "/ws" });
+ },
+ async close(ws) {
+ //
+ // ws.unsubscribe("shim");
+ // if (sub) await api.unsubscribe(sub);
+ },
+ async message(ws) {
+ // api.poke({ app: "nostril", mark: "json", json: { ws: ws.data } });
+ // server.publish("chat", "henlo");
+ },
+ },
+ port: 8888,
+});
+
+import { Relay } from "./client";
+import type { Filter, ShimRequest } from "./types";
+import { validate } from "./nostr";
+async function startWSClient(url: string, filters: Filter[]) {
+ console.log("connecting to relay...");
+ const relay = new Relay(url);
+ await relay.connect();
+ const id = crypto.randomUUID();
+ relay.subscribe(id, filters, {
+ oneose: () => {
+ console.log("oneose");
+ },
+ onevent(event) {
+ console.log("relay event", { url, event });
+ emit("all", url, event);
+ },
+ });
+ sockets.set(url, relay);
+}
diff --git a/shim/ws-shim/src/test.ts b/shim/ws-shim/src/test.ts
new file mode 100644
index 0000000..fb87555
--- /dev/null
+++ b/shim/ws-shim/src/test.ts
@@ -0,0 +1,44 @@
+import { Relay } from "./client";
+const ids = [
+ "1a4f2d987384a33753e777138586b1f9b3b62eb0f6e54ca1cdb42859de5625bc",
+];
+async function wsClient(url: string) {
+ console.log("connecting to relae", url);
+ const relay = new Relay(url);
+ await relay.connect();
+ const id = crypto.randomUUID();
+ relay.subscribe(id, [{ ids, limit: 50 }], {
+ oneose: () => {
+ console.log("oneose");
+ },
+ onevent(event) {
+ console.log("relay event", { url, event });
+ },
+ });
+ // const socket = new WebSocket(url);
+ // socket.addEventListener("open", (event) => {
+ // //
+ // console.log("socket client open", event);
+ // });
+ // socket.addEventListener("close", (event) => {
+ // //
+ // console.log("socket client close", event);
+ // });
+ // socket.addEventListener("error", (event) => {
+ // //
+ // console.log("socket client error", event);
+ // });
+ // socket.addEventListener("message", (event) => {
+ // //
+ // console.log("socket client msg", event);
+ // });
+ // return socket;
+}
+
+const relays = ["wss://nos.lol", "wss://relay.damus.io"];
+
+async function run() {
+ console.log("wth");
+ await wsClient(relays[0]!);
+}
+run();
diff --git a/shim/ws-shim/src/types.ts b/shim/ws-shim/src/types.ts
new file mode 100644
index 0000000..4063772
--- /dev/null
+++ b/shim/ws-shim/src/types.ts
@@ -0,0 +1,77 @@
+// Shim types
+export type ShimRequest = {
+ get: Array<{ relay: string; filters: Filter[] }>;
+ post: { event: NostrEvent; relays: string[] };
+};
+// NOSTR official
+export interface NostrEvent {
+ id: string;
+ pubkey: string;
+ created_at: number;
+ kind: number;
+ tags: string[][];
+ content: string;
+ sig: string;
+}
+
+export interface UnsignedEvent {
+ pubkey: string;
+ created_at: number;
+ kind: number;
+ tags: string[][];
+ content: string;
+}
+
+export enum EventKind {
+ Metadata = 0,
+ TextNote = 1,
+ RecommendRelay = 2,
+ Contacts = 3,
+ EncryptedDirectMessage = 4,
+ EventDeletion = 5,
+ Repost = 6,
+ Reaction = 7,
+ BadgeAward = 8,
+ ChannelCreation = 40,
+ ChannelMetadata = 41,
+ ChannelMessage = 42,
+ ChannelHideMessage = 43,
+ ChannelMuteUser = 44,
+}
+
+export interface Filter {
+ ids?: string[];
+ authors?: string[];
+ kinds?: number[];
+ since?: number;
+ until?: number;
+ limit?: number;
+ search?: string;
+ [key: `#${string}`]: string[];
+}
+
+export type ClientMessage =
+ | ["EVENT", NostrEvent]
+ | ["REQ", string, ...Filter[]]
+ | ["CLOSE", string];
+
+export type RelayMessage =
+ | ["EVENT", string, NostrEvent]
+ | ["OK", string, boolean, string]
+ | ["EOSE", string]
+ | ["CLOSED", string, string]
+ | ["NOTICE", string]
+ | ["AUTH", string];
+
+export interface RelayInfo {
+ url: string;
+ status: "connecting" | "connected" | "disconnected" | "error";
+ ws?: WebSocket;
+}
+
+export interface Subscription {
+ id: string;
+ filters: Filter[];
+ oneose?: () => void;
+ onevent?: (event: NostrEvent) => void;
+}