diff options
| author | polwex <polwex@sortug.com> | 2025-10-07 01:18:55 +0700 |
|---|---|---|
| committer | polwex <polwex@sortug.com> | 2025-10-07 01:18:55 +0700 |
| commit | 614e18998ea1db5bccadb170b5926288e57b1c01 (patch) | |
| tree | c470bb5c02c13871251fed6d89b4536ab6ca0709 | |
| parent | 0955e0ef873782df603b828fba5323f1518e46d3 (diff) | |
added iris ws support on vere
| -rw-r--r-- | app/ted/ws.hoon | 20 | ||||
| -rw-r--r-- | arvo/iris.hoon | 30 | ||||
| -rw-r--r-- | vere/pkg/vere/io/cttp.c | 726 |
3 files changed, 773 insertions, 3 deletions
diff --git a/app/ted/ws.hoon b/app/ted/ws.hoon new file mode 100644 index 0000000..b8dd583 --- /dev/null +++ b/app/ted/ws.hoon @@ -0,0 +1,20 @@ +/- spider +/+ strandio +=, strand=strand:spider +^- thread:spider +|= arg=vase +=/ m (strand ,vase) +^- form:m +=/ url=@t (need !<((unit @t) arg)) +;< =bowl:spider bind:m get-bowl:strandio +=/ desk q.byk.bowl +=/ =task:iris [%websocket-connect desk url] +=/ =card:agent:gall [%pass /ws-req %arvo %i task] +;< ~ bind:m (send-raw-card:strandio card) +;< res=(pair wire sign-arvo) bind:m take-sign-arvo:strandio +?. ?=([%iris %websocket-handshake id=@ud url=@t] q.res) + (strand-fail:strand %bad-sign ~) +~& ws-handshake=[id.q.res url.q.res] +:: ?: ?=([%iris %websocket-response id=@ud e=websocket-event:eyre] q.res) +=/ data=@t 'done' +(pure:m !>(data)) diff --git a/arvo/iris.hoon b/arvo/iris.hoon index ea2a59f..2639843 100644 --- a/arvo/iris.hoon +++ b/arvo/iris.hoon @@ -287,6 +287,28 @@ connection-by-id (~(del by connection-by-id.state) id) connection-by-duct (~(del by connection-by-duct.state) duct.u.con) == + ++ ws-connect + |= [desk=term url=@t] + ~& iris-ws-connect=[desk url] + :: TODO ... the wid comes from vere tho...? + =^ id next-id.state [next-id.state +(next-id.state)] + :: add a new open session + =/ wid id + :: + =. connection-by-id.state + %+ ~(put by connection-by-id.state) id + [duct [0 3 ~ ~ 0 ~]] + :: keep track of the duct for cancellation + :: + =. connection-by-duct.state + (~(put by connection-by-duct.state) duct id) + :- [outbound-duct.state %give %websocket-handshake wid url]~ + state + ++ ws-event + |= [id=@ud event=websocket-event:eyre] + ~& iris-ws-event=[id event] + :- [outbound-duct.state %give %websocket-response id event]~ + state -- -- :: end the =~ @@ -360,12 +382,14 @@ %receive =^ moves state.ax (receive:client +.task) [moves light-gate] - :: UIP-125 :: + :: UIP-125 %websocket-connect - `light-gate + =^ moves state.ax (ws-connect:client +.task) + [moves light-gate] %websocket-event - `light-gate + =^ moves state.ax (ws-event:client +.task) + [moves light-gate] == :: http-client issues no requests to other vanes :: diff --git a/vere/pkg/vere/io/cttp.c b/vere/pkg/vere/io/cttp.c index fa6bb5b..b36a335 100644 --- a/vere/pkg/vere/io/cttp.c +++ b/vere/pkg/vere/io/cttp.c @@ -3,8 +3,17 @@ #include "vere.h" #include "h2o.h" +#include "h2o/websocket.h" #include "noun.h" #include "openssl/ssl.h" +#include "openssl/sha.h" +#include "wslay/wslay.h" + +#include <ctype.h> +#include <stdint.h> +#include <string.h> + +#define U3_WS_GUID "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" /* u3_csat: client connection state. */ @@ -25,6 +34,16 @@ u3_hbod* dob_u; // entry of body queue } u3_cres; +typedef enum { + u3_cws_pending = 0, + u3_cws_open = 1, + u3_cws_closing = 2, + u3_cws_closed = 3 +} u3_cwsat; + +typedef struct _u3_cttp u3_cttp; +typedef struct _u3_cws u3_cws; + /* u3_creq: outgoing http request. */ typedef struct _u3_creq { // client request @@ -45,17 +64,56 @@ u3_hbod* bur_u; // entry of send queue h2o_iovec_t* vec_u; // send-buffer array u3_cres* res_u; // nascent response + u3_cws* wsu_u; // websocket session (optional) struct _u3_creq* nex_u; // next in list struct _u3_creq* pre_u; // previous in list struct _u3_cttp* ctp_u; // cttp backpointer } u3_creq; +struct _u3_cws { + c3_l wid_l; // websocket id + u3_cwsat sat_e; // websocket state + c3_o sec; // secure (wss) + c3_c* hot_c; // host string (nullable) + c3_w ipf_w; // ipv4 (numeric) + c3_c* ipf_c; // ipv4 string + c3_s por_s; // port (numeric) + c3_c* por_c; // port string + c3_c* url_c; // request url/path + c3_c key_c[29]; // sec-websocket-key + u3_cttp* ctp_u; // backpointer + u3_creq* ceq_u; // pending handshake request + h2o_socket_t* sok_u; // underlying socket + wslay_event_context_ptr wsl_w; // wslay context + struct wslay_event_callbacks wcb_u; // wslay callbacks + c3_y* out_y; // pending write buffer + struct _u3_cws* nex_u; // next in list + struct _u3_cws* pre_u; // prev in list +}; + +static void _cttp_ws_close(u3_cws* cws_u, c3_o send_event); +static void _cttp_ws_proceed(u3_cws* cws_u); +static c3_o _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url); +static c3_o _cttp_ws_send_message(u3_cws* cws_u, u3_noun msg); +static void _cttp_ws_queue_close(u3_cws* cws_u); +static void _cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c); +static void _cttp_ws_generate_key(u3_cws* cws_u); +static void _cttp_ws_compute_accept(const c3_c* key_c, c3_c out_c[29]); +static void _cttp_ws_read_cb(h2o_socket_t* sock_u, const c3_c* err_c); +static void _cttp_ws_write_cb(h2o_socket_t* sock_u, const c3_c* err_c); +static ssize_t _cttp_ws_recv_cb(wslay_event_context_ptr ctx, uint8_t* buf_y, size_t len_w, int flags, void* ves_p); +static ssize_t _cttp_ws_send_cb(wslay_event_context_ptr ctx, const uint8_t* dat_y, size_t len_w, int flags, void* ves_p); +static void _cttp_ws_msg_cb(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg* arg, void* ves_p); + +static void _cttp_creq_start(u3_creq* ceq_u); + /* u3_cttp: http client. */ typedef struct _u3_cttp { u3_auto car_u; // driver c3_l sev_l; // instance number u3_creq* ceq_u; // request list + u3_cws* cws_u; // websocket sessions uv_async_t nop_u; // unused handle (async close) h2o_timeout_t tim_u; // request timeout h2o_http1client_ctx_t // @@ -216,6 +274,158 @@ _cttp_bods_to_vec(u3_hbod* bod_u, c3_w* tot_w) return vec_u; } +/* websocket session bookkeeping helpers +*/ +static void +_cttp_ws_link(u3_cttp* ctp_u, u3_cws* cws_u) +{ + cws_u->ctp_u = ctp_u; + cws_u->nex_u = ctp_u->cws_u; + cws_u->pre_u = 0; + + if ( 0 != ctp_u->cws_u ) { + ctp_u->cws_u->pre_u = cws_u; + } + + ctp_u->cws_u = cws_u; +} + +static void +_cttp_ws_unlink(u3_cws* cws_u) +{ + u3_cttp* ctp_u = cws_u->ctp_u; + + if ( cws_u->pre_u ) { + cws_u->pre_u->nex_u = cws_u->nex_u; + if ( 0 != cws_u->nex_u ) { + cws_u->nex_u->pre_u = cws_u->pre_u; + } + } + else if ( ctp_u->cws_u == cws_u ) { + ctp_u->cws_u = cws_u->nex_u; + if ( 0 != cws_u->nex_u ) { + cws_u->nex_u->pre_u = 0; + } + } +} + +static u3_cws* +_cttp_ws_find(u3_cttp* ctp_u, c3_l wid_l) +{ + u3_cws* cws_u = ctp_u->cws_u; + + while ( cws_u ) { + if ( wid_l == cws_u->wid_l ) { + return cws_u; + } + cws_u = cws_u->nex_u; + } + + return 0; +} + +static void +_cttp_ws_plan_event(u3_cws* cws_u, u3_noun event) +{ + u3_cttp* ctp_u = cws_u->ctp_u; + u3_noun wir = u3nt(u3i_string("http-client"), + u3dc("scot", c3__uv, ctp_u->sev_l), + u3_nul); + u3_noun pay = u3nc(u3i_chub((c3_d)cws_u->wid_l), event); + u3_noun cad = u3nc(u3i_string("websocket-event"), pay); + + u3_auto_plan(&ctp_u->car_u, u3_ovum_init(0, c3__i, wir, cad)); +} + +static void +_cttp_base64_encode(c3_c* dst_c, const uint8_t* src_y, size_t len_w) +{ + static const char tab_c[] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + size_t i_w = 0; + size_t o_w = 0; + + while ( len_w >= 3 ) { + dst_c[o_w++] = tab_c[(src_y[i_w] >> 2) & 0x3F]; + dst_c[o_w++] = tab_c[((src_y[i_w] & 0x3) << 4) | (src_y[i_w + 1] >> 4)]; + dst_c[o_w++] = tab_c[((src_y[i_w + 1] & 0xF) << 2) | (src_y[i_w + 2] >> 6)]; + dst_c[o_w++] = tab_c[src_y[i_w + 2] & 0x3F]; + i_w += 3; + len_w -= 3; + } + + if ( len_w > 0 ) { + dst_c[o_w++] = tab_c[(src_y[i_w] >> 2) & 0x3F]; + if ( 1 == len_w ) { + dst_c[o_w++] = tab_c[(src_y[i_w] & 0x3) << 4]; + dst_c[o_w++] = '='; + dst_c[o_w++] = '='; + } + else { // len_w == 2 + dst_c[o_w++] = tab_c[((src_y[i_w] & 0x3) << 4) | (src_y[i_w + 1] >> 4)]; + dst_c[o_w++] = tab_c[(src_y[i_w + 1] & 0xF) << 2]; + dst_c[o_w++] = '='; + } + } + + dst_c[o_w] = 0; +} + +static void +_cttp_ws_generate_key(u3_cws* cws_u) +{ + c3_w rad_w[16]; + c3_y raw_y[16]; + + c3_rand(rad_w); + memcpy(raw_y, rad_w, sizeof(raw_y)); + + _cttp_base64_encode(cws_u->key_c, raw_y, sizeof(raw_y)); +} + +static void +_cttp_ws_compute_accept(const c3_c* key_c, c3_c out_c[29]) +{ + c3_y sha_y[20]; + c3_c buf_c[24 + sizeof(U3_WS_GUID)]; + + memcpy(buf_c, key_c, 24); + memcpy(buf_c + 24, U3_WS_GUID, sizeof(U3_WS_GUID) - 1); + + SHA1((const unsigned char*)buf_c, 24 + (sizeof(U3_WS_GUID) - 1), sha_y); + _cttp_base64_encode(out_c, sha_y, sizeof(sha_y)); +} + +static c3_o +_cttp_header_is(const h2o_header_t* hed_u, const c3_c* name_c) +{ + size_t len = strlen(name_c); + + if ( hed_u->name->len != len ) { + return c3n; + } + + for ( size_t i = 0; i < len; i++ ) { + if ( tolower((unsigned char)hed_u->name->base[i]) != tolower((unsigned char)name_c[i]) ) { + return c3n; + } + } + + return c3y; +} + +static h2o_iovec_t* +_cttp_find_header(h2o_header_t* hed_u, size_t hed_t, const c3_c* name_c) +{ + for ( size_t i = 0; i < hed_t; i++ ) { + if ( c3y == _cttp_header_is(&hed_u[i], name_c) ) { + return &hed_u[i].value; + } + } + + return 0; +} + // XX deduplicate with _http_heds_free /* _cttp_heds_free(): free header linked list */ @@ -256,6 +466,14 @@ _cttp_hed_new(u3_atom nam, u3_atom val) return hed_u; } +static void +_cttp_hed_push(u3_hhed** list_u, const c3_c* nam_c, const c3_c* val_c) +{ + u3_hhed* hed_u = _cttp_hed_new(u3i_string(nam_c), u3i_string(val_c)); + hed_u->nex_u = *list_u; + *list_u = hed_u; +} + // XX deduplicate with _http_heds_from_noun /* _cttp_heds_from_noun(): convert (list (pair @t @t)) to u3_hhed */ @@ -516,6 +734,7 @@ _cttp_creq_unlink(u3_creq* ceq_u) static void _cttp_creq_free(u3_creq* ceq_u) { + ceq_u->wsu_u = 0; _cttp_creq_unlink(ceq_u); _cttp_heds_free(ceq_u->hed_u); @@ -693,11 +912,375 @@ _cttp_creq_fire(u3_creq* ceq_u) } } +static void +_cttp_ws_close(u3_cws* cws_u, c3_o send_event) +{ + if ( !cws_u ) { + return; + } + + if ( u3_cws_closed != cws_u->sat_e ) { + cws_u->sat_e = u3_cws_closed; + + if ( c3y == send_event ) { + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("disconnect"), u3_nul)); + send_event = c3n; + } + + if ( cws_u->wsl_w ) { + wslay_event_context_free(cws_u->wsl_w); + cws_u->wsl_w = 0; + } + + if ( cws_u->sok_u ) { + h2o_socket_t* sok_u = cws_u->sok_u; + cws_u->sok_u = 0; + h2o_socket_close(sok_u); + } + } + else if ( c3y == send_event ) { + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("disconnect"), u3_nul)); + send_event = c3n; + } + + if ( cws_u->out_y ) { + c3_free(cws_u->out_y); + cws_u->out_y = 0; + } + + _cttp_ws_unlink(cws_u); + cws_u->ctp_u = 0; + + c3_free(cws_u->hot_c); + c3_free(cws_u->ipf_c); + c3_free(cws_u->por_c); + c3_free(cws_u->url_c); + + c3_free(cws_u); +} + +static ssize_t +_cttp_ws_recv_cb(wslay_event_context_ptr ctx, + uint8_t* buf_y, + size_t len_w, + int flags, + void* ves_p) +{ + u3_cws* cws_u = ves_p; + + if ( 0 == cws_u->sok_u->input->size ) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + return -1; + } + + if ( cws_u->sok_u->input->size < len_w ) { + len_w = cws_u->sok_u->input->size; + } + + memcpy(buf_y, cws_u->sok_u->input->bytes, len_w); + h2o_buffer_consume(&cws_u->sok_u->input, len_w); + + return (ssize_t)len_w; +} + +static ssize_t +_cttp_ws_send_cb(wslay_event_context_ptr ctx, + const uint8_t* dat_y, + size_t len_w, + int flags, + void* ves_p) +{ + u3_cws* cws_u = ves_p; + + if ( h2o_socket_is_writing(cws_u->sok_u) ) { + wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + return -1; + } + + if ( 0 == len_w ) { + return 0; + } + + cws_u->out_y = c3_malloc(len_w); + memcpy(cws_u->out_y, dat_y, len_w); + + h2o_iovec_t buf_u = h2o_iovec_init((char*)cws_u->out_y, len_w); + h2o_socket_write(cws_u->sok_u, &buf_u, 1, _cttp_ws_write_cb); + + return (ssize_t)len_w; +} + +static void +_cttp_ws_msg_cb(wslay_event_context_ptr ctx, + const struct wslay_event_on_msg_recv_arg* arg, + void* ves_p) +{ + u3_cws* cws_u = ves_p; + + if ( 0 == arg ) { + _cttp_ws_close(cws_u, c3y); + return; + } + + if ( WSLAY_CONNECTION_CLOSE == arg->opcode ) { + _cttp_ws_close(cws_u, c3y); + return; + } + + u3_noun payload; + if ( 0 == arg->msg_length ) { + payload = u3_nul; + } + else { + u3_noun octs = u3nc(u3i_chub((c3_d)arg->msg_length), + u3i_bytes(arg->msg_length, (const c3_y*)arg->msg)); + payload = u3nc(u3_nul, octs); + } + + u3_noun event = u3nc(u3i_string("message"), + u3nc(u3i_chub((c3_d)arg->opcode), payload)); + + _cttp_ws_plan_event(cws_u, event); +} + +static void +_cttp_ws_proceed(u3_cws* cws_u) +{ + if ( 0 == cws_u->sok_u ) { + return; + } + + while ( 1 ) { + c3_o handled = c3n; + + if ( !h2o_socket_is_writing(cws_u->sok_u) && + wslay_event_want_write(cws_u->wsl_w) ) + { + if ( 0 != wslay_event_send(cws_u->wsl_w) ) { + _cttp_ws_close(cws_u, c3y); + return; + } + handled = c3y; + } + + if ( cws_u->sok_u->input->size && wslay_event_want_read(cws_u->wsl_w) ) { + if ( 0 != wslay_event_recv(cws_u->wsl_w) ) { + _cttp_ws_close(cws_u, c3y); + return; + } + handled = c3y; + } + + if ( c3n == handled ) { + break; + } + } + + if ( wslay_event_want_read(cws_u->wsl_w) ) { + h2o_socket_read_start(cws_u->sok_u, _cttp_ws_read_cb); + } + else if ( h2o_socket_is_writing(cws_u->sok_u) || + wslay_event_want_write(cws_u->wsl_w) ) + { + h2o_socket_read_stop(cws_u->sok_u); + } + else { + _cttp_ws_close(cws_u, c3y); + } +} + +static void +_cttp_ws_read_cb(h2o_socket_t* sok_u, const c3_c* err_c) +{ + u3_cws* cws_u = sok_u->data; + + if ( err_c ) { + _cttp_ws_close(cws_u, c3y); + return; + } + + _cttp_ws_proceed(cws_u); +} + +static void +_cttp_ws_write_cb(h2o_socket_t* sok_u, const c3_c* err_c) +{ + u3_cws* cws_u = sok_u->data; + + if ( cws_u->out_y ) { + c3_free(cws_u->out_y); + cws_u->out_y = 0; + } + + if ( err_c ) { + _cttp_ws_close(cws_u, c3y); + return; + } + + _cttp_ws_proceed(cws_u); +} + +static void +_cttp_ws_queue_close(u3_cws* cws_u) +{ + if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) { + _cttp_ws_close(cws_u, c3y); + return; + } + + if ( u3_cws_closing == cws_u->sat_e ) { + return; + } + + cws_u->sat_e = u3_cws_closing; + wslay_event_queue_close(cws_u->wsl_w, WSLAY_CODE_NORMAL_CLOSURE, NULL, 0); + _cttp_ws_proceed(cws_u); +} + +static c3_o +_cttp_ws_send_message(u3_cws* cws_u, u3_noun msg) +{ + if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) { + u3z(msg); + return c3n; + } + + u3_noun opcode = u3h(msg); + u3_noun body = u3t(msg); + c3_w opc_w; + + if ( c3n == u3r_safe_word(opcode, &opc_w) ) { + u3l_log("cttp: websocket invalid opcode"); + u3z(msg); + return c3n; + } + + c3_y* buf_y = 0; + size_t len_w = 0; + + if ( u3_nul != body ) { + if ( u3_nul != u3h(body) ) { + u3l_log("cttp: websocket body malformed"); + u3z(msg); + return c3n; + } + + u3_noun oct = u3t(body); + c3_d len_d = u3r_chub(0, u3h(oct)); + if ( len_d > SIZE_MAX ) { + u3l_log("cttp: websocket body too large"); + u3z(msg); + return c3n; + } + + len_w = (size_t)len_d; + if ( len_w ) { + buf_y = c3_malloc(len_w); + u3r_bytes(0, len_w, buf_y, u3t(oct)); + } + } + + struct wslay_event_msg out = { + .opcode = (uint8_t)(opc_w & 0xFF), + .msg = buf_y, + .msg_length = len_w + }; + + if ( 0 != wslay_event_queue_msg(cws_u->wsl_w, &out) ) { + if ( buf_y ) { + c3_free(buf_y); + } + u3l_log("cttp: websocket queue failed"); + u3z(msg); + return c3n; + } + + if ( buf_y ) { + c3_free(buf_y); + } + + u3z(msg); + _cttp_ws_proceed(cws_u); + return c3y; +} + +static void +_cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c) +{ + u3_cws* cws_u = ceq_u->wsu_u; + + if ( 0 == cws_u ) { + return; + } + + ceq_u->wsu_u = 0; + + if ( err_c ) { + u3l_log("cttp: websocket handshake failed (%s)", err_c); + } + + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul)); + _cttp_ws_close(cws_u, c3n); +} + +static c3_o +_cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url) +{ + u3_cws* cws_u = c3_calloc(sizeof(*cws_u)); + + cws_u->wid_l = wid_l; + cws_u->sat_e = u3_cws_pending; + + _cttp_ws_generate_key(cws_u); + _cttp_ws_link(ctp_u, cws_u); + + u3_noun hed = u3_nul; + hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Key"), + u3i_string(cws_u->key_c)), + hed); + hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Version"), + u3i_string("13")), + hed); + hed = u3nc(u3nc(u3i_string("Connection"), + u3i_string("Upgrade")), + hed); + hed = u3nc(u3nc(u3i_string("Upgrade"), + u3i_string("websocket")), + hed); + + u3_noun hes = u3nq(u3i_string("GET"), u3k(url), hed, u3_nul); + + u3_creq* ceq_u = _cttp_creq_new(ctp_u, wid_l, hes); + + if ( 0 == ceq_u ) { + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul)); + _cttp_ws_close(cws_u, c3n); + u3z(url); + return c3y; + } + + ceq_u->wsu_u = cws_u; + cws_u->ceq_u = ceq_u; + cws_u->sec = ceq_u->sec; + + _cttp_creq_start(ceq_u); + + u3z(url); + return c3y; +} + + /* _cttp_creq_quit(): cancel a u3_creq */ static void _cttp_creq_quit(u3_creq* ceq_u) { + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, "cancel"); + _cttp_creq_free(ceq_u); + return; + } + if ( u3_csat_addr == ceq_u->sat_e ) { ceq_u->sat_e = u3_csat_quit; return; // wait to be called again on address resolution @@ -732,6 +1315,12 @@ _cttp_http_client_receive(u3_creq* ceq_u, c3_w sas_w, u3_noun mes, u3_noun uct) static void _cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c) { + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, err_c ? err_c : "handshake failed"); + _cttp_creq_free(ceq_u); + return; + } + // XX anything other than a 504? c3_w cod_w = 504; @@ -747,6 +1336,12 @@ _cttp_creq_fail(u3_creq* ceq_u, const c3_c* err_c) static void _cttp_creq_respond(u3_creq* ceq_u) { + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, "handshake rejected"); + _cttp_creq_free(ceq_u); + return; + } + u3_cres* res_u = ceq_u->res_u; _cttp_http_client_receive(ceq_u, res_u->sas_w, res_u->hed, @@ -764,6 +1359,12 @@ _cttp_creq_on_body(h2o_http1client_t* cli_u, const c3_c* err_c) { u3_creq* ceq_u = (u3_creq *)cli_u->data; + if ( ceq_u->wsu_u ) { + _cttp_ws_fail_handshake(ceq_u, err_c ? err_c : "unexpected body"); + _cttp_creq_free(ceq_u); + return -1; + } + if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) { _cttp_creq_fail(ceq_u, err_c); return -1; @@ -795,6 +1396,69 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i, { u3_creq* ceq_u = (u3_creq *)cli_u->data; + if ( ceq_u->wsu_u ) { + u3_cws* cws_u = ceq_u->wsu_u; + + if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) { + _cttp_ws_fail_handshake(ceq_u, err_c); + _cttp_creq_free(ceq_u); + return 0; + } + + if ( 101 != sas_i ) { + _cttp_ws_fail_handshake(ceq_u, "status"); + _cttp_creq_free(ceq_u); + return 0; + } + + h2o_iovec_t* acc_u = _cttp_find_header(hed_u, hed_t, "sec-websocket-accept"); + c3_c exp_c[29]; + + if ( 0 == acc_u ) { + _cttp_ws_fail_handshake(ceq_u, "missing accept"); + _cttp_creq_free(ceq_u); + return 0; + } + + _cttp_ws_compute_accept(cws_u->key_c, exp_c); + + if ( acc_u->len != 28 || 0 != memcmp(acc_u->base, exp_c, 28) ) { + _cttp_ws_fail_handshake(ceq_u, "bad accept"); + _cttp_creq_free(ceq_u); + return 0; + } + + h2o_socket_t* sok_u = h2o_http1client_steal_socket(cli_u); + + if ( 0 == sok_u ) { + _cttp_ws_fail_handshake(ceq_u, "steal failure"); + _cttp_creq_free(ceq_u); + return 0; + } + + ceq_u->wsu_u = 0; + ceq_u->cli_u = 0; + cws_u->ceq_u = 0; + + _cttp_creq_free(ceq_u); + + memset(&cws_u->wcb_u, 0, sizeof(cws_u->wcb_u)); + cws_u->wcb_u.recv_callback = _cttp_ws_recv_cb; + cws_u->wcb_u.send_callback = _cttp_ws_send_cb; + cws_u->wcb_u.on_msg_recv_callback = _cttp_ws_msg_cb; + + wslay_event_context_client_init(&cws_u->wsl_w, &cws_u->wcb_u, cws_u); + + cws_u->sok_u = sok_u; + sok_u->data = cws_u; + + cws_u->sat_e = u3_cws_open; + + _cttp_ws_plan_event(cws_u, u3nc(u3i_string("accept"), u3_nul)); + _cttp_ws_proceed(cws_u); + return 0; + } + if ( 0 != err_c && h2o_http1client_error_is_eos != err_c ) { _cttp_creq_fail(ceq_u, err_c); return 0; @@ -1031,6 +1695,57 @@ _cttp_ef_http_client(u3_cttp* ctp_u, u3_noun tag, u3_noun dat) ret_o = c3y; } } + else if ( c3y == u3r_sing_c("websocket-handshake", tag) ) { + u3_noun wid, url; + c3_l wid_l; + + if ( (c3n == u3r_cell(dat, &wid, &url)) + || (c3n == u3r_safe_word(wid, &wid_l)) ) + { + u3l_log("cttp: strange websocket-handshake"); + ret_o = c3n; + } + else { + ret_o = _cttp_ws_start(ctp_u, wid_l, u3k(url)); + } + } + else if ( c3y == u3r_sing_c("websocket-response", tag) ) { + u3_noun wid, evt; + c3_l wid_l; + + if ( (c3n == u3r_cell(dat, &wid, &evt)) + || (c3n == u3r_safe_word(wid, &wid_l)) ) + { + u3l_log("cttp: strange websocket-response"); + ret_o = c3n; + } + else { + u3_cws* cws_u = _cttp_ws_find(ctp_u, wid_l); + + if ( 0 == cws_u ) { + u3l_log("cttp: unknown websocket id %u", wid_l); + ret_o = c3n; + } + else { + u3_noun typ = u3h(evt); + + if ( c3y == u3r_sing_c("message", typ) ) { + ret_o = _cttp_ws_send_message(cws_u, u3k(u3t(evt))); + } + else if ( c3y == u3r_sing_c("disconnect", typ) ) { + _cttp_ws_queue_close(cws_u); + ret_o = c3y; + } + else if ( c3y == u3r_sing_c("accept", typ) ) { + ret_o = c3y; + } + else { + u3l_log("cttp: unexpected websocket response"); + ret_o = c3n; + } + } + } + } else { u3l_log("cttp: strange effect (unknown type)"); ret_o = c3n; @@ -1103,6 +1818,16 @@ _cttp_io_exit(u3_auto* car_u) // uv_close((uv_handle_t*)&ctp_u->nop_u, _cttp_io_exit_cb); + { + u3_cws* cws_u = ctp_u->cws_u; + + while ( cws_u ) { + u3_cws* nex_u = cws_u->nex_u; + _cttp_ws_close(cws_u, c3n); + cws_u = nex_u; + } + } + // cancel requests // { @@ -1137,6 +1862,7 @@ u3_cttp_io_init(u3_pier* pir_u) // h2o_timeout_init(u3L, &ctp_u->tim_u, 300 * 1000); ctp_u->ctx_u.io_timeout = &ctp_u->tim_u; + ctp_u->ctx_u.websocket_timeout = &ctp_u->tim_u; // link to initialized tls ctx // |
