summaryrefslogtreecommitdiff
path: root/proxy_network.c
diff options
context:
space:
mode:
Diffstat (limited to 'proxy_network.c')
-rw-r--r--proxy_network.c105
1 files changed, 58 insertions, 47 deletions
diff --git a/proxy_network.c b/proxy_network.c
index 2da41aa..1d9db29 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -18,6 +18,7 @@ enum proxy_be_failures {
P_BE_FAIL_OOM,
P_BE_FAIL_ENDSYNC,
P_BE_FAIL_TRAILINGDATA,
+ P_BE_FAIL_INVALIDPROTOCOL,
};
const char *proxy_be_failure_text[] = {
@@ -35,6 +36,7 @@ const char *proxy_be_failure_text[] = {
[P_BE_FAIL_OOM] = "outofmemory",
[P_BE_FAIL_ENDSYNC] = "missingend",
[P_BE_FAIL_TRAILINGDATA] = "trailingdata",
+ [P_BE_FAIL_INVALIDPROTOCOL] = "invalidprotocol",
NULL
};
@@ -956,19 +958,31 @@ static void _stop_timeout_event(mcp_backend_t *be) {
event_del(&be->timeout_event);
}
+static void _drive_machine_next(mcp_backend_t *be, io_pending_proxy_t *p) {
+ struct timeval end;
+ // set the head here. when we break the head will be correct.
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ be->depth--;
+ be->pending_read--;
+
+ // stamp the elapsed time into the response object.
+ gettimeofday(&end, NULL);
+ p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
+ (end.tv_usec - p->client_resp->start.tv_usec);
+
+ // have to do the q->count-- and == 0 and redispatch_conn()
+ // stuff here. The moment we call return_io here we
+ // don't own *p anymore.
+ return_io_pending((io_pending_t *)p);
+ be->state = mcp_backend_read;
+}
+
// NOTES:
// - mcp_backend_read: grab req_stack_head, do things
// read -> next, want_read -> next | read_end, etc.
-// issue: want read back to read_end as necessary. special state?
-// - it's fine: p->client_resp->type.
-// - mcp_backend_next: advance, consume, etc.
-// TODO (v2): second argument with enum for a specific error.
-// - probably just for logging. for app if any of these errors shouldn't
-// result in killing the request stack!
static int proxy_backend_drive_machine(mcp_backend_t *be) {
bool stop = false;
io_pending_proxy_t *p = NULL;
- struct timeval end;
int flags = 0;
p = STAILQ_FIRST(&be->io_head);
@@ -997,30 +1011,38 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
r = p->client_resp;
r->status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r->resp);
- if (r->status == MCMC_ERR) {
- P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
- if (r->resp.code == MCMC_WANT_READ) {
- return 0;
- }
- flags = P_BE_FAIL_PARSING;
- stop = true;
- break;
+ // Quick check if we need more data.
+ if (r->resp.code == MCMC_WANT_READ) {
+ return 0;
}
// we actually don't care about anything but the value length
// TODO (v2): if vlen != vlen_read, pull an item and copy the data.
int extra_space = 0;
+ // if all goes well, move to the next request.
+ be->state = mcp_backend_next;
switch (r->resp.type) {
case MCMC_RESP_GET:
// We're in GET mode. we only support one key per
// GET in the proxy backends, so we need to later check
// for an END.
extra_space = ENDLEN;
+ be->state = mcp_backend_read_end;
break;
case MCMC_RESP_END:
// this is a MISS from a GET request
// or final handler from a STAT request.
assert(r->resp.vlen == 0);
+ if (p->ascii_multiget) {
+ // Ascii multiget hack mode; consume END's
+ be->rbufused -= r->resp.reslen;
+ if (be->rbufused > 0) {
+ memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused);
+ }
+
+ be->state = mcp_backend_next;
+ continue;
+ }
break;
case MCMC_RESP_META:
// we can handle meta responses easily since they're self
@@ -1029,6 +1051,18 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
case MCMC_RESP_GENERIC:
case MCMC_RESP_NUMERIC:
break;
+ case MCMC_RESP_ERRMSG: // received an error message
+ if (r->resp.code != MCMC_CODE_SERVER_ERROR) {
+ // Non server errors are protocol errors; can't trust
+ // the connection anymore.
+ be->state = mcp_backend_next_close;
+ }
+ break;
+ case MCMC_RESP_FAIL:
+ P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
+ flags = P_BE_FAIL_PARSING;
+ stop = true;
+ break;
// TODO (v2): No-op response?
default:
P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
@@ -1038,17 +1072,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
break;
}
- if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) {
- // Ascii multiget hack mode; consume END's
- be->rbufused -= r->resp.reslen;
- if (be->rbufused > 0) {
- memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused);
- }
-
- be->state = mcp_backend_next;
- break;
- }
-
// r->resp.reslen + r->resp.vlen is the total length of the response.
// TODO (v2): need to associate a buffer with this response...
// for now we simply malloc, but reusable buffers should be used
@@ -1101,12 +1124,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused);
}
- if (r->resp.type == MCMC_RESP_GET) {
- be->state = mcp_backend_read_end;
- } else {
- be->state = mcp_backend_next;
- }
-
break;
case mcp_backend_read_end:
r = p->client_resp;
@@ -1178,21 +1195,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
break;
case mcp_backend_next:
- // set the head here. when we break the head will be correct.
- STAILQ_REMOVE_HEAD(&be->io_head, io_next);
- be->depth--;
- be->pending_read--;
-
- // stamp the elapsed time into the response object.
- gettimeofday(&end, NULL);
- p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
- (end.tv_usec - p->client_resp->start.tv_usec);
-
- // have to do the q->count-- and == 0 and redispatch_conn()
- // stuff here. The moment we call return_io here we
- // don't own *p anymore.
- return_io_pending((io_pending_t *)p);
- be->state = mcp_backend_read;
+ _drive_machine_next(be, p);
if (STAILQ_EMPTY(&be->io_head)) {
stop = true;
@@ -1220,6 +1223,13 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
}
break;
+ case mcp_backend_next_close:
+ // we advance and return the current IO, then kill the conn.
+ _drive_machine_next(be, p);
+ stop = true;
+ flags = P_BE_FAIL_INVALIDPROTOCOL;
+
+ break;
default:
// TODO (v2): at some point (after v1?) this should attempt to recover,
// though we should only get here from memory corruption and
@@ -1298,6 +1308,7 @@ static void _backend_failed(mcp_backend_t *be) {
// _must_ be called from within the event thread.
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
io_pending_proxy_t *io = NULL;
+ P_DEBUG("%s: resetting bad backend: %s\n", __func__, proxy_be_failure_text[err]);
// Can't use STAILQ_FOREACH() since return_io_pending() free's the current
// io. STAILQ_FOREACH_SAFE maybe?
int depth = be->depth;