diff options
-rw-r--r-- | proto_proxy.c | 2 | ||||
-rw-r--r-- | proxy.h | 1 | ||||
-rw-r--r-- | proxy_lua.c | 4 | ||||
-rw-r--r-- | proxy_network.c | 105 | ||||
-rw-r--r-- | t/proxyunits.lua | 16 | ||||
-rw-r--r-- | t/proxyunits.t | 139 |
6 files changed, 211 insertions, 56 deletions
diff --git a/proto_proxy.c b/proto_proxy.c index 3e17441..9a6736b 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -595,7 +595,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con if (type == LUA_TUSERDATA) { mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response"); _set_noreply_mode(resp, r); - if (r->status != MCMC_OK) { + if (r->status != MCMC_OK && r->resp.type != MCMC_RESP_ERRMSG) { proxy_out_errstring(resp, "backend failure"); } else if (r->cresp) { mc_resp *tresp = r->cresp; @@ -251,6 +251,7 @@ enum mcp_backend_states { mcp_backend_read_end, // looking for an "END" marker for GET mcp_backend_want_read, // read more data to complete command mcp_backend_next, // advance to the next IO + mcp_backend_next_close, // complete current request, then close socket }; typedef struct mcp_backend_wrap_s mcp_backend_wrap_t; diff --git a/proxy_lua.c b/proxy_lua.c index 25209d0..3297d9f 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -1176,6 +1176,10 @@ static void proxy_register_defines(lua_State *L) { X(MCMC_CODE_OK); X(MCMC_CODE_NOP); X(MCMC_CODE_END); + X(MCMC_CODE_ERROR); + X(MCMC_CODE_CLIENT_ERROR); + X(MCMC_CODE_SERVER_ERROR); + X(MCMC_ERR); X(P_OK); X(CMD_ANY); X(CMD_ANY_STORAGE); diff --git a/proxy_network.c b/proxy_network.c index 2da41aa..1d9db29 100644 --- a/proxy_network.c +++ b/proxy_network.c @@ -18,6 +18,7 @@ enum proxy_be_failures { P_BE_FAIL_OOM, P_BE_FAIL_ENDSYNC, P_BE_FAIL_TRAILINGDATA, + P_BE_FAIL_INVALIDPROTOCOL, }; const char *proxy_be_failure_text[] = { @@ -35,6 +36,7 @@ const char *proxy_be_failure_text[] = { [P_BE_FAIL_OOM] = "outofmemory", [P_BE_FAIL_ENDSYNC] = "missingend", [P_BE_FAIL_TRAILINGDATA] = "trailingdata", + [P_BE_FAIL_INVALIDPROTOCOL] = "invalidprotocol", NULL }; @@ -956,19 +958,31 @@ static void _stop_timeout_event(mcp_backend_t *be) { event_del(&be->timeout_event); } +static void _drive_machine_next(mcp_backend_t *be, io_pending_proxy_t *p) { + struct timeval end; + // set the head here. when we break the head will be correct. + STAILQ_REMOVE_HEAD(&be->io_head, io_next); + be->depth--; + be->pending_read--; + + // stamp the elapsed time into the response object. + gettimeofday(&end, NULL); + p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + + (end.tv_usec - p->client_resp->start.tv_usec); + + // have to do the q->count-- and == 0 and redispatch_conn() + // stuff here. The moment we call return_io here we + // don't own *p anymore. + return_io_pending((io_pending_t *)p); + be->state = mcp_backend_read; +} + // NOTES: // - mcp_backend_read: grab req_stack_head, do things // read -> next, want_read -> next | read_end, etc. -// issue: want read back to read_end as necessary. special state? -// - it's fine: p->client_resp->type. -// - mcp_backend_next: advance, consume, etc. -// TODO (v2): second argument with enum for a specific error. -// - probably just for logging. for app if any of these errors shouldn't -// result in killing the request stack! static int proxy_backend_drive_machine(mcp_backend_t *be) { bool stop = false; io_pending_proxy_t *p = NULL; - struct timeval end; int flags = 0; p = STAILQ_FIRST(&be->io_head); @@ -997,30 +1011,38 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { r = p->client_resp; r->status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r->resp); - if (r->status == MCMC_ERR) { - P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status); - if (r->resp.code == MCMC_WANT_READ) { - return 0; - } - flags = P_BE_FAIL_PARSING; - stop = true; - break; + // Quick check if we need more data. + if (r->resp.code == MCMC_WANT_READ) { + return 0; } // we actually don't care about anything but the value length // TODO (v2): if vlen != vlen_read, pull an item and copy the data. int extra_space = 0; + // if all goes well, move to the next request. + be->state = mcp_backend_next; switch (r->resp.type) { case MCMC_RESP_GET: // We're in GET mode. we only support one key per // GET in the proxy backends, so we need to later check // for an END. extra_space = ENDLEN; + be->state = mcp_backend_read_end; break; case MCMC_RESP_END: // this is a MISS from a GET request // or final handler from a STAT request. assert(r->resp.vlen == 0); + if (p->ascii_multiget) { + // Ascii multiget hack mode; consume END's + be->rbufused -= r->resp.reslen; + if (be->rbufused > 0) { + memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused); + } + + be->state = mcp_backend_next; + continue; + } break; case MCMC_RESP_META: // we can handle meta responses easily since they're self @@ -1029,6 +1051,18 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { case MCMC_RESP_GENERIC: case MCMC_RESP_NUMERIC: break; + case MCMC_RESP_ERRMSG: // received an error message + if (r->resp.code != MCMC_CODE_SERVER_ERROR) { + // Non server errors are protocol errors; can't trust + // the connection anymore. + be->state = mcp_backend_next_close; + } + break; + case MCMC_RESP_FAIL: + P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status); + flags = P_BE_FAIL_PARSING; + stop = true; + break; // TODO (v2): No-op response? default: P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type); @@ -1038,17 +1072,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { break; } - if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) { - // Ascii multiget hack mode; consume END's - be->rbufused -= r->resp.reslen; - if (be->rbufused > 0) { - memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused); - } - - be->state = mcp_backend_next; - break; - } - // r->resp.reslen + r->resp.vlen is the total length of the response. // TODO (v2): need to associate a buffer with this response... // for now we simply malloc, but reusable buffers should be used @@ -1101,12 +1124,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused); } - if (r->resp.type == MCMC_RESP_GET) { - be->state = mcp_backend_read_end; - } else { - be->state = mcp_backend_next; - } - break; case mcp_backend_read_end: r = p->client_resp; @@ -1178,21 +1195,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { break; case mcp_backend_next: - // set the head here. when we break the head will be correct. - STAILQ_REMOVE_HEAD(&be->io_head, io_next); - be->depth--; - be->pending_read--; - - // stamp the elapsed time into the response object. - gettimeofday(&end, NULL); - p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 + - (end.tv_usec - p->client_resp->start.tv_usec); - - // have to do the q->count-- and == 0 and redispatch_conn() - // stuff here. The moment we call return_io here we - // don't own *p anymore. - return_io_pending((io_pending_t *)p); - be->state = mcp_backend_read; + _drive_machine_next(be, p); if (STAILQ_EMPTY(&be->io_head)) { stop = true; @@ -1220,6 +1223,13 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) { } break; + case mcp_backend_next_close: + // we advance and return the current IO, then kill the conn. + _drive_machine_next(be, p); + stop = true; + flags = P_BE_FAIL_INVALIDPROTOCOL; + + break; default: // TODO (v2): at some point (after v1?) this should attempt to recover, // though we should only get here from memory corruption and @@ -1298,6 +1308,7 @@ static void _backend_failed(mcp_backend_t *be) { // _must_ be called from within the event thread. static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) { io_pending_proxy_t *io = NULL; + P_DEBUG("%s: resetting bad backend: %s\n", __func__, proxy_be_failure_text[err]); // Can't use STAILQ_FOREACH() since return_io_pending() free's the current // io. STAILQ_FOREACH_SAFE maybe? int depth = be->depth; diff --git a/t/proxyunits.lua b/t/proxyunits.lua index e555866..94a0b80 100644 --- a/t/proxyunits.lua +++ b/t/proxyunits.lua @@ -89,6 +89,22 @@ function mcp_config_routes(zones) pfx_md["b"] = basic pfx_ma["b"] = basic + pfx_get["errcheck"] = function(r) + local res = zones.z1(r) + -- expect an error + if res:ok() then + return "FAIL\r\n" + end + if res:code() == mcp.MCMC_CODE_ERROR then + return "ERROR\r\n" + elseif res:code() == mcp.MCMC_CODE_CLIENT_ERROR then + return "CLIENT_ERROR\r\n" + elseif res:code() == mcp.MCMC_CODE_SERVER_ERROR then + return "SERVER_ERROR\r\n" + end + return "FAIL" + end + -- show that we fetched the key by generating our own response string. pfx_get["getkey"] = function(r) return "VALUE |" .. r:key() .. " 0 2\r\nts\r\nEND\r\n" diff --git a/t/proxyunits.t b/t/proxyunits.t index 9d8b302..e76c11d 100644 --- a/t/proxyunits.t +++ b/t/proxyunits.t @@ -488,14 +488,6 @@ check_version($ps); # - fetch all three zones # - hit the same zone multiple times -# Test out of spec commands from client -# - wrong # of tokens -# - bad key size -# - etc - -# Test errors/garbage from server -# - certain errors pass through to the client, most close the backend. - # Test delayed read (timeout) # Test Lua logging (see t/watcher.t) @@ -726,4 +718,135 @@ check_version($ps); } check_version($ps); +# Test out of spec commands from client +# - wrong # of tokens +# - bad key size +# - etc + +# Test errors/garbage from server +# - certain errors pass through to the client, most close the backend. +# - should be able to retrieve the error message +{ + my $be = $mbe[0]; + print $ps "set /b/foo 0 0 2\r\nhi\r\n"; + is(scalar <$be>, "set /b/foo 0 0 2\r\n", "received set cmd"); + is(scalar <$be>, "hi\r\n", "received set data"); + # Send a classic back up the pipe. + my $msg = "SERVER_ERROR object too large for cache\r\n"; + print $be $msg; + is(scalar <$ps>, $msg, "client received error message"); + + print $ps "get /b/foo\r\n"; + is(scalar <$be>, "get /b/foo\r\n", "backend still works"); + print $be "END\r\n"; + is(scalar <$ps>, "END\r\n", "got end back"); + + # ERROR and CLIENT_ERROR should both break the backend. + print $ps "get /b/moo\r\n"; + is(scalar <$be>, "get /b/moo\r\n", "received get command"); + $msg = "CLIENT_ERROR bad command line format\r\n"; + my $data; + print $be $msg; + is(scalar <$ps>, $msg, "client received error message"); + my $read = $be->read($data, 1); + is($read, 0, "backend disconnected"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + print $ps "get /b/too\r\n"; + is(scalar <$be>, "get /b/too\r\n", "received get command"); + $msg = "ERROR unhappy\r\n"; + print $be $msg; + is(scalar <$ps>, $msg, "client received error message"); + $read = $be->read($data, 1); + is($read, 0, "backend disconnected"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + # Sometimes blank ERRORS can be sent. + print $ps "get /b/zoo\r\n"; + is(scalar <$be>, "get /b/zoo\r\n", "received get command"); + $msg = "ERROR\r\n"; + print $be $msg; + is(scalar <$ps>, $msg, "client received error message"); + $read = $be->read($data, 1); + is($read, 0, "backend disconnected"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + # Ensure garbage doesn't surface to client. + print $ps "get /b/doo\r\n"; + is(scalar <$be>, "get /b/doo\r\n", "received get command"); + print $be "garbage\r\n"; # don't need the \r\n but it makes tests easier + is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "generic backend error"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + # Check errors from pipelined commands past a CLIENT_ERROR + print $ps "get /b/quu\r\nget /b/muu\r\n"; + is(scalar <$be>, "get /b/quu\r\n", "received get command"); + is(scalar <$be>, "get /b/muu\r\n", "received next get command"); + print $be "CLIENT_ERROR bad protocol\r\nEND\r\n"; + is(scalar <$ps>, "CLIENT_ERROR bad protocol\r\n", "backend error"); + is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "backend error"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + # Check that lua handles errors properly. + print $ps "get /errcheck/a\r\n"; + is(scalar <$be>, "get /errcheck/a\r\n", "received get command"); + print $be "ERROR test1\r\n"; + is(scalar <$ps>, "ERROR\r\n", "lua saw correct error code"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + print $ps "get /errcheck/b\r\n"; + is(scalar <$be>, "get /errcheck/b\r\n", "received get command"); + print $be "CLIENT_ERROR test2\r\n"; + is(scalar <$ps>, "CLIENT_ERROR\r\n", "lua saw correct error code"); + + # re-accept the backend. + $be = $mocksrvs[0]->accept(); + $be->autoflush(1); + like(<$be>, qr/version/, "received version command"); + print $be "VERSION 1.0.0-mock\r\n"; + $mbe[0] = $be; + + print $ps "get /errcheck/c\r\n"; + is(scalar <$be>, "get /errcheck/c\r\n", "received get command"); + print $be "SERVER_ERROR test3\r\n"; + is(scalar <$ps>, "SERVER_ERROR\r\n", "lua saw correct error code"); +} + +check_version($ps); done_testing(); |