diff options
Diffstat (limited to 'proxy_network.c')
-rw-r--r-- | proxy_network.c | 105 |
1 files changed, 58 insertions, 47 deletions
diff --git a/proxy_network.c b/proxy_network.c index 2da41aa..1d9db29 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -18,6 +18,7 @@ enum proxy_be_failures { P_BE_FAIL_OOM, P_BE_FAIL_ENDSYNC, P_BE_FAIL_TRAILINGDATA, + P_BE_FAIL_INVALIDPROTOCOL, }; const char *proxy_be_failure_text[] = { @@ -35,6 +36,7 @@ const char *proxy_be_failure_text[] = { [P_BE_FAIL_OOM] = "outofmemory", [P_BE_FAIL_ENDSYNC] = "missingend", [P_BE_FAIL_TRAILINGDATA] = "trailingdata", + [P_BE_FAIL_INVALIDPROTOCOL] = "invalidprotocol", NULL }; @@ -956,19 +958,31 @@ static void _stop_timeout_event(mcp_backend_t *be) { event_del(&be->timeout_event); } +static void _drive_machine_next(mcp_backend_t *be, io_pending_proxy_t *p) { + struct timeval end; + // set the head here. when we break the head will be correct. + STAILQ_REMOVE_HEAD(&be->io_head, io_next); + be->depth--; + be->pending_read--; + + // stamp the elapsed time into the response object. + gettimeofday(&end, NULL); + p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + + (end.tv_usec - p->client_resp->start.tv_usec); + + // have to do the q->count-- and == 0 and redispatch_conn() + // stuff here. The moment we call return_io here we + // don't own *p anymore. + return_io_pending((io_pending_t *)p); + be->state = mcp_backend_read; +} + // NOTES: // - mcp_backend_read: grab req_stack_head, do things // read -> next, want_read -> next | read_end, etc. -// issue: want read back to read_end as necessary. special state? -// - it's fine: p->client_resp->type. -// - mcp_backend_next: advance, consume, etc. -// TODO (v2): second argument with enum for a specific error. -// - probably just for logging. for app if any of these errors shouldn't -// result in killing the request stack! static int proxy_backend_drive_machine(mcp_backend_t *be) { bool stop = false; io_pending_proxy_t *p = NULL; - struct timeval end; int flags = 0; p = STAILQ_FIRST(&be->io_head); @@ -997,30 +1011,38 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { r = p->client_resp; r->status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r->resp); - if (r->status == MCMC_ERR) { - P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status); - if (r->resp.code == MCMC_WANT_READ) { - return 0; - } - flags = P_BE_FAIL_PARSING; - stop = true; - break; + // Quick check if we need more data. + if (r->resp.code == MCMC_WANT_READ) { + return 0; } // we actually don't care about anything but the value length // TODO (v2): if vlen != vlen_read, pull an item and copy the data. int extra_space = 0; + // if all goes well, move to the next request. + be->state = mcp_backend_next; switch (r->resp.type) { case MCMC_RESP_GET: // We're in GET mode. we only support one key per // GET in the proxy backends, so we need to later check // for an END. extra_space = ENDLEN; + be->state = mcp_backend_read_end; break; case MCMC_RESP_END: // this is a MISS from a GET request // or final handler from a STAT request. assert(r->resp.vlen == 0); + if (p->ascii_multiget) { + // Ascii multiget hack mode; consume END's + be->rbufused -= r->resp.reslen; + if (be->rbufused > 0) { + memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused); + } + + be->state = mcp_backend_next; + continue; + } break; case MCMC_RESP_META: // we can handle meta responses easily since they're self @@ -1029,6 +1051,18 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { case MCMC_RESP_GENERIC: case MCMC_RESP_NUMERIC: break; + case MCMC_RESP_ERRMSG: // received an error message + if (r->resp.code != MCMC_CODE_SERVER_ERROR) { + // Non server errors are protocol errors; can't trust + // the connection anymore. + be->state = mcp_backend_next_close; + } + break; + case MCMC_RESP_FAIL: + P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status); + flags = P_BE_FAIL_PARSING; + stop = true; + break; // TODO (v2): No-op response? default: P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type); @@ -1038,17 +1072,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { break; } - if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) { - // Ascii multiget hack mode; consume END's - be->rbufused -= r->resp.reslen; - if (be->rbufused > 0) { - memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused); - } - - be->state = mcp_backend_next; - break; - } - // r->resp.reslen + r->resp.vlen is the total length of the response. // TODO (v2): need to associate a buffer with this response... // for now we simply malloc, but reusable buffers should be used @@ -1101,12 +1124,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused); } - if (r->resp.type == MCMC_RESP_GET) { - be->state = mcp_backend_read_end; - } else { - be->state = mcp_backend_next; - } - break; case mcp_backend_read_end: r = p->client_resp; @@ -1178,21 +1195,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { break; case mcp_backend_next: - // set the head here. when we break the head will be correct. - STAILQ_REMOVE_HEAD(&be->io_head, io_next); - be->depth--; - be->pending_read--; - - // stamp the elapsed time into the response object. - gettimeofday(&end, NULL); - p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + - (end.tv_usec - p->client_resp->start.tv_usec); - - // have to do the q->count-- and == 0 and redispatch_conn() - // stuff here. The moment we call return_io here we - // don't own *p anymore. - return_io_pending((io_pending_t *)p); - be->state = mcp_backend_read; + _drive_machine_next(be, p); if (STAILQ_EMPTY(&be->io_head)) { stop = true; @@ -1220,6 +1223,13 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { } break; + case mcp_backend_next_close: + // we advance and return the current IO, then kill the conn. + _drive_machine_next(be, p); + stop = true; + flags = P_BE_FAIL_INVALIDPROTOCOL; + + break; default: // TODO (v2): at some point (after v1?) this should attempt to recover, // though we should only get here from memory corruption and @@ -1298,6 +1308,7 @@ static void _backend_failed(mcp_backend_t *be) { // _must_ be called from within the event thread. static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { io_pending_proxy_t *io = NULL; + P_DEBUG("%s: resetting bad backend: %s\n", __func__, proxy_be_failure_text[err]); // Can't use STAILQ_FOREACH() since return_io_pending() free's the current // io. STAILQ_FOREACH_SAFE maybe? int depth = be->depth; |