summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Lambert <eric.lambert@sun.com>2009-04-08 21:25:52 -0700
committerTrond Norbye <Trond.Norbye@sun.com>2009-05-02 18:41:48 -0700
commita90d745b7997637eeb87c9a3a26c49003dcd8b18 (patch)
tree8b6dd1d94c116c3263dfd8d3c769c12bb5ea708c
parenta4106cf31d24a4c200fbfc50660f24dcf903e08f (diff)
downloadmemcached-stable-1.3.tar.gz
fix and test for issue 38 (server does not respond to binary requests)stable-1.3
-rw-r--r--memcached.c73
-rw-r--r--memcached.h14
-rwxr-xr-xt/udp.t234
-rw-r--r--thread.c10
4 files changed, 228 insertions, 103 deletions
diff --git a/memcached.c b/memcached.c
index 058c13a..16effd3 100644
--- a/memcached.c
+++ b/memcached.c
@@ -58,7 +58,7 @@
*/
static void drive_machine(conn *c);
static int new_socket(struct addrinfo *ai);
-static int server_socket(const int port, enum protocol prot);
+static int server_socket(const int port, enum network_transport transport);
static int try_read_command(conn *c);
enum try_read_result {
@@ -230,7 +230,7 @@ static int add_msghdr(conn *c)
c->msgbytes = 0;
c->msgused++;
- if (IS_UDP(c->protocol)) {
+ if (IS_UDP(c->transport)) {
/* Leave room for the UDP header, which we'll fill in later. */
return add_iov(c, NULL, UDP_HEADER_SIZE);
}
@@ -309,9 +309,6 @@ static const char *prot_text(enum protocol prot) {
case binary_prot:
rv = "binary";
break;
- case ascii_udp_prot:
- rv = "ascii-udp";
- break;
case negotiating_prot:
rv = "auto-negotiate";
break;
@@ -321,7 +318,7 @@ static const char *prot_text(enum protocol prot) {
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
- const int read_buffer_size, enum protocol prot,
+ const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
conn *c = conn_from_freelist();
@@ -366,6 +363,9 @@ conn *conn_new(const int sfd, enum conn_states init_state,
STATS_UNLOCK();
}
+ c->transport = transport;
+ c->protocol = negotiating_prot;
+
/* unix socket mode doesn't need this, so zeroed out. but why
* is this done for every command? presumably for UDP
* mode. */
@@ -378,21 +378,20 @@ conn *conn_new(const int sfd, enum conn_states init_state,
if (settings.verbose > 1) {
if (init_state == conn_listening) {
fprintf(stderr, "<%d server listening (%s)\n", sfd,
- prot_text(prot));
- } else if (IS_UDP(prot)) {
+ prot_text(c->protocol));
+ } else if (IS_UDP(transport)) {
fprintf(stderr, "<%d server listening (udp)\n", sfd);
- } else if (prot == negotiating_prot) {
+ } else if (c->protocol == negotiating_prot) {
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
} else {
fprintf(stderr, "<%d new unknown (%d) client connection\n",
- sfd, prot);
+ sfd, c->protocol);
abort();
}
}
c->sfd = sfd;
- c->protocol = prot;
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
@@ -525,7 +524,7 @@ static void conn_close(conn *c) {
static void conn_shrink(conn *c) {
assert(c != NULL);
- if (IS_UDP(c->protocol))
+ if (IS_UDP(c->transport))
return;
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
@@ -729,7 +728,7 @@ static int add_iov(conn *c, const void *buf, int len) {
* Limit UDP packets, and the first payloads of TCP replies, to
* UDP_MAX_PAYLOAD_SIZE bytes.
*/
- limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused);
+ limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
/* We may need to start a new msghdr if this one is full. */
if (m->msg_iovlen == IOV_MAX ||
@@ -1857,11 +1856,10 @@ static void reset_cmd_handler(conn *c) {
static void complete_nread(conn *c) {
assert(c != NULL);
- assert(c->protocol == ascii_udp_prot
- || c->protocol == ascii_prot
+ assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
- if (c->protocol == ascii_prot || c->protocol == ascii_udp_prot) {
+ if (c->protocol == ascii_prot) {
complete_nread_ascii(c);
} else if (c->protocol == binary_prot) {
complete_nread_binary(c);
@@ -2433,7 +2431,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
in \r\n. So we send SERVER_ERROR instead.
*/
if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
- || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
+ || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
out_string(c, "SERVER_ERROR out of memory writing get response");
}
else {
@@ -2848,7 +2846,7 @@ static int try_read_command(conn *c) {
assert(c->rcurr <= (c->rbuf + c->rsize));
assert(c->rbytes > 0);
- if (c->protocol == negotiating_prot) {
+ if (c->protocol == negotiating_prot || c->transport == udp_transport) {
if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
c->protocol = binary_prot;
} else {
@@ -3158,7 +3156,7 @@ static enum transmit_result transmit(conn *c) {
if (settings.verbose > 0)
perror("Failed to write, and not due to blocking");
- if (IS_UDP(c->protocol))
+ if (IS_UDP(c->transport))
conn_set_state(c, conn_read);
else
conn_set_state(c, conn_closing);
@@ -3206,7 +3204,7 @@ static void drive_machine(conn *c) {
}
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
- DATA_BUFFER_SIZE, c->protocol);
+ DATA_BUFFER_SIZE, tcp_transport);
stop = true;
break;
@@ -3223,7 +3221,7 @@ static void drive_machine(conn *c) {
break;
case conn_read:
- res = IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c);
+ res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED:
@@ -3360,9 +3358,8 @@ static void drive_machine(conn *c) {
* assemble it into a msgbuf list (this will be a single-entry
* list for TCP or a two-entry list for UDP).
*/
- if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) {
- if (add_iov(c, c->wcurr, c->wbytes) != 0 ||
- (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) {
+ if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
+ if (add_iov(c, c->wcurr, c->wbytes) != 0) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't build response\n");
conn_set_state(c, conn_closing);
@@ -3373,6 +3370,12 @@ static void drive_machine(conn *c) {
/* fall through... */
case conn_mwrite:
+ if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
+ if (settings.verbose > 0)
+ fprintf(stderr, "Failed to build UDP headers\n");
+ conn_set_state(c, conn_closing);
+ break;
+ }
switch (transmit(c)) {
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) {
@@ -3422,7 +3425,7 @@ static void drive_machine(conn *c) {
break;
case conn_closing:
- if (IS_UDP(c->protocol))
+ if (IS_UDP(c->transport))
conn_cleanup(c);
else
conn_close(c);
@@ -3512,7 +3515,7 @@ static void maximize_sndbuf(const int sfd) {
fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
}
-static int server_socket(const int port, enum protocol prot) {
+static int server_socket(const int port, enum network_transport transport) {
int sfd;
struct linger ling = {0, 0};
struct addrinfo *ai;
@@ -3531,7 +3534,7 @@ static int server_socket(const int port, enum protocol prot) {
memset(&hints, 0, sizeof (hints));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM;
+ hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
snprintf(port_buf, NI_MAXSERV, "%d", port);
error= getaddrinfo(settings.inter, port_buf, &hints, &ai);
@@ -3565,7 +3568,7 @@ static int server_socket(const int port, enum protocol prot) {
#endif
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
- if (IS_UDP(prot)) {
+ if (IS_UDP(transport)) {
maximize_sndbuf(sfd);
} else {
error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
@@ -3592,7 +3595,7 @@ static int server_socket(const int port, enum protocol prot) {
continue;
} else {
success++;
- if (!IS_UDP(prot) && listen(sfd, settings.backlog) == -1) {
+ if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
perror("listen()");
close(sfd);
freeaddrinfo(ai);
@@ -3600,18 +3603,18 @@ static int server_socket(const int port, enum protocol prot) {
}
}
- if (IS_UDP(prot)) {
+ if (IS_UDP(transport)) {
int c;
for (c = 1; c < settings.num_threads; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
- UDP_READ_BUFFER_SIZE, ascii_udp_prot);
+ UDP_READ_BUFFER_SIZE, transport);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
- prot, main_base))) {
+ transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
@@ -3695,7 +3698,7 @@ static int server_socket_unix(const char *path, int access_mask) {
}
if (!(listen_conn = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
- negotiating_prot, main_base))) {
+ local_transport, main_base))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
@@ -4233,7 +4236,7 @@ int main (int argc, char **argv) {
if (settings.socketpath == NULL) {
int udp_port;
errno = 0;
- if (settings.port && server_socket(settings.port, negotiating_prot)) {
+ if (settings.port && server_socket(settings.port, tcp_transport)) {
fprintf(stderr, "failed to listen on TCP port %d\n", settings.port);
if (errno != 0)
perror("tcp listen");
@@ -4250,7 +4253,7 @@ int main (int argc, char **argv) {
/* create the UDP listening socket and bind it */
errno = 0;
- if (settings.udpport && server_socket(settings.udpport, ascii_udp_prot)) {
+ if (settings.udpport && server_socket(settings.udpport, udp_transport)) {
fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
if (errno != 0)
perror("udp listen");
diff --git a/memcached.h b/memcached.h
index 1d6aabe..dc41389 100644
--- a/memcached.h
+++ b/memcached.h
@@ -237,12 +237,17 @@ enum bin_substates {
enum protocol {
ascii_prot = 3, /* arbitrary value. */
- ascii_udp_prot,
binary_prot,
negotiating_prot /* Discovering the protocol */
};
-#define IS_UDP(x) (x == ascii_udp_prot)
+enum network_transport {
+ local_transport, /* Unix sockets*/
+ tcp_transport,
+ udp_transport
+};
+
+#define IS_UDP(x) (x == udp_transport)
#define NREAD_ADD 1
#define NREAD_SET 2
@@ -325,6 +330,7 @@ struct conn {
int suffixleft;
enum protocol protocol; /* which protocol this connection speaks */
+ enum network_transport transport; /* what transport is used by this connection */
/* data for UDP clients */
int request_id; /* Incoming UDP request ID, if this is a UDP "connection" */
@@ -362,7 +368,7 @@ extern volatile rel_time_t current_time;
char *do_add_delta(conn *c, item *item, const bool incr, const int64_t delta,
char *buf);
enum store_item_type do_store_item(item *item, int comm, conn* c);
-conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum protocol prot, struct event_base *base);
+conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size, enum network_transport transport, struct event_base *base);
extern int daemonize(int nochdir, int noclose);
@@ -383,7 +389,7 @@ extern int daemonize(int nochdir, int noclose);
void thread_init(int nthreads, struct event_base *main_base);
int dispatch_event_add(int thread, conn *c);
-void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum protocol prot);
+void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size, enum network_transport transport);
/* Lock wrappers for cache functions that are called from main loop. */
char *add_delta(conn *c, item *item, const int incr, const int64_t delta,
diff --git a/t/udp.t b/t/udp.t
index 7f0448d..7648d36 100755
--- a/t/udp.t
+++ b/t/udp.t
@@ -6,6 +6,27 @@ use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
+use constant IS_ASCII => 0;
+use constant IS_BINARY => 1;
+use constant ENTRY_EXISTS => 0;
+use constant ENTRY_MISSING => 1;
+use constant BIN_REQ_MAGIC => 0x80;
+use constant BIN_RES_MAGIC => 0x81;
+use constant CMD_GET => 0x00;
+use constant CMD_SET => 0x01;
+use constant CMD_ADD => 0x02;
+use constant CMD_REPLACE => 0x03;
+use constant CMD_DELETE => 0x04;
+use constant CMD_INCR => 0x05;
+use constant CMD_DECR => 0x06;
+use constant CMD_APPEND => 0x0E;
+use constant CMD_PREPEND => 0x0F;
+use constant REQ_PKT_FMT => "CCnCCnNNNN";
+use constant RES_PKT_FMT => "CCnCCnNNNN";
+use constant INCRDECR_PKT_FMT => "NNNNN";
+use constant MIN_RECV_BYTES => length(pack(RES_PKT_FMT));
+
+
my $server = new_memcached();
my $sock = $server->sock;
@@ -17,74 +38,166 @@ mem_get_is($sock, "foo", "fooval");
my $usock = $server->new_udp_sock
or die "Can't bind : $@\n";
-# test all the get steps, one by one:
-test_single_op($usock,"get foo\r\n","VALUE foo 0 6\r\nfooval\r\nEND\r\n");
-
-# test all the set steps, one by one:
-test_single_op($usock,"set aval 0 0 1\r\n1\r\n","STORED\r\n");
-
-# test all the incr steps, one by one:
-test_single_op($usock,"incr aval 1\r\n","2\r\n");
-
-# test all the delete steps, one by one:
-test_single_op($usock,"delete aval\r\n","DELETED\r\n");
-
-
-# testing sequence numbers
+# testing sequence of request ids
for my $offt (1, 1, 2) {
- my $seq = 160 + $offt;
- my $res = send_udp_request($usock, $seq, "get foo\r\n");
+ my $req = 160 + $offt;
+ my $res = send_udp_request($usock, $req, "get foo\r\n");
ok($res, "got result");
is(keys %$res, 1, "one key (one packet)");
ok($res->{0}, "only got seq number 0");
is(substr($res->{0}, 8), "VALUE foo 0 6\r\nfooval\r\nEND\r\n");
- is(hexify(substr($res->{0}, 0, 2)), hexify(pack("n", $seq)), "sequence number in response ($seq) is correct");
+ is(hexify(substr($res->{0}, 0, 2)), hexify(pack("n", $req)), "udp request number in response ($req) is correct");
+}
+
+# op tests
+for my $prot (::IS_ASCII,::IS_BINARY) {
+ udp_set_test($prot,45,"aval$prot","1",0,0);
+ udp_set_test($prot,45,"bval$prot","abcd" x 1024,0,0);
+ udp_get_test($prot,45,"aval$prot","1",::ENTRY_EXISTS);
+ udp_get_test($prot,45,"404$prot","1",::ENTRY_MISSING);
+ udp_incr_decr_test($prot,45,"aval$prot","1","incr",1);
+ udp_incr_decr_test($prot,45,"aval$prot","1","decr",2);
+ udp_delete_test($prot,45,"aval$prot","0");
+}
+
+sub udp_set_test {
+ my ($protocol, $req_id, $key, $value, $flags, $exp) = @_;
+ my $req = "";
+ my $val_len = length($value);
+
+ if ($protocol == ::IS_ASCII) {
+ $req = "set $key $flags $exp $val_len\r\n$value\r\n";
+ } elsif ($protocol == ::IS_BINARY) {
+ my $key_len = length($key);
+ my $extra = pack "NN",$flags,$exp;
+ my $extra_len = length($extra);
+ my $total_len = $val_len + $extra_len + $key_len;
+ $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, ::CMD_SET, $key_len, $extra_len, 0, 0, $total_len, 0, 0, 0);
+ $req .= $extra . $key . $value;
+ }
+
+ my $datagrams = send_udp_request($usock, $req_id, $req);
+ my $resp = construct_udp_message($datagrams);
+
+ if ($protocol == ::IS_ASCII) {
+ is($resp,"STORED\r\n","Store key $key using ASCII protocol");
+ } elsif ($protocol == ::IS_BINARY) {
+ my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len,
+ $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp);
+ is($resp_status,"0","Store key $key using binary protocol");
+ }
+}
+
+sub udp_get_test {
+ my ($protocol, $req_id, $key, $value, $exists) = @_;
+ my $key_len = length($key);
+ my $value_len = length($value);
+ my $req = "";
+
+ if ($protocol == ::IS_ASCII) {
+ $req = "get $key\r\n";
+ } elsif ($protocol == ::IS_BINARY) {
+ $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, ::CMD_GET, $key_len, 0, 0, 0, $key_len, 0, 0, 0);
+ $req .= $key;
+ }
+
+ my $datagrams = send_udp_request($usock, $req_id, $req);
+ my $resp = construct_udp_message($datagrams);
+
+ if ($protocol == ::IS_ASCII) {
+ if ($exists == ::ENTRY_EXISTS) {
+ is($resp,"VALUE $key 0 $value_len\r\n$value\r\nEND\r\n","Retrieve entry with key $key using ASCII protocol");
+ } else {
+ is($resp,"END\r\n","Retrieve non existing entry with key $key using ASCII protocol");
+ }
+ } elsif ($protocol == ::IS_BINARY) {
+ my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len,
+ $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp);
+ if ($exists == ::ENTRY_EXISTS) {
+ is($resp_status,"0","Retrieve entry with key $key using binary protocol");
+ is(substr($resp,::MIN_RECV_BYTES + $resp_extra_len + $resp_key_len, $value_len),$value,"Value for key $key retrieved with binary protocol matches");
+ } else {
+ is($resp_status,"1","Retrieve non existing entry with key $key using binary protocol");
+ }
+ }
}
-# testing non-existent stuff
-my $res = send_udp_request($usock, 404, "get notexist\r\n");
-ok($res, "got result");
-is(keys %$res, 1, "one key (one packet)");
-ok($res->{0}, "only got seq number 0");
-is(hexify(substr($res->{0}, 0, 2)), hexify(pack("n", 404)), "sequence number 404 correct");
-is(substr($res->{0}, 8), "END\r\n");
-
-# test multi-packet response
-{
- my $big = "abcd" x 1024;
- my $len = length $big;
- print $sock "set big 0 0 $len\r\n$big\r\n";
- is(scalar <$sock>, "STORED\r\n", "stored big");
- mem_get_is($sock, "big", $big, "big value matches");
- my $res = send_udp_request($usock, 999, "get big\r\n");
- is(scalar keys %$res, 3, "three packet response");
- like($res->{0}, qr/VALUE big 0 4096/, "first packet has value line");
- like($res->{2}, qr/\r\nEND\r\n/, "last packet has end");
- is(hexify(substr($res->{1}, 0, 2)), hexify(pack("n", 999)), "sequence number of middle packet is correct");
+sub udp_delete_test {
+ my ($protocol, $req_id, $key, $time) = @_;
+ my $req = "";
+ my $key_len = length($key);
+
+ if ($protocol == ::IS_ASCII) {
+ $req = "delete $key $time\r\n";
+ } elsif ($protocol == ::IS_BINARY) {
+ $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, ::CMD_DELETE, $key_len, 0, 0, 0, $key_len, 0, 0, 0);
+ $req .= $key;
+ }
+
+ my $datagrams = send_udp_request($usock, $req_id, $req);
+ my $resp = construct_udp_message($datagrams);
+
+ if ($protocol == ::IS_ASCII) {
+ is($resp,"DELETED\r\n","Delete key $key using ASCII protocol");
+ } elsif ($protocol == ::IS_BINARY) {
+ my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len,
+ $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp);
+ is($resp_status,"0","Delete key $key using binary protocol");
+ }
}
-sub test_single_op {
- my $usock = shift;
- my $op = shift;
- my $resp = shift;
- my $req = pack("nnnn", 45, 0, 1, 0); # request id (opaque), seq num, #packets, reserved (must be 0)
- $req .= $op;
- ok(defined send($usock, $req, 0), "sent request");
-
- my $rin = '';
- vec($rin, fileno($usock), 1) = 1;
- my $rout;
- ok(select($rout = $rin, undef, undef, 2.0), "got readability");
-
- my $sender;
- my $res;
- $sender = $usock->recv($res, 1500, 0);
-
- my $id = pack("n", 45);
- my $expctdlen = length($resp) + 8;
- is(hexify(substr($res, 0, 8)), hexify($id) . '0000' . '0001' . '0000', "header is correct");
- is(length $res,$expctdlen,'');
- is(substr($res, 8), $resp, "response is correct");
+sub udp_incr_decr_test {
+ my ($protocol, $req_id, $key, $val, $optype, $init_val) = @_;
+ my $req = "";
+ my $key_len = length($key);
+ my $expected_value = 0;
+ my $acmd = "incr";
+ my $bcmd = ::CMD_INCR;
+ if ($optype eq "incr") {
+ $expected_value = $init_val + $val;
+ } else {
+ $acmd = "decr";
+ $bcmd = ::CMD_DECR;
+ $expected_value = $init_val - $val;
+ }
+
+ if ($protocol == ::IS_ASCII) {
+ $req = "$acmd $key $val\r\n";
+ } elsif ($protocol == ::IS_BINARY) {
+ my $extra = pack(::INCRDECR_PKT_FMT, ($val / 2 ** 32),($val % 2 ** 32), 0, 0, 0);
+ my $extra_len = length($extra);
+ $req = pack(::REQ_PKT_FMT, ::BIN_REQ_MAGIC, $bcmd, $key_len, $extra_len, 0, 0, $key_len + $extra_len, 0, 0, 0);
+ $req .= $extra . $key;
+ }
+
+ my $datagrams = send_udp_request($usock, $req_id, $req);
+ my $resp = construct_udp_message($datagrams);
+
+ if ($protocol == ::IS_ASCII) {
+ is($resp,"$expected_value\r\n","perform $acmd math operation on key $key with ASCII protocol");
+ } elsif ($protocol == ::IS_BINARY) {
+ my ($resp_magic, $resp_op_code, $resp_key_len, $resp_extra_len, $resp_data_type, $resp_status, $resp_total_len,
+ $resp_opaque, $resp_ident_hi, $resp_ident_lo) = unpack(::RES_PKT_FMT, $resp);
+ is($resp_status,"0","perform $acmd math operation on key $key with binary protocol");
+ my ($resp_hi,$resp_lo) = unpack("NN",substr($resp,::MIN_RECV_BYTES + $resp_extra_len + $resp_key_len,
+ $resp_total_len - $resp_extra_len - $resp_key_len));
+ is(($resp_hi * 2 ** 32) + $resp_lo,$expected_value,"validate result of binary protocol math operation $acmd . Expected value $expected_value")
+ }
+}
+
+sub construct_udp_message {
+ my $datagrams = shift;
+ my $num_datagram = keys (%$datagrams);
+ my $msg = "";
+ my $cur_dg ="";
+ my $cur_udp_header ="";
+ for (my $cur_dg_index = 0; $cur_dg_index < $num_datagram; $cur_dg_index++) {
+ $cur_dg = %$datagrams->{$cur_dg_index};
+ isnt($cur_dg,"","missing datagram for segment $cur_dg_index");
+ $cur_udp_header=substr($cur_dg, 0, 8);
+ $msg .= substr($cur_dg,8);
+ }
+ return $msg;
}
sub hexify {
@@ -94,6 +207,8 @@ sub hexify {
}
# returns undef on select timeout, or hashref of "seqnum" -> payload (including headers)
+# verifies that resp_id is equal to id sent in request
+# ensures consistency in num packets that make up response
sub send_udp_request {
my ($sock, $reqid, $req) = @_;
@@ -131,6 +246,7 @@ sub send_udp_request {
return $ret;
}
+
__END__
$sender = recv($usock, $ans, 1050, 0);
diff --git a/thread.c b/thread.c
index f46a4b9..d7ed2b8 100644
--- a/thread.c
+++ b/thread.c
@@ -20,7 +20,7 @@ struct conn_queue_item {
enum conn_states init_state;
int event_flags;
int read_buffer_size;
- enum protocol protocol;
+ enum network_transport transport;
CQ_ITEM *next;
};
@@ -246,9 +246,9 @@ static void thread_libevent_process(int fd, short which, void *arg) {
if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->protocol, me->base);
+ item->read_buffer_size, item->transport, me->base);
if (c == NULL) {
- if (IS_UDP(item->protocol)) {
+ if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
@@ -274,7 +274,7 @@ static int last_thread = 0;
* of an incoming connection.
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
- int read_buffer_size, enum protocol prot) {
+ int read_buffer_size, enum network_transport transport) {
CQ_ITEM *item = cqi_new();
int tid = last_thread % (settings.num_threads - 1);
@@ -288,7 +288,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
- item->protocol = prot;
+ item->transport = transport;
cq_push(thread->new_conn_queue, item);