summaryrefslogtreecommitdiff
path: root/vere/pkg/vere/io/mesa.c
diff options
context:
space:
mode:
Diffstat (limited to 'vere/pkg/vere/io/mesa.c')
-rw-r--r--vere/pkg/vere/io/mesa.c3065
1 files changed, 3065 insertions, 0 deletions
diff --git a/vere/pkg/vere/io/mesa.c b/vere/pkg/vere/io/mesa.c
new file mode 100644
index 0000000..73bb51f
--- /dev/null
+++ b/vere/pkg/vere/io/mesa.c
@@ -0,0 +1,3065 @@
+/// @file
+
+#include "vere.h"
+#include "ivory.h"
+
+#include "noun.h"
+#include "ur/ur.h"
+#include "ship.h"
+#include "io/ames/stun.h"
+#include "mesa/mesa.h"
+#include "mesa/bitset.h"
+#include <allocate.h>
+#include <error.h>
+#include <imprison.h>
+#include <inttypes.h>
+#include <jets/q.h>
+#include <manage.h>
+#include <c3/motes.h>
+#include <retrieve.h>
+#include <stdio.h>
+#include <string.h>
+#include <types.h>
+#include <stdlib.h>
+#include "lss.h"
+#include "arena.h"
+
+static c3_o dop_o = c3n;
+
+static c3_y are_y[524288];
+
+/* FILE* packs; */
+/* static c3_d tim_y[200000] = {0}; */
+/* static c3_o done = c3y; */
+
+/* #define PACKET_TEST c3y */
+
+// #define MESA_DEBUG c3y
+//#define MESA_TEST
+#define RED_TEXT "\033[0;31m"
+#define DEF_TEXT "\033[0m"
+#define REORDER_THRESH 5
+
+#define DIRECT_ROUTE_TIMEOUT_MICROS 5000000
+#define DIRECT_ROUTE_RETRY_MICROS 1000000
+#define PIT_EXPIRE_MICROS 20000000
+
+#define JUMBO_CACHE_MAX_SIZE 200000000 // 200 mb
+
+// logging and debug symbols
+#define MESA_SYM_DESC(SYM) MESA_DESC_ ## SYM
+#define MESA_SYM_FIELD(SYM) MESA_FIELD_ ## SYM
+#ifdef MESA_DEBUG
+ #define MESA_LOG(SAM_U, SYM, ...) { SAM_U->sat_u.MESA_SYM_FIELD(SYM)++; u3l_log("mesa: (%u) %s", __LINE__, MESA_SYM_DESC(SYM)); }
+#else
+ #define MESA_LOG(SAM_U, SYM, ...) { SAM_U->sat_u.MESA_SYM_FIELD(SYM)++; }
+#endif
+
+typedef struct _u3_mesa_stat {
+ c3_w dop_w; // dropped for other reasons
+ c3_w ser_w; // dropped for serialisation
+ c3_w aut_w; // droppped for auth
+ c3_w apa_w; // dropped bc no interest
+ c3_w inv_w; // non-fatal invariant violation
+ c3_w dup_w; // duplicates
+} u3_mesa_stat;
+
+#define MESA_DESC_STRANGE "dropped strange packet"
+#define MESA_FIELD_STRANGE dop_w
+
+#define MESA_DESC_SERIAL "dropped packet (serialisation)"
+#define MESA_FIELD_SERIAL ser_w
+
+#define MESA_DESC_AUTH "dropped packet (authentication)"
+#define MESA_FIELD_AUTH aut_w
+
+#define MESA_DESC_INVARIANT "invariant violation"
+#define MESA_FIELD_INVARIANT inv_w
+
+#define MESA_DESC_APATHY "dropped packet (no interest)"
+#define MESA_FIELD_APATHY apa_w
+
+#define MESA_DESC_DUPE "dropped packet (duplicate)"
+#define MESA_FIELD_DUPE dup_w
+
+// routing table sentinels
+#define MESA_CZAR 1 // pending dns lookup
+#define MESA_ROUT 2 // have route
+//
+// hop enum
+
+#define _mesa_met3_w(a_w) ((c3_bits_word(a_w) + 0x7) >> 3)
+
+struct _u3_mesa_pact;
+
+typedef struct _u3_pact_stat {
+ c3_y tie_y; // tries
+ c3_d sen_d; // last sent
+ c3_y sip_y; // skips
+} u3_pact_stat;
+
+struct _u3_mesa;
+
+typedef struct _u3_gage {
+ c3_w rtt_w; // rtt
+ c3_w rto_w; // rto
+ c3_w rtv_w; // rttvar
+ c3_w wnd_w; // cwnd
+ c3_w wnf_w; // cwnd fraction
+ c3_w sst_w; // ssthresh
+ c3_w con_w; // counter
+ //
+} u3_gage;
+
+struct _u3_mesa;
+
+typedef struct _u3_mesa_pict {
+ uv_udp_send_t snd_u;
+ struct _u3_mesa* sam_u;
+ u3_mesa_pact pac_u;
+} u3_mesa_pict;
+
+typedef struct _u3_lane_state {
+ c3_d sen_d; // last sent date
+ c3_d her_d; // last heard date
+ c3_w rtt_w; // round-trip time
+ c3_w rtv_w; // round-trip time variance
+} u3_lane_state;
+
+/* _u3_mesa: next generation networking
+ */
+
+typedef struct _u3_pit_addr u3_pit_addr;
+
+typedef struct sockaddr_in sockaddr_in;
+
+typedef struct _u3_pit_addr {
+ sockaddr_in sdr_u;
+ u3_pit_addr* nex_p;
+} u3_pit_addr;
+
+typedef struct _u3_pit_entry {
+ u3_pit_addr* adr_u;
+ c3_d tim_d;
+ arena are_u;
+} u3_pit_entry;
+
+typedef struct _u3_shap {
+ c3_d hed_d;
+ c3_d tel_d;
+} u3_shap;
+
+
+static u3_shap u3_ship_to_shap( u3_ship ship )
+{
+ return (u3_shap){ship[0], ship[1]};
+}
+
+static void u3_shap_to_ship( u3_ship ship, u3_shap shap )
+{
+ ship[0] = shap.hed_d;
+ ship[1] = shap.tel_d;
+}
+
+static void u3_free_pit( u3_pit_entry* pit_u )
+{
+ arena_free(&pit_u->are_u);
+}
+
+static void u3_free_str( u3_str key )
+{
+ c3_free(key.str_c);
+}
+
+static uint64_t u3_hash_str( u3_str key )
+{
+ c3_d hash = 0xcbf29ce484222325ull;
+ for (c3_w i = 0; i < key.len_w; i++) {
+ hash = ( (unsigned char)*(key.str_c)++ ^ hash ) * 0x100000001b3ull;
+ }
+ return hash;
+}
+
+static uint64_t u3_cmpr_str( u3_str key1, u3_str key2 )
+{
+ return key1.len_w == key2.len_w && memcmp( key1.str_c, key2.str_c, key1.len_w ) == 0;
+}
+
+static uint64_t u3_cmpr_shap( u3_shap ship1, u3_shap ship2 )
+{
+ return ship1.hed_d == ship2.hed_d && ship1.tel_d == ship2.tel_d;
+}
+
+static uint64_t u3_hash_shap(u3_shap ship)
+{
+ uint64_t combined = ship.hed_d ^ (ship.tel_d * 0x9e3779b97f4a7c15ull);
+ combined ^= combined >> 23;
+ combined *= 0x2127599bf4325c37ull;
+ combined ^= combined >> 47;
+ return combined;
+}
+
+typedef struct _u3_pend_req u3_pend_req;
+
+#define NAME req_map
+#define KEY_TY u3_str
+#define HASH_FN u3_hash_str
+#define CMPR_FN u3_cmpr_str
+#define VAL_TY u3_pend_req*
+#include "verstable.h"
+
+#define NAME pit_map
+#define KEY_TY u3_str
+#define HASH_FN u3_hash_str
+#define CMPR_FN u3_cmpr_str
+#define VAL_TY u3_pit_entry*
+#define VAL_DTOR_FN u3_free_pit
+#include "verstable.h"
+
+#define NAME gag_map
+#define KEY_TY u3_shap
+#define HASH_FN u3_hash_shap
+#define CMPR_FN u3_cmpr_shap
+#define VAL_TY u3_gage*
+#include "verstable.h"
+
+typedef enum _u3_mesa_ctag {
+ CTAG_WAIT = 1,
+ CTAG_BLOCK = 2,
+} u3_mesa_ctag;
+
+
+typedef struct _u3_scry_handle {
+ void* vod_p;
+ arena are_u;
+} u3_scry_handle;
+
+// jumbo frame cache value
+//
+typedef struct _u3_mesa_line {
+ u3_mesa_name nam_u; // full name for data, ready to serialize
+ u3_auth_data aut_u; // message authenticator
+ c3_w tob_d; // number of bytes in whole message
+ c3_w dat_w; // size in bytes of dat_y
+ c3_w len_w; // total allocated size, in bytes
+ c3_y* tip_y; // initial Merkle spine, nullable
+ c3_y* dat_y; // fragment data (1024 bytes per fragment)
+ c3_y* haz_y; // hash pairs (64 bytes per fragment)
+ arena are_u;
+} u3_mesa_line;
+
+
+static void u3_free_line( u3_mesa_line* lin_u )
+{
+ // CTAG_WAIT, CTAG_BLOCK
+ if ( lin_u > (u3_mesa_line*)2 ) {
+ arena_free(&lin_u->are_u);
+ }
+}
+
+#define NAME jum_map
+#define KEY_TY u3_str
+#define KEY_DTOR_FN u3_free_str
+#define VAL_DTOR_FN u3_free_line
+#define HASH_FN u3_hash_str
+#define CMPR_FN u3_cmpr_str
+#define VAL_TY u3_mesa_line*
+#include "verstable.h"
+
+typedef struct _u3_peer u3_peer;
+
+#define NAME per_map
+#define KEY_TY u3_shap
+#define HASH_FN u3_hash_shap
+#define CMPR_FN u3_cmpr_shap
+#define VAL_TY u3_peer*
+#include "verstable.h"
+
+typedef struct _u3_mesa {
+ u3_auto car_u;
+ u3_pier* pir_u;
+ union {
+ uv_udp_t wax_u;
+ uv_handle_t had_u;
+ };
+ u3_mesa_stat sat_u; // statistics
+ c3_l sev_l; // XX: ??
+ c3_o for_o; // is forwarding
+ per_map per_u; // (map ship u3_peer)
+ c3_d jum_d; // bytes in jumbo cache
+ jum_map jum_u; // jumbo cache
+ gag_map gag_u; // lane cache
+ pit_map pit_u; // (map path [our=? las=(set lane)])
+ req_map req_u; // (map [rift path] u3_pend_req)
+ c3_c* dns_c; // turf (urb.otrg)
+ c3_d tim_d; // XX: remove
+ arena are_u; // per packet arena
+ arena par_u; // permanent arena
+ uv_timer_t tim_u; // pit clear timer
+} u3_mesa;
+
+typedef struct _u3_peer {
+ u3_mesa* sam_u; // backpointer
+ u3_ship her_u; // who is this peer
+ c3_o ful_o; // has this been initialized?
+ sockaddr_in dan_u; // direct lane (nullable)
+ u3_lane_state dir_u; // direct lane state
+ c3_y imp_y; // galaxy @p
+ u3_lane_state ind_u; // indirect lane state
+} u3_peer;
+
+typedef struct _u3_pend_req {
+ u3_peer* per_u; // backpointer
+ c3_d nex_d; // number of the next fragment to be sent
+ c3_d tof_d; // total number of expected fragments
+ c3_d tob_d; // total number of expected bytes
+ u3_auth_data aut_u; // message authenticator
+ uv_timer_t tim_u; // timehandler
+ c3_y* dat_y; // ((mop @ud *) lte)
+ c3_d hav_d; // how many fragments we've received
+ c3_d lef_d; // lowest fragment number currently in flight/pending
+ c3_d out_d; // outstanding fragments in flight
+ c3_d old_d; // frag num of oldest packet sent
+ c3_d ack_d; // highest acked fragment number
+ u3_gage* gag_u; // congestion control
+ lss_pair* mis_u; // misordered packets
+ lss_verifier* los_u; // Lockstep verifier
+ u3_mesa_pict* pic_u; // preallocated request packet
+ u3_pact_stat* wat_u; // ((mop @ud packet-state) lte)
+ u3_bitset was_u; // ((mop @ud ?) lte)
+ c3_c* pek_c;
+ c3_w pek_w;
+ c3_d pek_d;
+ // stats TODO: use
+ c3_d beg_d; // date when request began
+ arena are_u;
+} u3_pend_req;
+
+/*
+ * typedef u3_mesa_req u3_noun
+ * [tot=@ waiting=(set @ud) missing=(set @ud) nex=@ud dat=(map @ud @)]
+ */
+
+typedef struct _u3_cace_enty {
+ u3_mesa_ctag typ_y;
+ union {
+ // u03_mesa_cace_wait wat_u;
+ // u3_mesa_cace_item res_u;
+ };
+} u3_cace_enty;
+
+// Sent request
+// because of lifecycles a u3_mesa_pact may have several libuv
+// callbacks associated with it, so we can't those as callback
+// instead just alloc new buffer and stick here
+typedef struct _u3_seal {
+ uv_udp_send_t snd_u; // udp send request
+ u3_mesa* sam_u;
+ c3_w len_w;
+ c3_y* buf_y;
+ arena are_u;
+} u3_seal;
+
+typedef struct _u3_mesa_cb_data {
+ u3_mesa* sam_u;
+ u3_mesa_name nam_u;
+ sockaddr_in lan_u;
+} u3_mesa_cb_data;
+
+static c3_d
+get_millis() {
+ struct timeval tp;
+
+ gettimeofday(&tp, NULL);
+ return ((c3_d)tp.tv_sec * 1000ull) + ((c3_d)tp.tv_usec / 1000ull);
+ // Convert the seconds to milliseconds by multiplying by 1000
+ // Convert the microseconds to milliseconds by dividing by 1000
+}
+
+static void
+_log_buf(c3_y* buf_y, c3_w len_w)
+{
+ for( c3_w i_w = 0; i_w < len_w; i_w++ ) {
+ fprintf(stderr, "%02x", buf_y[i_w]);
+ }
+ fprintf(stderr, "\r\n");
+}
+
+static void
+_log_gage(u3_gage* gag_u)
+{
+ u3l_log("gauge at %p", gag_u);
+ u3l_log("rtt: %f", ((double)gag_u->rtt_w / 1000));
+ u3l_log("rto: %f", ((double)gag_u->rto_w / 1000));
+ u3l_log("rttvar: %f", ((double)gag_u->rtv_w / 1000));
+ u3l_log("cwnd: %u", gag_u->wnd_w);
+ u3l_log("cwnd fraction: %f", gag_u->wnf_w / (float)gag_u->wnd_w );
+ u3l_log("ssthresh: %u", gag_u->sst_w);
+ u3l_log("counter: %u", gag_u->con_w);
+ //u3l_log("algorithm: %s", gag_u->alg_c);
+}
+
+static void
+_log_lane(u3_lane* lan_u)
+{
+ u3l_log("mesa: lane (%s,%u)", u3r_string(u3dc("scot", c3__if, u3i_word(lan_u->pip_w))), lan_u->por_s);
+}
+
+static void _log_peer(u3_peer* per_u)
+{
+ if ( per_u == NULL ) {
+ u3l_log("NULL peer");
+ return;
+ }
+ u3l_log("dir");
+ u3l_log("galaxy: %s", u3r_string(u3dc("scot", 'p', per_u->imp_y)));
+}
+
+static void
+_log_pend_req(u3_pend_req* req_u)
+{
+ if( req_u == NULL ) {
+ u3l_log("pending request was NULL");
+ return;
+ }
+ u3l_log("have: %"PRIu64, req_u->hav_d);
+ u3l_log("next: %"PRIu64, req_u->nex_d);
+ u3l_log("total: %" PRIu64, req_u->tof_d);
+ u3l_log("gage: %c", req_u->gag_u == NULL ? 'n' : 'y');
+ //u3l_log("timer in: %" PRIu64 " ms", uv_timer_get_due_in(&req_u->tim_u));
+}
+
+static void
+_log_mesa_data(u3_mesa_data dat_u)
+{
+ u3l_log("total bytes: %" PRIu64, dat_u.tob_d);
+ u3l_log("frag len: %u", dat_u.len_w);
+ // u3l_log("frag: %xxx", dat_u.fra_y);
+}
+
+/* _mesa_lop(): find beginning of page containing fra_d
+*/
+static inline c3_d
+_mesa_lop(c3_d fra_d)
+{
+ return fra_d & ~((1 << u3_Host.ops_u.jum_y) - 1);
+}
+
+static c3_d
+_get_now_micros()
+{
+ struct timeval tim_u;
+ gettimeofday(&tim_u, NULL);
+ return ((c3_d)tim_u.tv_sec * 1000ull) + ((c3_d)tim_u.tv_usec / 1000ull);
+}
+
+static c3_d
+_abs_dif(c3_d ayy_d, c3_d bee_d)
+{
+ return ayy_d > bee_d ? ayy_d - bee_d : bee_d - ayy_d;
+}
+
+static c3_d
+_clamp_rto(c3_d rto_d) {
+ return c3_min(c3_max(rto_d, 200 * 1000), 25000 * 1000); // ~s25 max backoff
+}
+
+static inline c3_o
+_mesa_is_lane_zero(sockaddr_in lan_u)
+{
+ return __((lan_u.sin_addr.s_addr == 0) && (lan_u.sin_port == 0));
+}
+
+static c3_o
+_mesa_is_direct_mode(u3_peer* per_u)
+{
+ c3_d now_d = _get_now_micros();
+ return __(per_u->dir_u.her_d + DIRECT_ROUTE_TIMEOUT_MICROS > now_d);
+}
+
+/* _mesa_encode_path(): produce buf_y as a parsed path
+*/
+static u3_noun
+_mesa_encode_path(c3_w len_w, c3_y* buf_y)
+{
+ u3_noun pro;
+ u3_noun* lit = &pro;
+
+ {
+ u3_noun* hed;
+ u3_noun* tel;
+ c3_y* fub_y = buf_y;
+ c3_y car_y;
+ c3_w tem_w;
+ u3i_slab sab_u;
+
+ while ( len_w-- ) {
+ car_y = *buf_y++;
+ if ( len_w == 0 ) {
+ buf_y++;
+ car_y = 47;
+ }
+
+ if ( 47 == car_y ) {
+ tem_w = buf_y - fub_y - 1;
+ u3i_slab_bare(&sab_u, 3, tem_w);
+ sab_u.buf_w[sab_u.len_w - 1] = 0;
+ memcpy(sab_u.buf_y, fub_y, tem_w);
+
+ *lit = u3i_defcons(&hed, &tel);
+ *hed = u3i_slab_moot(&sab_u);
+ lit = tel;
+ fub_y = buf_y;
+ }
+ }
+ }
+
+ *lit = u3_nul;
+
+ return pro;
+}
+
+static void
+_mesa_copy_auth_data(u3_auth_data* des_u, u3_auth_data* src_u)
+{
+ des_u->typ_e = src_u->typ_e;
+ switch ( des_u->typ_e ) {
+ case AUTH_HMAC: {
+ memcpy(des_u->mac_y, src_u->mac_y, 16);
+ } break;
+ case AUTH_SIGN: {
+ memcpy(des_u->sig_y, src_u->sig_y, 64);
+ } break;
+ case AUTH_PAIR: {
+ memcpy(des_u->has_y, src_u->has_y, 64);
+ } break;
+ case AUTH_NONE: {
+ } break;
+ default: u3_assert(!"unreachable");
+ }
+}
+
+static void
+_mesa_copy_name(u3_mesa_name* des_u, u3_mesa_name* src_u, arena* are_u)
+{
+ memcpy(des_u, src_u, sizeof(u3_mesa_name));
+ u3_ship_copy(des_u->her_u, src_u->her_u);
+ des_u->str_u.str_c = new(are_u, c3_c, src_u->str_u.len_w);
+ memcpy(des_u->str_u.str_c, src_u->str_u.str_c, src_u->str_u.len_w);
+ des_u->pat_c = des_u->str_u.str_c + (src_u->pat_c - src_u->str_u.str_c);
+}
+
+static u3_mesa_name*
+_mesa_copy_name_alloc(u3_mesa_name* src_u, arena* are_u)
+{
+ u3_mesa_name* des_u = new(are_u, u3_mesa_name, 1);
+ _mesa_copy_name(des_u, src_u, are_u);
+ return des_u;
+}
+
+static u3_atom
+_dire_etch_ud(c3_d num_d)
+{
+ c3_y hun_y[26];
+ c3_y* buf_y = u3s_etch_ud_smol(num_d, hun_y);
+ c3_w dif_w = (c3_p)buf_y - (c3_p)hun_y;
+ return u3i_bytes(26 - dif_w, buf_y); // XX known-non-null
+}
+
+/* _mesa_request_key(): produce key for request hashtable sam_u->req_p from nam_u
+*/
+u3_noun
+_mesa_request_key(u3_mesa_name* nam_u)
+{
+ u3_noun pax = _mesa_encode_path(nam_u->pat_s, (c3_y*)nam_u->pat_c);
+ u3_noun res = u3nc(u3i_word(nam_u->rif_w), pax);
+ return res;
+}
+
+static void
+_init_gage(u3_gage* gag_u) // microseconds
+{
+ gag_u->rto_w = 200 * 1000; // ~s1
+ gag_u->rtt_w = 1000 * 1000; // ~s1
+ gag_u->rtv_w = 1000 * 1000; // ~s1
+ /* gag_u->rto_w = 200 * 1000; // ~s1 */
+ /* gag_u->rtt_w = 82 * 1000; // ~s1 */
+ /* gag_u->rtv_w = 100 * 1000; // ~s1 */
+ gag_u->con_w = 0;
+ gag_u->wnd_w = 1;
+ gag_u->sst_w = 10000;
+}
+
+/* u3_mesa_encode_lane(): serialize lane to noun
+*/
+static u3_noun
+u3_mesa_encode_lane(sockaddr_in lan_u) {
+ // [%if ip=@ port=@]
+ c3_w pip_w = ntohl(lan_u.sin_addr.s_addr);
+ c3_s por_s = ntohs(lan_u.sin_port);
+ return u3nt(c3__if, u3i_word(pip_w), por_s);
+}
+
+static u3_peer*
+_mesa_get_peer(u3_mesa* sam_u, u3_ship her)
+{
+ per_map_itr itr_u = vt_get(&sam_u->per_u, u3_ship_to_shap(her));
+ if ( vt_is_end(itr_u) ) {
+ return NULL;
+ }
+ return itr_u.data->val;
+}
+
+static void
+_mesa_put_peer(u3_mesa* sam_u, u3_ship her_u, u3_peer* per_u)
+{
+ u3_shap him_u = u3_ship_to_shap(her_u);
+ per_map_itr itr_u = vt_get(&sam_u->per_u, him_u);
+
+ if ( vt_is_end(itr_u) ) {
+ itr_u = vt_insert(&sam_u->per_u, him_u, per_u);
+
+ if ( vt_is_end(itr_u) ) {
+ fprintf(stderr, "mesa: cannot allocate memory for peer, dying");
+ u3_king_bail();
+ }
+ } else {
+ memcpy(itr_u.data->val, per_u, sizeof(u3_peer));
+ }
+}
+
+/* _mesa_get_request(): produce pending request state for nam_u
+ *
+ * produces a NULL pointer if no pending request exists
+*/
+static u3_pend_req*
+_mesa_get_request(u3_mesa* sam_u, u3_mesa_name* nam_u) {
+ u3_str key_u = {nam_u->pat_c, nam_u->pat_s};
+ req_map_itr itr_u = vt_get(&sam_u->req_u, key_u);
+ if ( vt_is_end(itr_u) ) {
+ return NULL;
+ }
+ return itr_u.data->val;
+}
+
+static void
+_mesa_del_request_cb(uv_handle_t* han_u) {
+ u3_pend_req* req_u = han_u->data;
+ arena_free(&req_u->are_u);
+}
+
+static void
+_mesa_del_request(u3_mesa* sam_u, u3_mesa_name* nam_u) {
+ u3_str key_u = {nam_u->pat_c, nam_u->pat_s};
+ req_map_itr itr_u = vt_get(&sam_u->req_u, key_u);
+
+ if ( vt_is_end(itr_u) ) {
+ return;
+ }
+
+ u3_pend_req* req_u = itr_u.data->val;
+ vt_erase(&sam_u->req_u, key_u);
+ if ( (u3_pend_req*)CTAG_WAIT != req_u ) {
+ uv_timer_stop(&req_u->tim_u);
+ req_u->tim_u.data = req_u;
+ uv_close((uv_handle_t*)&req_u->tim_u, _mesa_del_request_cb);
+ }
+}
+
+/* _mesa_put_request(): save new pending request state for nam_u
+*/
+static void
+_mesa_put_request(u3_mesa* sam_u, u3_mesa_name* nam_u, u3_pend_req* req_u) {
+ u3_str key_u = {nam_u->pat_c, nam_u->pat_s};
+ req_map_itr itr_u = vt_insert(&sam_u->req_u, key_u, req_u);
+
+ if ( vt_is_end(itr_u) ) {
+ fprintf(stderr, "mesa: cannot allocate memory for request, dying");
+ u3_king_bail();
+ }
+}
+
+/* _ames_czar_port(): udp port for galaxy.
+ XX copied from io/ames.c
+*/
+static c3_s
+_ames_czar_port(c3_y imp_y)
+{
+ if ( c3n == u3_Host.ops_u.net ) {
+ return 31337 + imp_y;
+ }
+ else {
+ return 13337 + imp_y;
+ }
+}
+
+static sockaddr_in
+_mesa_get_direct_lane(u3_mesa* sam_u, u3_ship her_u)
+{
+ sockaddr_in adr_u = {0};
+ adr_u.sin_family = AF_INET;
+
+ if ( c3__czar == u3_ship_rank(her_u) ) {
+ c3_s por_s = _ames_czar_port(her_u[0]);
+ adr_u.sin_addr.s_addr = htonl(u3_Host.imp_u[her_u[0]]);
+ adr_u.sin_port = htons(por_s);
+ return adr_u;
+ }
+
+ per_map_itr itr_u = vt_get(&sam_u->per_u, u3_ship_to_shap(her_u));
+ if ( vt_is_end(itr_u) ) {
+ return adr_u;
+ }
+
+ adr_u = itr_u.data->val->dan_u;
+
+
+ return adr_u;
+}
+
+static c3_o
+_mesa_lanes_equal(sockaddr_in lan_u, sockaddr_in lon_u)
+{
+ return __((lan_u.sin_addr.s_addr == lon_u.sin_addr.s_addr) && (lan_u.sin_port == lon_u.sin_port));
+}
+
+static sockaddr_in
+_mesa_get_czar_lane(u3_mesa* sam_u, c3_y imp_y)
+{
+ c3_s por_s = _ames_czar_port(imp_y);
+ sockaddr_in adr_u = {0};
+ adr_u.sin_family = AF_INET;
+ adr_u.sin_addr.s_addr = htonl(u3_Host.imp_u[imp_y]);
+ adr_u.sin_port = htons(por_s);
+ return adr_u;
+}
+
+/* _mesa_get_lane(): get lane
+*/
+static u3_gage*
+_mesa_get_gage(u3_mesa* sam_u, u3_ship her_u) {
+ gag_map_itr itr_u = vt_get(&sam_u->gag_u, u3_ship_to_shap(her_u));
+ if ( vt_is_end(itr_u) ) {
+ return NULL;
+ }
+ return itr_u.data->val;
+}
+
+/* _mesa_put_gage(): put gage state in state
+ *
+*/
+static void
+_mesa_put_gage(u3_mesa* sam_u, u3_ship her_u, u3_gage* gag_u)
+{
+ gag_map_itr itr_u = vt_insert(&sam_u->gag_u, u3_ship_to_shap(her_u), gag_u);
+ if ( vt_is_end(itr_u) ) {
+ fprintf(stderr, "mesa: cannot allocate memory for gage, dying");
+ u3_king_bail();
+ }
+}
+// congestion control update
+static void _mesa_handle_ack(u3_gage* gag_u, u3_pact_stat* pat_u)
+{
+ /* _log_gage(gag_u); */
+ gag_u->con_w++;
+
+ c3_d now_d = _get_now_micros();
+ c3_d rtt_d = now_d < pat_u->sen_d ? 0 : now_d - pat_u->sen_d;
+
+ c3_d err_d = _abs_dif(rtt_d, gag_u->rtt_w);
+
+ gag_u->rtt_w = (rtt_d + (gag_u->rtt_w * 7)) >> 3;
+ gag_u->rtv_w = (err_d + (gag_u->rtv_w * 7)) >> 3;
+ gag_u->rto_w = _clamp_rto(gag_u->rtt_w + (4*gag_u->rtv_w));
+
+ if ( gag_u->wnd_w < gag_u->sst_w ) {
+ gag_u->wnd_w++;
+ } else if ( gag_u->wnd_w <= ++gag_u->wnf_w ) {
+ gag_u->wnd_w++;
+ gag_u->wnf_w = 0;
+ }
+}
+
+static inline c3_d
+_mesa_req_get_remaining(u3_pend_req* req_u)
+{
+ return req_u->tof_d - req_u->hav_d;
+}
+
+static c3_d
+_safe_sub(c3_d a, c3_d b) {
+ return a < b ? 0 : a - b;
+}
+
+/*
+ * _mesa_req_get_cwnd(): produce packets to send
+ *
+ * saves next fragment number and preallocated pact into the passed pointers.
+ * Will not do so if returning 0
+*/
+static c3_w
+_mesa_req_get_cwnd(u3_pend_req* req_u)
+{
+ /* c3_w liv_w = bitset_wyt(&req_u->was_u); */
+ c3_w rem_w = _mesa_req_get_remaining(req_u);
+ /* u3l_log("rem_w %u wnd_w %u", rem_w, req_u->gag_u->wnd_w); */
+
+ /* u3l_log("rem_w %u", rem_w); */
+ /* u3l_log("wnd_w %u", req_u->gag_u->wnd_w); */
+ /* u3l_log("out_d %"PRIu64, req_u->out_d); */
+ /* return c3_min(rem_w, _safe_sub(3500, req_u->out_d)); */
+ c3_d ava_d = _safe_sub((c3_d)req_u->gag_u->wnd_w, req_u->out_d);
+ return c3_min(rem_w, ava_d);
+ /* return c3_min(rem_w, 5000 - req_u->out_d); */
+}
+
+/* _mesa_req_pact_resent(): mark packet as resent
+**
+*/
+static void
+_mesa_req_pact_resent(u3_pend_req* req_u, u3_mesa_name* nam_u, c3_d now_d)
+{
+ req_u->wat_u[nam_u->fra_d].sen_d = now_d;
+ req_u->wat_u[nam_u->fra_d].tie_y++;
+}
+
+/* _mesa_req_pact_sent(): mark packet as sent
+** after 1-RT first packet is handled in _mesa_req_pact_init()
+*/
+static void
+_mesa_req_pact_sent(u3_pend_req* req_u, c3_d fra_d, c3_d now_d)
+{
+ if( req_u->nex_d == fra_d ) {
+ req_u->nex_d++;
+ req_u->out_d++;
+ }
+ // TODO: optional assertions?
+ req_u->wat_u[fra_d].sen_d = now_d;
+ req_u->wat_u[fra_d].sip_y = 0;
+ req_u->wat_u[fra_d].tie_y++;
+}
+
+/* _ames_alloc(): libuv buffer allocator.
+*/
+static void
+_mesa_alloc(uv_handle_t* had_u, size_t len_i, uv_buf_t* buf)
+{
+ u3_mesa* sam_u = (u3_mesa*)had_u->data;
+ sam_u->are_u.beg = (char*)are_y;
+ c3_c* ptr_v = new(&sam_u->are_u, c3_c, 400000);
+ buf->base = ptr_v;
+ buf->len = 400000;
+}
+
+/* u3_mesa_decode_lane(): deserialize noun to lane; 0.0.0.0:0 if invalid
+*/
+static sockaddr_in
+u3_mesa_decode_lane(u3_atom lan) {
+ sockaddr_in adr_u = {0};
+ c3_d lan_d;
+
+ if ( c3n == u3r_safe_chub(lan, &lan_d) || (lan_d >> 48) != 0 ) {
+ return adr_u;
+ }
+
+ u3z(lan);
+ // convert incoming localhost to outgoing localhost
+ //
+
+ adr_u.sin_addr.s_addr = ( u3_Host.ops_u.net == c3y ) ?
+ htonl((c3_w)lan_d) : htonl(0x7f000001);
+ adr_u.sin_port = htons((c3_s)(lan_d >> 32));
+
+ return adr_u;
+}
+
+// END plagariasm zone
+//
+//
+//
+//
+//
+//
+static void _mesa_free_seal(u3_seal* sel_u)
+{
+ c3_free(sel_u->buf_y);
+ c3_free(sel_u);
+}
+
+static void
+_mesa_send_cb(uv_udp_send_t* req_u, c3_i sas_i)
+{
+ u3_seal* sel_u = (u3_seal*)req_u;
+ u3_mesa* sam_u = sel_u->sam_u;
+
+ if ( sas_i ) {
+ u3l_log("mesa: send fail_async: %s", uv_strerror(sas_i));
+ //sam_u->fig_u.net_o = c3n;
+ }
+ else {
+ //sam_u->fig_u.net_o = c3y;
+ }
+
+ _mesa_free_seal(sel_u);
+}
+
+static void
+_mesa_send_cb2(uv_udp_send_t* req_u, c3_i sas_i)
+{
+ if ( sas_i ) {
+ u3l_log("mesa: send fail_async: %s", uv_strerror(sas_i));
+ //sam_u->fig_u.net_o = c3n;
+ }
+ c3_free(req_u);
+}
+
+typedef struct _send_helper {
+ uv_udp_send_t snd_u;
+ u3_pend_req* req_u;
+ c3_d fra_d;
+} send_helper;
+
+static void
+_mesa_send_cb3(uv_udp_send_t* snt_u, c3_i sas_i)
+{
+ send_helper* snd_u = (send_helper*)snt_u;
+ if ( sas_i ) {
+ u3l_log("mesa: send fail_async: %s", uv_strerror(sas_i));
+ //sam_u->fig_u.net_o = c3n;
+ } else {
+ /* _mesa_req_pact_sent(snd_u->req_u, snd_u->fra_d); */
+ }
+ c3_free(snd_u);
+}
+
+static c3_i _mesa_send_buf2(struct sockaddr** ads_u, uv_buf_t** bfs_u, c3_w* int_u, c3_w num_w)
+{
+
+ /* add_u.sin_family = AF_INET; */
+
+ /* if ( u3_Host.ops_u.net && (add_u.sin_addr.s_addr == 0 || add_u.sin_addr.s_addr == 0xffffffff) ) { */
+ /* return; */
+ /* } */
+
+#ifdef MESA_DEBUG
+ /* c3_c* sip_c = inet_ntoa(add_u.sin_addr); */
+ // u3l_log("mesa: sending packet to %s:%u", sip_c, por_s);
+#endif
+ c3_i sas_i = uv_udp_try_send2(&u3_Host.wax_u, num_w, bfs_u, int_u, ads_u, 0);
+ if ( sas_i < 0 ) {
+ u3l_log("ames: send fail_sync: %s", uv_strerror(sas_i));
+ return 0;
+ }
+ return sas_i;
+// else if ( sas_i < (c3_i)num_w) {
+// for ( c3_i i = sas_i; i < (c3_i)num_w; i++) {
+// uv_udp_send_t* req_u = c3_malloc(sizeof(*req_u));
+// uv_udp_send(req_u, &u3_Host.wax_u, bfs_u[i], 1, ads_u[0], _mesa_send_cb2);
+// }
+// }
+}
+static void _mesa_send_buf3(sockaddr_in add_u, uv_buf_t buf_u)
+{
+
+ add_u.sin_addr.s_addr = ( u3_Host.ops_u.net == c3y ) ? add_u.sin_addr.s_addr : htonl(0x7f000001);
+ /* add_u.sin_family = AF_INET; */
+
+ if ( u3_Host.ops_u.net && (add_u.sin_addr.s_addr == 0 || add_u.sin_addr.s_addr == 0xffffffff) ) {
+ return;
+ }
+
+#ifdef MESA_DEBUG
+ // c3_c* sip_c = inet_ntoa(add_u.sin_addr);
+ // u3l_log("mesa: sending packet to %s:%u", sip_c, por_s);
+#endif
+
+
+ uv_udp_send_t* snd_u = c3_malloc(sizeof(*snd_u));
+
+ c3_i sas_i = uv_udp_send((uv_udp_send_t*)snd_u,
+ &u3_Host.wax_u,
+ &buf_u, 1,
+ (const struct sockaddr*)&add_u,
+ _mesa_send_cb3);
+
+ if ( sas_i ) {
+ u3l_log("ames: send fail_sync: %s", uv_strerror(sas_i));
+ /*if ( c3y == sam_u->fig_u.net_o ) {
+ //sam_u->fig_u.net_o = c3n;
+ }*/
+ }
+}
+
+static void _mesa_send_buf(u3_mesa* sam_u, sockaddr_in add_u, c3_y* buf_y, c3_w len_w)
+{
+
+ add_u.sin_addr.s_addr = ( u3_Host.ops_u.net == c3y ) ? add_u.sin_addr.s_addr : htonl(0x7f000001);
+ add_u.sin_family = AF_INET;
+
+ if ( u3_Host.ops_u.net && (add_u.sin_addr.s_addr == 0 || add_u.sin_addr.s_addr == 0xffffffff) ) {
+ c3_free(buf_y);
+ return;
+ }
+
+ u3_seal* sel_u = c3_calloc(sizeof(*sel_u));
+ sel_u->buf_y = buf_y;
+ sel_u->len_w = len_w;
+ sel_u->sam_u = sam_u;
+
+
+
+#ifdef MESA_DEBUG
+ c3_c* sip_c = inet_ntoa(add_u.sin_addr);
+ u3l_log("mesa: sending packet to %s:%u", sip_c, ntohs(add_u.sin_port));
+#endif
+
+ uv_buf_t buf_u = uv_buf_init((c3_c*)buf_y, len_w);
+
+ c3_i sas_i = uv_udp_send(&sel_u->snd_u,
+ &u3_Host.wax_u,
+ &buf_u, 1,
+ (const struct sockaddr*)&add_u,
+ _mesa_send_cb);
+
+ if ( sas_i ) {
+ u3l_log("ames: send fail_sync: %s", uv_strerror(sas_i));
+ /*if ( c3y == sam_u->fig_u.net_o ) {
+ //sam_u->fig_u.net_o = c3n;
+ }*/
+ _mesa_free_seal(sel_u);
+ }
+}
+
+static void _mesa_send(u3_mesa_pict* pic_u, sockaddr_in lan_u)
+{
+ u3_mesa* sam_u = pic_u->sam_u;
+ c3_y *buf_y = c3_calloc(PACT_SIZE);
+ c3_w len_w = mesa_etch_pact_to_buf(buf_y, PACT_SIZE, &pic_u->pac_u);
+ _mesa_send_buf(sam_u, lan_u, buf_y, len_w);
+}
+
+typedef struct _u3_mesa_request_data {
+ u3_mesa* sam_u;
+ u3_ship her_u;
+ u3_mesa_name* nam_u;
+ c3_y* buf_y;
+ c3_w len_w;
+ u3_pit_addr* las_u;
+ arena are_u;
+} u3_mesa_request_data;
+
+typedef struct _u3_mesa_resend_data {
+ arena are_u;
+ uv_timer_t tim_u;
+ u3_mesa_request_data dat_u;
+ c3_y ret_y; // number of remaining retries
+} u3_mesa_resend_data;
+
+
+static void
+_mesa_free_cb(uv_handle_t *han_u)
+{
+ u3_mesa_resend_data* res_u = han_u->data;
+ arena_free(&res_u->are_u);
+}
+
+static void
+_mesa_free_resend_data(u3_mesa_resend_data* res_u)
+{
+ uv_timer_stop(&res_u->tim_u);
+ res_u->tim_u.data = res_u;
+ uv_close((uv_handle_t*)&res_u->tim_u, _mesa_free_cb);
+}
+
+static void
+_mesa_send_bufs(u3_mesa* sam_u,
+ u3_peer* per_u,
+ c3_y* buf_y,
+ c3_w len_w,
+ u3_pit_addr* las_u);
+
+static void
+_mesa_send_modal(u3_peer* per_u, uv_buf_t buf_u, u3_pit_addr* las_u)
+{
+ u3_mesa* sam_u = per_u->sam_u;
+ c3_d now_d = _get_now_micros();
+
+ c3_w len_w = buf_u.len;
+ c3_y* sen_y = c3_calloc(len_w);
+ memcpy(sen_y, buf_u.base, len_w);
+
+ u3_ship gal_u = {0};
+ gal_u[0] = per_u->imp_y;
+ c3_o our_o = u3_ships_equal(gal_u, sam_u->pir_u->who_d);
+
+ if ( ( c3y == _mesa_is_direct_mode(per_u) ) ||
+ // if we are the sponsor of the ship, don't send to ourselves
+ (our_o == c3y) ) {
+ // u3l_log("mesa: direct");
+ _mesa_send_buf(sam_u, per_u->dan_u, sen_y, len_w);
+ per_u->dir_u.sen_d = now_d;
+ }
+ else if ( las_u != NULL ) {
+ _mesa_send_bufs(sam_u, per_u, sen_y, len_w, las_u);
+ } else {
+ #ifdef MESA_DEBUG
+ /* c3_c* gal_c = u3_ship_to_string(gal_u); */
+ // u3l_log("mesa: sending to %s", gal_c);
+ /* c3_free(gal_c); */
+ #endif
+ //
+ sockaddr_in imp_u = _mesa_get_czar_lane(sam_u, per_u->imp_y);
+ _mesa_send_buf(sam_u, imp_u, sen_y, len_w);
+ per_u->ind_u.sen_d = now_d;
+
+ if ( c3n == _mesa_is_lane_zero(per_u->dan_u) ) {
+ c3_y* san_y = c3_calloc(len_w);
+ memcpy(san_y, buf_u.base, len_w);
+ _mesa_send_buf(sam_u, per_u->dan_u, san_y, len_w);
+ per_u->dir_u.sen_d = now_d;
+ }
+ }
+}
+
+
+static uv_buf_t
+_mesa_peek_buf(c3_c* pek_c, c3_d fra_d, c3_w pek_w)
+// 43
+{
+ if (fra_d <= 0xff) {
+ return uv_buf_init(pek_c+(fra_d*pek_w), pek_w);
+ }
+ if (fra_d <= 0xffff) {
+ return uv_buf_init(
+ pek_c + (0x100 * pek_w) + ((fra_d - 0x100) * (pek_w + 1)),
+ pek_w + 1
+ );
+ }
+ if (fra_d <= 0xffffff) {
+ return uv_buf_init(
+ pek_c + (0x100 * pek_w) +
+ ((fra_d - 0x100) * (pek_w + 1)) +
+ ((fra_d - 0x10000) * (pek_w + 3)),
+ pek_w + 3
+ );
+ }
+ return uv_buf_init(
+ pek_c + (0xff * pek_w) +
+ ((fra_d - 0x100) * (pek_w + 1)) +
+ ((fra_d - 0x10000) * (pek_w + 3)) +
+ ((fra_d - 0x1000000ULL) * (pek_w + 7)),
+ pek_w + 7
+ );
+}
+
+static void
+_try_resend(u3_pend_req* req_u, c3_d nex_d)
+{
+ c3_o los_o = c3n;
+ c3_d now_d = _get_now_micros();
+ u3_mesa_pact *pac_u = &req_u->pic_u->pac_u;
+
+ /* c3_y buf_y[PACT_SIZE]; */
+ /* arena scr_u = req_u->are_u; */
+ /* uv_buf_t* bfs_u = new(&scr_u, uv_buf_t, 1); */
+ // c3_w i_w = 0;
+ for ( c3_d i_d = req_u->lef_d; i_d < nex_d; i_d++ ) {
+ // TODO: make fast recovery different from slow
+ // TODO: track skip count but not dupes, since dupes are meaningless
+ if ( (c3n == bitset_has(&req_u->was_u, i_d)) &&
+ (now_d - req_u->wat_u[i_d].sen_d > req_u->gag_u->rto_w) ) {
+ // u3l_log("now_d %"PRIu64, now_d);
+ // u3l_log("sen_d %"PRIu64, req_u->wat_u[i_d].sen_d);
+ // u3l_log("rto_w %u", req_u->gag_u->rto_w);
+ los_o = c3y;
+ /* u3l_log("resend fra_w: %llu", i_d); */
+
+ uv_buf_t buf_u = _mesa_peek_buf(req_u->pek_c, i_d, req_u->pek_w);
+ /* if (buf_u.base < req_u->pek_c) { */
+ /* u3l_log("peek overflow, dying, fragment %"PRIu64, i_d); */
+ /* abort(); */
+ /* } */
+ /* new(&scr_u, uv_buf_t, 1); */
+ /* bfs_u[i_w] = buf_u; */
+ /* c3_w len_w = mesa_etch_pact_to_buf(buf_y, PACT_SIZE, pac_u); */
+ /* _mesa_send_buf3(req_u->per_u->dan_u, buf_u, req_u, i_d); */
+ _mesa_send_modal(req_u->per_u, buf_u, NULL);
+ _mesa_req_pact_resent(req_u, &pac_u->pek_u.nam_u, now_d);
+ // i_w++;
+ }
+ }
+ /* c3_w* int_u = new(&scr_u, c3_w, i_w); */
+ /* struct sockaddr** ads_u = new(&scr_u, struct sockaddr*, i_w); */
+ /* uv_buf_t** bus_u = new(&scr_u, uv_buf_t*, i_w); */
+ /* for (c3_w j_w = 0; j_w < i_w; j_w++) { */
+ /* ads_u[j_w] = (struct sockaddr*)&req_u->per_u->dan_u; */
+ /* bus_u[j_w] = &bfs_u[j_w]; */
+ /* int_u[j_w] = 1; */
+ /* } */
+
+ if ( c3y == los_o ) {
+ /* _mesa_send_buf2(req_u->per_u->sam_u, ads_u, bus_u, int_u, i_w); */
+ /* _mesa_send_buf2(req_u->per_u->sam_u, req_u->per_u->dan_u, bfs_u, i_w); */
+ req_u->gag_u->sst_w = c3_max(1, req_u->gag_u->wnd_w / 2);
+ req_u->gag_u->wnd_w = req_u->gag_u->sst_w;
+ req_u->gag_u->rto_w = _clamp_rto(req_u->gag_u->rto_w * 2);
+ // u3l_log("loss");
+ // u3l_log("resent %u", i_w);
+ // u3l_log("counter %u hav_d %"PRIu64 " nex_d %"PRIu64 " ack_d %"PRIu64 " lef_d %"PRIu64 " old_d %"PRIu64, req_u->los_u->counter, req_u->hav_d, req_u->nex_d, req_u->ack_d, req_u->lef_d, req_u->old_d);
+ // _log_gage(req_u->gag_u);
+ }
+}
+
+static void
+_mesa_packet_timeout(uv_timer_t* tim_u);
+
+// TODO rename to indicate it sets a timer
+static void
+_update_resend_timer(u3_pend_req *req_u)
+{
+ // scan in flight packets, find oldest
+ c3_w idx_d = req_u->lef_d;
+ /* c3_d now_d = _get_now_micros(); */
+ /* c3_d wen_d = now_d; */
+ /* for ( c3_d i = req_u->lef_d; i < req_u->nex_d; i++ ) { */
+ /* // u3l_log("fra %u (%u)", i, __LINE__); */
+ /* if ( c3n == bitset_has(&req_u->was_u, i) && */
+ /* wen_d > req_u->wat_u[i].sen_d */
+ /* ) { */
+ /* wen_d = req_u->wat_u[i].sen_d; */
+ /* idx_d = i; */
+ /* } */
+ /* } */
+/* if ( now_d == wen_d ) { */
+/* #ifdef MESA_DEBUG */
+/* /\* u3l_log("failed to find new oldest"); *\/ */
+/* #endif */
+/* } */
+ /* req_u->old_d = idx_d; */
+ req_u->tim_u.data = req_u;
+ // c3_d gap_d = req_u->wat_u[idx_d].sen_d == 0 ?
+ // 0 :
+ // now_d - req_u->wat_u[idx_d].sen_d;
+ c3_d next_expiry = req_u->gag_u->rto_w;
+ // u3l_log("next_expiry %llu", next_expiry / 1000);
+ // u3l_log("DUE %llu", uv_timer_get_due_in(&req_u->tim_u));
+ uv_timer_start(&req_u->tim_u, _mesa_packet_timeout, next_expiry / 1000, 0);
+}
+
+/* _mesa_packet_timeout(): callback for packet timeout
+*/
+static void
+_mesa_packet_timeout(uv_timer_t* tim_u) {
+ u3_pend_req* req_u = (u3_pend_req*)tim_u->data;
+ // u3l_log("old %llu nex %llu packet timed out", req_u->old_d, req_u->nex_d);
+ _try_resend(req_u, req_u->nex_d);
+ _update_resend_timer(req_u);
+}
+
+static c3_o
+_mesa_burn_misorder_queue(u3_pend_req* req_u, c3_y boq_y, c3_w ack_w)
+{
+ c3_d num_d;
+ c3_d max_d = req_u->tof_d;
+ c3_o res_o = c3y;
+ for ( num_d = 0; (num_d + ack_w) < max_d; num_d++ ) {
+ c3_w siz_w = (1 << (boq_y - 3)); // XX
+ c3_y* fra_y = req_u->dat_y + (siz_w * (ack_w + num_d));
+ c3_w len_w = (num_d + ack_w) == (max_d - 1) ? req_u->tob_d % 1024 : 1024;
+ lss_pair* pur_u = &req_u->mis_u[ack_w + num_d];
+ lss_pair* par_u = (0 == memcmp(pur_u, &(lss_pair){0}, sizeof(lss_pair))) ? NULL : &req_u->mis_u[ack_w + num_d];
+ if ( c3n == bitset_has(&req_u->was_u, (num_d + ack_w)) ) {
+ break;
+ }
+ if ( c3y != lss_verifier_ingest(req_u->los_u, fra_y, len_w, par_u) ) {
+ res_o = c3n;
+ u3l_log("fail to burn %" PRIu64 " %" PRIu64, num_d + ack_w, req_u->tof_d);
+ break;
+ }
+ // u3l_log("size %u counter %u num %u fra %u inx %u lef_d %u", siz_w, req_u->los_u->counter , num_w, fra_d, (req_u->los_u->counter + num_w + 1), lef_d);
+ }
+
+ // ratchet forward
+ /* u3l_log("burned %llu", num_d); */
+ num_d++; // account for the in-ordered packet processed in _mesa_req_pact_done
+ req_u->lef_d += num_d;
+ req_u->hav_d += num_d;
+
+ return res_o;
+}
+
+static void
+_init_lane_state(u3_lane_state* sat_u);
+
+/* _mesa_req_pact_done(): mark packet as done
+*/
+static void
+_mesa_req_pact_done(u3_pend_req* req_u,
+ u3_mesa_name* nam_u,
+ u3_mesa_data* dat_u,
+ c3_y hop_y,
+ sockaddr_in lan_u)
+{
+ u3_mesa* sam_u = req_u->per_u->sam_u; // needed for the MESA_LOG macro
+
+ // received past the end of the message
+ if ( mesa_num_leaves(dat_u->tob_d) <= nam_u->fra_d ) {
+ u3l_log("strange tob_d %"PRIu64" fra_d %"PRIu64" req_u %"PRIu64,
+ dat_u->tob_d, nam_u->fra_d, req_u->hav_d);
+ MESA_LOG(sam_u, STRANGE);
+ // XX: is this sufficient to drop whole request
+ return;
+ }
+
+ // received duplicate
+ if ( c3y == bitset_has(&req_u->was_u, nam_u->fra_d) ) {
+ // MESA_LOG(sam_u, DUPE);
+ /* _update_resend_timer(req_u); */
+ return;
+ }
+
+ lss_pair* par_u = NULL;
+
+ c3_w siz_w = (1 << (nam_u->boq_y - 3));
+ memcpy(req_u->dat_y + (siz_w * nam_u->fra_d), dat_u->fra_y, dat_u->len_w);
+
+ if ( dat_u->aut_u.typ_e == AUTH_PAIR ) {
+ par_u = &req_u->mis_u[nam_u->fra_d];
+ memcpy((*par_u)[0], dat_u->aut_u.has_y[0], sizeof(lss_hash));
+ memcpy((*par_u)[1], dat_u->aut_u.has_y[1], sizeof(lss_hash));
+ }
+
+ if ( req_u->los_u->counter != nam_u->fra_d ) {
+ // insert into misordered queue
+ /* u3l_log("insert into misordered queue fra: %llu [counter %u]", */
+ /* nam_u->fra_d, */
+ /* req_u->los_u->counter); */
+ req_u->out_d--;
+ bitset_put(&req_u->was_u, nam_u->fra_d);
+
+ _mesa_handle_ack(req_u->gag_u, &req_u->wat_u[nam_u->fra_d]);
+ /* _try_resend(req_u, nam_u->fra_d); */
+ /* _update_resend_timer(req_u); */
+ return;
+ }
+ else if ( c3y != lss_verifier_ingest(req_u->los_u, dat_u->fra_y, dat_u->len_w, par_u) ) {
+ u3l_log("auth fail frag %"PRIu64, nam_u->fra_d);
+ u3l_log("nit_o %u", nam_u->nit_o);
+ _mesa_del_request(sam_u, nam_u);
+ MESA_LOG(sam_u, AUTH);
+ return;
+ }
+ else if ( c3y != _mesa_burn_misorder_queue(req_u, nam_u->boq_y, req_u->los_u->counter) ) {
+ MESA_LOG(sam_u, AUTH)
+ _mesa_del_request(sam_u, nam_u);
+ return;
+ }
+ else {
+ // u3l_log("about to other free");
+ }
+
+ if ( nam_u->fra_d > req_u->ack_d ) {
+ req_u->ack_d = nam_u->fra_d;
+ }
+
+ req_u->out_d--;
+ bitset_put(&req_u->was_u, nam_u->fra_d);
+
+ #ifdef MESA_DEBUG
+ // u3l_log("fragment %llu counter %u hav_d %llu nex_d %llu ack_d %llu lef_d %llu old_d %llu", nam_u->fra_d, req_u->los_u->counter, req_u->hav_d, req_u->nex_d, req_u->ack_d, req_u->lef_d, req_u->old_d);
+ #endif
+
+ u3_lane_state* sat_u;
+ if ( 0 == hop_y && (c3n == _mesa_lanes_equal(lan_u, req_u->per_u->dan_u)) ) {
+ req_u->per_u->dan_u = lan_u;
+ sat_u = &req_u->per_u->dir_u;
+ _init_lane_state(sat_u);
+ }
+ else {
+ sat_u = &req_u->per_u->ind_u;
+ }
+ sat_u->her_d = _get_now_micros();
+
+ // handle gauge update
+ _mesa_handle_ack(req_u->gag_u, &req_u->wat_u[nam_u->fra_d]);
+
+ // XX FIXME?
+ /* _try_resend(req_u, nam_u->fra_d); */
+ _update_resend_timer(req_u);
+}
+
+static sockaddr_in
+_realise_lane(u3_noun lan) {
+ sockaddr_in lan_u = {0};
+ lan_u.sin_family = AF_INET;
+
+ if ( c3y == u3a_is_cat(lan) ) {
+ // u3_assert( lan < 256 );
+ if ( (c3n == u3_Host.ops_u.net) ) {
+ lan_u.sin_addr.s_addr = htonl(0x7f000001);
+ lan_u.sin_port = htons(_ames_czar_port(lan));
+ } else {
+ lan_u.sin_addr.s_addr = htonl(u3_Host.imp_u[lan]);
+ lan_u.sin_port = htons(_ames_czar_port(lan));
+ }
+ } else {
+ u3_noun tag, pip, por;
+ u3x_trel(lan, &tag, &pip, &por);
+ if ( tag == c3__if ) {
+ lan_u.sin_addr.s_addr = htonl(u3r_word(0, pip));
+ u3_assert( c3y == u3a_is_cat(por) && por <= 0xFFFF);
+ lan_u.sin_port = htons(por);
+ } else {
+ u3l_log("mesa: inscrutable lane");
+ }
+
+ }
+ u3z(lan);
+ return lan_u;
+}
+
+static void
+_mesa_send_bufs(u3_mesa* sam_u,
+ u3_peer* per_u, // null for response packets
+ c3_y* buf_y,
+ c3_w len_w,
+ u3_pit_addr* las_u)
+{
+
+ u3_pit_addr* t = las_u;
+ while ( NULL != t ) {
+ sockaddr_in lan_u = t->sdr_u;
+
+ if ( !lan_u.sin_port ) {
+ u3l_log("mesa: failed to realise lane");
+ } else {
+ c3_y* sen_y = c3_calloc(len_w);
+ memcpy(sen_y, buf_y, len_w);
+ _mesa_send_buf(sam_u, lan_u, sen_y, len_w);
+ if ( per_u && (c3y == _mesa_lanes_equal(lan_u, per_u->dan_u)) ) {
+ per_u->dir_u.sen_d = _get_now_micros();
+ }
+ }
+ t = t->nex_p;
+ }
+}
+
+static u3_pit_entry*
+_mesa_get_pit(u3_mesa* sam_u, u3_mesa_name* nam_u)
+{
+ pit_map_itr itr_u = vt_get(&sam_u->pit_u, nam_u->str_u);
+ if ( vt_is_end(itr_u) ) {
+ return NULL;
+ }
+ return itr_u.data->val;
+}
+
+static void
+_mesa_del_pit(u3_mesa* sam_u, u3_mesa_name* nam_u)
+{
+ /* u3l_log("deleting %llu", nam_u->fra_d); */
+ vt_erase(&sam_u->pit_u, nam_u->str_u);
+}
+
+static void
+_mesa_add_lane_to_pit(u3_mesa* sam_u, u3_mesa_name* nam_u, sockaddr_in lan_u)
+{
+ pit_map_itr itr_u = vt_get(&sam_u->pit_u, nam_u->str_u);
+
+ c3_d now_d = _get_now_micros();
+
+ if ( vt_is_end(itr_u) ) {
+ arena are_u = arena_create(16384);
+ u3_pit_entry* ent_u = new(&are_u, u3_pit_entry, 1);
+ ent_u->are_u = are_u;
+ ent_u->tim_d = now_d;
+ u3_pit_addr* adr_u = new(&ent_u->are_u, u3_pit_addr, 1);
+ adr_u->nex_p = 0;
+ adr_u->sdr_u = lan_u;
+ ent_u->adr_u = adr_u;
+
+ c3_c* str_c = new(&ent_u->are_u, c3_c, nam_u->str_u.len_w);
+ memcpy(str_c, nam_u->str_u.str_c, nam_u->str_u.len_w);
+ u3_str str_u = {str_c, nam_u->str_u.len_w};
+
+ itr_u = vt_insert(&sam_u->pit_u, str_u, ent_u);
+
+ if ( vt_is_end(itr_u) ) {
+ fprintf(stderr, "mesa: cannot allocate memory for pit, dying");
+ u3_king_bail();
+ }
+
+ }
+ else {
+ u3_pit_entry* ent_u = itr_u.data->val;
+ ent_u->tim_d = now_d;
+ u3_pit_addr* old_u = ent_u->adr_u;
+ while (old_u) {
+ if ( c3y == _mesa_lanes_equal(lan_u, old_u->sdr_u) ) {
+ return;
+ }
+ old_u = old_u->nex_p;
+ }
+
+ u3_pit_addr* adr_u = new(&ent_u->are_u, u3_pit_addr, 1);
+ adr_u->sdr_u = lan_u;
+ u3_pit_addr* tmp_u = ent_u->adr_u;
+ adr_u->nex_p = tmp_u;
+ ent_u->adr_u = adr_u;
+ }
+ return;
+}
+
+static void
+_mesa_resend_timer_cb(uv_timer_t* tim_u)
+{
+ u3_mesa_resend_data* res_u = tim_u->data;
+ u3_mesa_request_data* dat_u = &res_u->dat_u;
+ res_u->ret_y--;
+
+ u3_pend_req* pit_u = _mesa_get_request(dat_u->sam_u, dat_u->nam_u);
+ if ( (u3_pend_req*)CTAG_WAIT != pit_u ) {
+ #ifdef MESA_DEBUG
+ // u3l_log("mesa: resend PIT entry gone %u", res_u->ret_y);
+ #endif
+ _mesa_free_resend_data(res_u);
+ return;
+ }
+ else {
+ #ifdef MESA_DEBUG
+ u3l_log("mesa: resend %u", res_u->ret_y);
+ #endif
+ }
+
+ _mesa_send_bufs(dat_u->sam_u,
+ NULL,
+ dat_u->buf_y,
+ dat_u->len_w,
+ dat_u->las_u);
+
+ if ( res_u->ret_y ) {
+ uv_timer_start(&res_u->tim_u, _mesa_resend_timer_cb, 1000, 0);
+ }
+ else {
+ _mesa_del_request(res_u->dat_u.sam_u, res_u->dat_u.nam_u);
+ _mesa_free_resend_data(res_u);
+ }
+}
+
+static u3_pit_addr*
+_mesa_lanes_to_addrs(u3_noun las, arena* are_u) {
+ u3_pit_addr* adr_u = NULL;
+ u3_noun lan, t = las;
+ while ( t != u3_nul ) {
+ u3x_cell(t, &lan, &t);
+ u3_pit_addr* new_u = new(are_u, u3_pit_addr, 1);
+ new_u->sdr_u = _realise_lane(u3k(lan));
+ new_u->nex_p = adr_u;
+ adr_u = new_u;
+ }
+ return adr_u;
+}
+
+static void
+_mesa_hear(u3_mesa* sam_u,
+ const struct sockaddr* adr_u,
+ c3_w len_w,
+ c3_y* hun_y);
+
+static void time_elapsed(c3_d fra_d, c3_d total, c3_d begin, c3_d end)
+{
+ c3_d elapsed = end - begin;
+ double percent = 100.0 * ((double)elapsed / (double)total);
+ u3l_log(" %"PRIu64": %"PRIu64"(%.2f%%)", fra_d, elapsed, percent);
+}
+
+static void
+packet_test(u3_mesa* sam_u, c3_c* fil_c) {
+ FILE* fd = fopen(fil_c, "rb");
+ fseek(fd, 0L, SEEK_END);
+ c3_d sz = ftell(fd);
+ c3_y* packets = c3_malloc(sz);
+ fseek(fd, 0L, SEEK_SET);
+
+ const struct sockaddr adr_u = {
+ .sa_family = AF_INET,
+ .sa_data = {
+ 0x00, 0x50, // Port 80 in network byte order
+ 0xa7, 0xac, 0xa8, 0x17, // IP 167.172.168.23
+ 0x00, 0x00, 0x00, 0x00, // Padding
+ 0x00, 0x00, 0x00, 0x00 // Padding
+ }
+ };
+
+ fread(packets, 1, sz, fd);
+ fclose(fd);
+
+ c3_d now_d = _get_now_micros();
+ c3_d tidx = 0;
+
+ for (c3_d i = 0; i < sz;) {
+ c3_w len_w = *(c3_w*)(packets+i);
+ /* u3l_log("len_w %u i %"PRIu64, len_w, i); */
+ i += 4;
+ _mesa_hear(sam_u, &adr_u, len_w, packets + i);
+ /* tim_y[tidx] = _get_now_micros(); */
+ sam_u->are_u.beg = (char*)are_y;
+ i += len_w;
+ /* tidx++; */
+ }
+ c3_d end_d = _get_now_micros();
+ c3_d total_d = end_d - now_d;
+ /* for (c3_d i = 1; i < tidx; i++) { */
+ /* time_elapsed(i, total_d, tim_y[i-1], tim_y[i]); */
+ /* } */
+ u3l_log("packet_test took %"PRIu64, total_d);
+ c3_free(packets);
+}
+
+static void
+_mesa_ef_send(u3_mesa* sam_u, u3_noun las, u3_noun pac)
+{
+ c3_w len_w = u3r_met(3, pac);
+ arena are_u = arena_create(len_w + 16384);
+ c3_y* buf_y = new(&are_u, c3_y, len_w);
+ u3r_bytes(0, len_w, buf_y, pac);
+
+ u3_mesa_pact pac_u;
+ memset(&pac_u, 0x11, sizeof(pac_u));
+ c3_c* err_c = mesa_sift_pact_from_buf(&pac_u, buf_y, len_w);
+ if ( err_c ) {
+ u3l_log("mesa: ef_send: sift failed: %u %s", len_w, err_c);
+ u3z(pac);
+ u3z(las);
+ arena_free(&are_u);
+ return;
+ }
+
+ u3_pend_req* old_u = _mesa_get_request(sam_u, &pac_u.pek_u.nam_u);
+
+ if (old_u) {
+ u3z(pac);
+ u3z(las);
+ arena_free(&are_u);
+ return;
+ }
+
+ if ( PACT_PAGE == pac_u.hed_u.typ_y ) {
+ u3_pit_entry* pin_u = _mesa_get_pit(sam_u, &pac_u.pek_u.nam_u);
+
+ if ( NULL == pin_u ) {
+ arena_free(&are_u);
+ u3z(pac);
+ u3z(las);
+ return;
+ }
+
+ _mesa_send_bufs(sam_u, NULL, buf_y, len_w, pin_u->adr_u);
+ _mesa_del_pit(sam_u, &pac_u.pek_u.nam_u);
+ arena_free(&are_u);
+ }
+ else {
+ u3_mesa_resend_data* res_u = new(&are_u, u3_mesa_resend_data, 1);
+ res_u->are_u = are_u;
+ u3_mesa_name* nam_u = _mesa_copy_name_alloc(&pac_u.pek_u.nam_u, &res_u->are_u);
+ u3_mesa_request_data* dat_u = &res_u->dat_u;
+ {
+ dat_u->sam_u = sam_u;
+ u3_ship_copy(dat_u->her_u, nam_u->her_u);
+ dat_u->nam_u = nam_u;
+ dat_u->las_u = _mesa_lanes_to_addrs(las, &res_u->are_u);
+ dat_u->buf_y = buf_y;
+ dat_u->len_w = len_w;
+ }
+ {
+ res_u->ret_y = 9;
+ uv_timer_init(u3L, &res_u->tim_u);
+ }
+ _mesa_put_request(sam_u, nam_u, (u3_pend_req*)CTAG_WAIT);
+ res_u->tim_u.data = res_u;
+ #ifdef PACKET_TEST
+ packet_test(sam_u, "pages.packs");
+ #else
+
+ _mesa_send_bufs(sam_u, NULL, buf_y, len_w, dat_u->las_u);
+ uv_timer_start(&res_u->tim_u, _mesa_resend_timer_cb, 1000, 0);
+ #endif
+ }
+
+ u3z(pac);
+ u3z(las);
+}
+
+c3_o
+_ames_kick_newt(void* sam_u, u3_noun tag, u3_noun dat);
+
+u3_atom
+u3_ames_encode_lane(u3_lane lan);
+
+static void _meet_peer(u3_mesa* sam_u, u3_peer* per_u, u3_ship her_u);
+static void _init_peer(u3_mesa* sam_u, u3_peer* per_u);
+
+static c3_o _mesa_kick(u3_mesa* sam_u, u3_noun tag, u3_noun dat)
+{
+ c3_o ret_o;
+ switch ( tag ) {
+ default: {
+ ret_o = c3n;
+ } break;
+ case c3__push: {
+ u3_noun las, pac;
+ if ( c3n == u3r_cell(dat, &las, &pac) ) {
+ // u3l_log(" mesa: send old");
+ ret_o = c3n;
+ } else {
+ // u3l_log(" mesa: send new");
+ _mesa_ef_send(sam_u, u3k(las), u3k(pac));
+ ret_o = c3y;
+ }
+ } break;
+ case c3__send:
+ case c3__turf:
+ case c3__saxo: {
+ #ifdef MESA_DEBUG
+ c3_c* tag_c = u3r_string(tag);
+ /* u3l_log("mesa: send old %s", tag_c); */
+ c3_free(tag_c);
+ #endif
+ ret_o = _ames_kick_newt(u3_Host.sam_u, u3k(tag), u3k(dat));
+ } break;
+ case c3__nail: {
+ u3_ship who_u;
+ u3_ship_of_noun(who_u, u3h(dat));
+ u3_peer* per_u = _mesa_get_peer(sam_u, who_u);
+
+ if ( NULL == per_u ) {
+ per_u = new(&sam_u->par_u, u3_peer, 1);
+ _init_peer(sam_u, per_u);
+ }
+
+ // XX the format of the lane %nail gives is (list (each @p address))
+ //
+ u3_noun las = u3do("tail", u3k(dat));
+
+ if ( las == u3_nul ) {
+ per_u->dan_u = (sockaddr_in){0};
+ }
+ else {
+ u3_noun lan = u3h(las);
+ // we either have a direct route, and a galaxy, or just one lane
+ if ( c3n == u3h(lan) ) {
+ sockaddr_in lan_u = u3_mesa_decode_lane(u3k(u3t(lan)));
+ per_u->dan_u = lan_u;
+ } else {
+ // delete direct lane if galaxy
+ per_u->dan_u = (sockaddr_in){0};
+ }
+ }
+
+ _meet_peer(sam_u, per_u, who_u);
+
+ ret_o = _ames_kick_newt(u3_Host.sam_u, u3k(tag), u3k(dat));
+ } break;
+ }
+
+ // technically losing tag is unnecessary as it always should
+ // be a direct atom, but better to be strict
+ u3z(dat); u3z(tag);
+ return ret_o;
+}
+
+static c3_o _mesa_io_kick(u3_auto* car_u, u3_noun wir, u3_noun cad)
+{
+ u3_mesa* sam_u = (u3_mesa*)car_u;
+
+ u3_noun tag, dat, i_wir;
+ c3_o ret_o;
+ if ( (c3n == u3r_cell(wir, &i_wir, 0))
+ || (c3__ames != i_wir)
+ || (c3n == u3r_cell(cad, &tag, &dat)) )
+ {
+ ret_o = c3n;
+ }
+ else {
+ ret_o = _mesa_kick(sam_u, u3k(tag), u3k(dat));
+ }
+
+ u3z(wir); u3z(cad);
+ return ret_o;
+}
+
+static u3_noun
+_mesa_io_info(u3_auto* car_u)
+{
+
+ return u3_nul;
+}
+
+static void
+_mesa_io_slog(u3_auto* car_u) {
+ u3l_log("mesa is online");
+}
+
+static void
+_mesa_exit_cb(uv_handle_t* han_u) {
+ u3_mesa* sam_u = (u3_mesa*)han_u->data;
+ arena_free(&sam_u->par_u);
+}
+
+static void
+_mesa_io_exit(u3_auto* car_u)
+{
+ u3_mesa* sam_u = (u3_mesa*)car_u;
+ uv_timer_stop(&sam_u->tim_u);
+ sam_u->tim_u.data = sam_u;
+ uv_udp_recv_stop(&u3_Host.wax_u);
+ uv_close((uv_handle_t*)&sam_u->tim_u, _mesa_exit_cb);
+ uv_close((uv_handle_t*)&u3_Host.wax_u, 0);
+}
+
+static void
+_init_lane_state(u3_lane_state* sat_u)
+{
+ sat_u->sen_d = 0;
+ sat_u->her_d = 0;
+ sat_u->rtt_w = 1000000;
+ sat_u->rtv_w = 1000000;
+}
+
+static void
+_init_peer(u3_mesa* sam_u, u3_peer* per_u)
+{
+ per_u->sam_u = sam_u;
+ per_u->ful_o = c3n;
+ per_u->dan_u = (sockaddr_in){0};
+ _init_lane_state(&per_u->dir_u);
+ per_u->imp_y = 0;
+ _init_lane_state(&per_u->ind_u);
+}
+
+static u3_noun
+_name_to_jumbo_scry(u3_mesa_name* nam_u)
+{
+ u3_noun rif = _dire_etch_ud(nam_u->rif_w);
+ u3_noun boq = _dire_etch_ud(31); // XX make configurable
+ u3_noun fag = _dire_etch_ud(0); // XX 1
+ u3_noun pax = _mesa_encode_path(nam_u->pat_s, (c3_y*)nam_u->pat_c);
+ // u3_noun wer = nam_u->nit_o == c3y
+ // ? u3nc(c3__init, pax)
+ // : u3nt(nam_u->aut_o == c3y ? c3__auth : c3__data, fag, pax);
+
+ // XX only boq_y of MAX JUMBO allowed
+ u3_noun wer = u3nt(c3__data, fag, pax);
+ u3_noun res = u3nc(c3__mess, u3nq(rif, c3__pact, boq, u3nc(c3__etch, wer)));
+
+ return res;
+}
+
+
+static c3_w
+_name_to_jumbo_str(u3_mesa_name* nam_u, c3_y* buf_y)
+{
+ u3_mesa_name tmp_u = *nam_u;
+ tmp_u.boq_y = 31;
+ tmp_u.fra_d = 0;
+ tmp_u.nit_o = c3n;
+
+ u3_etcher ech_u;
+ etcher_init(&ech_u, buf_y, PACT_SIZE);
+ _mesa_etch_name(&ech_u, &tmp_u);
+ return ech_u.len_w;
+}
+
+static u3_mesa_line*
+_mesa_get_jumbo_cache(u3_mesa* sam_u, u3_mesa_name* nam_u)
+{
+ c3_y buf_y[PACT_SIZE];
+ c3_w len_w = _name_to_jumbo_str(nam_u, buf_y);
+ u3_str str_u = {(c3_c*)buf_y, len_w};
+
+ jum_map_itr itr_u = vt_get(&sam_u->jum_u, str_u);
+ if ( vt_is_end(itr_u) ) {
+ return NULL;
+ }
+ return itr_u.data->val;
+}
+
+static void
+_mesa_put_jumbo_cache(u3_mesa* sam_u, u3_mesa_name* nam_u, u3_mesa_line* lin_u)
+{
+ c3_y* buf_y = c3_malloc(PACT_SIZE);
+
+ c3_w len_w = _name_to_jumbo_str(nam_u, buf_y);
+ u3_str str_u = {(c3_c*)buf_y, len_w};
+
+ // CTAG_BLOCK, CTAG_WAIT
+ if ( lin_u > (u3_mesa_line*)2 ) {
+ if (sam_u->jum_d > JUMBO_CACHE_MAX_SIZE) {
+ vt_cleanup(&sam_u->jum_u);
+ sam_u->jum_d = 0;
+ }
+
+ sam_u->jum_d += lin_u->tob_d;
+ }
+
+ jum_map_itr itr_u = vt_insert(&sam_u->jum_u, str_u, lin_u);
+
+ if ( vt_is_end(itr_u) ) {
+ fprintf(stderr, "mesa: cannot allocate memory for jumbo cache, dying");
+ u3_king_bail();
+ }
+}
+
+static void
+_mesa_send_pact_single(u3_mesa* sam_u,
+ sockaddr_in adr_u,
+ u3_mesa_pact* pac_u)
+{
+ c3_y* buf_y = c3_calloc(PACT_SIZE);
+ c3_w len_w = mesa_etch_pact_to_buf(buf_y, PACT_SIZE, pac_u);
+ _mesa_send_buf(sam_u, adr_u, buf_y, len_w);
+}
+
+static void
+_mesa_send_pact(u3_mesa* sam_u,
+ u3_pit_addr* las_u,
+ u3_peer* per_u, // null for response packets
+ u3_mesa_pact* pac_u)
+{
+ c3_y* buf_y = c3_calloc(PACT_SIZE);
+ c3_w len_w = mesa_etch_pact_to_buf(buf_y, PACT_SIZE, pac_u);
+ _mesa_send_bufs(sam_u, per_u, buf_y, len_w, las_u);
+}
+
+static void
+_mesa_send_leaf(u3_mesa* sam_u,
+ u3_mesa_line* lin_u,
+ u3_mesa_pact* pac_u, // scratchpad
+ c3_d fra_d,
+ sockaddr_in adr_u
+ )
+{
+ u3_mesa_name* nam_u = &pac_u->pag_u.nam_u;
+ u3_mesa_data* dat_u = &pac_u->pag_u.dat_u;
+
+ nam_u->fra_d = fra_d;
+ c3_d i_d = fra_d - (lin_u->nam_u.fra_d * (1 << u3_Host.ops_u.jum_y));
+ c3_w cur_w = i_d * 1024;
+ dat_u->fra_y = lin_u->dat_y + cur_w;
+ dat_u->len_w = c3_min(lin_u->dat_w - cur_w, 1024);
+
+ lss_pair* pair = ((lss_pair*)lin_u->haz_y) + i_d;
+
+ if ( 0 == nam_u->fra_d ) { // XX
+ _mesa_copy_auth_data(&dat_u->aut_u, &lin_u->aut_u);
+ } else if ( 0 == memcmp(pair, &(lss_pair){0}, sizeof(lss_pair)) ) {
+ dat_u->aut_u.typ_e = AUTH_NONE;
+ } else {
+ dat_u->aut_u.typ_e = AUTH_PAIR;
+ memcpy(dat_u->aut_u.has_y, pair, sizeof(lss_pair));
+ }
+#ifdef MESA_DEBUG
+ // u3l_log(" sending leaf packet, fra_d: %"PRIu64, nam_u->fra_d);
+#endif
+ // log_pact(pac_u);
+ _mesa_send_pact_single(sam_u, adr_u, pac_u);
+}
+
+static void
+_mesa_send_piece(u3_mesa* sam_u, u3_mesa_line* lin_u, u3_mesa_name* nam_u, c3_d fra_d, sockaddr_in adr_u) {
+ u3_mesa_pact pac_u = {0};
+ u3_mesa_head* hed_u = &pac_u.hed_u;
+ {
+ hed_u->nex_y = HOP_NONE;
+ hed_u->pro_y = 1;
+ hed_u->typ_y = PACT_PAGE;
+ hed_u->hop_y = 0;
+ // mug_w varies by fragment
+ }
+ pac_u.pag_u.nam_u = *nam_u;
+
+ u3_mesa_data* dat_u = &pac_u.pag_u.dat_u;
+ {
+ dat_u->tob_d = lin_u->tob_d;
+ dat_u->aut_u = lin_u->aut_u;
+ // aut_u, len_w, and fra_y vary by fragment
+ }
+
+ c3_d mev_d = mesa_num_leaves(dat_u->tob_d);
+ c3_w pro_w = lss_proof_size(mev_d);
+ if ( 0 == nam_u->fra_d && c3y == nam_u->nit_o ) {
+ if ( pro_w > 1 ) {
+ dat_u->len_w = pro_w * sizeof(lss_hash);
+ c3_y* pro_y = c3_malloc(dat_u->len_w);
+ memcpy(pro_y, lin_u->tip_y, dat_u->len_w);
+ dat_u->fra_y = pro_y;
+ _mesa_send_pact_single(sam_u, adr_u, &pac_u);
+ c3_free(pro_y);
+ }
+
+ // single-fragment message; just send the one data fragment
+ else if ( 0 == fra_d ) {
+ _mesa_send_leaf(sam_u, lin_u, &pac_u, 0, adr_u);
+ }
+ else {
+ u3l_log("mesa: weird fragment number %"PRIu64, fra_d);
+ }
+
+ } else {
+ _mesa_send_leaf(sam_u, lin_u, &pac_u, fra_d, adr_u);
+ }
+}
+
+static void
+_mesa_page_scry_jumbo_cb(void* vod_p, u3_noun res)
+{
+ u3_scry_handle* han_u = (u3_scry_handle*)vod_p;
+ u3_mesa_cb_data* dat_u = (u3_mesa_cb_data*)han_u->vod_p;
+ u3_mesa* sam_u = dat_u->sam_u;
+ u3_mesa_name* nam_u = &dat_u->nam_u;
+
+ u3_weak pag = u3r_at(7, res);
+
+ if ( u3_none == pag ) {
+ // TODO: mark as dead
+ u3_mesa_line* lin_u = _mesa_get_jumbo_cache(sam_u, nam_u);
+ u3z(res);
+ if ( NULL == lin_u ) {
+ arena_free(&han_u->are_u);
+ return;
+ }
+ lin_u = (u3_mesa_line*)CTAG_BLOCK;
+ _mesa_put_jumbo_cache(sam_u, nam_u, lin_u);
+ arena_free(&han_u->are_u);
+ return;
+ }
+ u3_noun pac, pas, pof;
+ if ( c3n == u3r_trel(pag, &pac, &pas, &pof) ||
+ c3n == u3a_is_pug(pac) ) {
+ u3l_log("mesa: jumbo frame misshapen");
+ u3z(res);
+ arena_free(&han_u->are_u);
+ return;
+ }
+
+ u3_mesa_line* lin_u;
+ {
+ c3_w jumbo_w = u3r_met(3, pac);
+ c3_y* jumbo_y = c3_calloc(jumbo_w);
+ u3r_bytes(0, jumbo_w, jumbo_y, pac);
+
+ u3_mesa_pact jum_u;
+ c3_c* err_c = mesa_sift_pact_from_buf(&jum_u, jumbo_y, jumbo_w);
+ if ( err_c ) {
+ u3l_log("mesa: jumbo frame parse failure: %s", err_c);
+ arena_free(&han_u->are_u);
+ u3z(res);
+ return;
+ }
+ u3_mesa_data* dat_u = &jum_u.pag_u.dat_u;
+
+ c3_d mev_d = mesa_num_leaves(dat_u->tob_d); // leaves in message
+ c3_w tip_w = // bytes in Merkle spine
+ (mev_d > 1 && jum_u.pag_u.nam_u.fra_d == 0)?
+ lss_proof_size(mev_d) * sizeof(lss_hash):
+ 0;
+ c3_w dat_w = dat_u->len_w; // bytes in fragment data in this jumbo frame
+ c3_w lev_w = mesa_num_leaves(dat_w); // number of leaves in this frame
+ c3_w haz_w = lev_w * sizeof(lss_pair); // bytes in hash pairs
+ c3_w len_w = tip_w + dat_w + haz_w;
+
+ arena are_u = arena_create(sizeof(u3_mesa_line) + len_w + 2048);
+
+ lin_u = new(&are_u, u3_mesa_line, 1);
+ lin_u->are_u = are_u;
+ _mesa_copy_name(&lin_u->nam_u, &jum_u.pek_u.nam_u, &lin_u->are_u);
+ lin_u->aut_u = dat_u->aut_u;
+ lin_u->tob_d = dat_u->tob_d;
+ lin_u->dat_w = dat_w;
+ lin_u->len_w = len_w;
+ lin_u->tip_y = new(&lin_u->are_u, c3_y, len_w);
+ lin_u->dat_y = lin_u->tip_y + tip_w;
+ lin_u->haz_y = lin_u->dat_y + dat_w;
+ memcpy(lin_u->dat_y, dat_u->fra_y, dat_u->len_w);
+
+ c3_y* haz_y = lin_u->haz_y;
+ while ( pas != u3_nul ) {
+ u3r_bytes(0, 64, haz_y, u3h(pas));
+ haz_y += 64;
+ pas = u3t(pas);
+ }
+
+ u3r_bytes(0, tip_w, lin_u->tip_y, pof);
+ c3_free(jumbo_y);
+
+ }
+
+ _mesa_put_jumbo_cache(sam_u, nam_u, lin_u);
+
+ u3_pit_entry* ent_u = _mesa_get_pit(sam_u, nam_u);
+ if ( NULL != ent_u ) {
+ u3_pit_addr* adr_u = ent_u->adr_u;
+ while (adr_u) {
+ _mesa_send_piece(sam_u, lin_u, nam_u, 0, adr_u->sdr_u);
+ adr_u = adr_u->nex_p;
+ }
+ _mesa_del_pit(sam_u, nam_u);
+ }
+
+
+ /* _mesa_send_jumbo_pieces(sam_u, lin_u, NULL); */
+
+ u3z(res);
+ arena_free(&han_u->are_u);
+ return;
+}
+
+static void
+_mesa_hear_bail(u3_ovum* egg_u, u3_noun lud)
+{
+ u3l_log("mesa: hear bail");
+ c3_w len_w = u3qb_lent(lud);
+ u3l_log("len_w: %i", len_w);
+ if( len_w == 2 ) {
+ u3_pier_punt_goof("hear", u3k(u3h(lud)));
+ u3_pier_punt_goof("crud", u3k(u3h(u3t(lud))));
+ }
+ u3_ovum_free(egg_u);
+}
+
+static void
+_saxo_cb(void* vod_p, u3_noun nun)
+{
+ u3_peer* per_u = vod_p;
+ u3_weak sax = u3r_at(7, nun);
+
+ if ( sax != u3_none ) {
+ u3_noun her = u3h(sax);
+ u3_ship her_u;
+ u3_ship_of_noun(her_u, her);
+ u3_peer* new_u = _mesa_get_peer(per_u->sam_u, her_u);
+ if ( new_u != NULL ) {
+ per_u = new_u;
+ } else {
+ _mesa_put_peer(per_u->sam_u, her_u, per_u);
+ }
+ u3_mesa* sam_u = per_u->sam_u;
+ u3_noun gal = u3do("rear", u3k(sax));
+ u3_assert( c3y == u3a_is_cat(gal) && gal < 256 );
+ // both atoms guaranteed to be cats, bc we don't call unless forwarding
+ per_u->ful_o = c3y;
+ per_u->imp_y = gal;
+ }
+
+ u3z(nun);
+}
+
+static void
+_forward_lanes_cb(void* vod_p, u3_noun nun)
+{
+ u3_peer* per_u = vod_p;
+ u3_mesa* sam_u = per_u->sam_u;
+
+ u3_weak las = u3r_at(7, nun);
+ // u3m_p("_forward_lanes_cb", las);
+
+ if ( las != u3_none ) {
+ u3_peer* new_u = _mesa_get_peer(per_u->sam_u, per_u->her_u);
+ if ( new_u != NULL ) {
+ per_u = new_u;
+ }
+ u3_noun gal = u3h(las);
+ u3_assert( c3y == u3a_is_cat(gal) && gal < 256 );
+ // both atoms guaranteed to be cats, bc we don't call unless forwarding
+ per_u->ful_o = c3y;
+ per_u->imp_y = gal;
+ u3_noun sal = u3k(u3t(las));
+ u3_noun lan;
+ while ( sal != u3_nul ) {
+ u3x_cell(sal, &lan, &sal);
+
+ if ( c3n == u3a_is_cat(lan) ) {
+ // there should be only one lane that is not a direct atom
+ //
+ sockaddr_in lan_u = _realise_lane(u3k(lan));
+ per_u->dan_u = lan_u;
+ }
+ }
+ u3z(sal);
+ _mesa_put_peer(per_u->sam_u, per_u->her_u, per_u);
+ }
+
+ u3z(nun);
+
+}
+
+static void
+_meet_peer(u3_mesa* sam_u, u3_peer* per_u, u3_ship her_u)
+{
+ u3_noun her = u3_ship_to_noun(her_u);
+ u3_noun gan = u3nc(u3_nul, u3_nul);
+
+ per_u->her_u[0] = her_u[0];
+ per_u->her_u[1] = her_u[1];
+
+ u3_noun pax = u3nc(u3dc("scot", c3__p, her), u3_nul);
+ u3_pier_peek_last(sam_u->pir_u, gan, c3__j, c3__saxo, pax, per_u, _saxo_cb);
+
+}
+
+static void
+_get_peer_lanes(u3_mesa* sam_u, u3_peer* per_u)
+{
+ u3_noun her = u3_ship_to_noun(per_u->her_u);
+ u3_noun gan = u3nc(u3_nul, u3_nul);
+ u3_noun pax = u3nq(u3i_string("chums"),
+ u3dc("scot", 'p', her),
+ u3i_string("lanes"),
+ u3_nul);
+ u3m_p("pax", pax);
+ u3_pier_peek_last(sam_u->pir_u, gan, c3__ax, u3_nul, pax, per_u, _forward_lanes_cb);
+}
+
+static void
+_hear_peer(u3_mesa* sam_u, u3_peer* per_u, sockaddr_in lan_u, c3_o dir_o)
+{
+ if ( c3y == dir_o ) {
+ per_u->dan_u = lan_u;
+ per_u->dir_u.her_d = _get_now_micros();
+ } else {
+ per_u->ind_u.her_d = _get_now_micros();
+ }
+}
+
+static void
+_mesa_request_next_fragments(u3_mesa* sam_u,
+ u3_pend_req* req_u,
+ sockaddr_in lan_u)
+{
+
+ lan_u.sin_addr.s_addr = ( u3_Host.ops_u.net == c3y ) ? lan_u.sin_addr.s_addr : htonl(0x7f000001);
+
+ c3_w win_w = _mesa_req_get_cwnd(req_u);
+ u3_mesa_pict* nex_u = req_u->pic_u;
+ c3_w nex_d = req_u->nex_d;
+ /* arena scr_u = req_u->are_u; */
+ /* uv_buf_t* bfs_u = new(&scr_u, uv_buf_t, win_w); */
+ /* uv_buf_t** bus_u = new(&scr_u, uv_buf_t*, win_w); */
+ /* struct sockaddr** ads_u = new(&scr_u, struct sockaddr*, win_w); */
+ /* c3_w* int_u = new(&scr_u, c3_w, win_w); */
+ c3_d now_d = _get_now_micros();
+ for ( c3_w i = 0; i < win_w; i++ ) {
+ c3_w fra_w = nex_d + i;
+ if ( fra_w >= req_u->tof_d ) {
+ break;
+ }
+ // u3l_log("next fra_w: %u", fra_w);
+ nex_u->pac_u.pek_u.nam_u.fra_d = fra_w;
+ uv_buf_t buf_u = _mesa_peek_buf(req_u->pek_c, nex_d+i, req_u->pek_w);
+ if (buf_u.base < req_u->pek_c) {
+ u3l_log("peek overflow, dying, fragment %u", nex_d+i);
+ abort();
+ }
+ mesa_etch_pact_to_buf((c3_y*)buf_u.base, buf_u.len, &nex_u->pac_u);
+ /* bfs_u[i] = buf_u; */
+ /* bus_u[i] = &bfs_u[i]; */
+ /* ads_u[i] = (struct sockaddr*)&lan_u; */
+ /* int_u[i] = 1; */
+ _mesa_req_pact_sent(req_u, fra_w, now_d);
+
+ /* _mesa_send_buf3(req_u->per_u->dan_u, buf_u, req_u, fra_w); */
+ _mesa_send_modal(req_u->per_u, buf_u, NULL);
+ /* _mesa_send(nex_u, lan_u); */
+ }
+ /* if ( i > 0 ) { */
+ /* c3_i sen_i = _mesa_send_buf2(ads_u, bus_u, int_u, i); */
+ /* for (c3_w i = 0; i < sen_i; i++) { */
+ /* c3_w fra_w = nex_d + i; */
+ /* _mesa_req_pact_sent(req_u, fra_w, now_d); */
+ /* } */
+ /* } */
+}
+
+static void
+_mesa_veri_scry_cb(void* vod_p, u3_noun nun)
+{
+ u3_mesa_cb_data* ver_u = vod_p;
+ #ifndef PACKET_TEST
+ u3_pend_req* req_u = _mesa_get_request(ver_u->sam_u, &ver_u->nam_u);
+ if ( !req_u ) {
+ return;
+ }
+ else if ( c3y == nun ) { // XX
+ _mesa_request_next_fragments(ver_u->sam_u, req_u, ver_u->lan_u);
+ }
+ else if ( c3n == nun ) {
+ u3l_log("mesa: packet auth failed verification");
+ // TODO: wipe request state? (If this was an imposter,
+ // we don't want to punish the real peer.)
+ }
+ else {
+ u3l_log("mesa: %%veri returned strange value");
+ }
+ #endif
+ c3_free(ver_u);
+}
+
+static void
+_mesa_req_pact_init(u3_mesa* sam_u, u3_mesa_pict* pic_u, sockaddr_in lan_u, u3_peer* per_u)
+{
+ sam_u->tim_d = _get_now_micros();
+ u3_mesa_pact* pac_u = &pic_u->pac_u;
+ u3_mesa_name* nam_u = &pac_u->pag_u.nam_u;
+ u3_mesa_data* dat_u = &pac_u->pag_u.dat_u;
+
+ u3_gage* gag_u = _mesa_get_gage(sam_u, nam_u->her_u);
+ if ( gag_u == NULL ) {
+ gag_u = new(&sam_u->par_u, u3_gage, 1);
+ _init_gage(gag_u);
+ _mesa_put_gage(sam_u, nam_u->her_u, gag_u);
+ u3_assert( gag_u != NULL );
+ }
+
+ // _log_gage(gag_u);
+ u3_mesa_pact exa_u;
+ exa_u.hed_u.typ_y = PACT_PEEK;
+ exa_u.pek_u.nam_u = *nam_u;
+ exa_u.pek_u.nam_u.fra_d = 0;
+ exa_u.pek_u.nam_u.nit_o = c3n;
+ exa_u.pek_u.nam_u.aut_o = c3n;
+ c3_w pek_w = mesa_size_pact(&exa_u);
+ c3_d tof_d = mesa_num_leaves(dat_u->tob_d);
+ c3_w pof_w = lss_proof_size(tof_d);
+ c3_w pairs_w = c3_bits_word(pof_w);
+ c3_d pek_d = dat_u->tob_d;
+ arena are_u = arena_create(5*dat_u->tob_d);
+ u3_pend_req* req_u = new(&are_u, u3_pend_req, 1);
+ req_u->are_u = are_u;
+ req_u->pic_u = new(&req_u->are_u, u3_mesa_pict, 1);
+ req_u->pic_u->sam_u = sam_u;
+ req_u->pic_u->pac_u.hed_u.typ_y = PACT_PEEK;
+ req_u->pic_u->pac_u.hed_u.pro_y = MESA_VER;
+ req_u->pic_u->pac_u.pek_u.nam_u = *(_mesa_copy_name_alloc(nam_u, &req_u->are_u));
+ req_u->pic_u->pac_u.pek_u.nam_u.aut_o = c3n;
+ req_u->pic_u->pac_u.pek_u.nam_u.nit_o = c3n;
+ req_u->aut_u = dat_u->aut_u;
+
+ uv_timer_init(u3L, &req_u->tim_u);
+
+ u3_assert( pac_u->pag_u.nam_u.boq_y == 13 );
+ req_u->gag_u = gag_u;
+ req_u->tob_d = dat_u->tob_d;
+ /* req_u->out_d = 4000; */
+ req_u->out_d = 0;
+ req_u->tof_d = mesa_num_leaves(dat_u->tob_d); // NOTE: only correct for bloq 13!
+ assert( req_u->tof_d != 1 ); // these should be injected directly by _mesa_hear_page
+ req_u->dat_y = new(&req_u->are_u, c3_y, dat_u->tob_d);
+ req_u->mis_u = new(&req_u->are_u, lss_pair, tof_d);
+ memset(req_u->mis_u, 0, sizeof(lss_pair)*tof_d);
+ req_u->wat_u = new(&req_u->are_u, u3_pact_stat, req_u->tof_d + 2);
+ bitset_init(&req_u->was_u, req_u->tof_d, &req_u->are_u);
+
+ // TODO: handle restart
+ // u3_assert( nam_u->fra_w == 0 );
+
+ req_u->nex_d = 0;
+ req_u->hav_d = 0;
+ req_u->lef_d = 0;
+ req_u->old_d = 0;
+ req_u->ack_d = 0;
+
+ lss_hash* pof_u = new(&req_u->are_u, lss_hash, pof_w);
+ if ( dat_u->len_w != pof_w*sizeof(lss_hash) ) {
+ return; // TODO: handle like other auth failures
+ }
+ for ( int i = 0; i < pof_w; i++ ) {
+ memcpy(pof_u[i], dat_u->fra_y + (i * sizeof(lss_hash)), sizeof(lss_hash));
+ }
+ lss_hash root;
+ lss_root(root, pof_u, pof_w);
+ req_u->los_u = new(&req_u->are_u, lss_verifier, 1);
+ lss_verifier_init(req_u->los_u, 0, req_u->tof_d, pof_u, &req_u->are_u);
+
+ req_u->pek_w = pek_w;
+ req_u->pek_d = pek_d;
+ req_u->pek_c = new(&req_u->are_u, c3_c, pek_d);
+
+ req_u->per_u = per_u;
+
+ _mesa_put_request(sam_u, &req_u->pic_u->pac_u.pek_u.nam_u, req_u);
+ _update_resend_timer(req_u);
+
+ // scry to verify auth
+ u3_noun typ, aut;
+ switch ( dat_u->aut_u.typ_e ) {
+ case AUTH_SIGN:
+ typ = c3__sign;
+ aut = u3dc("scot", c3__uv, u3i_bytes(64, dat_u->aut_u.sig_y));
+ break;
+ case AUTH_HMAC:
+ typ = c3__hmac;
+ aut = u3dc("scot", c3__uv, u3i_bytes(16, dat_u->aut_u.mac_y));
+ break;
+ default:
+ return; // TODO: handle like other auth failures
+ }
+ u3_noun her = u3dc("scot", c3__p, u3_ship_to_noun(nam_u->her_u));
+ u3_noun rut = u3dc("scot", c3__uv, u3i_bytes(32, root));
+ u3_noun pax = _mesa_encode_path(nam_u->pat_s, (c3_y*)nam_u->pat_c);
+ u3_noun sky = u3i_list(typ, her, aut, rut, pax, u3_none);
+ u3_mesa_cb_data* ver_u = c3_malloc(sizeof(u3_mesa_cb_data));
+ ver_u->sam_u = sam_u;
+ ver_u->nam_u = req_u->pic_u->pac_u.pek_u.nam_u;
+ ver_u->lan_u = lan_u;
+ u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__a, c3__veri, sky, ver_u, _mesa_veri_scry_cb);
+}
+
+typedef struct _u3_mesa_lane_cb_data {
+ u3_lane lan_u;
+ u3_peer* per_u;
+} u3_mesa_lane_cb_data;
+
+static void
+_mesa_page_bail_cb(u3_ovum* egg_u, u3_ovum_news new_e)
+{
+ u3l_log("mesa: arvo page event failed");
+}
+
+static void
+_mesa_add_hop(c3_y hop_y, u3_mesa_head* hed_u, u3_mesa_page_pact* pag_u, sockaddr_in lan_u)
+{
+ c3_w pip_w = ntohl(lan_u.sin_addr.s_addr);
+ c3_s por_s = ntohs(lan_u.sin_port);
+ c3_etch_word(pag_u->sot_u, pip_w);
+ c3_etch_short(pag_u->sot_u + 4, por_s);
+ hed_u->nex_y = HOP_SHORT;
+}
+
+/* static c3_d avg_time() { */
+/* c3_d sum = 0; */
+/* c3_w i; */
+/* for (i = 0; tim_y[i] != 0; i++) { */
+/* if (tim_y[i] > 1000) { */
+/* u3l_log("dingding fra %u time %"PRIu64, i, tim_y[i]); */
+/* } */
+/* sum += tim_y[i]; */
+/* } */
+/* return sum / i; */
+/* } */
+
+static void
+_mesa_forward_request(u3_mesa* sam_u, u3_mesa_pict* pic_u, sockaddr_in lan_u)
+{
+ u3_mesa_pact* pac_u = &pic_u->pac_u;
+ u3_peer* per_u = _mesa_get_peer(sam_u, pac_u->pek_u.nam_u.her_u);
+ if ( !per_u ) {
+ #ifdef MESA_DEBUG
+ c3_c* mes = u3_ship_to_string(pac_u->pek_u.nam_u.her_u);
+ u3l_log("mesa: alien forward for %s; meeting ship", mes);
+ c3_free(mes);
+ #endif
+ per_u = new(&sam_u->par_u, u3_peer, 1);
+ _init_peer(sam_u, per_u);
+ per_u->her_u[0] = pac_u->pek_u.nam_u.her_u[0];
+ per_u->her_u[1] = pac_u->pek_u.nam_u.her_u[1];
+
+ _get_peer_lanes(sam_u, per_u); // forward-lanes
+ return;
+ }
+ if ( c3y == sam_u->for_o && sam_u->pir_u->who_d[0] == per_u->imp_y ) {
+ // if ( c3y == sam_u->for_o ) {
+ sockaddr_in lin_u = _mesa_get_direct_lane(sam_u, pac_u->pek_u.nam_u.her_u);
+ if ( _mesa_is_lane_zero(lin_u) == c3y) {
+ c3_c* shp_c = u3_ship_to_string(pac_u->pek_u.nam_u.her_u);
+ u3l_log("zero lane for %s", shp_c);
+ c3_free(shp_c);
+ return;
+ }
+ inc_hopcount(&pac_u->hed_u);
+ #ifdef MESA_DEBUG
+ u3l_log("mesa: forward_request()");
+ // _log_lane(&lan_u);
+ #endif
+
+ #ifdef MESA_DEBUG
+ c3_c* sip_c = inet_ntoa(lin_u.sin_addr);
+ u3l_log("mesa: sending packet to %s:%u", sip_c, ntohs(lin_u.sin_port));
+ #endif
+
+ _mesa_add_lane_to_pit(sam_u, &pac_u->pek_u.nam_u, lan_u);
+ _mesa_send(pic_u, lin_u);
+ }
+}
+
+static void
+_mesa_hear_page(u3_mesa_pict* pic_u, sockaddr_in lan_u)
+{
+ #ifdef MESA_DEBUG
+ u3l_log("mesa: hear_page()");
+ // log_pact(&pic_u->pac_u);
+ u3_assert( PACT_PAGE == pic_u->pac_u.hed_u.typ_y );
+ #endif
+
+ u3_mesa* sam_u = pic_u->sam_u;
+ u3_mesa_pact* pac_u = &pic_u->pac_u;
+ u3_mesa_name* nam_u = &pac_u->pag_u.nam_u;
+
+ c3_o our_o = u3_ships_equal(nam_u->her_u, sam_u->pir_u->who_d);
+
+ u3_peer* per_u = _mesa_get_peer(sam_u, nam_u->her_u);
+ c3_o new_o = c3n;
+ if ( NULL == per_u ) {
+ new_o = c3y;
+ per_u = new(&sam_u->par_u, u3_peer, 1);
+ _init_peer(sam_u, per_u);
+ _meet_peer(sam_u, per_u, nam_u->her_u);
+ }
+
+ c3_o dir_o = __(pac_u->hed_u.hop_y == 0);
+ // if ( pac_u->hed_u.hop_y == 0 ) {
+ // u3l_log(" received direct page");
+ // } else {
+ // u3l_log(" received forwarded page");
+ // }
+ _hear_peer(sam_u, per_u, lan_u, dir_o);
+
+ if ( new_o == c3y ) {
+ //u3l_log("new lane is direct %c", c3y == dir_o ? 'y' : 'n');
+ //_log_lane(&lan_u);
+ }
+
+ _mesa_put_peer(sam_u, nam_u->her_u, per_u);
+
+ u3_pit_entry* pin_u = _mesa_get_pit(sam_u, nam_u);
+
+ if ( NULL != pin_u ) {
+ #ifdef MESA_DEBUG
+ u3l_log(" forwarding page");
+ #endif
+
+ inc_hopcount(&pac_u->hed_u);
+ c3_etch_word(pac_u->pag_u.sot_u, ntohl(lan_u.sin_addr.s_addr));
+ c3_etch_short(pac_u->pag_u.sot_u + 4, ntohs(lan_u.sin_port));
+
+ // stick next hop in packet
+
+ _mesa_add_hop(pac_u->hed_u.hop_y, &pac_u->hed_u, &pac_u->pag_u, lan_u);
+
+ _mesa_send_pact(sam_u, pin_u->adr_u, NULL, pac_u);
+ _mesa_del_pit(sam_u, nam_u);
+ }
+
+ c3_d lev_d = mesa_num_leaves(pac_u->pag_u.dat_u.tob_d);
+ u3_pend_req* req_u = _mesa_get_request(sam_u, nam_u);
+ if ( !req_u ) {
+ return;
+ }
+ if ( (u3_pend_req*)CTAG_WAIT == req_u ) {
+ if (0 != nam_u->fra_d) {
+ return;
+ }
+ // process incoming response to ourselves
+ // if single-leaf message, inject directly into Arvo
+ c3_d lev_d = mesa_num_leaves(pac_u->pag_u.dat_u.tob_d);
+ if ( 1 == lev_d ) {
+ u3_noun cad;
+ {
+ u3_noun lan = u3_mesa_encode_lane(lan_u);
+
+ // XX should just preserve input buffer
+ u3i_slab sab_u;
+ u3i_slab_init(&sab_u, 3, PACT_SIZE);
+ mesa_etch_pact_to_buf(sab_u.buf_y, PACT_SIZE, pac_u);
+ cad = u3nt(c3__heer, lan, u3i_slab_mint(&sab_u));
+ }
+
+ u3_noun wir = u3nc(c3__ames, u3_nul);
+
+ u3_ovum* ovo = u3_ovum_init(0, c3__ames, wir, cad);
+ ovo = u3_auto_plan(&sam_u->car_u, ovo);
+
+ // early deletion, to avoid injecting retries,
+ // (in case of failure, the retry timer will add it again to the PIT)
+ //
+ _mesa_del_request(sam_u, nam_u);
+ u3_auto_peer(ovo, 0, 0, _mesa_page_bail_cb);
+ return;
+ }
+
+
+ c3_d now_d = _get_now_micros();
+ _mesa_req_pact_init(sam_u, pic_u, lan_u, per_u);
+ return;
+ }
+
+ if ( c3y == nam_u->nit_o ) {
+ /* u3l_log("dupe init"); */
+ return;
+ }
+
+ sockaddr_in lon_u = {0};
+ if ( HOP_SHORT == pac_u->hed_u.nex_y ) {
+ lon_u.sin_family = AF_INET;
+ lon_u.sin_addr.s_addr = htonl(c3_sift_word(pac_u->pag_u.sot_u));
+ lon_u.sin_port = htons(c3_sift_short(pac_u->pag_u.sot_u + 4));
+ }
+ else {
+ lon_u = lan_u;
+ }
+ _mesa_req_pact_done(req_u,
+ nam_u,
+ &pac_u->pag_u.dat_u,
+ pac_u->hed_u.hop_y,
+ lon_u);
+ // TODO: check return value before continuing?
+
+ c3_y boq_y = 31;
+ // c3_o done_with_jumbo_frame = __(0 == req_u->hav_d % boq_y);
+ c3_o done_with_jumbo_frame = __(req_u->hav_d == req_u->tof_d); // TODO: fix for non-message-sized jumbo frames
+ if ( c3y == done_with_jumbo_frame ) {
+ u3_noun cad;
+
+ // u3l_log(" received last packet, tof_d: %" PRIu64 " tob_d: %" PRIu64,
+ // req_u->tof_d,
+ // req_u->tob_d);
+
+ /* c3_d now_d = _get_now_micros(); */
+ /* u3l_log("%" PRIu64 " kilobytes took %f ms", */
+ /* req_u->tof_d, */
+ /* (now_d - sam_u->tim_d)/1000.0); */
+ // u3l_log("page handling took %"PRIu64, avg_time());
+ //done = c3y;
+
+ #ifndef PACKET_TEST
+ {
+ // construct jumbo frame
+ u3_noun lan = u3_mesa_encode_lane(lan_u);
+ u3_noun pac;
+ {
+ pac_u->pag_u.nam_u.boq_y = boq_y;
+ pac_u->pag_u.dat_u.tob_d = req_u->tob_d;
+ pac_u->pag_u.nam_u.fra_d = (req_u->hav_d >> boq_y);
+ pac_u->pag_u.dat_u.len_w = req_u->tob_d;
+ pac_u->pag_u.dat_u.fra_y = req_u->dat_y;
+ pac_u->pag_u.dat_u.aut_u = req_u->aut_u;
+
+ c3_y* buf_y = c3_calloc(mesa_size_pact(pac_u));
+ c3_w res_w = mesa_etch_pact_to_buf(buf_y, mesa_size_pact(pac_u), pac_u);
+ pac = u3i_bytes(res_w, buf_y);
+ c3_free(buf_y);
+ }
+ cad = u3nt(c3__heer, lan, pac);
+ }
+
+
+ _mesa_del_request(sam_u, &pac_u->pag_u.nam_u);
+
+ u3_auto_plan(&sam_u->car_u,
+ u3_ovum_init(0, c3__ames, u3nc(c3__ames, u3_nul), cad));
+ #else
+ _mesa_del_request(sam_u, &pac_u->pag_u.nam_u);
+ #endif
+ } else if ( req_u->hav_d < lev_d ) {
+ _mesa_request_next_fragments(sam_u, req_u, lan_u);
+ }
+}
+
+static void
+_mesa_hear_peek(u3_mesa_pict* pic_u, sockaddr_in lan_u)
+{
+ #ifdef MESA_DEBUG
+ u3l_log("mesa: hear_peek()");
+ u3_assert( PACT_PEEK == pic_u->pac_u.hed_u.typ_y );
+ #endif
+
+ u3_mesa_pact* pac_u = &pic_u->pac_u;
+ u3_mesa* sam_u = pic_u->sam_u;
+ c3_o our_o = u3_ships_equal(pac_u->pek_u.nam_u.her_u, sam_u->pir_u->who_d);
+
+ if ( c3n == our_o ) {
+ _mesa_forward_request(sam_u, pic_u, lan_u);
+ return;
+ }
+
+ c3_d fra_d = pac_u->pek_u.nam_u.fra_d;
+ c3_d bat_d = _mesa_lop(fra_d);
+
+ pac_u->pek_u.nam_u.fra_d = bat_d;
+
+ /* u3l_log("hear peek fra %llu", fra_d); */
+
+ // if we have the page, send it
+ u3_mesa_line* lin_u = _mesa_get_jumbo_cache(sam_u, &pac_u->pek_u.nam_u);
+
+ if ( ( (u3_mesa_line*)CTAG_WAIT == lin_u )) {
+ return;
+ }
+
+ if ( ( NULL != lin_u && (u3_mesa_line*)CTAG_BLOCK != lin_u)) {
+ _mesa_send_piece(sam_u, lin_u, &pac_u->pek_u.nam_u, fra_d, lan_u);
+ return;
+ }
+
+
+ // record interest
+ _mesa_add_lane_to_pit(sam_u, &pac_u->pek_u.nam_u, lan_u);
+
+ // otherwise, if blocked or NULL scry
+ lin_u = (u3_mesa_line*)CTAG_WAIT;
+ /* _mesa_copy_name(&lin_u->nam_u, &pac_u->pek_u.nam_u); // XX */
+
+ _mesa_put_jumbo_cache(sam_u, &pac_u->pek_u.nam_u, lin_u);
+ u3_noun sky = _name_to_jumbo_scry(&pac_u->pek_u.nam_u);
+ u3_noun our = u3i_chubs(2, sam_u->car_u.pir_u->who_d);
+ u3_noun bem = u3nc(u3nt(our, u3_nul, u3nc(c3__ud, 1)), sky);
+
+ arena are_u = arena_create(sizeof(u3_mesa_cb_data) + 1024);
+ u3_scry_handle* han_u = new(&are_u, u3_scry_handle, 1);
+ han_u->are_u = are_u;
+ u3_mesa_cb_data* dat_u = new(&han_u->are_u, u3_mesa_cb_data, 1);
+ han_u->vod_p = dat_u;
+ dat_u->sam_u = sam_u;
+ _mesa_copy_name(&dat_u->nam_u, &pac_u->pek_u.nam_u, &han_u->are_u);
+
+ u3_pier_peek(sam_u->car_u.pir_u, u3_nul, u3k(u3nq(1, c3__beam, c3__ax, bem)), han_u, _mesa_page_scry_jumbo_cb);
+}
+
+static void
+_mesa_poke_bail_cb(u3_ovum* egg_u, u3_noun lud)
+{
+ // XX failure stuff here
+ u3l_log("mesa: poke failure");
+}
+
+static void
+_mesa_hear_poke(u3_mesa_pict* pic_u, sockaddr_in lan_u)
+{
+ u3_mesa_pact* pac_u = &pic_u->pac_u;
+ u3_mesa* sam_u = pic_u->sam_u;
+
+ #ifdef MESA_DEBUG
+ u3l_log("mesa: hear_poke()");
+ u3_assert( PACT_POKE == pac_u->hed_u.typ_y );
+ #endif
+
+ c3_o our_o = u3_ships_equal(pac_u->pek_u.nam_u.her_u, sam_u->pir_u->who_d);
+
+ if ( c3n == our_o ) {
+ _mesa_forward_request(sam_u, pic_u, lan_u);
+ return;
+ }
+ // TODO check if lane already in pit, drop dupes
+ _mesa_add_lane_to_pit(sam_u, &pac_u->pek_u.nam_u, lan_u);
+
+ // XX if this lane management stuff is necessary
+ // it should be deferred to after successful event processing
+ u3_peer* per_u = _mesa_get_peer(sam_u, pac_u->pok_u.pay_u.her_u);
+ c3_o new_o = c3n;
+ if ( NULL == per_u ) {
+ new_o = c3y;
+ per_u = new(&sam_u->par_u, u3_peer, 1);
+ _init_peer(sam_u, per_u);
+ _meet_peer(sam_u, per_u, pac_u->pok_u.pay_u.her_u);
+ }
+
+ c3_o dir_o = __(pac_u->hed_u.hop_y == 0);
+ if ( pac_u->hed_u.hop_y == 0 ) {
+ new_o = c3y;
+ _hear_peer(sam_u, per_u, lan_u, dir_o);
+ // u3l_log("learnt lane");
+ } else {
+ // u3l_log("received forwarded poke");
+ }
+ if ( new_o == c3y ) {
+ // u3l_log("new lane is direct %c", c3y == dir_o ? 'y' : 'n');
+ // _log_lane(lan_u);
+ }
+ // XX _meet_peer, in the _saxo_cb, is already putting the peer in her_p
+ //
+ _mesa_put_peer(sam_u, pac_u->pok_u.pay_u.her_u, per_u);
+
+ u3_pend_req* req_u = _mesa_get_request(sam_u, &pac_u->pok_u.pay_u);
+ if ( req_u != NULL) {
+ // u3l_log("req pending");
+ return;
+ }
+
+ u3_ovum_peer nes_f;
+ u3_ovum_bail bal_f;
+ void* ptr_v;
+
+ u3_noun wir = u3nc(c3__ames, u3_nul);
+ u3_noun cad;
+ {
+ u3_noun lan = u3_mesa_encode_lane(lan_u);
+ u3i_slab sab_u;
+ u3i_slab_init(&sab_u, 3, PACT_SIZE);
+
+ // XX should just preserve input buffer
+ mesa_etch_pact_to_buf(sab_u.buf_y, PACT_SIZE, pac_u);
+ cad = u3nt(c3__heer, lan, u3i_slab_mint(&sab_u));
+ }
+
+ u3_ovum* ovo = u3_ovum_init(0, c3__ames, wir, cad);
+ ovo = u3_auto_plan(&sam_u->car_u, ovo);
+
+ // XX check request state for *payload* (in-progress duplicate)
+ assert(pac_u->pok_u.dat_u.tob_d);
+
+ u3_auto_peer(ovo, 0, 0, _mesa_poke_bail_cb);
+}
+
+void
+_ames_hear(void* sam_u,
+ const struct sockaddr* adr_u,
+ c3_w len_w,
+ c3_y* hun_y);
+
+static void
+_mesa_hear(u3_mesa* sam_u,
+ const struct sockaddr* adr_u,
+ c3_w len_w,
+ c3_y* hun_y)
+{
+ /* fwrite(&len_w, 4, 1, packs); */
+ /* fwrite(hun_y, 1, len_w, packs); */
+ // c3_d now_d = _get_now_micros();
+ if ( c3n == mesa_is_new_pact(hun_y, len_w) ) {
+ c3_y* han_y = c3_malloc(len_w);
+ memcpy(han_y, hun_y, len_w);
+ _ames_hear(u3_Host.sam_u, adr_u, len_w, han_y);
+ return;
+ }
+
+ u3_mesa_pict* pic_u = new(&sam_u->are_u, u3_mesa_pict, 1);
+ pic_u->sam_u = sam_u;
+ c3_c* err_c = mesa_sift_pact_from_buf(&pic_u->pac_u, hun_y, len_w);
+ if ( err_c ) {
+ u3l_log("mesa: hear: sift failed: %s", err_c);
+ return;
+ }
+
+ sockaddr_in sdr_u = *((sockaddr_in*)adr_u);
+
+ switch ( pic_u->pac_u.hed_u.typ_y ) {
+ case PACT_PEEK: {
+ _mesa_hear_peek(pic_u, sdr_u);
+ } break;
+ case PACT_PAGE: {
+ _mesa_hear_page(pic_u, sdr_u);
+ } break;
+ default: {
+ _mesa_hear_poke(pic_u, sdr_u);
+ } break;
+ }
+ // if (done == c3n) {
+ // if (tidx < 200000) {
+ // tim_y[tidx] = _get_now_micros() - now_d;
+ // tidx++;
+ // } else {
+ // u3l_log("peek handling took %"PRIu64, avg_time());
+ // tidx = 0;
+ // }
+ // } else {
+ // tidx = 0;
+ // done = c3n;
+ // }
+}
+
+static void _mesa_recv_cb(uv_udp_t* wax_u,
+ ssize_t nrd_i,
+ const uv_buf_t * buf_u,
+ const struct sockaddr* adr_u,
+ unsigned flg_i)
+{
+ u3_mesa* sam_u = (u3_mesa*)wax_u->data;
+ if ( 0 > nrd_i ) {
+ if ( u3C.wag_w & u3o_verbose ) {
+ u3l_log("mesa: recv: fail: %s", uv_strerror(nrd_i));
+ }
+ }
+ else if ( 0 == nrd_i ) {
+ }
+ else if ( flg_i & UV_UDP_PARTIAL ) {
+ if ( u3C.wag_w & u3o_verbose ) {
+ u3l_log("mesa: recv: fail: message truncated");
+ }
+ }
+ else {
+ _mesa_hear(wax_u->data, adr_u, (c3_w)nrd_i, (c3_y*)buf_u->base);
+ }
+}
+
+static void
+_mesa_io_talk(u3_auto* car_u)
+{
+ u3_mesa* sam_u = (u3_mesa*)car_u;
+ sam_u->dns_c = "urbit.org"; // TODO: receive turf
+ {
+ // XX remove [sev_l]
+ //
+ u3_noun wir = u3nc(c3__ames, u3_nul);
+ u3_noun cad = u3nc(c3__born, u3_nul);
+
+ u3_auto_plan(car_u, u3_ovum_init(0, c3__a, wir, cad));
+ }
+ u3_noun who = u3i_chubs(2, sam_u->pir_u->who_d);
+ u3_noun rac = u3do("clan:title", u3k(who));
+ c3_s por_s = sam_u->pir_u->por_s;
+ c3_i ret_i;
+ if ( c3__czar == rac ) {
+ c3_y num_y = (c3_y)sam_u->pir_u->who_d[0];
+ c3_s zar_s = _ames_czar_port(num_y);
+
+ if ( 0 == por_s ) {
+ por_s = zar_s;
+ }
+ else if ( por_s != zar_s ) {
+ u3l_log("ames: czar: overriding port %d with -p %d", zar_s, por_s);
+ u3l_log("ames: czar: WARNING: %d required for discoverability", zar_s);
+ }
+ }
+
+
+ // Bind and stuff.
+ {
+ struct sockaddr_in add_u;
+ c3_i add_i = sizeof(add_u);
+
+ memset(&add_u, 0, sizeof(add_u));
+ add_u.sin_family = AF_INET;
+ add_u.sin_addr.s_addr = _(u3_Host.ops_u.net) ?
+ htonl(INADDR_ANY) :
+ htonl(INADDR_LOOPBACK);
+ add_u.sin_port = htons(por_s);
+
+ if ( (ret_i = uv_udp_bind(&u3_Host.wax_u,
+ (const struct sockaddr*)&add_u, 0)) != 0 )
+ {
+ u3l_log("mesa: bind: %s", uv_strerror(ret_i));
+
+ /*if ( (c3__czar == rac) &&
+ (UV_EADDRINUSE == ret_i) )
+ {
+ u3l_log(" ...perhaps you've got two copies of vere running?");
+ }*/
+
+ // XX revise
+ //
+ u3_pier_bail(u3_king_stub());
+ }
+
+ uv_udp_getsockname(&u3_Host.wax_u, (struct sockaddr *)&add_u, &add_i);
+ u3_assert(add_u.sin_port);
+
+ sam_u->pir_u->por_s = ntohs(add_u.sin_port);
+ }
+ if ( c3y == u3_Host.ops_u.net ) {
+ u3l_log("mesa: live on %d", sam_u->pir_u->por_s);
+ }
+ else {
+ u3l_log("mesa: live on %d (localhost only)", sam_u->pir_u->por_s);
+ }
+
+ u3_Host.wax_u.data = sam_u;
+ uv_udp_recv_start(&u3_Host.wax_u, _mesa_alloc, _mesa_recv_cb);
+
+ c3_i rec_i = 2 * 1024 * 1024;
+ uv_recv_buffer_size((uv_handle_t*)&u3_Host.wax_u, &rec_i);
+
+ uv_send_buffer_size((uv_handle_t*)&u3_Host.wax_u, &rec_i);
+
+ sam_u->car_u.liv_o = c3y;
+ //u3z(rac); u3z(who);
+}
+
+static void _mesa_clear_pit(uv_timer_t *tim_u)
+{
+ u3_mesa* sam_u = (u3_mesa*)tim_u->data;
+ pit_map_itr itr_u = vt_first( &sam_u->pit_u );
+
+ c3_d now_d = _get_now_micros();
+
+ while (!vt_is_end(itr_u)) {
+ u3_pit_entry* pit_u = itr_u.data->val;
+ if ( (now_d - pit_u->tim_d) > PIT_EXPIRE_MICROS) {
+ itr_u = vt_erase_itr(&sam_u->pit_u, itr_u);
+ } else {
+ itr_u = vt_next(itr_u);
+ }
+ }
+}
+
+/* _mesa_io_init(): initialize ames I/O.
+*/
+u3_auto*
+u3_mesa_io_init(u3_pier* pir_u)
+{
+ u3l_log("mesa: INIT");
+ /* packs = fopen("/home/ec2-user/pages.packs", "rb"); */
+ arena par_u = arena_create(67108864);
+ u3_mesa* sam_u = new(&par_u, u3_mesa, 1);
+ sam_u->par_u = par_u;
+ sam_u->pir_u = pir_u;
+
+ arena are_u;
+ are_u.dat = (char*)are_y;
+ are_u.beg = (char*)are_y;
+ are_u.end = (char*)(are_y + 524288);
+ sam_u->are_u = are_u;
+
+ sam_u->jum_d = 0;
+
+ uv_timer_init(u3L, &sam_u->tim_u);
+
+ // clear pit every 30 seconds
+ sam_u->tim_u.data = sam_u;
+ uv_timer_start(&sam_u->tim_u, _mesa_clear_pit, 30000, 30000);
+
+ vt_init(&sam_u->pit_u);
+ vt_init(&sam_u->per_u);
+ vt_init(&sam_u->gag_u);
+ vt_init(&sam_u->jum_u);
+ vt_init(&sam_u->req_u);
+
+ // Disable networking for fake ships
+ //
+ if ( c3y == sam_u->pir_u->fak_o ) {
+ u3_Host.ops_u.net = c3n;
+ }
+
+ sam_u->for_o = c3n;
+ {
+ u3_noun her = u3i_chubs(2, pir_u->who_d);
+ if ( c3y == u3a_is_cat(her) && her < 256 ) {
+ u3l_log("mesa: forwarding enabled");
+ sam_u->for_o = c3y;
+ }
+ u3z(her);
+ }
+
+
+ u3_auto* car_u = &sam_u->car_u;
+ memset(car_u, 0, sizeof(*car_u));
+ car_u->nam_m = c3__ames;
+ car_u->liv_o = c3y;
+ car_u->io.talk_f = _mesa_io_talk;
+ car_u->io.info_f = _mesa_io_info;
+ car_u->io.slog_f = _mesa_io_slog;
+ car_u->io.kick_f = _mesa_io_kick;
+ car_u->io.exit_f = _mesa_io_exit;
+
+
+
+ /*{
+ u3_noun now;
+ struct timeval tim_u;
+ gettimeofday(&tim_u, 0);
+
+ now = u3_time_in_tv(&tim_u);
+ //sam_u->sev_l = u3r_mug(now);
+ u3z(now);
+ }*/
+
+ return car_u;
+}