From a90d745b7997637eeb87c9a3a26c49003dcd8b18 Mon Sep 17 00:00:00 2001 From: Eric Lambert Date: Wed, 8 Apr 2009 21:25:52 -0700 Subject: fix and test for issue 38 (server does not respond to binary requests) --- memcached.c | 73 ++++++++++--------- memcached.h | 14 ++-- t/udp.t | 234 +++++++++++++++++++++++++++++++++++++++++++++--------------- thread.c | 10 +-- 4 files changed, 228 insertions(+), 103 deletions(-) diff --git a/memcached.c b/memcached.c index 058c13a..16effd3 100644 --- a/memcached.c +++ b/memcached.c @@ -58,7 +58,7 @@ */ static void drive_machine(conn *c); static int new_socket(struct addrinfo *ai); -static int server_socket(const int port, enum protocol prot); +static int server_socket(const int port, enum network_transport transport); static int try_read_command(conn *c); enum try_read_result { @@ -230,7 +230,7 @@ static int add_msghdr(conn *c) c->msgbytes = 0; c->msgused++; - if (IS_UDP(c->protocol)) { + if (IS_UDP(c->transport)) { /* Leave room for the UDP header, which we'll fill in later. */ return add_iov(c, NULL, UDP_HEADER_SIZE); } @@ -309,9 +309,6 @@ static const char *prot_text(enum protocol prot) { case binary_prot: rv = "binary"; break; - case ascii_udp_prot: - rv = "ascii-udp"; - break; case negotiating_prot: rv = "auto-negotiate"; break; @@ -321,7 +318,7 @@ static const char *prot_text(enum protocol prot) { conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, - const int read_buffer_size, enum protocol prot, + const int read_buffer_size, enum network_transport transport, struct event_base *base) { conn *c = conn_from_freelist(); @@ -366,6 +363,9 @@ conn *conn_new(const int sfd, enum conn_states init_state, STATS_UNLOCK(); } + c->transport = transport; + c->protocol = negotiating_prot; + /* unix socket mode doesn't need this, so zeroed out. but why * is this done for every command? presumably for UDP * mode. */ @@ -378,21 +378,20 @@ conn *conn_new(const int sfd, enum conn_states init_state, if (settings.verbose > 1) { if (init_state == conn_listening) { fprintf(stderr, "<%d server listening (%s)\n", sfd, - prot_text(prot)); - } else if (IS_UDP(prot)) { + prot_text(c->protocol)); + } else if (IS_UDP(transport)) { fprintf(stderr, "<%d server listening (udp)\n", sfd); - } else if (prot == negotiating_prot) { + } else if (c->protocol == negotiating_prot) { fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd); } else { fprintf(stderr, "<%d new unknown (%d) client connection\n", - sfd, prot); + sfd, c->protocol); abort(); } } c->sfd = sfd; - c->protocol = prot; c->state = init_state; c->rlbytes = 0; c->cmd = -1; @@ -525,7 +524,7 @@ static void conn_close(conn *c) { static void conn_shrink(conn *c) { assert(c != NULL); - if (IS_UDP(c->protocol)) + if (IS_UDP(c->transport)) return; if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { @@ -729,7 +728,7 @@ static int add_iov(conn *c, const void *buf, int len) { * Limit UDP packets, and the first payloads of TCP replies, to * UDP_MAX_PAYLOAD_SIZE bytes. */ - limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused); + limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused); /* We may need to start a new msghdr if this one is full. */ if (m->msg_iovlen == IOV_MAX || @@ -1857,11 +1856,10 @@ static void reset_cmd_handler(conn *c) { static void complete_nread(conn *c) { assert(c != NULL); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); - if (c->protocol == ascii_prot || c->protocol == ascii_udp_prot) { + if (c->protocol == ascii_prot) { complete_nread_ascii(c); } else if (c->protocol == binary_prot) { complete_nread_binary(c); @@ -2433,7 +2431,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, in \r\n. So we send SERVER_ERROR instead. */ if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0 - || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { + || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) { out_string(c, "SERVER_ERROR out of memory writing get response"); } else { @@ -2848,7 +2846,7 @@ static int try_read_command(conn *c) { assert(c->rcurr <= (c->rbuf + c->rsize)); assert(c->rbytes > 0); - if (c->protocol == negotiating_prot) { + if (c->protocol == negotiating_prot || c->transport == udp_transport) { if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) { c->protocol = binary_prot; } else { @@ -3158,7 +3156,7 @@ static enum transmit_result transmit(conn *c) { if (settings.verbose > 0) perror("Failed to write, and not due to blocking"); - if (IS_UDP(c->protocol)) + if (IS_UDP(c->transport)) conn_set_state(c, conn_read); else conn_set_state(c, conn_closing); @@ -3206,7 +3204,7 @@ static void drive_machine(conn *c) { } dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, - DATA_BUFFER_SIZE, c->protocol); + DATA_BUFFER_SIZE, tcp_transport); stop = true; break; @@ -3223,7 +3221,7 @@ static void drive_machine(conn *c) { break; case conn_read: - res = IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c); + res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); switch (res) { case READ_NO_DATA_RECEIVED: @@ -3360,9 +3358,8 @@ static void drive_machine(conn *c) { * assemble it into a msgbuf list (this will be a single-entry * list for TCP or a two-entry list for UDP). */ - if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) { - if (add_iov(c, c->wcurr, c->wbytes) != 0 || - (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { + if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) { + if (add_iov(c, c->wcurr, c->wbytes) != 0) { if (settings.verbose > 0) fprintf(stderr, "Couldn't build response\n"); conn_set_state(c, conn_closing); @@ -3373,6 +3370,12 @@ static void drive_machine(conn *c) { /* fall through... */ case conn_mwrite: + if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) { + if (settings.verbose > 0) + fprintf(stderr, "Failed to build UDP headers\n"); + conn_set_state(c, conn_closing); + break; + } switch (transmit(c)) { case TRANSMIT_COMPLETE: if (c->state == conn_mwrite) { @@ -3422,7 +3425,7 @@ static void drive_machine(conn *c) { break; case conn_closing: - if (IS_UDP(c->protocol)) + if (IS_UDP(c->transport)) conn_cleanup(c); else conn_close(c); @@ -3512,7 +3515,7 @@ static void maximize_sndbuf(const int sfd) { fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good); } -static int server_socket(const int port, enum protocol prot) { +static int server_socket(const int port, enum network_transport transport) { int sfd; struct linger ling = {0, 0}; struct addrinfo *ai; @@ -3531,7 +3534,7 @@ static int server_socket(const int port, enum protocol prot) { memset(&hints, 0, sizeof (hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_UNSPEC; - hints.ai_socktype = IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM; + hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM; snprintf(port_buf, NI_MAXSERV, "%d", port); error= getaddrinfo(settings.inter, port_buf, &hints, &ai); @@ -3565,7 +3568,7 @@ static int server_socket(const int port, enum protocol prot) { #endif setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); - if (IS_UDP(prot)) { + if (IS_UDP(transport)) { maximize_sndbuf(sfd); } else { error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); @@ -3592,7 +3595,7 @@ static int server_socket(const int port, enum protocol prot) { continue; } else { success++; - if (!IS_UDP(prot) && listen(sfd, settings.backlog) == -1) { + if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) { perror("listen()"); close(sfd); freeaddrinfo(ai); @@ -3600,18 +3603,18 @@ static int server_socket(const int port, enum protocol prot) { } } - if (IS_UDP(prot)) { + if (IS_UDP(transport)) { int c; for (c = 1; c < settings.num_threads; c++) { /* this is guaranteed to hit all threads because we round-robin */ dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, - UDP_READ_BUFFER_SIZE, ascii_udp_prot); + UDP_READ_BUFFER_SIZE, transport); } } else { if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, - prot, main_base))) { + transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } @@ -3695,7 +3698,7 @@ static int server_socket_unix(const char *path, int access_mask) { } if (!(listen_conn = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, - negotiating_prot, main_base))) { + local_transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } @@ -4233,7 +4236,7 @@ int main (int argc, char **argv) { if (settings.socketpath == NULL) { int udp_port; errno = 0; - if (settings.port && server_socket(settings.port, negotiating_prot)) { + if (settings.port && server_socket(settings.port, tcp_transport)) { fprintf(stderr, "failed to listen on TCP port %d\n", settings.port); if (errno != 0) perror("tcp listen"); @@ -4250,7 +4253,7 @@ int main (int argc, char **argv) { /* create the UDP listening socket and bind it */ errno = 0; - if (settings.udpport && server_socket(settings.udpport, ascii_udp_prot)) { + if (settings.udpport && server_socket(settings.udpport, udp_transport)) { fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport); if (errno != 0) perror("udp listen"); diff --git a/memcached.h b/memcached.h index 1d6aabe..dc41389 100644 --- a/memcached.h +++ b/memcached.h @@ -237,12 +237,17 @@ enum bin_substates { enum protocol { ascii_prot = 3, /* arbitrary value. */ - ascii_udp_prot, binary_prot, negotiating_prot /* Discovering the protocol */ }; -#define IS_UDP(x) (x == ascii_udp_prot) +enum network_transport { + local_transport, /* Unix sockets*/ + tcp_transport, + udp_transport +}; + +#define IS_UDP(x) (x == udp_transport) #define NREAD_ADD 1 #define NREAD_SET 2 @@ -325,6 +330,7 @@ struct conn { int suffixleft; enum protocol protocol; /* which protocol this connection speaks */ + enum network_transport transport; /* what transport is used by this connection */ /* data for UDP clients */ int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */ @@ -362,7 +368,7 @@ extern volatile rel_time_t current_time; char *do_add_delta(conn *c, item *item, const bool incr, const int64_t delta, char *buf); enum store_item_type do_store_item(item *item, int comm, conn* c); -conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum protocol prot, struct event_base *base); +conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base); extern int daemonize(int nochdir, int noclose); @@ -383,7 +389,7 @@ extern int daemonize(int nochdir, int noclose); void thread_init(int nthreads, struct event_base *main_base); int dispatch_event_add(int thread, conn *c); -void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot); +void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport); /* Lock wrappers for cache functions that are called from main loop. */ char *add_delta(conn *c, item *item, const int incr, const int64_t delta, diff --git a/t/udp.t b/t/udp.t index 7f0448d..7648d36 100755 --- a/t/udp.t +++ b/t/udp.t @@ -6,6 +6,27 @@ use FindBin qw($Bin); use lib "$Bin/lib"; use MemcachedTest; +use constant IS_ASCII => 0; +use constant IS_BINARY => 1; +use constant ENTRY_EXISTS => 0; +use constant ENTRY_MISSING => 1; +use constant BIN_REQ_MAGIC => 0x80; +use constant BIN_RES_MAGIC => 0x81; +use constant CMD_GET => 0x00; +use constant CMD_SET => 0x01; +use constant CMD_ADD => 0x02; +use constant CMD_REPLACE => 0x03; +use constant CMD_DELETE => 0x04; +use constant CMD_INCR => 0x05; +use constant CMD_DECR => 0x06; +use constant CMD_APPEND => 0x0E; +use constant CMD_PREPEND => 0x0F; +use constant REQ_PKT_FMT => "CCnCCnNNNN"; +use constant RES_PKT_FMT => "CCnCCnNNNN"; +use constant INCRDECR_PKT_FMT => "NNNNN"; +use constant MIN_RECV_BYTES => length(pack(RES_PKT_FMT)); + + my $server = new_memcached(); my $sock = $server->sock; @@ -17,74 +38,166 @@ mem_get_is($sock, "foo", "fooval"); my $usock = $server->new_udp_sock or die "Can't bind : $@\n"; -# test all the get steps, one by one: -test_single_op($usock,"get foo\r\n","VALUE foo 0 6\r\nfooval\r\nEND\r\n"); - -# test all the set steps, one by one: -test_single_op($usock,"set aval 0 0 1\r\n1\r\n","STORED\r\n"); - -# test all the incr steps, one by one: -test_single_op($usock,"incr aval 1\r\n","2\r\n"); - -# test all the delete steps, one by one: -test_single_op($usock,"delete aval\r\n","DELETED\r\n"); - - -# testing sequence numbers +# testing sequence of request ids for my $offt (1, 1, 2) { - my $seq = 160 + $offt; - my $res = send_udp_request($usock, $seq, "get foo\r\n"); + my $req = 160 + $offt; + my $res = send_udp_request($usock, $req, "get foo\r\n"); ok($res, "got result"); is(keys %$res, 1, "one key (one packet)"); ok($res->{0}, "only got seq number 0"); is(substr($res->{0}, 8), "VALUE foo 0 6\r\nfooval\r\nEND\r\n"); - is(hexify(substr($res->{0}, 0, 2)), hexify(pack("n", $seq)), "sequence number in response ($seq) is correct"); + is(hexify(substr($res->{0}, 0, 2)), hexify(pack("n", $req)), "udp request number in response ($req) is correct"); +} + +# op tests +for my $prot (::IS_ASCII,::IS_BINARY) { + udp_set_test($prot,45,"aval$prot","1",0,0); + udp_set_test($prot,45,"bval$prot","abcd" x 1024,0,0); + udp_get_test($prot,45,"aval$prot","1",::ENTRY_EXISTS); + udp_get_test($prot,45,"404$prot","1",::ENTRY_MISSING); + udp_incr_decr_test($prot,45,"aval$prot","1","incr",1); + udp_incr_decr_test($prot,45,"aval$prot","1","decr",2); + udp_delete_test($prot,45,"aval$prot","0"); +} + +sub udp_set_test { + my ($protocol, $req_id, $key, $value, $flags, $exp) = @_; + my $req = ""; + my $val_len = length($value); + + if ($protocol == ::IS_ASCII) { + $req = "set $key $flags $exp $val_len\r\n$value\r\n"; + } elsif ($protocol == ::IS_BINARY) { + my $key_len = length($key); + my $extra = pack "NN",$flags,$exp; + my $extra_len = length($extra); + my $total_len = $val_len + $extra_len + $key_len; + $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, ::CMD_SET, $key_len, $extra_len, 0, 0, $total_len, 0, 0, 0); + $req .= $extra . $key . $value; + } + + my $datagrams = send_udp_request($usock, $req_id, $req); + my $resp = construct_udp_message($datagrams); + + if ($protocol == ::IS_ASCII) { + is($resp,"STORED\r\n","Store key $key using ASCII protocol"); + } elsif ($protocol == ::IS_BINARY) { + my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len, + $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp); + is($resp_status,"0","Store key $key using binary protocol"); + } +} + +sub udp_get_test { + my ($protocol, $req_id, $key, $value, $exists) = @_; + my $key_len = length($key); + my $value_len = length($value); + my $req = ""; + + if ($protocol == ::IS_ASCII) { + $req = "get $key\r\n"; + } elsif ($protocol == ::IS_BINARY) { + $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, ::CMD_GET, $key_len, 0, 0, 0, $key_len, 0, 0, 0); + $req .= $key; + } + + my $datagrams = send_udp_request($usock, $req_id, $req); + my $resp = construct_udp_message($datagrams); + + if ($protocol == ::IS_ASCII) { + if ($exists == ::ENTRY_EXISTS) { + is($resp,"VALUE $key 0 $value_len\r\n$value\r\nEND\r\n","Retrieve entry with key $key using ASCII protocol"); + } else { + is($resp,"END\r\n","Retrieve non existing entry with key $key using ASCII protocol"); + } + } elsif ($protocol == ::IS_BINARY) { + my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len, + $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp); + if ($exists == ::ENTRY_EXISTS) { + is($resp_status,"0","Retrieve entry with key $key using binary protocol"); + is(substr($resp,::MIN_RECV_BYTES + $resp_extra_len + $resp_key_len, $value_len),$value,"Value for key $key retrieved with binary protocol matches"); + } else { + is($resp_status,"1","Retrieve non existing entry with key $key using binary protocol"); + } + } } -# testing non-existent stuff -my $res = send_udp_request($usock, 404, "get notexist\r\n"); -ok($res, "got result"); -is(keys %$res, 1, "one key (one packet)"); -ok($res->{0}, "only got seq number 0"); -is(hexify(substr($res->{0}, 0, 2)), hexify(pack("n", 404)), "sequence number 404 correct"); -is(substr($res->{0}, 8), "END\r\n"); - -# test multi-packet response -{ - my $big = "abcd" x 1024; - my $len = length $big; - print $sock "set big 0 0 $len\r\n$big\r\n"; - is(scalar <$sock>, "STORED\r\n", "stored big"); - mem_get_is($sock, "big", $big, "big value matches"); - my $res = send_udp_request($usock, 999, "get big\r\n"); - is(scalar keys %$res, 3, "three packet response"); - like($res->{0}, qr/VALUE big 0 4096/, "first packet has value line"); - like($res->{2}, qr/\r\nEND\r\n/, "last packet has end"); - is(hexify(substr($res->{1}, 0, 2)), hexify(pack("n", 999)), "sequence number of middle packet is correct"); +sub udp_delete_test { + my ($protocol, $req_id, $key, $time) = @_; + my $req = ""; + my $key_len = length($key); + + if ($protocol == ::IS_ASCII) { + $req = "delete $key $time\r\n"; + } elsif ($protocol == ::IS_BINARY) { + $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, ::CMD_DELETE, $key_len, 0, 0, 0, $key_len, 0, 0, 0); + $req .= $key; + } + + my $datagrams = send_udp_request($usock, $req_id, $req); + my $resp = construct_udp_message($datagrams); + + if ($protocol == ::IS_ASCII) { + is($resp,"DELETED\r\n","Delete key $key using ASCII protocol"); + } elsif ($protocol == ::IS_BINARY) { + my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len, + $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp); + is($resp_status,"0","Delete key $key using binary protocol"); + } } -sub test_single_op { - my $usock = shift; - my $op = shift; - my $resp = shift; - my $req = pack("nnnn", 45, 0, 1, 0); # request id (opaque), seq num, #packets, reserved (must be 0) - $req .= $op; - ok(defined send($usock, $req, 0), "sent request"); - - my $rin = ''; - vec($rin, fileno($usock), 1) = 1; - my $rout; - ok(select($rout = $rin, undef, undef, 2.0), "got readability"); - - my $sender; - my $res; - $sender = $usock->recv($res, 1500, 0); - - my $id = pack("n", 45); - my $expctdlen = length($resp) + 8; - is(hexify(substr($res, 0, 8)), hexify($id) . '0000' . '0001' . '0000', "header is correct"); - is(length $res,$expctdlen,''); - is(substr($res, 8), $resp, "response is correct"); +sub udp_incr_decr_test { + my ($protocol, $req_id, $key, $val, $optype, $init_val) = @_; + my $req = ""; + my $key_len = length($key); + my $expected_value = 0; + my $acmd = "incr"; + my $bcmd = ::CMD_INCR; + if ($optype eq "incr") { + $expected_value = $init_val + $val; + } else { + $acmd = "decr"; + $bcmd = ::CMD_DECR; + $expected_value = $init_val - $val; + } + + if ($protocol == ::IS_ASCII) { + $req = "$acmd $key $val\r\n"; + } elsif ($protocol == ::IS_BINARY) { + my $extra = pack(::INCRDECR_PKT_FMT, ($val / 2 ** 32),($val % 2 ** 32), 0, 0, 0); + my $extra_len = length($extra); + $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, $bcmd, $key_len, $extra_len, 0, 0, $key_len + $extra_len, 0, 0, 0); + $req .= $extra . $key; + } + + my $datagrams = send_udp_request($usock, $req_id, $req); + my $resp = construct_udp_message($datagrams); + + if ($protocol == ::IS_ASCII) { + is($resp,"$expected_value\r\n","perform $acmd math operation on key $key with ASCII protocol"); + } elsif ($protocol == ::IS_BINARY) { + my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len, + $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp); + is($resp_status,"0","perform $acmd math operation on key $key with binary protocol"); + my ($resp_hi,$resp_lo) = unpack("NN",substr($resp,::MIN_RECV_BYTES + $resp_extra_len + $resp_key_len, + $resp_total_len - $resp_extra_len - $resp_key_len)); + is(($resp_hi * 2 ** 32) + $resp_lo,$expected_value,"validate result of binary protocol math operation $acmd . Expected value $expected_value") + } +} + +sub construct_udp_message { + my $datagrams = shift; + my $num_datagram = keys (%$datagrams); + my $msg = ""; + my $cur_dg =""; + my $cur_udp_header =""; + for (my $cur_dg_index = 0; $cur_dg_index < $num_datagram; $cur_dg_index++) { + $cur_dg = %$datagrams->{$cur_dg_index}; + isnt($cur_dg,"","missing datagram for segment $cur_dg_index"); + $cur_udp_header=substr($cur_dg, 0, 8); + $msg .= substr($cur_dg,8); + } + return $msg; } sub hexify { @@ -94,6 +207,8 @@ sub hexify { } # returns undef on select timeout, or hashref of "seqnum" -> payload (including headers) +# verifies that resp_id is equal to id sent in request +# ensures consistency in num packets that make up response sub send_udp_request { my ($sock, $reqid, $req) = @_; @@ -131,6 +246,7 @@ sub send_udp_request { return $ret; } + __END__ $sender = recv($usock, $ans, 1050, 0); diff --git a/thread.c b/thread.c index f46a4b9..d7ed2b8 100644 --- a/thread.c +++ b/thread.c @@ -20,7 +20,7 @@ struct conn_queue_item { enum conn_states init_state; int event_flags; int read_buffer_size; - enum protocol protocol; + enum network_transport transport; CQ_ITEM *next; }; @@ -246,9 +246,9 @@ static void thread_libevent_process(int fd, short which, void *arg) { if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, - item->read_buffer_size, item->protocol, me->base); + item->read_buffer_size, item->transport, me->base); if (c == NULL) { - if (IS_UDP(item->protocol)) { + if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { @@ -274,7 +274,7 @@ static int last_thread = 0; * of an incoming connection. */ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, - int read_buffer_size, enum protocol prot) { + int read_buffer_size, enum network_transport transport) { CQ_ITEM *item = cqi_new(); int tid = last_thread % (settings.num_threads - 1); @@ -288,7 +288,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; - item->protocol = prot; + item->transport = transport; cq_push(thread->new_conn_queue, item); -- cgit v1.2.1