summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorpolwex <polwex@sortug.com>2025-10-07 01:18:55 +0700
committerpolwex <polwex@sortug.com>2025-10-07 01:18:55 +0700
commit614e18998ea1db5bccadb170b5926288e57b1c01 (patch)
treec470bb5c02c13871251fed6d89b4536ab6ca0709
parent0955e0ef873782df603b828fba5323f1518e46d3 (diff)
added iris ws support on vere
-rw-r--r--app/ted/ws.hoon20
-rw-r--r--arvo/iris.hoon30
-rw-r--r--vere/pkg/vere/io/cttp.c726
3 files changed, 773 insertions, 3 deletions
diff --git a/app/ted/ws.hoon b/app/ted/ws.hoon
new file mode 100644
index 0000000..b8dd583
--- /dev/null
+++ b/app/ted/ws.hoon
@@ -0,0 +1,20 @@
+/- spider
+/+ strandio
+=, strand=strand:spider
+^- thread:spider
+|= arg=vase
+=/ m (strand ,vase)
+^- form:m
+=/ url=@t (need !<((unit @t) arg))
+;< =bowl:spider bind:m get-bowl:strandio
+=/ desk q.byk.bowl
+=/ =task:iris [%websocket-connect desk url]
+=/ =card:agent:gall [%pass /ws-req %arvo %i task]
+;< ~ bind:m (send-raw-card:strandio card)
+;< res=(pair wire sign-arvo) bind:m take-sign-arvo:strandio
+?. ?=([%iris %websocket-handshake id=@ud url=@t] q.res)
+ (strand-fail:strand %bad-sign ~)
+~& ws-handshake=[id.q.res url.q.res]
+:: ?: ?=([%iris %websocket-response id=@ud e=websocket-event:eyre] q.res)
+=/ data=@t 'done'
+(pure:m !>(data))
diff --git a/arvo/iris.hoon b/arvo/iris.hoon
index ea2a59f..2639843 100644
--- a/arvo/iris.hoon
+++ b/arvo/iris.hoon
@@ -287,6 +287,28 @@
connection-by-id (~(del by connection-by-id.state) id)
connection-by-duct (~(del by connection-by-duct.state) duct.u.con)
==
+ ++ ws-connect
+ |= [desk=term url=@t]
+ ~& iris-ws-connect=[desk url]
+ :: TODO ... the wid comes from vere tho...?
+ =^ id next-id.state [next-id.state +(next-id.state)]
+ :: add a new open session
+ =/ wid id
+ ::
+ =. connection-by-id.state
+ %+ ~(put by connection-by-id.state) id
+ [duct [0 3 ~ ~ 0 ~]]
+ :: keep track of the duct for cancellation
+ ::
+ =. connection-by-duct.state
+ (~(put by connection-by-duct.state) duct id)
+ :- [outbound-duct.state %give %websocket-handshake wid url]~
+ state
+ ++ ws-event
+ |= [id=@ud event=websocket-event:eyre]
+ ~& iris-ws-event=[id event]
+ :- [outbound-duct.state %give %websocket-response id event]~
+ state
--
--
:: end the =~
@@ -360,12 +382,14 @@
%receive
=^ moves state.ax (receive:client +.task)
[moves light-gate]
- :: UIP-125
::
+ :: UIP-125
%websocket-connect
- `light-gate
+ =^ moves state.ax (ws-connect:client +.task)
+ [moves light-gate]
%websocket-event
- `light-gate
+ =^ moves state.ax (ws-event:client +.task)
+ [moves light-gate]
==
:: http-client issues no requests to other vanes
::
diff --git a/vere/pkg/vere/io/cttp.c b/vere/pkg/vere/io/cttp.c
index fa6bb5b..b36a335 100644
--- a/vere/pkg/vere/io/cttp.c
+++ b/vere/pkg/vere/io/cttp.c
@@ -3,8 +3,17 @@
#include "vere.h"
#include "h2o.h"
+#include "h2o/websocket.h"
#include "noun.h"
#include "openssl/ssl.h"
+#include "openssl/sha.h"
+#include "wslay/wslay.h"
+
+#include <ctype.h>
+#include <stdint.h>
+#include <string.h>
+
+#define U3_WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
/* u3_csat: client connection state.
*/
@@ -25,6 +34,16 @@
u3_hbod* dob_u; // entry of body queue
} u3_cres;
+typedef enum {
+ u3_cws_pending = 0,
+ u3_cws_open = 1,
+ u3_cws_closing = 2,
+ u3_cws_closed = 3
+} u3_cwsat;
+
+typedef struct _u3_cttp u3_cttp;
+typedef struct _u3_cws u3_cws;
+
/* u3_creq: outgoing http request.
*/
typedef struct _u3_creq { // client request
@@ -45,17 +64,56 @@
u3_hbod* bur_u; // entry of send queue
h2o_iovec_t* vec_u; // send-buffer array
u3_cres* res_u; // nascent response
+ u3_cws* wsu_u; // websocket session (optional)
struct _u3_creq* nex_u; // next in list
struct _u3_creq* pre_u; // previous in list
struct _u3_cttp* ctp_u; // cttp backpointer
} u3_creq;
+struct _u3_cws {
+ c3_l wid_l; // websocket id
+ u3_cwsat sat_e; // websocket state
+ c3_o sec; // secure (wss)
+ c3_c* hot_c; // host string (nullable)
+ c3_w ipf_w; // ipv4 (numeric)
+ c3_c* ipf_c; // ipv4 string
+ c3_s por_s; // port (numeric)
+ c3_c* por_c; // port string
+ c3_c* url_c; // request url/path
+ c3_c key_c[29]; // sec-websocket-key
+ u3_cttp* ctp_u; // backpointer
+ u3_creq* ceq_u; // pending handshake request
+ h2o_socket_t* sok_u; // underlying socket
+ wslay_event_context_ptr wsl_w; // wslay context
+ struct wslay_event_callbacks wcb_u; // wslay callbacks
+ c3_y* out_y; // pending write buffer
+ struct _u3_cws* nex_u; // next in list
+ struct _u3_cws* pre_u; // prev in list
+};
+
+static void _cttp_ws_close(u3_cws* cws_u, c3_o send_event);
+static void _cttp_ws_proceed(u3_cws* cws_u);
+static c3_o _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url);
+static c3_o _cttp_ws_send_message(u3_cws* cws_u, u3_noun msg);
+static void _cttp_ws_queue_close(u3_cws* cws_u);
+static void _cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c);
+static void _cttp_ws_generate_key(u3_cws* cws_u);
+static void _cttp_ws_compute_accept(const c3_c* key_c, c3_c out_c[29]);
+static void _cttp_ws_read_cb(h2o_socket_t* sock_u, const c3_c* err_c);
+static void _cttp_ws_write_cb(h2o_socket_t* sock_u, const c3_c* err_c);
+static ssize_t _cttp_ws_recv_cb(wslay_event_context_ptr ctx, uint8_t* buf_y, size_t len_w, int flags, void* ves_p);
+static ssize_t _cttp_ws_send_cb(wslay_event_context_ptr ctx, const uint8_t* dat_y, size_t len_w, int flags, void* ves_p);
+static void _cttp_ws_msg_cb(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg* arg, void* ves_p);
+
+static void _cttp_creq_start(u3_creq* ceq_u);
+
/* u3_cttp: http client.
*/
typedef struct _u3_cttp {
u3_auto car_u; // driver
c3_l sev_l; // instance number
u3_creq* ceq_u; // request list
+ u3_cws* cws_u; // websocket sessions
uv_async_t nop_u; // unused handle (async close)
h2o_timeout_t tim_u; // request timeout
h2o_http1client_ctx_t //
@@ -216,6 +274,158 @@ _cttp_bods_to_vec(u3_hbod* bod_u, c3_w* tot_w)
return vec_u;
}
+/* websocket session bookkeeping helpers
+*/
+static void
+_cttp_ws_link(u3_cttp* ctp_u, u3_cws* cws_u)
+{
+ cws_u->ctp_u = ctp_u;
+ cws_u->nex_u = ctp_u->cws_u;
+ cws_u->pre_u = 0;
+
+ if ( 0 != ctp_u->cws_u ) {
+ ctp_u->cws_u->pre_u = cws_u;
+ }
+
+ ctp_u->cws_u = cws_u;
+}
+
+static void
+_cttp_ws_unlink(u3_cws* cws_u)
+{
+ u3_cttp* ctp_u = cws_u->ctp_u;
+
+ if ( cws_u->pre_u ) {
+ cws_u->pre_u->nex_u = cws_u->nex_u;
+ if ( 0 != cws_u->nex_u ) {
+ cws_u->nex_u->pre_u = cws_u->pre_u;
+ }
+ }
+ else if ( ctp_u->cws_u == cws_u ) {
+ ctp_u->cws_u = cws_u->nex_u;
+ if ( 0 != cws_u->nex_u ) {
+ cws_u->nex_u->pre_u = 0;
+ }
+ }
+}
+
+static u3_cws*
+_cttp_ws_find(u3_cttp* ctp_u, c3_l wid_l)
+{
+ u3_cws* cws_u = ctp_u->cws_u;
+
+ while ( cws_u ) {
+ if ( wid_l == cws_u->wid_l ) {
+ return cws_u;
+ }
+ cws_u = cws_u->nex_u;
+ }
+
+ return 0;
+}
+
+static void
+_cttp_ws_plan_event(u3_cws* cws_u, u3_noun event)
+{
+ u3_cttp* ctp_u = cws_u->ctp_u;
+ u3_noun wir = u3nt(u3i_string("http-client"),
+ u3dc("scot", c3__uv, ctp_u->sev_l),
+ u3_nul);
+ u3_noun pay = u3nc(u3i_chub((c3_d)cws_u->wid_l), event);
+ u3_noun cad = u3nc(u3i_string("websocket-event"), pay);
+
+ u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad));
+}
+
+static void
+_cttp_base64_encode(c3_c* dst_c, const uint8_t* src_y, size_t len_w)
+{
+ static const char tab_c[] =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+ size_t i_w = 0;
+ size_t o_w = 0;
+
+ while ( len_w >= 3 ) {
+ dst_c[o_w++] = tab_c[(src_y[i_w] >> 2) & 0x3F];
+ dst_c[o_w++] = tab_c[((src_y[i_w] & 0x3) << 4) | (src_y[i_w + 1] >> 4)];
+ dst_c[o_w++] = tab_c[((src_y[i_w + 1] & 0xF) << 2) | (src_y[i_w + 2] >> 6)];
+ dst_c[o_w++] = tab_c[src_y[i_w + 2] & 0x3F];
+ i_w += 3;
+ len_w -= 3;
+ }
+
+ if ( len_w > 0 ) {
+ dst_c[o_w++] = tab_c[(src_y[i_w] >> 2) & 0x3F];
+ if ( 1 == len_w ) {
+ dst_c[o_w++] = tab_c[(src_y[i_w] & 0x3) << 4];
+ dst_c[o_w++] = '=';
+ dst_c[o_w++] = '=';
+ }
+ else { // len_w == 2
+ dst_c[o_w++] = tab_c[((src_y[i_w] & 0x3) << 4) | (src_y[i_w + 1] >> 4)];
+ dst_c[o_w++] = tab_c[(src_y[i_w + 1] & 0xF) << 2];
+ dst_c[o_w++] = '=';
+ }
+ }
+
+ dst_c[o_w] = 0;
+}
+
+static void
+_cttp_ws_generate_key(u3_cws* cws_u)
+{
+ c3_w rad_w[16];
+ c3_y raw_y[16];
+
+ c3_rand(rad_w);
+ memcpy(raw_y, rad_w, sizeof(raw_y));
+
+ _cttp_base64_encode(cws_u->key_c, raw_y, sizeof(raw_y));
+}
+
+static void
+_cttp_ws_compute_accept(const c3_c* key_c, c3_c out_c[29])
+{
+ c3_y sha_y[20];
+ c3_c buf_c[24 + sizeof(U3_WS_GUID)];
+
+ memcpy(buf_c, key_c, 24);
+ memcpy(buf_c + 24, U3_WS_GUID, sizeof(U3_WS_GUID) - 1);
+
+ SHA1((const unsigned char*)buf_c, 24 + (sizeof(U3_WS_GUID) - 1), sha_y);
+ _cttp_base64_encode(out_c, sha_y, sizeof(sha_y));
+}
+
+static c3_o
+_cttp_header_is(const h2o_header_t* hed_u, const c3_c* name_c)
+{
+ size_t len = strlen(name_c);
+
+ if ( hed_u->name->len != len ) {
+ return c3n;
+ }
+
+ for ( size_t i = 0; i < len; i++ ) {
+ if ( tolower((unsigned char)hed_u->name->base[i]) != tolower((unsigned char)name_c[i]) ) {
+ return c3n;
+ }
+ }
+
+ return c3y;
+}
+
+static h2o_iovec_t*
+_cttp_find_header(h2o_header_t* hed_u, size_t hed_t, const c3_c* name_c)
+{
+ for ( size_t i = 0; i < hed_t; i++ ) {
+ if ( c3y == _cttp_header_is(&hed_u[i], name_c) ) {
+ return &hed_u[i].value;
+ }
+ }
+
+ return 0;
+}
+
// XX deduplicate with _http_heds_free
/* _cttp_heds_free(): free header linked list
*/
@@ -256,6 +466,14 @@ _cttp_hed_new(u3_atom nam, u3_atom val)
return hed_u;
}
+static void
+_cttp_hed_push(u3_hhed** list_u, const c3_c* nam_c, const c3_c* val_c)
+{
+ u3_hhed* hed_u = _cttp_hed_new(u3i_string(nam_c), u3i_string(val_c));
+ hed_u->nex_u = *list_u;
+ *list_u = hed_u;
+}
+
// XX deduplicate with _http_heds_from_noun
/* _cttp_heds_from_noun(): convert (list (pair @t @t)) to u3_hhed
*/
@@ -516,6 +734,7 @@ _cttp_creq_unlink(u3_creq* ceq_u)
static void
_cttp_creq_free(u3_creq* ceq_u)
{
+ ceq_u->wsu_u = 0;
_cttp_creq_unlink(ceq_u);
_cttp_heds_free(ceq_u->hed_u);
@@ -693,11 +912,375 @@ _cttp_creq_fire(u3_creq* ceq_u)
}
}
+static void
+_cttp_ws_close(u3_cws* cws_u, c3_o send_event)
+{
+ if ( !cws_u ) {
+ return;
+ }
+
+ if ( u3_cws_closed != cws_u->sat_e ) {
+ cws_u->sat_e = u3_cws_closed;
+
+ if ( c3y == send_event ) {
+ _cttp_ws_plan_event(cws_u, u3nc(u3i_string("disconnect"), u3_nul));
+ send_event = c3n;
+ }
+
+ if ( cws_u->wsl_w ) {
+ wslay_event_context_free(cws_u->wsl_w);
+ cws_u->wsl_w = 0;
+ }
+
+ if ( cws_u->sok_u ) {
+ h2o_socket_t* sok_u = cws_u->sok_u;
+ cws_u->sok_u = 0;
+ h2o_socket_close(sok_u);
+ }
+ }
+ else if ( c3y == send_event ) {
+ _cttp_ws_plan_event(cws_u, u3nc(u3i_string("disconnect"), u3_nul));
+ send_event = c3n;
+ }
+
+ if ( cws_u->out_y ) {
+ c3_free(cws_u->out_y);
+ cws_u->out_y = 0;
+ }
+
+ _cttp_ws_unlink(cws_u);
+ cws_u->ctp_u = 0;
+
+ c3_free(cws_u->hot_c);
+ c3_free(cws_u->ipf_c);
+ c3_free(cws_u->por_c);
+ c3_free(cws_u->url_c);
+
+ c3_free(cws_u);
+}
+
+static ssize_t
+_cttp_ws_recv_cb(wslay_event_context_ptr ctx,
+ uint8_t* buf_y,
+ size_t len_w,
+ int flags,
+ void* ves_p)
+{
+ u3_cws* cws_u = ves_p;
+
+ if ( 0 == cws_u->sok_u->input->size ) {
+ wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
+ return -1;
+ }
+
+ if ( cws_u->sok_u->input->size < len_w ) {
+ len_w = cws_u->sok_u->input->size;
+ }
+
+ memcpy(buf_y, cws_u->sok_u->input->bytes, len_w);
+ h2o_buffer_consume(&cws_u->sok_u->input, len_w);
+
+ return (ssize_t)len_w;
+}
+
+static ssize_t
+_cttp_ws_send_cb(wslay_event_context_ptr ctx,
+ const uint8_t* dat_y,
+ size_t len_w,
+ int flags,
+ void* ves_p)
+{
+ u3_cws* cws_u = ves_p;
+
+ if ( h2o_socket_is_writing(cws_u->sok_u) ) {
+ wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK);
+ return -1;
+ }
+
+ if ( 0 == len_w ) {
+ return 0;
+ }
+
+ cws_u->out_y = c3_malloc(len_w);
+ memcpy(cws_u->out_y, dat_y, len_w);
+
+ h2o_iovec_t buf_u = h2o_iovec_init((char*)cws_u->out_y, len_w);
+ h2o_socket_write(cws_u->sok_u, &buf_u, 1, _cttp_ws_write_cb);
+
+ return (ssize_t)len_w;
+}
+
+static void
+_cttp_ws_msg_cb(wslay_event_context_ptr ctx,
+ const struct wslay_event_on_msg_recv_arg* arg,
+ void* ves_p)
+{
+ u3_cws* cws_u = ves_p;
+
+ if ( 0 == arg ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+
+ if ( WSLAY_CONNECTION_CLOSE == arg->opcode ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+
+ u3_noun payload;
+ if ( 0 == arg->msg_length ) {
+ payload = u3_nul;
+ }
+ else {
+ u3_noun octs = u3nc(u3i_chub((c3_d)arg->msg_length),
+ u3i_bytes(arg->msg_length, (const c3_y*)arg->msg));
+ payload = u3nc(u3_nul, octs);
+ }
+
+ u3_noun event = u3nc(u3i_string("message"),
+ u3nc(u3i_chub((c3_d)arg->opcode), payload));
+
+ _cttp_ws_plan_event(cws_u, event);
+}
+
+static void
+_cttp_ws_proceed(u3_cws* cws_u)
+{
+ if ( 0 == cws_u->sok_u ) {
+ return;
+ }
+
+ while ( 1 ) {
+ c3_o handled = c3n;
+
+ if ( !h2o_socket_is_writing(cws_u->sok_u) &&
+ wslay_event_want_write(cws_u->wsl_w) )
+ {
+ if ( 0 != wslay_event_send(cws_u->wsl_w) ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+ handled = c3y;
+ }
+
+ if ( cws_u->sok_u->input->size && wslay_event_want_read(cws_u->wsl_w) ) {
+ if ( 0 != wslay_event_recv(cws_u->wsl_w) ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+ handled = c3y;
+ }
+
+ if ( c3n == handled ) {
+ break;
+ }
+ }
+
+ if ( wslay_event_want_read(cws_u->wsl_w) ) {
+ h2o_socket_read_start(cws_u->sok_u, _cttp_ws_read_cb);
+ }
+ else if ( h2o_socket_is_writing(cws_u->sok_u) ||
+ wslay_event_want_write(cws_u->wsl_w) )
+ {
+ h2o_socket_read_stop(cws_u->sok_u);
+ }
+ else {
+ _cttp_ws_close(cws_u, c3y);
+ }
+}
+
+static void
+_cttp_ws_read_cb(h2o_socket_t* sok_u, const c3_c* err_c)
+{
+ u3_cws* cws_u = sok_u->data;
+
+ if ( err_c ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+
+ _cttp_ws_proceed(cws_u);
+}
+
+static void
+_cttp_ws_write_cb(h2o_socket_t* sok_u, const c3_c* err_c)
+{
+ u3_cws* cws_u = sok_u->data;
+
+ if ( cws_u->out_y ) {
+ c3_free(cws_u->out_y);
+ cws_u->out_y = 0;
+ }
+
+ if ( err_c ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+
+ _cttp_ws_proceed(cws_u);
+}
+
+static void
+_cttp_ws_queue_close(u3_cws* cws_u)
+{
+ if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) {
+ _cttp_ws_close(cws_u, c3y);
+ return;
+ }
+
+ if ( u3_cws_closing == cws_u->sat_e ) {
+ return;
+ }
+
+ cws_u->sat_e = u3_cws_closing;
+ wslay_event_queue_close(cws_u->wsl_w, WSLAY_CODE_NORMAL_CLOSURE, NULL, 0);
+ _cttp_ws_proceed(cws_u);
+}
+
+static c3_o
+_cttp_ws_send_message(u3_cws* cws_u, u3_noun msg)
+{
+ if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) {
+ u3z(msg);
+ return c3n;
+ }
+
+ u3_noun opcode = u3h(msg);
+ u3_noun body = u3t(msg);
+ c3_w opc_w;
+
+ if ( c3n == u3r_safe_word(opcode, &opc_w) ) {
+ u3l_log("cttp: websocket invalid opcode");
+ u3z(msg);
+ return c3n;
+ }
+
+ c3_y* buf_y = 0;
+ size_t len_w = 0;
+
+ if ( u3_nul != body ) {
+ if ( u3_nul != u3h(body) ) {
+ u3l_log("cttp: websocket body malformed");
+ u3z(msg);
+ return c3n;
+ }
+
+ u3_noun oct = u3t(body);
+ c3_d len_d = u3r_chub(0, u3h(oct));
+ if ( len_d > SIZE_MAX ) {
+ u3l_log("cttp: websocket body too large");
+ u3z(msg);
+ return c3n;
+ }
+
+ len_w = (size_t)len_d;
+ if ( len_w ) {
+ buf_y = c3_malloc(len_w);
+ u3r_bytes(0, len_w, buf_y, u3t(oct));
+ }
+ }
+
+ struct wslay_event_msg out = {
+ .opcode = (uint8_t)(opc_w & 0xFF),
+ .msg = buf_y,
+ .msg_length = len_w
+ };
+
+ if ( 0 != wslay_event_queue_msg(cws_u->wsl_w, &out) ) {
+ if ( buf_y ) {
+ c3_free(buf_y);
+ }
+ u3l_log("cttp: websocket queue failed");
+ u3z(msg);
+ return c3n;
+ }
+
+ if ( buf_y ) {
+ c3_free(buf_y);
+ }
+
+ u3z(msg);
+ _cttp_ws_proceed(cws_u);
+ return c3y;
+}
+
+static void
+_cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c)
+{
+ u3_cws* cws_u = ceq_u->wsu_u;
+
+ if ( 0 == cws_u ) {
+ return;
+ }
+
+ ceq_u->wsu_u = 0;
+
+ if ( err_c ) {
+ u3l_log("cttp: websocket handshake failed (%s)", err_c);
+ }
+
+ _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul));
+ _cttp_ws_close(cws_u, c3n);
+}
+
+static c3_o
+_cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url)
+{
+ u3_cws* cws_u = c3_calloc(sizeof(*cws_u));
+
+ cws_u->wid_l = wid_l;
+ cws_u->sat_e = u3_cws_pending;
+
+ _cttp_ws_generate_key(cws_u);
+ _cttp_ws_link(ctp_u, cws_u);
+
+ u3_noun hed = u3_nul;
+ hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Key"),
+ u3i_string(cws_u->key_c)),
+ hed);
+ hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Version"),
+ u3i_string("13")),
+ hed);
+ hed = u3nc(u3nc(u3i_string("Connection"),
+ u3i_string("Upgrade")),
+ hed);
+ hed = u3nc(u3nc(u3i_string("Upgrade"),
+ u3i_string("websocket")),
+ hed);
+
+ u3_noun hes = u3nq(u3i_string("GET"), u3k(url), hed, u3_nul);
+
+ u3_creq* ceq_u = _cttp_creq_new(ctp_u, wid_l, hes);
+
+ if ( 0 == ceq_u ) {
+ _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul));
+ _cttp_ws_close(cws_u, c3n);
+ u3z(url);
+ return c3y;
+ }
+
+ ceq_u->wsu_u = cws_u;
+ cws_u->ceq_u = ceq_u;
+ cws_u->sec = ceq_u->sec;
+
+ _cttp_creq_start(ceq_u);
+
+ u3z(url);
+ return c3y;
+}
+
+
/* _cttp_creq_quit(): cancel a u3_creq
*/
static void
_cttp_creq_quit(u3_creq* ceq_u)
{
+ if ( ceq_u->wsu_u ) {
+ _cttp_ws_fail_handshake(ceq_u, "cancel");
+ _cttp_creq_free(ceq_u);
+ return;
+ }
+
if ( u3_csat_addr == ceq_u->sat_e ) {
ceq_u->sat_e = u3_csat_quit;
return; // wait to be called again on address resolution
@@ -732,6 +1315,12 @@ _cttp_http_client_receive(u3_creq* ceq_u, c3_w sas_w, u3_noun mes, u3_noun uct)
static void
_cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c)
{
+ if ( ceq_u->wsu_u ) {
+ _cttp_ws_fail_handshake(ceq_u, err_c ? err_c : "handshake failed");
+ _cttp_creq_free(ceq_u);
+ return;
+ }
+
// XX anything other than a 504?
c3_w cod_w = 504;
@@ -747,6 +1336,12 @@ _cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c)
static void
_cttp_creq_respond(u3_creq* ceq_u)
{
+ if ( ceq_u->wsu_u ) {
+ _cttp_ws_fail_handshake(ceq_u, "handshake rejected");
+ _cttp_creq_free(ceq_u);
+ return;
+ }
+
u3_cres* res_u = ceq_u->res_u;
_cttp_http_client_receive(ceq_u, res_u->sas_w, res_u->hed,
@@ -764,6 +1359,12 @@ _cttp_creq_on_body(h2o_http1client_t* cli_u, const c3_c* err_c)
{
u3_creq* ceq_u = (u3_creq *)cli_u->data;
+ if ( ceq_u->wsu_u ) {
+ _cttp_ws_fail_handshake(ceq_u, err_c ? err_c : "unexpected body");
+ _cttp_creq_free(ceq_u);
+ return -1;
+ }
+
if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) {
_cttp_creq_fail(ceq_u, err_c);
return -1;
@@ -795,6 +1396,69 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i,
{
u3_creq* ceq_u = (u3_creq *)cli_u->data;
+ if ( ceq_u->wsu_u ) {
+ u3_cws* cws_u = ceq_u->wsu_u;
+
+ if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) {
+ _cttp_ws_fail_handshake(ceq_u, err_c);
+ _cttp_creq_free(ceq_u);
+ return 0;
+ }
+
+ if ( 101 != sas_i ) {
+ _cttp_ws_fail_handshake(ceq_u, "status");
+ _cttp_creq_free(ceq_u);
+ return 0;
+ }
+
+ h2o_iovec_t* acc_u = _cttp_find_header(hed_u, hed_t, "sec-websocket-accept");
+ c3_c exp_c[29];
+
+ if ( 0 == acc_u ) {
+ _cttp_ws_fail_handshake(ceq_u, "missing accept");
+ _cttp_creq_free(ceq_u);
+ return 0;
+ }
+
+ _cttp_ws_compute_accept(cws_u->key_c, exp_c);
+
+ if ( acc_u->len != 28 || 0 != memcmp(acc_u->base, exp_c, 28) ) {
+ _cttp_ws_fail_handshake(ceq_u, "bad accept");
+ _cttp_creq_free(ceq_u);
+ return 0;
+ }
+
+ h2o_socket_t* sok_u = h2o_http1client_steal_socket(cli_u);
+
+ if ( 0 == sok_u ) {
+ _cttp_ws_fail_handshake(ceq_u, "steal failure");
+ _cttp_creq_free(ceq_u);
+ return 0;
+ }
+
+ ceq_u->wsu_u = 0;
+ ceq_u->cli_u = 0;
+ cws_u->ceq_u = 0;
+
+ _cttp_creq_free(ceq_u);
+
+ memset(&cws_u->wcb_u, 0, sizeof(cws_u->wcb_u));
+ cws_u->wcb_u.recv_callback = _cttp_ws_recv_cb;
+ cws_u->wcb_u.send_callback = _cttp_ws_send_cb;
+ cws_u->wcb_u.on_msg_recv_callback = _cttp_ws_msg_cb;
+
+ wslay_event_context_client_init(&cws_u->wsl_w, &cws_u->wcb_u, cws_u);
+
+ cws_u->sok_u = sok_u;
+ sok_u->data = cws_u;
+
+ cws_u->sat_e = u3_cws_open;
+
+ _cttp_ws_plan_event(cws_u, u3nc(u3i_string("accept"), u3_nul));
+ _cttp_ws_proceed(cws_u);
+ return 0;
+ }
+
if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) {
_cttp_creq_fail(ceq_u, err_c);
return 0;
@@ -1031,6 +1695,57 @@ _cttp_ef_http_client(u3_cttp* ctp_u, u3_noun tag, u3_noun dat)
ret_o = c3y;
}
}
+ else if ( c3y == u3r_sing_c("websocket-handshake", tag) ) {
+ u3_noun wid, url;
+ c3_l wid_l;
+
+ if ( (c3n == u3r_cell(dat, &wid, &url))
+ || (c3n == u3r_safe_word(wid, &wid_l)) )
+ {
+ u3l_log("cttp: strange websocket-handshake");
+ ret_o = c3n;
+ }
+ else {
+ ret_o = _cttp_ws_start(ctp_u, wid_l, u3k(url));
+ }
+ }
+ else if ( c3y == u3r_sing_c("websocket-response", tag) ) {
+ u3_noun wid, evt;
+ c3_l wid_l;
+
+ if ( (c3n == u3r_cell(dat, &wid, &evt))
+ || (c3n == u3r_safe_word(wid, &wid_l)) )
+ {
+ u3l_log("cttp: strange websocket-response");
+ ret_o = c3n;
+ }
+ else {
+ u3_cws* cws_u = _cttp_ws_find(ctp_u, wid_l);
+
+ if ( 0 == cws_u ) {
+ u3l_log("cttp: unknown websocket id %u", wid_l);
+ ret_o = c3n;
+ }
+ else {
+ u3_noun typ = u3h(evt);
+
+ if ( c3y == u3r_sing_c("message", typ) ) {
+ ret_o = _cttp_ws_send_message(cws_u, u3k(u3t(evt)));
+ }
+ else if ( c3y == u3r_sing_c("disconnect", typ) ) {
+ _cttp_ws_queue_close(cws_u);
+ ret_o = c3y;
+ }
+ else if ( c3y == u3r_sing_c("accept", typ) ) {
+ ret_o = c3y;
+ }
+ else {
+ u3l_log("cttp: unexpected websocket response");
+ ret_o = c3n;
+ }
+ }
+ }
+ }
else {
u3l_log("cttp: strange effect (unknown type)");
ret_o = c3n;
@@ -1103,6 +1818,16 @@ _cttp_io_exit(u3_auto* car_u)
//
uv_close((uv_handle_t*)&ctp_u->nop_u, _cttp_io_exit_cb);
+ {
+ u3_cws* cws_u = ctp_u->cws_u;
+
+ while ( cws_u ) {
+ u3_cws* nex_u = cws_u->nex_u;
+ _cttp_ws_close(cws_u, c3n);
+ cws_u = nex_u;
+ }
+ }
+
// cancel requests
//
{
@@ -1137,6 +1862,7 @@ u3_cttp_io_init(u3_pier* pir_u)
//
h2o_timeout_init(u3L, &ctp_u->tim_u, 300 * 1000);
ctp_u->ctx_u.io_timeout = &ctp_u->tim_u;
+ ctp_u->ctx_u.websocket_timeout = &ctp_u->tim_u;
// link to initialized tls ctx
//