diff options
| author | polwex <polwex@sortug.com> | 2025-10-07 03:12:16 +0700 |
|---|---|---|
| committer | polwex <polwex@sortug.com> | 2025-10-07 03:12:16 +0700 |
| commit | b793d12cd25f7f1ae6bb39f9ee5266eb32120ea7 (patch) | |
| tree | 3eb2dfa164a6c5a71bb43ea4c18e21877d9a9d0e | |
| parent | 614e18998ea1db5bccadb170b5926288e57b1c01 (diff) | |
fixes to cttp, iris kinda working
| -rw-r--r-- | vere/pkg/vere/io/cttp.c | 203 |
1 files changed, 189 insertions, 14 deletions
diff --git a/vere/pkg/vere/io/cttp.c b/vere/pkg/vere/io/cttp.c index b36a335..6dc53f1 100644 --- a/vere/pkg/vere/io/cttp.c +++ b/vere/pkg/vere/io/cttp.c @@ -104,8 +104,11 @@ static void _cttp_ws_write_cb(h2o_socket_t* sock_u, const c3_c* err_c); static ssize_t _cttp_ws_recv_cb(wslay_event_context_ptr ctx, uint8_t* buf_y, size_t len_w, int flags, void* ves_p); static ssize_t _cttp_ws_send_cb(wslay_event_context_ptr ctx, const uint8_t* dat_y, size_t len_w, int flags, void* ves_p); static void _cttp_ws_msg_cb(wslay_event_context_ptr ctx, const struct wslay_event_on_msg_recv_arg* arg, void* ves_p); +static int _cttp_ws_genmask_cb(wslay_event_context_ptr ctx, uint8_t* buf_y, size_t len_w, void* ves_p); +static c3_c* _cttp_ws_origin(u3_atom nor); static void _cttp_creq_start(u3_creq* ceq_u); +static u3_atom _cttp_ws_normalize_url(u3_atom url); /* u3_cttp: http client. */ @@ -328,6 +331,16 @@ static void _cttp_ws_plan_event(u3_cws* cws_u, u3_noun event) { u3_cttp* ctp_u = cws_u->ctp_u; + c3_l wid_l = cws_u->wid_l; + u3_noun typ = u3h(event); + if ( c3y == u3a_is_cat(typ) ) { + c3_c* nam_c = u3r_string(u3k(typ)); + u3l_log("cttp: ws plan wid=%u typ=%s", wid_l, nam_c); + c3_free(nam_c); + } + else { + u3l_log("cttp: ws plan wid=%u", wid_l); + } u3_noun wir = u3nt(u3i_string("http-client"), u3dc("scot", c3__uv, ctp_u->sev_l), u3_nul); @@ -426,6 +439,56 @@ _cttp_find_header(h2o_header_t* hed_u, size_t hed_t, const c3_c* name_c) return 0; } +static c3_c* +_cttp_ws_origin(u3_atom nor) +{ + c3_c* url_c = u3r_string(nor); + c3_c* scheme_c = strstr(url_c, "://"); + c3_c* start_c = scheme_c ? scheme_c + 3 : url_c; + c3_c* end_c = strchr(start_c, '/'); + size_t len_w = end_c ? (size_t)(end_c - url_c) : strlen(url_c); + + c3_c* ori_c = c3_malloc(len_w + 1); + memcpy(ori_c, url_c, len_w); + ori_c[len_w] = 0; + + c3_free(url_c); + return ori_c; +} + +static u3_atom +_cttp_ws_normalize_url(u3_atom url) +{ + c3_c* url_c = u3r_string(url); + size_t len_w = strlen(url_c); + u3_atom ret; + + if ( len_w >= 5 && 0 == strncmp(url_c, "ws://", 5) ) { + size_t new_len = len_w + 2; + c3_c* rew_c = c3_malloc(new_len + 1); + memcpy(rew_c, "http://", 7); + memcpy(rew_c + 7, url_c + 5, len_w - 5); + rew_c[new_len] = '\0'; + ret = u3i_string(rew_c); + c3_free(rew_c); + } + else if ( len_w >= 6 && 0 == strncmp(url_c, "wss://", 6) ) { + size_t new_len = len_w + 2; + c3_c* rew_c = c3_malloc(new_len + 1); + memcpy(rew_c, "https://", 8); + memcpy(rew_c + 8, url_c + 6, len_w - 6); + rew_c[new_len] = '\0'; + ret = u3i_string(rew_c); + c3_free(rew_c); + } + else { + ret = u3k(url); + } + + c3_free(url_c); + return ret; +} + // XX deduplicate with _http_heds_free /* _cttp_heds_free(): free header linked list */ @@ -920,6 +983,7 @@ _cttp_ws_close(u3_cws* cws_u, c3_o send_event) } if ( u3_cws_closed != cws_u->sat_e ) { + u3l_log("cttp: ws close wid=%u send=%c", cws_u->wid_l, (c3y==send_event?'y':'n')); cws_u->sat_e = u3_cws_closed; if ( c3y == send_event ) { @@ -968,8 +1032,14 @@ _cttp_ws_recv_cb(wslay_event_context_ptr ctx, { u3_cws* cws_u = ves_p; + u3l_log("cttp: ws recv_cb wid=%u want=%zu have=%zu", + cws_u->wid_l, + len_w, + (size_t)cws_u->sok_u->input->size); + if ( 0 == cws_u->sok_u->input->size ) { wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + u3l_log("cttp: ws recv_cb wid=%u empty", cws_u->wid_l); return -1; } @@ -992,8 +1062,14 @@ _cttp_ws_send_cb(wslay_event_context_ptr ctx, { u3_cws* cws_u = ves_p; + u3l_log("cttp: ws send_cb wid=%u len=%zu writing=%c", + cws_u->wid_l, + len_w, + h2o_socket_is_writing(cws_u->sok_u) ? 'y' : 'n'); + if ( h2o_socket_is_writing(cws_u->sok_u) ) { wslay_event_set_error(ctx, WSLAY_ERR_WOULDBLOCK); + u3l_log("cttp: ws send_cb wid=%u busy", cws_u->wid_l); return -1; } @@ -1018,15 +1094,19 @@ _cttp_ws_msg_cb(wslay_event_context_ptr ctx, u3_cws* cws_u = ves_p; if ( 0 == arg ) { + u3l_log("cttp: ws msg close wid=%u", cws_u->wid_l); _cttp_ws_close(cws_u, c3y); return; } if ( WSLAY_CONNECTION_CLOSE == arg->opcode ) { + u3l_log("cttp: ws msg opcode close wid=%u", cws_u->wid_l); _cttp_ws_close(cws_u, c3y); return; } + u3l_log("cttp: ws msg opcode=%u len=%zu", arg->opcode, arg->msg_length); + u3_noun payload; if ( 0 == arg->msg_length ) { payload = u3_nul; @@ -1043,6 +1123,30 @@ _cttp_ws_msg_cb(wslay_event_context_ptr ctx, _cttp_ws_plan_event(cws_u, event); } +static int +_cttp_ws_genmask_cb(wslay_event_context_ptr ctx, + uint8_t* buf_y, + size_t len_w, + void* ves_p) +{ + (void)ctx; + (void)ves_p; + + size_t off_w = 0; + + while ( off_w < len_w ) { + c3_w rad_w[16]; + size_t rem_w = len_w - off_w; + size_t chc_w = c3_min(rem_w, sizeof(rad_w)); + + c3_rand(rad_w); + memcpy(buf_y + off_w, (const uint8_t*)rad_w, chc_w); + off_w += chc_w; + } + + return 0; +} + static void _cttp_ws_proceed(u3_cws* cws_u) { @@ -1056,19 +1160,46 @@ _cttp_ws_proceed(u3_cws* cws_u) if ( !h2o_socket_is_writing(cws_u->sok_u) && wslay_event_want_write(cws_u->wsl_w) ) { - if ( 0 != wslay_event_send(cws_u->wsl_w) ) { - _cttp_ws_close(cws_u, c3y); - return; + int sas_i = wslay_event_send(cws_u->wsl_w); + + if ( 0 == sas_i ) { + handled = c3y; + } + else { + u3l_log("cttp: ws send err wid=%u code=%d want-read=%c want-write=%c", + cws_u->wid_l, + sas_i, + wslay_event_want_read(cws_u->wsl_w) ? 'y' : 'n', + wslay_event_want_write(cws_u->wsl_w) ? 'y' : 'n'); + + if ( WSLAY_ERR_WOULDBLOCK != sas_i ) { + // any other error is fatal to the websocket session + _cttp_ws_close(cws_u, c3y); + return; + } } - handled = c3y; } if ( cws_u->sok_u->input->size && wslay_event_want_read(cws_u->wsl_w) ) { - if ( 0 != wslay_event_recv(cws_u->wsl_w) ) { - _cttp_ws_close(cws_u, c3y); - return; + int ras_i = wslay_event_recv(cws_u->wsl_w); + + if ( 0 == ras_i ) { + handled = c3y; + } + else { + u3l_log("cttp: ws recv err wid=%u code=%d want-read=%c want-write=%c input=%zu", + cws_u->wid_l, + ras_i, + wslay_event_want_read(cws_u->wsl_w) ? 'y' : 'n', + wslay_event_want_write(cws_u->wsl_w) ? 'y' : 'n', + (size_t)cws_u->sok_u->input->size); + + if ( WSLAY_ERR_WOULDBLOCK != ras_i ) { + // wslay reports anything else as a hard protocol failure + _cttp_ws_close(cws_u, c3y); + return; + } } - handled = c3y; } if ( c3n == handled ) { @@ -1077,15 +1208,18 @@ _cttp_ws_proceed(u3_cws* cws_u) } if ( wslay_event_want_read(cws_u->wsl_w) ) { + u3l_log("cttp: ws proceed wid=%u want-read", cws_u->wid_l); h2o_socket_read_start(cws_u->sok_u, _cttp_ws_read_cb); } else if ( h2o_socket_is_writing(cws_u->sok_u) || wslay_event_want_write(cws_u->wsl_w) ) { + u3l_log("cttp: ws proceed wid=%u write-pending", cws_u->wid_l); h2o_socket_read_stop(cws_u->sok_u); } else { - _cttp_ws_close(cws_u, c3y); + u3l_log("cttp: ws proceed wid=%u idle", cws_u->wid_l); + h2o_socket_read_start(cws_u->sok_u, _cttp_ws_read_cb); } } @@ -1095,10 +1229,13 @@ _cttp_ws_read_cb(h2o_socket_t* sok_u, const c3_c* err_c) u3_cws* cws_u = sok_u->data; if ( err_c ) { + u3l_log("cttp: ws read err %s", err_c); _cttp_ws_close(cws_u, c3y); return; } + u3l_log("cttp: ws read wid=%u size=%zu", cws_u->wid_l, sok_u->input->size); + _cttp_ws_proceed(cws_u); } @@ -1113,10 +1250,12 @@ _cttp_ws_write_cb(h2o_socket_t* sok_u, const c3_c* err_c) } if ( err_c ) { + u3l_log("cttp: ws write err %s", err_c); _cttp_ws_close(cws_u, c3y); return; } + u3l_log("cttp: ws write done wid=%u", cws_u->wid_l); _cttp_ws_proceed(cws_u); } @@ -1124,14 +1263,17 @@ static void _cttp_ws_queue_close(u3_cws* cws_u) { if ( (u3_cws_open != cws_u->sat_e) || (0 == cws_u->wsl_w) ) { + u3l_log("cttp: ws queue close wid=%u direct", cws_u->wid_l); _cttp_ws_close(cws_u, c3y); return; } if ( u3_cws_closing == cws_u->sat_e ) { + u3l_log("cttp: ws queue close wid=%u already", cws_u->wid_l); return; } + u3l_log("cttp: ws queue close wid=%u normal", cws_u->wid_l); cws_u->sat_e = u3_cws_closing; wslay_event_queue_close(cws_u->wsl_w, WSLAY_CODE_NORMAL_CLOSURE, NULL, 0); _cttp_ws_proceed(cws_u); @@ -1218,6 +1360,9 @@ _cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c) if ( err_c ) { u3l_log("cttp: websocket handshake failed (%s)", err_c); } + else { + u3l_log("cttp: websocket handshake failed (unknown)"); + } _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul)); _cttp_ws_close(cws_u, c3n); @@ -1226,6 +1371,7 @@ _cttp_ws_fail_handshake(u3_creq* ceq_u, const c3_c* err_c) static c3_o _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url) { + u3l_log("cttp: ws start wid=%u", wid_l); u3_cws* cws_u = c3_calloc(sizeof(*cws_u)); cws_u->wid_l = wid_l; @@ -1234,6 +1380,8 @@ _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url) _cttp_ws_generate_key(cws_u); _cttp_ws_link(ctp_u, cws_u); + u3_atom nor = _cttp_ws_normalize_url(url); + u3_noun hed = u3_nul; hed = u3nc(u3nc(u3i_string("Sec-WebSocket-Key"), u3i_string(cws_u->key_c)), @@ -1247,14 +1395,20 @@ _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url) hed = u3nc(u3nc(u3i_string("Upgrade"), u3i_string("websocket")), hed); + { + c3_c* ori_c = _cttp_ws_origin(nor); + hed = u3nc(u3nc(u3i_string("Origin"), u3i_string(ori_c)), hed); + c3_free(ori_c); + } - u3_noun hes = u3nq(u3i_string("GET"), u3k(url), hed, u3_nul); + u3_noun hes = u3nq(u3i_string("GET"), u3k(nor), hed, u3_nul); u3_creq* ceq_u = _cttp_creq_new(ctp_u, wid_l, hes); if ( 0 == ceq_u ) { _cttp_ws_plan_event(cws_u, u3nc(u3i_string("reject"), u3_nul)); _cttp_ws_close(cws_u, c3n); + u3z(nor); u3z(url); return c3y; } @@ -1265,6 +1419,7 @@ _cttp_ws_start(u3_cttp* ctp_u, c3_l wid_l, u3_atom url) _cttp_creq_start(ceq_u); + u3z(nor); u3z(url); return c3y; } @@ -1411,6 +1566,13 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i, return 0; } + for ( size_t i = 0; i < hed_t; i++ ) { + h2o_header_t* hdr = &hed_u[i]; + u3l_log("cttp: ws hdr %.*s: %.*s", + (int)hdr->name->len, hdr->name->base, + (int)hdr->value.len, hdr->value.base); + } + h2o_iovec_t* acc_u = _cttp_find_header(hed_u, hed_t, "sec-websocket-accept"); c3_c exp_c[29]; @@ -1440,11 +1602,19 @@ _cttp_creq_on_head(h2o_http1client_t* cli_u, const c3_c* err_c, c3_i ver_i, ceq_u->cli_u = 0; cws_u->ceq_u = 0; + if ( sok_u->input && sok_u->input->size ) { + u3l_log("cttp: ws draining leftover handshake wid=%u size=%zu", + cws_u->wid_l, + (size_t)sok_u->input->size); + h2o_buffer_consume(&sok_u->input, sok_u->input->size); + } + _cttp_creq_free(ceq_u); memset(&cws_u->wcb_u, 0, sizeof(cws_u->wcb_u)); cws_u->wcb_u.recv_callback = _cttp_ws_recv_cb; cws_u->wcb_u.send_callback = _cttp_ws_send_cb; + cws_u->wcb_u.genmask_callback = _cttp_ws_genmask_cb; cws_u->wcb_u.on_msg_recv_callback = _cttp_ws_msg_cb; wslay_event_context_client_init(&cws_u->wsl_w, &cws_u->wcb_u, cws_u); @@ -1722,13 +1892,18 @@ _cttp_ef_http_client(u3_cttp* ctp_u, u3_noun tag, u3_noun dat) else { u3_cws* cws_u = _cttp_ws_find(ctp_u, wid_l); + u3_noun typ = u3h(evt); + if ( 0 == cws_u ) { - u3l_log("cttp: unknown websocket id %u", wid_l); - ret_o = c3n; + if ( c3y == u3r_sing_c("message", typ) ) { + u3l_log("cttp: unknown websocket id %u", wid_l); + ret_o = c3n; + } + else { + ret_o = c3y; + } } else { - u3_noun typ = u3h(evt); - if ( c3y == u3r_sing_c("message", typ) ) { ret_o = _cttp_ws_send_message(cws_u, u3k(u3t(evt))); } |
