diff options
Diffstat (limited to 'memcached.c')
-rw-r--r-- | memcached.c | 73 |
1 files changed, 38 insertions, 35 deletions
diff --git a/memcached.c b/memcached.c index 058c13a..16effd3 100644 --- a/memcached.c +++ b/memcached.c @@ -58,7 +58,7 @@ */ static void drive_machine(conn *c); static int new_socket(struct addrinfo *ai); -static int server_socket(const int port, enum protocol prot); +static int server_socket(const int port, enum network_transport transport); static int try_read_command(conn *c); enum try_read_result { @@ -230,7 +230,7 @@ static int add_msghdr(conn *c) c->msgbytes = 0; c->msgused++; - if (IS_UDP(c->protocol)) { + if (IS_UDP(c->transport)) { /* Leave room for the UDP header, which we'll fill in later. */ return add_iov(c, NULL, UDP_HEADER_SIZE); } @@ -309,9 +309,6 @@ static const char *prot_text(enum protocol prot) { case binary_prot: rv = "binary"; break; - case ascii_udp_prot: - rv = "ascii-udp"; - break; case negotiating_prot: rv = "auto-negotiate"; break; @@ -321,7 +318,7 @@ static const char *prot_text(enum protocol prot) { conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags, - const int read_buffer_size, enum protocol prot, + const int read_buffer_size, enum network_transport transport, struct event_base *base) { conn *c = conn_from_freelist(); @@ -366,6 +363,9 @@ conn *conn_new(const int sfd, enum conn_states init_state, STATS_UNLOCK(); } + c->transport = transport; + c->protocol = negotiating_prot; + /* unix socket mode doesn't need this, so zeroed out. but why * is this done for every command? presumably for UDP * mode. */ @@ -378,21 +378,20 @@ conn *conn_new(const int sfd, enum conn_states init_state, if (settings.verbose > 1) { if (init_state == conn_listening) { fprintf(stderr, "<%d server listening (%s)\n", sfd, - prot_text(prot)); - } else if (IS_UDP(prot)) { + prot_text(c->protocol)); + } else if (IS_UDP(transport)) { fprintf(stderr, "<%d server listening (udp)\n", sfd); - } else if (prot == negotiating_prot) { + } else if (c->protocol == negotiating_prot) { fprintf(stderr, "<%d new auto-negotiating client connection\n", sfd); } else { fprintf(stderr, "<%d new unknown (%d) client connection\n", - sfd, prot); + sfd, c->protocol); abort(); } } c->sfd = sfd; - c->protocol = prot; c->state = init_state; c->rlbytes = 0; c->cmd = -1; @@ -525,7 +524,7 @@ static void conn_close(conn *c) { static void conn_shrink(conn *c) { assert(c != NULL); - if (IS_UDP(c->protocol)) + if (IS_UDP(c->transport)) return; if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) { @@ -729,7 +728,7 @@ static int add_iov(conn *c, const void *buf, int len) { * Limit UDP packets, and the first payloads of TCP replies, to * UDP_MAX_PAYLOAD_SIZE bytes. */ - limit_to_mtu = IS_UDP(c->protocol) || (1 == c->msgused); + limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused); /* We may need to start a new msghdr if this one is full. */ if (m->msg_iovlen == IOV_MAX || @@ -1857,11 +1856,10 @@ static void reset_cmd_handler(conn *c) { static void complete_nread(conn *c) { assert(c != NULL); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); - if (c->protocol == ascii_prot || c->protocol == ascii_udp_prot) { + if (c->protocol == ascii_prot) { complete_nread_ascii(c); } else if (c->protocol == binary_prot) { complete_nread_binary(c); @@ -2433,7 +2431,7 @@ static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, in \r\n. So we send SERVER_ERROR instead. */ if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0 - || (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { + || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) { out_string(c, "SERVER_ERROR out of memory writing get response"); } else { @@ -2848,7 +2846,7 @@ static int try_read_command(conn *c) { assert(c->rcurr <= (c->rbuf + c->rsize)); assert(c->rbytes > 0); - if (c->protocol == negotiating_prot) { + if (c->protocol == negotiating_prot || c->transport == udp_transport) { if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) { c->protocol = binary_prot; } else { @@ -3158,7 +3156,7 @@ static enum transmit_result transmit(conn *c) { if (settings.verbose > 0) perror("Failed to write, and not due to blocking"); - if (IS_UDP(c->protocol)) + if (IS_UDP(c->transport)) conn_set_state(c, conn_read); else conn_set_state(c, conn_closing); @@ -3206,7 +3204,7 @@ static void drive_machine(conn *c) { } dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST, - DATA_BUFFER_SIZE, c->protocol); + DATA_BUFFER_SIZE, tcp_transport); stop = true; break; @@ -3223,7 +3221,7 @@ static void drive_machine(conn *c) { break; case conn_read: - res = IS_UDP(c->protocol) ? try_read_udp(c) : try_read_network(c); + res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c); switch (res) { case READ_NO_DATA_RECEIVED: @@ -3360,9 +3358,8 @@ static void drive_machine(conn *c) { * assemble it into a msgbuf list (this will be a single-entry * list for TCP or a two-entry list for UDP). */ - if (c->iovused == 0 || (IS_UDP(c->protocol) && c->iovused == 1)) { - if (add_iov(c, c->wcurr, c->wbytes) != 0 || - (IS_UDP(c->protocol) && build_udp_headers(c) != 0)) { + if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) { + if (add_iov(c, c->wcurr, c->wbytes) != 0) { if (settings.verbose > 0) fprintf(stderr, "Couldn't build response\n"); conn_set_state(c, conn_closing); @@ -3373,6 +3370,12 @@ static void drive_machine(conn *c) { /* fall through... */ case conn_mwrite: + if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) { + if (settings.verbose > 0) + fprintf(stderr, "Failed to build UDP headers\n"); + conn_set_state(c, conn_closing); + break; + } switch (transmit(c)) { case TRANSMIT_COMPLETE: if (c->state == conn_mwrite) { @@ -3422,7 +3425,7 @@ static void drive_machine(conn *c) { break; case conn_closing: - if (IS_UDP(c->protocol)) + if (IS_UDP(c->transport)) conn_cleanup(c); else conn_close(c); @@ -3512,7 +3515,7 @@ static void maximize_sndbuf(const int sfd) { fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good); } -static int server_socket(const int port, enum protocol prot) { +static int server_socket(const int port, enum network_transport transport) { int sfd; struct linger ling = {0, 0}; struct addrinfo *ai; @@ -3531,7 +3534,7 @@ static int server_socket(const int port, enum protocol prot) { memset(&hints, 0, sizeof (hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_UNSPEC; - hints.ai_socktype = IS_UDP(prot) ? SOCK_DGRAM : SOCK_STREAM; + hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM; snprintf(port_buf, NI_MAXSERV, "%d", port); error= getaddrinfo(settings.inter, port_buf, &hints, &ai); @@ -3565,7 +3568,7 @@ static int server_socket(const int port, enum protocol prot) { #endif setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); - if (IS_UDP(prot)) { + if (IS_UDP(transport)) { maximize_sndbuf(sfd); } else { error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); @@ -3592,7 +3595,7 @@ static int server_socket(const int port, enum protocol prot) { continue; } else { success++; - if (!IS_UDP(prot) && listen(sfd, settings.backlog) == -1) { + if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) { perror("listen()"); close(sfd); freeaddrinfo(ai); @@ -3600,18 +3603,18 @@ static int server_socket(const int port, enum protocol prot) { } } - if (IS_UDP(prot)) { + if (IS_UDP(transport)) { int c; for (c = 1; c < settings.num_threads; c++) { /* this is guaranteed to hit all threads because we round-robin */ dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, - UDP_READ_BUFFER_SIZE, ascii_udp_prot); + UDP_READ_BUFFER_SIZE, transport); } } else { if (!(listen_conn_add = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, - prot, main_base))) { + transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } @@ -3695,7 +3698,7 @@ static int server_socket_unix(const char *path, int access_mask) { } if (!(listen_conn = conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, - negotiating_prot, main_base))) { + local_transport, main_base))) { fprintf(stderr, "failed to create listening connection\n"); exit(EXIT_FAILURE); } @@ -4233,7 +4236,7 @@ int main (int argc, char **argv) { if (settings.socketpath == NULL) { int udp_port; errno = 0; - if (settings.port && server_socket(settings.port, negotiating_prot)) { + if (settings.port && server_socket(settings.port, tcp_transport)) { fprintf(stderr, "failed to listen on TCP port %d\n", settings.port); if (errno != 0) perror("tcp listen"); @@ -4250,7 +4253,7 @@ int main (int argc, char **argv) { /* create the UDP listening socket and bind it */ errno = 0; - if (settings.udpport && server_socket(settings.udpport, ascii_udp_prot)) { + if (settings.udpport && server_socket(settings.udpport, udp_transport)) { fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport); if (errno != 0) perror("udp listen"); |