summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--proto_proxy.c2
-rw-r--r--proxy.h1
-rw-r--r--proxy_lua.c4
-rw-r--r--proxy_network.c105
-rw-r--r--t/proxyunits.lua16
-rw-r--r--t/proxyunits.t139
6 files changed, 211 insertions, 56 deletions
diff --git a/proto_proxy.c b/proto_proxy.c
index 3e17441..9a6736b 100644
--- a/proto_proxy.c
+++ b/proto_proxy.c
@@ -595,7 +595,7 @@ int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, con
if (type == LUA_TUSERDATA) {
mcp_resp_t *r = luaL_checkudata(Lc, 1, "mcp.response");
_set_noreply_mode(resp, r);
- if (r->status != MCMC_OK) {
+ if (r->status != MCMC_OK && r->resp.type != MCMC_RESP_ERRMSG) {
proxy_out_errstring(resp, "backend failure");
} else if (r->cresp) {
mc_resp *tresp = r->cresp;
diff --git a/proxy.h b/proxy.h
index 24a5ea0..87f2b7e 100644
--- a/proxy.h
+++ b/proxy.h
@@ -251,6 +251,7 @@ enum mcp_backend_states {
mcp_backend_read_end, // looking for an "END" marker for GET
mcp_backend_want_read, // read more data to complete command
mcp_backend_next, // advance to the next IO
+ mcp_backend_next_close, // complete current request, then close socket
};
typedef struct mcp_backend_wrap_s mcp_backend_wrap_t;
diff --git a/proxy_lua.c b/proxy_lua.c
index 25209d0..3297d9f 100644
--- a/proxy_lua.c
+++ b/proxy_lua.c
@@ -1176,6 +1176,10 @@ static void proxy_register_defines(lua_State *L) {
X(MCMC_CODE_OK);
X(MCMC_CODE_NOP);
X(MCMC_CODE_END);
+ X(MCMC_CODE_ERROR);
+ X(MCMC_CODE_CLIENT_ERROR);
+ X(MCMC_CODE_SERVER_ERROR);
+ X(MCMC_ERR);
X(P_OK);
X(CMD_ANY);
X(CMD_ANY_STORAGE);
diff --git a/proxy_network.c b/proxy_network.c
index 2da41aa..1d9db29 100644
--- a/proxy_network.c
+++ b/proxy_network.c
@@ -18,6 +18,7 @@ enum proxy_be_failures {
P_BE_FAIL_OOM,
P_BE_FAIL_ENDSYNC,
P_BE_FAIL_TRAILINGDATA,
+ P_BE_FAIL_INVALIDPROTOCOL,
};
const char *proxy_be_failure_text[] = {
@@ -35,6 +36,7 @@ const char *proxy_be_failure_text[] = {
[P_BE_FAIL_OOM] = "outofmemory",
[P_BE_FAIL_ENDSYNC] = "missingend",
[P_BE_FAIL_TRAILINGDATA] = "trailingdata",
+ [P_BE_FAIL_INVALIDPROTOCOL] = "invalidprotocol",
NULL
};
@@ -956,19 +958,31 @@ static void _stop_timeout_event(mcp_backend_t *be) {
event_del(&be->timeout_event);
}
+static void _drive_machine_next(mcp_backend_t *be, io_pending_proxy_t *p) {
+ struct timeval end;
+ // set the head here. when we break the head will be correct.
+ STAILQ_REMOVE_HEAD(&be->io_head, io_next);
+ be->depth--;
+ be->pending_read--;
+
+ // stamp the elapsed time into the response object.
+ gettimeofday(&end, NULL);
+ p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
+ (end.tv_usec - p->client_resp->start.tv_usec);
+
+ // have to do the q->count-- and == 0 and redispatch_conn()
+ // stuff here. The moment we call return_io here we
+ // don't own *p anymore.
+ return_io_pending((io_pending_t *)p);
+ be->state = mcp_backend_read;
+}
+
// NOTES:
// - mcp_backend_read: grab req_stack_head, do things
// read -> next, want_read -> next | read_end, etc.
-// issue: want read back to read_end as necessary. special state?
-// - it's fine: p->client_resp->type.
-// - mcp_backend_next: advance, consume, etc.
-// TODO (v2): second argument with enum for a specific error.
-// - probably just for logging. for app if any of these errors shouldn't
-// result in killing the request stack!
static int proxy_backend_drive_machine(mcp_backend_t *be) {
bool stop = false;
io_pending_proxy_t *p = NULL;
- struct timeval end;
int flags = 0;
p = STAILQ_FIRST(&be->io_head);
@@ -997,30 +1011,38 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
r = p->client_resp;
r->status = mcmc_parse_buf(be->client, be->rbuf, be->rbufused, &r->resp);
- if (r->status == MCMC_ERR) {
- P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
- if (r->resp.code == MCMC_WANT_READ) {
- return 0;
- }
- flags = P_BE_FAIL_PARSING;
- stop = true;
- break;
+ // Quick check if we need more data.
+ if (r->resp.code == MCMC_WANT_READ) {
+ return 0;
}
// we actually don't care about anything but the value length
// TODO (v2): if vlen != vlen_read, pull an item and copy the data.
int extra_space = 0;
+ // if all goes well, move to the next request.
+ be->state = mcp_backend_next;
switch (r->resp.type) {
case MCMC_RESP_GET:
// We're in GET mode. we only support one key per
// GET in the proxy backends, so we need to later check
// for an END.
extra_space = ENDLEN;
+ be->state = mcp_backend_read_end;
break;
case MCMC_RESP_END:
// this is a MISS from a GET request
// or final handler from a STAT request.
assert(r->resp.vlen == 0);
+ if (p->ascii_multiget) {
+ // Ascii multiget hack mode; consume END's
+ be->rbufused -= r->resp.reslen;
+ if (be->rbufused > 0) {
+ memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused);
+ }
+
+ be->state = mcp_backend_next;
+ continue;
+ }
break;
case MCMC_RESP_META:
// we can handle meta responses easily since they're self
@@ -1029,6 +1051,18 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
case MCMC_RESP_GENERIC:
case MCMC_RESP_NUMERIC:
break;
+ case MCMC_RESP_ERRMSG: // received an error message
+ if (r->resp.code != MCMC_CODE_SERVER_ERROR) {
+ // Non server errors are protocol errors; can't trust
+ // the connection anymore.
+ be->state = mcp_backend_next_close;
+ }
+ break;
+ case MCMC_RESP_FAIL:
+ P_DEBUG("%s: mcmc_read failed [%d]\n", __func__, r->status);
+ flags = P_BE_FAIL_PARSING;
+ stop = true;
+ break;
// TODO (v2): No-op response?
default:
P_DEBUG("%s: Unhandled response from backend: %d\n", __func__, r->resp.type);
@@ -1038,17 +1072,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
break;
}
- if (p->ascii_multiget && r->resp.type == MCMC_RESP_END) {
- // Ascii multiget hack mode; consume END's
- be->rbufused -= r->resp.reslen;
- if (be->rbufused > 0) {
- memmove(be->rbuf, be->rbuf+r->resp.reslen, be->rbufused);
- }
-
- be->state = mcp_backend_next;
- break;
- }
-
// r->resp.reslen + r->resp.vlen is the total length of the response.
// TODO (v2): need to associate a buffer with this response...
// for now we simply malloc, but reusable buffers should be used
@@ -1101,12 +1124,6 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
memmove(be->rbuf, be->rbuf+r->resp.reslen+r->resp.vlen_read, be->rbufused);
}
- if (r->resp.type == MCMC_RESP_GET) {
- be->state = mcp_backend_read_end;
- } else {
- be->state = mcp_backend_next;
- }
-
break;
case mcp_backend_read_end:
r = p->client_resp;
@@ -1178,21 +1195,7 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
break;
case mcp_backend_next:
- // set the head here. when we break the head will be correct.
- STAILQ_REMOVE_HEAD(&be->io_head, io_next);
- be->depth--;
- be->pending_read--;
-
- // stamp the elapsed time into the response object.
- gettimeofday(&end, NULL);
- p->client_resp->elapsed = (end.tv_sec - p->client_resp->start.tv_sec) * 1000000 +
- (end.tv_usec - p->client_resp->start.tv_usec);
-
- // have to do the q->count-- and == 0 and redispatch_conn()
- // stuff here. The moment we call return_io here we
- // don't own *p anymore.
- return_io_pending((io_pending_t *)p);
- be->state = mcp_backend_read;
+ _drive_machine_next(be, p);
if (STAILQ_EMPTY(&be->io_head)) {
stop = true;
@@ -1220,6 +1223,13 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
}
break;
+ case mcp_backend_next_close:
+ // we advance and return the current IO, then kill the conn.
+ _drive_machine_next(be, p);
+ stop = true;
+ flags = P_BE_FAIL_INVALIDPROTOCOL;
+
+ break;
default:
// TODO (v2): at some point (after v1?) this should attempt to recover,
// though we should only get here from memory corruption and
@@ -1298,6 +1308,7 @@ static void _backend_failed(mcp_backend_t *be) {
// _must_ be called from within the event thread.
static int _reset_bad_backend(mcp_backend_t *be, enum proxy_be_failures err) {
io_pending_proxy_t *io = NULL;
+ P_DEBUG("%s: resetting bad backend: %s\n", __func__, proxy_be_failure_text[err]);
// Can't use STAILQ_FOREACH() since return_io_pending() free's the current
// io. STAILQ_FOREACH_SAFE maybe?
int depth = be->depth;
diff --git a/t/proxyunits.lua b/t/proxyunits.lua
index e555866..94a0b80 100644
--- a/t/proxyunits.lua
+++ b/t/proxyunits.lua
@@ -89,6 +89,22 @@ function mcp_config_routes(zones)
pfx_md["b"] = basic
pfx_ma["b"] = basic
+ pfx_get["errcheck"] = function(r)
+ local res = zones.z1(r)
+ -- expect an error
+ if res:ok() then
+ return "FAIL\r\n"
+ end
+ if res:code() == mcp.MCMC_CODE_ERROR then
+ return "ERROR\r\n"
+ elseif res:code() == mcp.MCMC_CODE_CLIENT_ERROR then
+ return "CLIENT_ERROR\r\n"
+ elseif res:code() == mcp.MCMC_CODE_SERVER_ERROR then
+ return "SERVER_ERROR\r\n"
+ end
+ return "FAIL"
+ end
+
-- show that we fetched the key by generating our own response string.
pfx_get["getkey"] = function(r)
return "VALUE |" .. r:key() .. " 0 2\r\nts\r\nEND\r\n"
diff --git a/t/proxyunits.t b/t/proxyunits.t
index 9d8b302..e76c11d 100644
--- a/t/proxyunits.t
+++ b/t/proxyunits.t
@@ -488,14 +488,6 @@ check_version($ps);
# - fetch all three zones
# - hit the same zone multiple times
-# Test out of spec commands from client
-# - wrong # of tokens
-# - bad key size
-# - etc
-
-# Test errors/garbage from server
-# - certain errors pass through to the client, most close the backend.
-
# Test delayed read (timeout)
# Test Lua logging (see t/watcher.t)
@@ -726,4 +718,135 @@ check_version($ps);
}
check_version($ps);
+# Test out of spec commands from client
+# - wrong # of tokens
+# - bad key size
+# - etc
+
+# Test errors/garbage from server
+# - certain errors pass through to the client, most close the backend.
+# - should be able to retrieve the error message
+{
+ my $be = $mbe[0];
+ print $ps "set /b/foo 0 0 2\r\nhi\r\n";
+ is(scalar <$be>, "set /b/foo 0 0 2\r\n", "received set cmd");
+ is(scalar <$be>, "hi\r\n", "received set data");
+ # Send a classic back up the pipe.
+ my $msg = "SERVER_ERROR object too large for cache\r\n";
+ print $be $msg;
+ is(scalar <$ps>, $msg, "client received error message");
+
+ print $ps "get /b/foo\r\n";
+ is(scalar <$be>, "get /b/foo\r\n", "backend still works");
+ print $be "END\r\n";
+ is(scalar <$ps>, "END\r\n", "got end back");
+
+ # ERROR and CLIENT_ERROR should both break the backend.
+ print $ps "get /b/moo\r\n";
+ is(scalar <$be>, "get /b/moo\r\n", "received get command");
+ $msg = "CLIENT_ERROR bad command line format\r\n";
+ my $data;
+ print $be $msg;
+ is(scalar <$ps>, $msg, "client received error message");
+ my $read = $be->read($data, 1);
+ is($read, 0, "backend disconnected");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ print $ps "get /b/too\r\n";
+ is(scalar <$be>, "get /b/too\r\n", "received get command");
+ $msg = "ERROR unhappy\r\n";
+ print $be $msg;
+ is(scalar <$ps>, $msg, "client received error message");
+ $read = $be->read($data, 1);
+ is($read, 0, "backend disconnected");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ # Sometimes blank ERRORS can be sent.
+ print $ps "get /b/zoo\r\n";
+ is(scalar <$be>, "get /b/zoo\r\n", "received get command");
+ $msg = "ERROR\r\n";
+ print $be $msg;
+ is(scalar <$ps>, $msg, "client received error message");
+ $read = $be->read($data, 1);
+ is($read, 0, "backend disconnected");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ # Ensure garbage doesn't surface to client.
+ print $ps "get /b/doo\r\n";
+ is(scalar <$be>, "get /b/doo\r\n", "received get command");
+ print $be "garbage\r\n"; # don't need the \r\n but it makes tests easier
+ is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "generic backend error");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ # Check errors from pipelined commands past a CLIENT_ERROR
+ print $ps "get /b/quu\r\nget /b/muu\r\n";
+ is(scalar <$be>, "get /b/quu\r\n", "received get command");
+ is(scalar <$be>, "get /b/muu\r\n", "received next get command");
+ print $be "CLIENT_ERROR bad protocol\r\nEND\r\n";
+ is(scalar <$ps>, "CLIENT_ERROR bad protocol\r\n", "backend error");
+ is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "backend error");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ # Check that lua handles errors properly.
+ print $ps "get /errcheck/a\r\n";
+ is(scalar <$be>, "get /errcheck/a\r\n", "received get command");
+ print $be "ERROR test1\r\n";
+ is(scalar <$ps>, "ERROR\r\n", "lua saw correct error code");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ print $ps "get /errcheck/b\r\n";
+ is(scalar <$be>, "get /errcheck/b\r\n", "received get command");
+ print $be "CLIENT_ERROR test2\r\n";
+ is(scalar <$ps>, "CLIENT_ERROR\r\n", "lua saw correct error code");
+
+ # re-accept the backend.
+ $be = $mocksrvs[0]->accept();
+ $be->autoflush(1);
+ like(<$be>, qr/version/, "received version command");
+ print $be "VERSION 1.0.0-mock\r\n";
+ $mbe[0] = $be;
+
+ print $ps "get /errcheck/c\r\n";
+ is(scalar <$be>, "get /errcheck/c\r\n", "received get command");
+ print $be "SERVER_ERROR test3\r\n";
+ is(scalar <$ps>, "SERVER_ERROR\r\n", "lua saw correct error code");
+}
+
+check_version($ps);
done_testing();