summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--evhttp.h4
-rw-r--r--evrpc-internal.h2
-rw-r--r--evrpc.c92
-rw-r--r--evrpc.h20
-rw-r--r--http-internal.h2
-rw-r--r--http.c68
-rw-r--r--test/regress_rpc.c81
7 files changed, 222 insertions, 47 deletions
diff --git a/evhttp.h b/evhttp.h
index f31a013c..cb5330ae 100644
--- a/evhttp.h
+++ b/evhttp.h
@@ -150,6 +150,10 @@ struct evhttp_connection *evhttp_connection_new(
/* Frees an http connection */
void evhttp_connection_free(struct evhttp_connection *evcon);
+/* Sets the timeout for events related to this connection */
+void evhttp_connection_set_timeout(struct evhttp_connection *evcon,
+ int timeout_in_secs);
+
/* The connection gets ownership of the request */
int evhttp_make_request(struct evhttp_connection *evcon,
struct evhttp_request *req,
diff --git a/evrpc-internal.h b/evrpc-internal.h
index de2ab47d..656533b6 100644
--- a/evrpc-internal.h
+++ b/evrpc-internal.h
@@ -48,6 +48,8 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
struct evrpc_pool {
struct evconq connections;
+ int timeout;
+
TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests;
};
diff --git a/evrpc.c b/evrpc.c
index bf66c181..d575cd14 100644
--- a/evrpc.c
+++ b/evrpc.c
@@ -85,7 +85,8 @@ evrpc_free(struct evrpc_base *base)
}
-void evrpc_request_cb(struct evhttp_request *, void *);
+static void evrpc_pool_schedule(struct evrpc_pool *pool);
+static void evrpc_request_cb(struct evhttp_request *, void *);
void evrpc_request_done(struct evrpc_req_generic*);
/*
@@ -132,7 +133,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
return (0);
}
-void
+static void
evrpc_request_cb(struct evhttp_request *req, void *arg)
{
struct evrpc *rpc = arg;
@@ -244,6 +245,8 @@ evrpc_pool_new()
TAILQ_INIT(&pool->connections);
TAILQ_INIT(&pool->requests);
+ pool->timeout = -1;
+
return (pool);
}
@@ -286,6 +289,13 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
TAILQ_INSERT_TAIL(&pool->connections, connection, next);
/*
+ * unless a timeout was specifically set for a connection,
+ * the connection inherits the timeout from the pool.
+ */
+ if (connection->timeout == -1)
+ connection->timeout = pool->timeout;
+
+ /*
* if we have any requests pending, schedule them with the new
* connections.
*/
@@ -298,8 +308,19 @@ evrpc_pool_add_connection(struct evrpc_pool *pool,
}
}
+void
+evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
+{
+ struct evhttp_connection *evcon;
+ TAILQ_FOREACH(evcon, &pool->connections, next) {
+ evcon->timeout = timeout_in_secs;
+ }
+ pool->timeout = timeout_in_secs;
+}
+
static void evrpc_reply_done(struct evhttp_request *, void *);
+static void evrpc_request_timeout(int, short, void *);
/*
* Finds a connection object associated with the pool that is currently
@@ -325,6 +346,7 @@ evrpc_schedule_request(struct evhttp_connection *connection,
struct evrpc_request_wrapper *ctx)
{
struct evhttp_request *req = NULL;
+ struct evrpc_pool *pool = ctx->pool;
char *uri = NULL;
int res = 0;
@@ -338,6 +360,19 @@ evrpc_schedule_request(struct evhttp_connection *connection,
if (uri == NULL)
goto error;
+ /* we need to know the connection that we might have to abort */
+ ctx->evcon = connection;
+
+ if (pool->timeout > 0) {
+ /*
+ * a timeout after which the whole rpc is going to be aborted.
+ */
+ struct timeval tv;
+ timerclear(&tv);
+ tv.tv_sec = pool->timeout;
+ evtimer_add(&ctx->ev_timeout, &tv);
+ }
+
/* start the request over the connection */
res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
free(uri);
@@ -357,24 +392,21 @@ int
evrpc_make_request(struct evrpc_request_wrapper *ctx)
{
struct evrpc_pool *pool = ctx->pool;
- struct evhttp_connection *connection;
+
+ /* initialize the event structure for this rpc */
+ evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
/* we better have some available connections on the pool */
assert(TAILQ_FIRST(&pool->connections) != NULL);
-
- /* even if a connection might be available, we do FIFO */
- if (TAILQ_FIRST(&pool->requests) == NULL) {
- connection = evrpc_pool_find_connection(pool);
- if (connection != NULL)
- return evrpc_schedule_request(connection, ctx);
- }
-
/*
* if no connection is available, we queue the request on the pool,
* the next time a connection is empty, the rpc will be send on that.
*/
TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
+
+ evrpc_pool_schedule(pool);
+
return (0);
}
@@ -382,10 +414,15 @@ static void
evrpc_reply_done(struct evhttp_request *req, void *arg)
{
struct evrpc_request_wrapper *ctx = arg;
- int res;
+ struct evrpc_pool *pool = ctx->pool;
+ int res = -1;
+
+ /* cancel any timeout we might have scheduled */
+ event_del(&ctx->ev_timeout);
/* we need to get the reply now */
- res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
+ if (req != NULL)
+ res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
if (res == -1) {
/* clear everything that we might have written previously */
ctx->reply_clear(ctx->reply);
@@ -396,4 +433,33 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
evrpc_request_wrapper_free(ctx);
/* the http layer owns the request structure */
+
+ /* see if we can schedule another request */
+ evrpc_pool_schedule(pool);
+}
+
+static void
+evrpc_pool_schedule(struct evrpc_pool *pool)
+{
+ struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
+ struct evhttp_connection *evcon;
+
+ /* if no requests are pending, we have no work */
+ if (ctx == NULL)
+ return;
+
+ if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
+ TAILQ_REMOVE(&pool->requests, ctx, next);
+ evrpc_schedule_request(evcon, ctx);
+ }
+}
+
+static void
+evrpc_request_timeout(int fd, short what, void *arg)
+{
+ struct evrpc_request_wrapper *ctx = arg;
+ struct evhttp_connection *evcon = ctx->evcon;
+ assert(evcon != NULL);
+
+ evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
}
diff --git a/evrpc.h b/evrpc.h
index 4640d0ba..5f2dfd67 100644
--- a/evrpc.h
+++ b/evrpc.h
@@ -154,6 +154,7 @@ int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \
return (-1); \
} \
ctx->pool = pool; \
+ ctx->evcon = NULL; \
ctx->name = strdup(#rpcname); \
if (ctx->name == NULL) { \
free(ctx); \
@@ -228,6 +229,12 @@ struct evrpc_request_wrapper {
/* pool on which this rpc request is being made */
struct evrpc_pool *pool;
+ /* connection on which the request is being sent */
+ struct evhttp_connection *evcon;
+
+ /* event for implementing request timeouts */
+ struct event ev_timeout;
+
/* the name of the rpc */
char *name;
@@ -262,4 +269,17 @@ void evrpc_pool_free(struct evrpc_pool *);
void evrpc_pool_add_connection(struct evrpc_pool *,
struct evhttp_connection *);
+/*
+ * Sets the timeout in secs after which a request has to complete. The
+ * RPC is completely aborted if it does not complete by then. Setting
+ * the timeout to 0 means that it never timeouts and can be used to
+ * implement callback type RPCs.
+ *
+ * Any connection already in the pool will be updated with the new
+ * timeout. Connections added to the pool after set_timeout has be
+ * called receive the pool timeout only if no timeout has been set
+ * for the connection itself.
+ */
+void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
+
#endif /* _EVRPC_H_ */
diff --git a/http-internal.h b/http-internal.h
index b8ef6639..f95a2090 100644
--- a/http-internal.h
+++ b/http-internal.h
@@ -50,6 +50,8 @@ struct evhttp_connection {
int flags;
#define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */
#define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */
+
+ int timeout; /* timeout in seconds for events */
enum evhttp_connection_state state;
diff --git a/http.c b/http.c
index a330028b..1711eec8 100644
--- a/http.c
+++ b/http.c
@@ -218,12 +218,24 @@ evhttp_method(enum evhttp_cmd_type type)
return (method);
}
+static void
+evhttp_add_event(struct event *ev, int timeout, int default_timeout)
+{
+ if (timeout != 0) {
+ struct timeval tv;
+
+ timerclear(&tv);
+ tv.tv_sec = timeout != -1 ? timeout : default_timeout;
+ event_add(ev, &tv);
+ } else {
+ event_add(ev, NULL);
+ }
+}
+
void
evhttp_write_buffer(struct evhttp_connection *evcon,
void (*cb)(struct evhttp_connection *, void *), void *arg)
{
- struct timeval tv;
-
event_debug(("%s: preparing to write buffer\n", __func__));
/* Set call back */
@@ -232,9 +244,7 @@ evhttp_write_buffer(struct evhttp_connection *evcon,
/* xxx: maybe check if the event is still pending? */
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon);
- timerclear(&tv);
- tv.tv_sec = HTTP_WRITE_TIMEOUT;
- event_add(&evcon->ev, &tv);
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT);
}
/*
@@ -464,7 +474,6 @@ void
evhttp_write(int fd, short what, void *arg)
{
struct evhttp_connection *evcon = arg;
- struct timeval tv;
int n;
if (what == EV_TIMEOUT) {
@@ -486,9 +495,8 @@ evhttp_write(int fd, short what, void *arg)
}
if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) {
- timerclear(&tv);
- tv.tv_sec = HTTP_WRITE_TIMEOUT;
- event_add(&evcon->ev, &tv);
+ evhttp_add_event(&evcon->ev,
+ evcon->timeout, HTTP_WRITE_TIMEOUT);
return;
}
@@ -549,7 +557,6 @@ evhttp_read(int fd, short what, void *arg)
{
struct evhttp_connection *evcon = arg;
struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
- struct timeval tv;
int n;
if (what == EV_TIMEOUT) {
@@ -574,10 +581,8 @@ evhttp_read(int fd, short what, void *arg)
evhttp_connection_done(evcon);
return;
}
-
- timerclear(&tv);
- tv.tv_sec = HTTP_READ_TIMEOUT;
- event_add(&evcon->ev, &tv);
+
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
}
void
@@ -971,7 +976,6 @@ evhttp_parse_lines(struct evhttp_request *req, struct evbuffer* buffer)
void
evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
{
- struct timeval tv;
const char *content_length;
const char *connection;
struct evkeyvalq *headers = req->input_headers;
@@ -1013,16 +1017,12 @@ evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req)
}
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon);
- timerclear(&tv);
- tv.tv_sec = HTTP_READ_TIMEOUT;
- event_add(&evcon->ev, &tv);
- return;
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
}
void
evhttp_read_header(int fd, short what, void *arg)
{
- struct timeval tv;
struct evhttp_connection *evcon = arg;
struct evhttp_request *req = TAILQ_FIRST(&evcon->requests);
int n, res;
@@ -1053,9 +1053,8 @@ evhttp_read_header(int fd, short what, void *arg)
return;
} else if (res == 0) {
/* Need more header lines */
- timerclear(&tv);
- tv.tv_sec = HTTP_READ_TIMEOUT;
- event_add(&evcon->ev, &tv);
+ evhttp_add_event(&evcon->ev,
+ evcon->timeout, HTTP_READ_TIMEOUT);
return;
}
@@ -1105,6 +1104,8 @@ evhttp_connection_new(const char *address, unsigned short port)
evcon->fd = -1;
evcon->port = port;
+ evcon->timeout = -1;
+
if ((evcon->address = strdup(address)) == NULL) {
event_warn("%s: strdup failed", __func__);
goto error;
@@ -1131,11 +1132,16 @@ evhttp_connection_new(const char *address, unsigned short port)
return (NULL);
}
+void
+evhttp_connection_set_timeout(struct evhttp_connection *evcon,
+ int timeout_in_secs)
+{
+ evcon->timeout = timeout_in_secs;
+}
+
int
evhttp_connection_connect(struct evhttp_connection *evcon)
{
- struct timeval tv;
-
if (evcon->state == EVCON_CONNECTING)
return (0);
@@ -1154,9 +1160,7 @@ evhttp_connection_connect(struct evhttp_connection *evcon)
/* Set up a callback for successful connection setup */
event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_connectioncb, evcon);
- timerclear(&tv);
- tv.tv_sec = HTTP_CONNECT_TIMEOUT;
- event_add(&evcon->ev, &tv);
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_CONNECT_TIMEOUT);
evcon->state = EVCON_CONNECTING;
@@ -1217,16 +1221,12 @@ evhttp_make_request(struct evhttp_connection *evcon,
void
evhttp_start_read(struct evhttp_connection *evcon)
{
- struct timeval tv;
-
/* Set up an event to read the headers */
if (event_initialized(&evcon->ev))
event_del(&evcon->ev);
event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read_header, evcon);
-
- timerclear(&tv);
- tv.tv_sec = HTTP_READ_TIMEOUT;
- event_add(&evcon->ev, &tv);
+
+ evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT);
}
void
diff --git a/test/regress_rpc.c b/test/regress_rpc.c
index 863b9571..3db21e38 100644
--- a/test/regress_rpc.c
+++ b/test/regress_rpc.c
@@ -315,11 +315,40 @@ GotKillCb(struct msg *msg, struct kill *kill, void *arg)
goto done;
test_ok += 1;
+
done:
event_loopexit(NULL);
}
static void
+GotKillCbTwo(struct msg *msg, struct kill *kill, void *arg)
+{
+ char *weapon;
+ char *action;
+
+ if (EVTAG_GET(kill, weapon, &weapon) == -1) {
+ fprintf(stderr, "get weapon\n");
+ goto done;
+ }
+ if (EVTAG_GET(kill, action, &action) == -1) {
+ fprintf(stderr, "get action\n");
+ goto done;
+ }
+
+ if (strcmp(weapon, "dagger"))
+ goto done;
+
+ if (strcmp(action, "wave around like an idiot"))
+ goto done;
+
+ test_ok += 1;
+
+done:
+ if (test_ok == 2)
+ event_loopexit(NULL);
+}
+
+static void
rpc_basic_client(void)
{
short port;
@@ -374,10 +403,62 @@ rpc_basic_client(void)
evhttp_free(http);
}
+/*
+ * We are testing that the second requests gets send over the same
+ * connection after the first RPCs completes.
+ */
+static void
+rpc_basic_queued_client(void)
+{
+ short port;
+ struct evhttp *http = NULL;
+ struct evrpc_base *base = NULL;
+ struct evrpc_pool *pool = NULL;
+ struct msg *msg;
+ struct kill *kill_one, *kill_two;
+
+ fprintf(stdout, "Testing RPC (Queued) Client: ");
+
+ rpc_setup(&http, &port, &base);
+
+ pool = rpc_pool_with_connection(port);
+
+ /* set up the basic message */
+ msg = msg_new();
+ EVTAG_ASSIGN(msg, from_name, "niels");
+ EVTAG_ASSIGN(msg, to_name, "tester");
+
+ kill_one = kill_new();
+ kill_two = kill_new();
+
+ EVRPC_MAKE_REQUEST(Message, msg, kill_one, GotKillCbTwo, NULL);
+ EVRPC_MAKE_REQUEST(Message, msg, kill_two, GotKillCb, NULL);
+
+ test_ok = 0;
+
+ event_dispatch();
+
+ if (test_ok != 2) {
+ fprintf(stdout, "FAILED (1)\n");
+ exit(1);
+ }
+
+ fprintf(stdout, "OK\n");
+
+ msg_free(msg);
+ kill_free(kill_one);
+ kill_free(kill_two);
+
+ evrpc_pool_free(pool);
+ evhttp_free(http);
+}
+
+
void
rpc_suite(void)
{
rpc_basic_test();
rpc_basic_message();
rpc_basic_client();
+ rpc_basic_queued_client();
}