diff options
-rw-r--r-- | evhttp.h | 4 | ||||
-rw-r--r-- | evrpc-internal.h | 2 | ||||
-rw-r--r-- | evrpc.c | 92 | ||||
-rw-r--r-- | evrpc.h | 20 | ||||
-rw-r--r-- | http-internal.h | 2 | ||||
-rw-r--r-- | http.c | 68 | ||||
-rw-r--r-- | test/regress_rpc.c | 81 |
7 files changed, 222 insertions, 47 deletions
@@ -150,6 +150,10 @@ struct evhttp_connection *evhttp_connection_new( /* Frees an http connection */ void evhttp_connection_free(struct evhttp_connection *evcon); +/* Sets the timeout for events related to this connection */ +void evhttp_connection_set_timeout(struct evhttp_connection *evcon, + int timeout_in_secs); + /* The connection gets ownership of the request */ int evhttp_make_request(struct evhttp_connection *evcon, struct evhttp_request *req, diff --git a/evrpc-internal.h b/evrpc-internal.h index de2ab47d..656533b6 100644 --- a/evrpc-internal.h +++ b/evrpc-internal.h @@ -48,6 +48,8 @@ void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state); struct evrpc_pool { struct evconq connections; + int timeout; + TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests; }; @@ -85,7 +85,8 @@ evrpc_free(struct evrpc_base *base) } -void evrpc_request_cb(struct evhttp_request *, void *); +static void evrpc_pool_schedule(struct evrpc_pool *pool); +static void evrpc_request_cb(struct evhttp_request *, void *); void evrpc_request_done(struct evrpc_req_generic*); /* @@ -132,7 +133,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc, return (0); } -void +static void evrpc_request_cb(struct evhttp_request *req, void *arg) { struct evrpc *rpc = arg; @@ -244,6 +245,8 @@ evrpc_pool_new() TAILQ_INIT(&pool->connections); TAILQ_INIT(&pool->requests); + pool->timeout = -1; + return (pool); } @@ -286,6 +289,13 @@ evrpc_pool_add_connection(struct evrpc_pool *pool, TAILQ_INSERT_TAIL(&pool->connections, connection, next); /* + * unless a timeout was specifically set for a connection, + * the connection inherits the timeout from the pool. + */ + if (connection->timeout == -1) + connection->timeout = pool->timeout; + + /* * if we have any requests pending, schedule them with the new * connections. */ @@ -298,8 +308,19 @@ evrpc_pool_add_connection(struct evrpc_pool *pool, } } +void +evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs) +{ + struct evhttp_connection *evcon; + TAILQ_FOREACH(evcon, &pool->connections, next) { + evcon->timeout = timeout_in_secs; + } + pool->timeout = timeout_in_secs; +} + static void evrpc_reply_done(struct evhttp_request *, void *); +static void evrpc_request_timeout(int, short, void *); /* * Finds a connection object associated with the pool that is currently @@ -325,6 +346,7 @@ evrpc_schedule_request(struct evhttp_connection *connection, struct evrpc_request_wrapper *ctx) { struct evhttp_request *req = NULL; + struct evrpc_pool *pool = ctx->pool; char *uri = NULL; int res = 0; @@ -338,6 +360,19 @@ evrpc_schedule_request(struct evhttp_connection *connection, if (uri == NULL) goto error; + /* we need to know the connection that we might have to abort */ + ctx->evcon = connection; + + if (pool->timeout > 0) { + /* + * a timeout after which the whole rpc is going to be aborted. + */ + struct timeval tv; + timerclear(&tv); + tv.tv_sec = pool->timeout; + evtimer_add(&ctx->ev_timeout, &tv); + } + /* start the request over the connection */ res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri); free(uri); @@ -357,24 +392,21 @@ int evrpc_make_request(struct evrpc_request_wrapper *ctx) { struct evrpc_pool *pool = ctx->pool; - struct evhttp_connection *connection; + + /* initialize the event structure for this rpc */ + evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx); /* we better have some available connections on the pool */ assert(TAILQ_FIRST(&pool->connections) != NULL); - - /* even if a connection might be available, we do FIFO */ - if (TAILQ_FIRST(&pool->requests) == NULL) { - connection = evrpc_pool_find_connection(pool); - if (connection != NULL) - return evrpc_schedule_request(connection, ctx); - } - /* * if no connection is available, we queue the request on the pool, * the next time a connection is empty, the rpc will be send on that. */ TAILQ_INSERT_TAIL(&pool->requests, ctx, next); + + evrpc_pool_schedule(pool); + return (0); } @@ -382,10 +414,15 @@ static void evrpc_reply_done(struct evhttp_request *req, void *arg) { struct evrpc_request_wrapper *ctx = arg; - int res; + struct evrpc_pool *pool = ctx->pool; + int res = -1; + + /* cancel any timeout we might have scheduled */ + event_del(&ctx->ev_timeout); /* we need to get the reply now */ - res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); + if (req != NULL) + res = ctx->reply_unmarshal(ctx->reply, req->input_buffer); if (res == -1) { /* clear everything that we might have written previously */ ctx->reply_clear(ctx->reply); @@ -396,4 +433,33 @@ evrpc_reply_done(struct evhttp_request *req, void *arg) evrpc_request_wrapper_free(ctx); /* the http layer owns the request structure */ + + /* see if we can schedule another request */ + evrpc_pool_schedule(pool); +} + +static void +evrpc_pool_schedule(struct evrpc_pool *pool) +{ + struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests); + struct evhttp_connection *evcon; + + /* if no requests are pending, we have no work */ + if (ctx == NULL) + return; + + if ((evcon = evrpc_pool_find_connection(pool)) != NULL) { + TAILQ_REMOVE(&pool->requests, ctx, next); + evrpc_schedule_request(evcon, ctx); + } +} + +static void +evrpc_request_timeout(int fd, short what, void *arg) +{ + struct evrpc_request_wrapper *ctx = arg; + struct evhttp_connection *evcon = ctx->evcon; + assert(evcon != NULL); + + evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT); } @@ -154,6 +154,7 @@ int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \ return (-1); \ } \ ctx->pool = pool; \ + ctx->evcon = NULL; \ ctx->name = strdup(#rpcname); \ if (ctx->name == NULL) { \ free(ctx); \ @@ -228,6 +229,12 @@ struct evrpc_request_wrapper { /* pool on which this rpc request is being made */ struct evrpc_pool *pool; + /* connection on which the request is being sent */ + struct evhttp_connection *evcon; + + /* event for implementing request timeouts */ + struct event ev_timeout; + /* the name of the rpc */ char *name; @@ -262,4 +269,17 @@ void evrpc_pool_free(struct evrpc_pool *); void evrpc_pool_add_connection(struct evrpc_pool *, struct evhttp_connection *); +/* + * Sets the timeout in secs after which a request has to complete. The + * RPC is completely aborted if it does not complete by then. Setting + * the timeout to 0 means that it never timeouts and can be used to + * implement callback type RPCs. + * + * Any connection already in the pool will be updated with the new + * timeout. Connections added to the pool after set_timeout has be + * called receive the pool timeout only if no timeout has been set + * for the connection itself. + */ +void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs); + #endif /* _EVRPC_H_ */ diff --git a/http-internal.h b/http-internal.h index b8ef6639..f95a2090 100644 --- a/http-internal.h +++ b/http-internal.h @@ -50,6 +50,8 @@ struct evhttp_connection { int flags; #define EVHTTP_CON_INCOMING 0x0001 /* only one request on it ever */ #define EVHTTP_CON_OUTGOING 0x0002 /* multiple requests possible */ + + int timeout; /* timeout in seconds for events */ enum evhttp_connection_state state; @@ -218,12 +218,24 @@ evhttp_method(enum evhttp_cmd_type type) return (method); } +static void +evhttp_add_event(struct event *ev, int timeout, int default_timeout) +{ + if (timeout != 0) { + struct timeval tv; + + timerclear(&tv); + tv.tv_sec = timeout != -1 ? timeout : default_timeout; + event_add(ev, &tv); + } else { + event_add(ev, NULL); + } +} + void evhttp_write_buffer(struct evhttp_connection *evcon, void (*cb)(struct evhttp_connection *, void *), void *arg) { - struct timeval tv; - event_debug(("%s: preparing to write buffer\n", __func__)); /* Set call back */ @@ -232,9 +244,7 @@ evhttp_write_buffer(struct evhttp_connection *evcon, /* xxx: maybe check if the event is still pending? */ event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_write, evcon); - timerclear(&tv); - tv.tv_sec = HTTP_WRITE_TIMEOUT; - event_add(&evcon->ev, &tv); + evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_WRITE_TIMEOUT); } /* @@ -464,7 +474,6 @@ void evhttp_write(int fd, short what, void *arg) { struct evhttp_connection *evcon = arg; - struct timeval tv; int n; if (what == EV_TIMEOUT) { @@ -486,9 +495,8 @@ evhttp_write(int fd, short what, void *arg) } if (EVBUFFER_LENGTH(evcon->output_buffer) != 0) { - timerclear(&tv); - tv.tv_sec = HTTP_WRITE_TIMEOUT; - event_add(&evcon->ev, &tv); + evhttp_add_event(&evcon->ev, + evcon->timeout, HTTP_WRITE_TIMEOUT); return; } @@ -549,7 +557,6 @@ evhttp_read(int fd, short what, void *arg) { struct evhttp_connection *evcon = arg; struct evhttp_request *req = TAILQ_FIRST(&evcon->requests); - struct timeval tv; int n; if (what == EV_TIMEOUT) { @@ -574,10 +581,8 @@ evhttp_read(int fd, short what, void *arg) evhttp_connection_done(evcon); return; } - - timerclear(&tv); - tv.tv_sec = HTTP_READ_TIMEOUT; - event_add(&evcon->ev, &tv); + + evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT); } void @@ -971,7 +976,6 @@ evhttp_parse_lines(struct evhttp_request *req, struct evbuffer* buffer) void evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req) { - struct timeval tv; const char *content_length; const char *connection; struct evkeyvalq *headers = req->input_headers; @@ -1013,16 +1017,12 @@ evhttp_get_body(struct evhttp_connection *evcon, struct evhttp_request *req) } event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read, evcon); - timerclear(&tv); - tv.tv_sec = HTTP_READ_TIMEOUT; - event_add(&evcon->ev, &tv); - return; + evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT); } void evhttp_read_header(int fd, short what, void *arg) { - struct timeval tv; struct evhttp_connection *evcon = arg; struct evhttp_request *req = TAILQ_FIRST(&evcon->requests); int n, res; @@ -1053,9 +1053,8 @@ evhttp_read_header(int fd, short what, void *arg) return; } else if (res == 0) { /* Need more header lines */ - timerclear(&tv); - tv.tv_sec = HTTP_READ_TIMEOUT; - event_add(&evcon->ev, &tv); + evhttp_add_event(&evcon->ev, + evcon->timeout, HTTP_READ_TIMEOUT); return; } @@ -1105,6 +1104,8 @@ evhttp_connection_new(const char *address, unsigned short port) evcon->fd = -1; evcon->port = port; + evcon->timeout = -1; + if ((evcon->address = strdup(address)) == NULL) { event_warn("%s: strdup failed", __func__); goto error; @@ -1131,11 +1132,16 @@ evhttp_connection_new(const char *address, unsigned short port) return (NULL); } +void +evhttp_connection_set_timeout(struct evhttp_connection *evcon, + int timeout_in_secs) +{ + evcon->timeout = timeout_in_secs; +} + int evhttp_connection_connect(struct evhttp_connection *evcon) { - struct timeval tv; - if (evcon->state == EVCON_CONNECTING) return (0); @@ -1154,9 +1160,7 @@ evhttp_connection_connect(struct evhttp_connection *evcon) /* Set up a callback for successful connection setup */ event_set(&evcon->ev, evcon->fd, EV_WRITE, evhttp_connectioncb, evcon); - timerclear(&tv); - tv.tv_sec = HTTP_CONNECT_TIMEOUT; - event_add(&evcon->ev, &tv); + evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_CONNECT_TIMEOUT); evcon->state = EVCON_CONNECTING; @@ -1217,16 +1221,12 @@ evhttp_make_request(struct evhttp_connection *evcon, void evhttp_start_read(struct evhttp_connection *evcon) { - struct timeval tv; - /* Set up an event to read the headers */ if (event_initialized(&evcon->ev)) event_del(&evcon->ev); event_set(&evcon->ev, evcon->fd, EV_READ, evhttp_read_header, evcon); - - timerclear(&tv); - tv.tv_sec = HTTP_READ_TIMEOUT; - event_add(&evcon->ev, &tv); + + evhttp_add_event(&evcon->ev, evcon->timeout, HTTP_READ_TIMEOUT); } void diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 863b9571..3db21e38 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -315,11 +315,40 @@ GotKillCb(struct msg *msg, struct kill *kill, void *arg) goto done; test_ok += 1; + done: event_loopexit(NULL); } static void +GotKillCbTwo(struct msg *msg, struct kill *kill, void *arg) +{ + char *weapon; + char *action; + + if (EVTAG_GET(kill, weapon, &weapon) == -1) { + fprintf(stderr, "get weapon\n"); + goto done; + } + if (EVTAG_GET(kill, action, &action) == -1) { + fprintf(stderr, "get action\n"); + goto done; + } + + if (strcmp(weapon, "dagger")) + goto done; + + if (strcmp(action, "wave around like an idiot")) + goto done; + + test_ok += 1; + +done: + if (test_ok == 2) + event_loopexit(NULL); +} + +static void rpc_basic_client(void) { short port; @@ -374,10 +403,62 @@ rpc_basic_client(void) evhttp_free(http); } +/* + * We are testing that the second requests gets send over the same + * connection after the first RPCs completes. + */ +static void +rpc_basic_queued_client(void) +{ + short port; + struct evhttp *http = NULL; + struct evrpc_base *base = NULL; + struct evrpc_pool *pool = NULL; + struct msg *msg; + struct kill *kill_one, *kill_two; + + fprintf(stdout, "Testing RPC (Queued) Client: "); + + rpc_setup(&http, &port, &base); + + pool = rpc_pool_with_connection(port); + + /* set up the basic message */ + msg = msg_new(); + EVTAG_ASSIGN(msg, from_name, "niels"); + EVTAG_ASSIGN(msg, to_name, "tester"); + + kill_one = kill_new(); + kill_two = kill_new(); + + EVRPC_MAKE_REQUEST(Message, msg, kill_one, GotKillCbTwo, NULL); + EVRPC_MAKE_REQUEST(Message, msg, kill_two, GotKillCb, NULL); + + test_ok = 0; + + event_dispatch(); + + if (test_ok != 2) { + fprintf(stdout, "FAILED (1)\n"); + exit(1); + } + + fprintf(stdout, "OK\n"); + + msg_free(msg); + kill_free(kill_one); + kill_free(kill_two); + + evrpc_pool_free(pool); + evhttp_free(http); +} + + void rpc_suite(void) { rpc_basic_test(); rpc_basic_message(); rpc_basic_client(); + rpc_basic_queued_client(); } |