diff options
author | polwex <polwex@sortug.com> | 2025-09-11 01:50:29 +0700 |
---|---|---|
committer | polwex <polwex@sortug.com> | 2025-09-11 01:50:29 +0700 |
commit | 91b15ad49092c314dd6d3483aec47f0be7a37506 (patch) | |
tree | 9a0b040ed6e1c2793e4f9fc269a5d6118b16e453 /shim/ws-shim/src | |
parent | b1d68ac307ed87d63e83820cbdf843fff0fd9f7f (diff) |
ihategit
Diffstat (limited to 'shim/ws-shim/src')
-rw-r--r-- | shim/ws-shim/src/client.ts | 209 | ||||
-rw-r--r-- | shim/ws-shim/src/nostr.ts | 24 | ||||
-rw-r--r-- | shim/ws-shim/src/server.ts | 157 | ||||
-rw-r--r-- | shim/ws-shim/src/test.ts | 44 | ||||
-rw-r--r-- | shim/ws-shim/src/types.ts | 77 |
5 files changed, 511 insertions, 0 deletions
diff --git a/shim/ws-shim/src/client.ts b/shim/ws-shim/src/client.ts new file mode 100644 index 0000000..b7df3e9 --- /dev/null +++ b/shim/ws-shim/src/client.ts @@ -0,0 +1,209 @@ +import type { + NostrEvent, + ClientMessage, + RelayMessage, + Filter, + Subscription, +} from "./types"; + +export class Relay { + private url: string; + private ws: WebSocket | null = null; + private subscriptions: Map<string, Subscription> = new Map(); + private messageQueue: ClientMessage[] = []; + private reconnectTimer: Timer | null = null; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private reconnectDelay = 1000; + + public status: "connecting" | "connected" | "disconnected" | "error" = + "disconnected"; + + public onconnect?: () => void; + public ondisconnect?: () => void; + public onerror?: (error: Error) => void; + public onnotice?: (message: string) => void; + + constructor(url: string) { + this.url = url; + } + + async connect(): Promise<void> { + return new Promise((resolve, reject) => { + if (this.ws?.readyState === WebSocket.OPEN) { + resolve(); + return; + } + + this.status = "connecting"; + this.ws = new WebSocket(this.url); + + this.ws.onopen = () => { + this.status = "connected"; + this.reconnectAttempts = 0; + this.flushMessageQueue(); + this.onconnect?.(); + resolve(); + }; + + this.ws.onclose = () => { + this.status = "disconnected"; + this.ondisconnect?.(); + this.attemptReconnect(); + }; + + this.ws.onerror = (event) => { + this.status = "error"; + const error = new Error(`WebSocket error: ${event.type}`); + this.onerror?.(error); + reject(error); + }; + + this.ws.onmessage = (event) => { + this.handleMessage(event.data); + }; + }); + } + + disconnect(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + if (this.ws) { + this.ws.close(); + this.ws = null; + } + + this.status = "disconnected"; + this.subscriptions.clear(); + this.messageQueue = []; + } + + private attemptReconnect(): void { + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + this.status = "error"; + this.onerror?.(new Error("Max reconnection attempts reached")); + return; + } + + this.reconnectAttempts++; + const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1); + + this.reconnectTimer = setTimeout(() => { + this.connect().catch((error) => { + console.error("Reconnection failed:", error); + }); + }, delay); + } + + private flushMessageQueue(): void { + while (this.messageQueue.length > 0) { + const message = this.messageQueue.shift(); + if (message) { + this.send(message); + } + } + } + + private send(message: ClientMessage): void { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(message)); + } else { + this.messageQueue.push(message); + } + } + + private handleMessage(data: string): void { + try { + const message = JSON.parse(data) as RelayMessage; + + switch (message[0]) { + case "EVENT": { + const [, subscriptionId, event] = message; + const subscription = this.subscriptions.get(subscriptionId); + + if (subscription) { + subscription.onevent?.(event); + } + break; + } + + case "OK": { + const [, eventId, success, messag] = message; + if (!success) { + console.error(`Event ${eventId} rejected: ${messag}`); + } + break; + } + + case "EOSE": { + const [, subscriptionId] = message; + const subscription = this.subscriptions.get(subscriptionId); + subscription?.oneose?.(); + break; + } + + case "CLOSED": { + const [, subscriptionId, messag] = message; + this.subscriptions.delete(subscriptionId); + console.log(`Subscription ${subscriptionId} closed: ${messag}`); + break; + } + + case "NOTICE": { + const [, messag] = message; + this.onnotice?.(messag); + break; + } + + case "AUTH": { + console.warn("AUTH not implemented"); + break; + } + } + } catch (error) { + console.error("Failed to handle message:", error); + } + } + + publishEvent(event: NostrEvent): void { + this.send(["EVENT", event]); + } + + subscribe( + id: string, + filters: Filter[], + handlers: { + onevent?: (event: NostrEvent) => void; + oneose?: () => void; + }, + ): () => void { + const subscription: Subscription = { + id, + filters, + ...handlers, + }; + + this.subscriptions.set(id, subscription); + this.send(["REQ", id, ...filters]); + + return () => { + this.unsubscribe(id); + }; + } + + unsubscribe(id: string): void { + this.subscriptions.delete(id); + this.send(["CLOSE", id]); + } + + getStatus(): string { + return this.status; + } + + getUrl(): string { + return this.url; + } +} diff --git a/shim/ws-shim/src/nostr.ts b/shim/ws-shim/src/nostr.ts new file mode 100644 index 0000000..0b084b6 --- /dev/null +++ b/shim/ws-shim/src/nostr.ts @@ -0,0 +1,24 @@ +import { finalizeEvent, validateEvent, verifyEvent } from "nostr-tools"; +import type { NostrEvent } from "./types"; +import { hexToBytes } from "nostr-tools/utils"; + +export function validate(event: NostrEvent) { + console.log("constructing event in js"); + const priv = + "d862c25aacfae2f66380448eafdeefeccb970a382f2ff185f3e0c5a538d60e35"; + const sk = hexToBytes(priv); + const raw = { + kind: event.kind, + created_at: event.created_at, + tags: event.tags, + content: event.content, + }; + const ev = finalizeEvent(raw, sk); + console.log("js event", ev); + console.log("validating my event", event); + const ok = validateEvent(event); + console.log("is valid?", ok); + const ok2 = verifyEvent(event); + console.log("is verified?", ok2); + return ok && ok2; +} diff --git a/shim/ws-shim/src/server.ts b/shim/ws-shim/src/server.ts new file mode 100644 index 0000000..0b807aa --- /dev/null +++ b/shim/ws-shim/src/server.ts @@ -0,0 +1,157 @@ +import EventEmitter from "events"; +import Urbit from "urbit-http"; + +const SHIP_URL = "http://localhost:8080"; +const api = new Urbit(SHIP_URL, ""); +let sub: number; +// const emitter = new EventEmitter(); + +// //github.com/oven-sh/bun/issues/13811 +// function sse(req: Request, channel: string): Response { +// const stream = new ReadableStream({ +// type: "direct", +// pull(controller: ReadableStreamDirectController) { +// let id = +(req.headers.get("last-event-id") ?? 1); +// const handler = async (event: string, data: unknown): Promise<void> => { +// await controller.write(`id:${id}\n`); +// await controller.write(`event:${event}\n`); +// if (data) await controller.write(`data:${JSON.stringify(data)}\n\n`); +// await controller.flush(); +// id++; +// emitter.on(channel, handler); +// if (req.signal.aborted) { +// emitter.off(channel, handler); +// controller.close(); +// } +// }; +// return new Promise(() => void 0); +// }, +// }); +// return new Response(stream, { +// status: 200, +// headers: { "Content-Type": "text/event-stream" }, +// }); +// } +function emit(channel: string, url: string, event?: any): void { + // emitter.emit(channel, event, data); + const body = JSON.stringify({ event, relay: url }); + fetch(SHIP_URL + "/nostr-shim", { + method: "PUT", + headers: { "Content-type": "application/json" }, + body, + }); +} +const sockets: Map<string, Relay> = new Map(); + +const server = Bun.serve({ + //http + routes: { + "/shim": async (req: Request) => { + const data = (await req.json()) as ShimRequest; + console.log("request data", data); + if ("get" in data) { + for (const req of data.get) { + startWSClient(req.relay, req.filters); + } + } + if ("post" in data) { + const ok = validate(data.post.event); + if (!ok) return; + for (const relay of data.post.relays) { + const socket = sockets.get(relay); + if (socket) socket.publishEvent(data.post.event); + else { + await startWSClient(relay, []); + + const socket = sockets.get(relay); + if (socket) socket.publishEvent(data.post.event); + else console.error("wtf man"); + } + } + } + // server.publish("shim", data); + // return sse(req, "all"); + return new Response("OK"); + }, + }, + fetch(req, server) { + const upgraded = server.upgrade(req, { data: { createdAt: Date.now() } }); + if (upgraded) return undefined; + return new Response("henlo"); + }, + websocket: { + // Maximum message size (in bytes) + maxPayloadLength: 64 * 1024, + + // Backpressure limit before messages are dropped + backpressureLimit: 1024 * 1024, + + // Close connection if backpressure limit is hit + closeOnBackpressureLimit: true, + + // Handler called when backpressure is relieved + drain(ws) { + console.log("Backpressure relieved"); + }, + + // Enable per-message deflate compression + perMessageDeflate: { + compress: true, + decompress: true, + }, + + // Send ping frames to keep connection alive + sendPings: true, + + // Handlers for ping/pong frames + ping(ws, data) { + console.log("Received ping"); + }, + pong(ws, data) { + console.log("Received pong"); + }, + + // Whether server receives its own published messages + publishToSelf: false, + // handlers + async open(ws) { + // + // ws.subscribe("shim"); + // console.log(ws, "hey someone subscribed here"); + // if (sub) { + // await api.unsubscribe(sub); + // sub = await api.subscribe({ app: "nostril", path: "/ws" }); + // } else sub = await api.subscribe({ app: "nostril", path: "/ws" }); + }, + async close(ws) { + // + // ws.unsubscribe("shim"); + // if (sub) await api.unsubscribe(sub); + }, + async message(ws) { + // api.poke({ app: "nostril", mark: "json", json: { ws: ws.data } }); + // server.publish("chat", "henlo"); + }, + }, + port: 8888, +}); + +import { Relay } from "./client"; +import type { Filter, ShimRequest } from "./types"; +import { validate } from "./nostr"; +async function startWSClient(url: string, filters: Filter[]) { + console.log("connecting to relay..."); + const relay = new Relay(url); + await relay.connect(); + const id = crypto.randomUUID(); + relay.subscribe(id, filters, { + oneose: () => { + console.log("oneose"); + }, + onevent(event) { + console.log("relay event", { url, event }); + emit("all", url, event); + }, + }); + sockets.set(url, relay); +} diff --git a/shim/ws-shim/src/test.ts b/shim/ws-shim/src/test.ts new file mode 100644 index 0000000..fb87555 --- /dev/null +++ b/shim/ws-shim/src/test.ts @@ -0,0 +1,44 @@ +import { Relay } from "./client"; +const ids = [ + "1a4f2d987384a33753e777138586b1f9b3b62eb0f6e54ca1cdb42859de5625bc", +]; +async function wsClient(url: string) { + console.log("connecting to relae", url); + const relay = new Relay(url); + await relay.connect(); + const id = crypto.randomUUID(); + relay.subscribe(id, [{ ids, limit: 50 }], { + oneose: () => { + console.log("oneose"); + }, + onevent(event) { + console.log("relay event", { url, event }); + }, + }); + // const socket = new WebSocket(url); + // socket.addEventListener("open", (event) => { + // // + // console.log("socket client open", event); + // }); + // socket.addEventListener("close", (event) => { + // // + // console.log("socket client close", event); + // }); + // socket.addEventListener("error", (event) => { + // // + // console.log("socket client error", event); + // }); + // socket.addEventListener("message", (event) => { + // // + // console.log("socket client msg", event); + // }); + // return socket; +} + +const relays = ["wss://nos.lol", "wss://relay.damus.io"]; + +async function run() { + console.log("wth"); + await wsClient(relays[0]!); +} +run(); diff --git a/shim/ws-shim/src/types.ts b/shim/ws-shim/src/types.ts new file mode 100644 index 0000000..4063772 --- /dev/null +++ b/shim/ws-shim/src/types.ts @@ -0,0 +1,77 @@ +// Shim types +export type ShimRequest = { + get: Array<{ relay: string; filters: Filter[] }>; + post: { event: NostrEvent; relays: string[] }; +}; +// NOSTR official +export interface NostrEvent { + id: string; + pubkey: string; + created_at: number; + kind: number; + tags: string[][]; + content: string; + sig: string; +} + +export interface UnsignedEvent { + pubkey: string; + created_at: number; + kind: number; + tags: string[][]; + content: string; +} + +export enum EventKind { + Metadata = 0, + TextNote = 1, + RecommendRelay = 2, + Contacts = 3, + EncryptedDirectMessage = 4, + EventDeletion = 5, + Repost = 6, + Reaction = 7, + BadgeAward = 8, + ChannelCreation = 40, + ChannelMetadata = 41, + ChannelMessage = 42, + ChannelHideMessage = 43, + ChannelMuteUser = 44, +} + +export interface Filter { + ids?: string[]; + authors?: string[]; + kinds?: number[]; + since?: number; + until?: number; + limit?: number; + search?: string; + [key: `#${string}`]: string[]; +} + +export type ClientMessage = + | ["EVENT", NostrEvent] + | ["REQ", string, ...Filter[]] + | ["CLOSE", string]; + +export type RelayMessage = + | ["EVENT", string, NostrEvent] + | ["OK", string, boolean, string] + | ["EOSE", string] + | ["CLOSED", string, string] + | ["NOTICE", string] + | ["AUTH", string]; + +export interface RelayInfo { + url: string; + status: "connecting" | "connected" | "disconnected" | "error"; + ws?: WebSocket; +} + +export interface Subscription { + id: string; + filters: Filter[]; + oneose?: () => void; + onevent?: (event: NostrEvent) => void; +} |