diff options
-rw-r--r-- | ChangeLog | 1 | ||||
-rw-r--r-- | evrpc-internal.h | 14 | ||||
-rw-r--r-- | evrpc.c | 239 | ||||
-rw-r--r-- | evrpc.h | 14 | ||||
-rw-r--r-- | test/regress_rpc.c | 6 |
5 files changed, 178 insertions, 96 deletions
@@ -33,6 +33,7 @@ Changes in current version: o change evrpc hooking to allow pausing of RPCs; this will make it possible for the hook to do some meaning ful work; this is not backwards compatible. o allow an http request callback to take ownership of a request structure o allow association of meta data with RPC requests for hook processing + o associate more context for hooks to query such as the connection object Changes in 1.4.0: o allow \r or \n individually to separate HTTP headers instead of the standard "\r\n"; from Charles Kerr. diff --git a/evrpc-internal.h b/evrpc-internal.h index bffe8a2c..44ebee18 100644 --- a/evrpc-internal.h +++ b/evrpc-internal.h @@ -110,7 +110,19 @@ struct evrpc_meta { TAILQ_HEAD(evrpc_meta_list, evrpc_meta); +struct evrpc_hook_meta { + struct evrpc_meta_list meta_data; + struct evhttp_connection *evcon; +}; + +/* allows association of meta data with a request */ +static void evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx, + struct evhttp_connection *evcon); + +/* creates a new meta data store */ +static struct evrpc_hook_meta *evrpc_hook_meta_new(void); + /* frees the meta data associated with a request */ -static void evrpc_meta_data_free(struct evrpc_meta_list *meta_data); +static void evrpc_hook_context_free(struct evrpc_hook_meta *ctx); #endif /* _EVRPC_INTERNAL_H_ */ @@ -270,7 +270,6 @@ evrpc_request_cb(struct evhttp_request *req, void *arg) { struct evrpc *rpc = arg; struct evrpc_req_generic *rpc_state = NULL; - int hook_res; /* let's verify the outside parameters */ if (req->type != EVHTTP_REQ_POST || @@ -285,26 +284,30 @@ evrpc_request_cb(struct evhttp_request *req, void *arg) rpc_state->rpc_data = NULL; rpc_state->done = evrpc_request_done; + if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) { + int hook_res; - /* - * we might want to allow hooks to suspend the processing, - * but at the moment, we assume that they just act as simple - * filters. - */ - hook_res = evrpc_process_hooks(&rpc->base->input_hooks, - rpc_state, req, req->input_buffer); - switch (hook_res) { - case EVRPC_TERMINATE: - goto error; - case EVRPC_PAUSE: - evrpc_pause_request(rpc->base, rpc_state, - evrpc_request_cb_closure); - return; - case EVRPC_CONTINUE: - break; - default: - assert(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); + + /* + * allow hooks to modify the outgoing request + */ + hook_res = evrpc_process_hooks(&rpc->base->input_hooks, + rpc_state, req, req->input_buffer); + switch (hook_res) { + case EVRPC_TERMINATE: + goto error; + case EVRPC_PAUSE: + evrpc_pause_request(rpc->base, rpc_state, + evrpc_request_cb_closure); + return; + case EVRPC_CONTINUE: + break; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } } evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE); @@ -365,8 +368,8 @@ evrpc_reqstate_free(struct evrpc_req_generic* rpc_state) rpc = rpc_state->rpc; /* clean up all memory */ - if (rpc_state->meta_data != NULL) - evrpc_meta_data_free(rpc_state->meta_data); + if (rpc_state->hook_meta != NULL) + evrpc_hook_context_free(rpc_state->hook_meta); if (rpc_state->request != NULL) rpc->request_free(rpc_state->request); if (rpc_state->reply != NULL) @@ -384,7 +387,6 @@ evrpc_request_done(struct evrpc_req_generic *rpc_state) { struct evhttp_request *req = rpc_state->http_req; struct evrpc *rpc = rpc_state->rpc; - int hook_res; if (rpc->reply_complete(rpc_state->reply) == -1) { /* the reply was not completely filled in. error out */ @@ -399,22 +401,29 @@ evrpc_request_done(struct evrpc_req_generic *rpc_state) /* serialize the reply */ rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply); - /* do hook based tweaks to the request */ - hook_res = evrpc_process_hooks(&rpc->base->output_hooks, - rpc_state, req, rpc_state->rpc_data); - switch (hook_res) { - case EVRPC_TERMINATE: - goto error; - case EVRPC_PAUSE: - if (evrpc_pause_request(rpc->base, rpc_state, - evrpc_request_done_closure) == -1) + if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) { + int hook_res; + + evrpc_hook_associate_meta(&rpc_state->hook_meta, req->evcon); + + /* do hook based tweaks to the request */ + hook_res = evrpc_process_hooks(&rpc->base->output_hooks, + rpc_state, req, rpc_state->rpc_data); + switch (hook_res) { + case EVRPC_TERMINATE: goto error; - return; - case EVRPC_CONTINUE: - break; - default: - assert(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + case EVRPC_PAUSE: + if (evrpc_pause_request(rpc->base, rpc_state, + evrpc_request_done_closure) == -1) + goto error; + return; + case EVRPC_CONTINUE: + break; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } } evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE); @@ -479,8 +488,8 @@ evrpc_pool_new(struct event_base *base) static void evrpc_request_wrapper_free(struct evrpc_request_wrapper *request) { - if (request->meta_data != NULL) - evrpc_meta_data_free(request->meta_data); + if (request->hook_meta != NULL) + evrpc_hook_context_free(request->hook_meta); event_free(request->name); event_free(request); } @@ -602,7 +611,6 @@ evrpc_schedule_request(struct evhttp_connection *connection, struct evhttp_request *req = NULL; struct evrpc_pool *pool = ctx->pool; struct evrpc_status status; - int hook_res = 0; if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL) goto error; @@ -616,25 +624,32 @@ evrpc_schedule_request(struct evhttp_connection *connection, /* if we get paused we also need to know the request */ ctx->req = req; - /* apply hooks to the outgoing request */ - hook_res = evrpc_process_hooks(&pool->output_hooks, - ctx, req, req->output_buffer); + if (TAILQ_FIRST(&pool->output_hooks) != NULL) { + int hook_res; - switch (hook_res) { - case EVRPC_TERMINATE: - goto error; - case EVRPC_PAUSE: - /* we need to be explicitly resumed */ - if (evrpc_pause_request(pool, ctx, - evrpc_schedule_request_closure) == -1) + evrpc_hook_associate_meta(&ctx->hook_meta, connection); + + /* apply hooks to the outgoing request */ + hook_res = evrpc_process_hooks(&pool->output_hooks, + ctx, req, req->output_buffer); + + switch (hook_res) { + case EVRPC_TERMINATE: goto error; - return (0); - case EVRPC_CONTINUE: - /* we can just continue */ - break; - default: - assert(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + case EVRPC_PAUSE: + /* we need to be explicitly resumed */ + if (evrpc_pause_request(pool, ctx, + evrpc_schedule_request_closure) == -1) + goto error; + return (0); + case EVRPC_CONTINUE: + /* we can just continue */ + break; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } } evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE); @@ -770,7 +785,7 @@ evrpc_send_request_generic( return (NULL); ctx->pool = pool; - ctx->meta_data = NULL; + ctx->hook_meta = NULL; ctx->evcon = NULL; ctx->name = event_strdup(rpcname); if (ctx->name == NULL) { @@ -796,7 +811,7 @@ evrpc_reply_done(struct evhttp_request *req, void *arg) { struct evrpc_request_wrapper *ctx = arg; struct evrpc_pool *pool = ctx->pool; - int hook_res; + int hook_res = EVRPC_CONTINUE; /* cancel any timeout we might have scheduled */ event_del(&ctx->ev_timeout); @@ -809,31 +824,39 @@ evrpc_reply_done(struct evhttp_request *req, void *arg) return; } - /* apply hooks to the incoming request */ - hook_res = evrpc_process_hooks(&pool->input_hooks, - ctx, req, req->input_buffer); + if (TAILQ_FIRST(&pool->input_hooks) != NULL) { + evrpc_hook_associate_meta(&ctx->hook_meta, req->evcon); - switch (hook_res) { - case EVRPC_TERMINATE: - case EVRPC_CONTINUE: - evrpc_reply_done_closure(ctx, hook_res); - return; - case EVRPC_PAUSE: - /* - * if we get paused we also need to know the request. - * unfortunately, the underlying layer is going to free it. - * we need to request ownership explicitly - */ - if (req != NULL) - evhttp_request_own(req); + /* apply hooks to the incoming request */ + hook_res = evrpc_process_hooks(&pool->input_hooks, + ctx, req, req->input_buffer); - evrpc_pause_request(pool, ctx, evrpc_reply_done_closure); - return; - default: - assert(hook_res == EVRPC_TERMINATE || - hook_res == EVRPC_CONTINUE || hook_res == EVRPC_PAUSE); + switch (hook_res) { + case EVRPC_TERMINATE: + case EVRPC_CONTINUE: + break; + case EVRPC_PAUSE: + /* + * if we get paused we also need to know the + * request. unfortunately, the underlying + * layer is going to free it. we need to + * request ownership explicitly + */ + if (req != NULL) + evhttp_request_own(req); + + evrpc_pause_request(pool, ctx, + evrpc_reply_done_closure); + return; + default: + assert(hook_res == EVRPC_TERMINATE || + hook_res == EVRPC_CONTINUE || + hook_res == EVRPC_PAUSE); + } } + evrpc_reply_done_closure(ctx, hook_res); + /* http request is being freed by underlying layer */ } @@ -920,23 +943,49 @@ evrpc_meta_data_free(struct evrpc_meta_list *meta_data) event_free(entry->data); event_free(entry); } +} + +static struct evrpc_hook_meta * +evrpc_hook_meta_new(void) +{ + struct evrpc_hook_meta *ctx; + ctx = event_malloc(sizeof(struct evrpc_hook_meta)); + assert(ctx != NULL); + + TAILQ_INIT(&ctx->meta_data); + ctx->evcon = NULL; + + return (ctx); +} - event_free(meta_data); +static void +evrpc_hook_associate_meta(struct evrpc_hook_meta **pctx, + struct evhttp_connection *evcon) +{ + struct evrpc_hook_meta *ctx = *pctx; + if (ctx == NULL) + *pctx = ctx = evrpc_hook_meta_new(); + ctx->evcon = evcon; +} + +static void +evrpc_hook_context_free(struct evrpc_hook_meta *ctx) +{ + evrpc_meta_data_free(&ctx->meta_data); + event_free(ctx); } -/* adds meta data */ +/* Adds meta data */ void evrpc_hook_add_meta(void *ctx, const char *key, const void *data, size_t data_size) { struct evrpc_request_wrapper *req = ctx; + struct evrpc_hook_meta *store = NULL; struct evrpc_meta *meta = NULL; - if (req->meta_data == NULL) { - req->meta_data = event_malloc(sizeof(struct evrpc_meta_list)); - assert(req->meta_data != NULL); - TAILQ_INIT(req->meta_data); - } + if ((store = req->hook_meta) == NULL) + store = req->hook_meta = evrpc_hook_meta_new(); assert((meta = event_malloc(sizeof(struct evrpc_meta))) != NULL); assert((meta->key = event_strdup(key)) != NULL); @@ -944,7 +993,7 @@ evrpc_hook_add_meta(void *ctx, const char *key, assert((meta->data = event_malloc(data_size)) != NULL); memcpy(meta->data, data, data_size); - TAILQ_INSERT_TAIL(req->meta_data, meta, next); + TAILQ_INSERT_TAIL(&store->meta_data, meta, next); } int @@ -953,10 +1002,10 @@ evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) struct evrpc_request_wrapper *req = ctx; struct evrpc_meta *meta = NULL; - if (req->meta_data == NULL) + if (req->hook_meta == NULL) return (-1); - TAILQ_FOREACH(meta, req->meta_data, next) { + TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) { if (strcmp(meta->key, key) == 0) { *data = meta->data; *data_size = meta->data_size; @@ -966,3 +1015,11 @@ evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size) return (-1); } + +struct evhttp_connection * +evrpc_hook_get_connection(void *ctx) +{ + struct evrpc_request_wrapper *req = ctx; + + return (req->evcon); +} @@ -114,7 +114,7 @@ struct evrpc { struct evhttp_request; struct evrpc_status; -struct evrpc_meta_list; +struct evrpc_hook_meta; /* We alias the RPC specific structs to this voided one */ struct evrpc_req_generic { @@ -122,7 +122,7 @@ struct evrpc_req_generic { * allows association of meta data via hooks - needs to be * synchronized with evrpc_request_wrapper */ - struct evrpc_meta_list *meta_data; + struct evrpc_hook_meta *hook_meta; /* the unmarshaled request object */ void *request; @@ -165,7 +165,7 @@ struct evrpc_req_generic { */ #define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \ EVRPC_STRUCT(rpcname) { \ - struct evrpc_meta_list *meta_data; \ + struct evrpc_hook_meta *hook_meta; \ struct reqstruct* request; \ struct rplystruct* reply; \ struct evrpc* rpc; \ @@ -357,7 +357,7 @@ struct evrpc_request_wrapper { * allows association of meta data via hooks - needs to be * synchronized with evrpc_req_generic. */ - struct evrpc_meta_list *meta_data; + struct evrpc_hook_meta *hook_meta; TAILQ_ENTRY(evrpc_request_wrapper) next; @@ -542,6 +542,12 @@ void evrpc_hook_add_meta(void *ctx, const char *key, int evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size); +/** returns the connection object associated with the request + * + * @param ctx the context provided to the hook call + * @return a pointer to the evhttp_connection object + */ +struct evhttp_connection *evrpc_hook_get_connection(void *ctx); #ifdef __cplusplus } #endif diff --git a/test/regress_rpc.c b/test/regress_rpc.c index 68cf0c3b..3b40c372 100644 --- a/test/regress_rpc.c +++ b/test/regress_rpc.c @@ -418,6 +418,8 @@ rpc_hook_add_header(void *ctx, struct evhttp_request *req, else evhttp_add_header(req->output_headers, "X-Hook", hook_type); + assert(evrpc_hook_get_connection(ctx) != NULL); + return (EVRPC_CONTINUE); } @@ -427,6 +429,8 @@ rpc_hook_add_meta(void *ctx, struct evhttp_request *req, { evrpc_hook_add_meta(ctx, "meta", "test", 5); + assert(evrpc_hook_get_connection(ctx) != NULL); + return (EVRPC_CONTINUE); } @@ -448,6 +452,8 @@ rpc_hook_remove_header(void *ctx, struct evhttp_request *req, assert(data != NULL); assert(data_len == 5); + assert(evrpc_hook_get_connection(ctx) != NULL); + return (EVRPC_CONTINUE); } |