summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNiels Provos <provos@gmail.com>2007-11-02 06:34:04 +0000
committerNiels Provos <provos@gmail.com>2007-11-02 06:34:04 +0000
commit65236aa8578aeb17c088b41818da17311672aed1 (patch)
tree14ace7e01f2ea1e59a9d7c11a00a49e6c3e7f819
parent18ac92486fc7b4f2ac7092ce56dc89ebad41c729 (diff)
downloadlibevent-65236aa8578aeb17c088b41818da17311672aed1.tar.gz
simple hooks for processing incoming and outgoing rpcs
svn:r466
-rw-r--r--ChangeLog1
-rw-r--r--evrpc-internal.h12
-rw-r--r--evrpc.c108
-rw-r--r--evrpc.h34
-rw-r--r--test/regress_rpc.c40
5 files changed, 192 insertions, 3 deletions
diff --git a/ChangeLog b/ChangeLog
index 2b811087..755a6568 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -29,3 +29,4 @@ Changes in current version:
o Fix implementation of strsep on platforms that lack it
o Fix implementation of getaddrinfo on platforms that lack it; mainly, this will make Windows http.c work better. Original patch by Lubomir Marinov.
o Fix evport implementation: port_disassociate called on unassociated events resulting in bogus errors; more efficient memory management; from Trond Norbye and Prakash Sangappa
+ o support for hooks on rpc input and output; can be used to implement rpc independent processing such as compression or authentication.
diff --git a/evrpc-internal.h b/evrpc-internal.h
index 656533b6..8b8dd691 100644
--- a/evrpc-internal.h
+++ b/evrpc-internal.h
@@ -33,12 +33,24 @@ struct evrpc;
#define EVRPC_URI_PREFIX "/.rpc."
+struct evrpc_hook {
+ TAILQ_ENTRY(evrpc_hook) (next);
+
+ /* returns -1; if the rpc should be aborted, is allowed to rewrite */
+ int (*process)(struct evhttp_request *, struct evbuffer *, void *);
+ void *process_arg;
+};
+
struct evrpc_base {
/* the HTTP server under which we register our RPC calls */
struct evhttp* http_server;
/* a list of all RPCs registered with us */
TAILQ_HEAD(evrpc_list, evrpc) registered_rpcs;
+
+ /* hooks for processing outbound and inbound rpcs */
+ TAILQ_HEAD(evrpc_hook_list, evrpc_hook) input_hooks;
+ struct evrpc_hook_list output_hooks;
};
struct evrpc_req_generic;
diff --git a/evrpc.c b/evrpc.c
index 0d24d36c..b1fb4765 100644
--- a/evrpc.c
+++ b/evrpc.c
@@ -74,6 +74,8 @@ evrpc_init(struct evhttp *http_server)
evtag_init();
TAILQ_INIT(&base->registered_rpcs);
+ TAILQ_INIT(&base->input_hooks);
+ TAILQ_INIT(&base->output_hooks);
base->http_server = http_server;
return (base);
@@ -83,14 +85,95 @@ void
evrpc_free(struct evrpc_base *base)
{
struct evrpc *rpc;
-
+ struct evrpc_hook *hook;
+
while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
assert(evrpc_unregister_rpc(base, rpc->uri));
}
-
+ while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
+ assert(evrpc_remove_hook(base, INPUT, hook));
+ }
+ while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
+ assert(evrpc_remove_hook(base, OUTPUT, hook));
+ }
free(base);
}
+void *
+evrpc_add_hook(struct evrpc_base *base,
+ enum EVRPC_HOOK_TYPE hook_type,
+ int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
+ void *cb_arg)
+{
+ struct evrpc_hook_list *head = NULL;
+ struct evrpc_hook *hook = NULL;
+ switch (hook_type) {
+ case INPUT:
+ head = &base->input_hooks;
+ break;
+ case OUTPUT:
+ head = &base->output_hooks;
+ break;
+ default:
+ assert(hook_type == INPUT || hook_type == OUTPUT);
+ }
+
+ hook = calloc(1, sizeof(struct evrpc_hook));
+ assert(hook != NULL);
+
+ hook->process = cb;
+ hook->process_arg = cb_arg;
+ TAILQ_INSERT_TAIL(head, hook, next);
+
+ return (hook);
+}
+
+/*
+ * remove the hook specified by the handle
+ */
+
+int
+evrpc_remove_hook(struct evrpc_base *base,
+ enum EVRPC_HOOK_TYPE hook_type,
+ void *handle)
+{
+ struct evrpc_hook_list *head = NULL;
+ struct evrpc_hook *hook = NULL;
+ switch (hook_type) {
+ case INPUT:
+ head = &base->input_hooks;
+ break;
+ case OUTPUT:
+ head = &base->output_hooks;
+ break;
+ default:
+ assert(hook_type == INPUT || hook_type == OUTPUT);
+ }
+
+ TAILQ_FOREACH(hook, head, next) {
+ if (hook == handle) {
+ TAILQ_REMOVE(head, hook, next);
+ free(hook);
+ return (1);
+ }
+ }
+
+ return (0);
+}
+
+static int
+evrpc_process_hooks(struct evrpc_hook_list *head,
+ struct evhttp_request *req, struct evbuffer *evbuf)
+{
+ struct evrpc_hook *hook;
+ TAILQ_FOREACH(hook, head, next) {
+ if (hook->process(req, evbuf, hook->process_arg) == -1)
+ return (-1);
+ }
+
+ return (0);
+}
+
static void evrpc_pool_schedule(struct evrpc_pool *pool);
static void evrpc_request_cb(struct evhttp_request *, void *);
void evrpc_request_done(struct evrpc_req_generic*);
@@ -124,6 +207,7 @@ evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
{
char *constructed_uri = evrpc_construct_uri(rpc->uri);
+ rpc->base = base;
rpc->cb = cb;
rpc->cb_arg = cb_arg;
@@ -179,6 +263,15 @@ evrpc_request_cb(struct evhttp_request *req, void *arg)
EVBUFFER_LENGTH(req->input_buffer) <= 0)
goto error;
+ /*
+ * we might want to allow hooks to suspend the processing,
+ * but at the moment, we assume that they just act as simple
+ * filters.
+ */
+ if (evrpc_process_hooks(&rpc->base->input_hooks,
+ req, req->input_buffer) == -1)
+ goto error;
+
rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
if (rpc_state == NULL)
goto error;
@@ -236,7 +329,7 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
{
struct evhttp_request *req = rpc_state->http_req;
struct evrpc *rpc = rpc_state->rpc;
- struct evbuffer* data;
+ struct evbuffer* data = NULL;
if (rpc->reply_complete(rpc_state->reply) == -1) {
/* the reply was not completely filled in. error out */
@@ -251,6 +344,11 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
/* serialize the reply */
rpc->reply_marshal(data, rpc_state->reply);
+ /* do hook based tweaks to the request */
+ if (evrpc_process_hooks(&rpc->base->output_hooks,
+ req, data) == -1)
+ goto error;
+
evhttp_send_reply(req, HTTP_OK, "OK", data);
evbuffer_free(data);
@@ -260,6 +358,8 @@ evrpc_request_done(struct evrpc_req_generic* rpc_state)
return;
error:
+ if (data != NULL)
+ evbuffer_free(data);
evrpc_reqstate_free(rpc_state);
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
return;
@@ -460,6 +560,8 @@ evrpc_reply_done(struct evhttp_request *req, void *arg)
event_del(&ctx->ev_timeout);
memset(&status, 0, sizeof(status));
+ status.http_req = req;
+
/* we need to get the reply now */
if (req != NULL) {
res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
diff --git a/evrpc.h b/evrpc.h
index 5f45e780..45911146 100644
--- a/evrpc.h
+++ b/evrpc.h
@@ -99,6 +99,9 @@ struct evrpc {
/* the callback invoked for each received rpc */
void (*cb)(struct evrpc_req_generic *, void *);
void *cb_arg;
+
+ /* reference for further configuration */
+ struct evrpc_base *base;
};
#define EVRPC_STRUCT(rpcname) struct evrpc_req__##rpcname
@@ -140,6 +143,7 @@ EVRPC_STRUCT(rpcname) { \
struct reqstruct* request; \
struct rplystruct* reply; \
struct evrpc* rpc; \
+ struct evhttp_request* http_req; \
void (*done)(struct evrpc_status *, \
struct evrpc* rpc, void *request, void *reply); \
}; \
@@ -184,6 +188,11 @@ error: \
return (-1); \
}
+/*
+ * Access to the underlying http object; can be used to look at headers or
+ * for getting the remote ip address
+ */
+#define EVRPC_REQUEST_HTTP(rpc_req) (rpc_req)->http_req
/*
* EVRPC_REQUEST_DONE is used to answer a request; the reply is expected
@@ -252,6 +261,9 @@ struct evrpc_status {
#define EVRPC_STATUS_ERR_BADPAYLOAD 2
#define EVRPC_STATUS_ERR_UNSTARTED 3
int error;
+
+ /* for looking at headers or other information */
+ struct evhttp_request *http_req;
};
struct evrpc_request_wrapper {
@@ -313,6 +325,28 @@ void evrpc_pool_add_connection(struct evrpc_pool *,
*/
void evrpc_pool_set_timeout(struct evrpc_pool *, int timeout_in_secs);
+/*
+ * Hooks for changing the input and output of RPCs; this can be used to
+ * implement compression, authentication, encryption, ...
+ *
+ * If a hook returns -1, the processing is aborted.
+ *
+ * The add functions return handles that can be used for removing hooks.
+ */
+
+enum EVRPC_HOOK_TYPE {
+ INPUT, OUTPUT
+};
+
+void *evrpc_add_hook(struct evrpc_base *base,
+ enum EVRPC_HOOK_TYPE hook_type,
+ int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
+ void *cb_arg);
+
+int evrpc_remove_hook(struct evrpc_base *base,
+ enum EVRPC_HOOK_TYPE hook_type,
+ void *handle);
+
#ifdef __cplusplus
}
#endif
diff --git a/test/regress_rpc.c b/test/regress_rpc.c
index 3361f9e9..341cf991 100644
--- a/test/regress_rpc.c
+++ b/test/regress_rpc.c
@@ -91,11 +91,21 @@ EVRPC_HEADER(NeverReply, msg, kill);
EVRPC_GENERATE(Message, msg, kill);
EVRPC_GENERATE(NeverReply, msg, kill);
+static int need_input_hook = 0;
+static int need_output_hook = 0;
+
void
MessageCb(EVRPC_STRUCT(Message)* rpc, void *arg)
{
struct kill* kill_reply = rpc->reply;
+ if (need_input_hook) {
+ struct evhttp_request* req = EVRPC_REQUEST_HTTP(rpc);
+ const char *header = evhttp_find_header(
+ req->input_headers, "X-Hook");
+ assert(strcmp(header, "input") == 0);
+ }
+
/* we just want to fill in some non-sense */
EVTAG_ASSIGN(kill_reply, weapon, "dagger");
EVTAG_ASSIGN(kill_reply, action, "wave around like an idiot");
@@ -129,6 +139,9 @@ rpc_setup(struct evhttp **phttp, short *pport, struct evrpc_base **pbase)
*phttp = http;
*pport = port;
*pbase = base;
+
+ need_input_hook = 0;
+ need_output_hook = 0;
}
static void
@@ -330,6 +343,13 @@ GotKillCb(struct evrpc_status *status,
char *weapon;
char *action;
+ if (need_output_hook) {
+ struct evhttp_request *req = status->http_req;
+ const char *header = evhttp_find_header(
+ req->input_headers, "X-Hook");
+ assert(strcmp(header, "output") == 0);
+ }
+
if (status->error != EVRPC_STATUS_ERR_NONE)
goto done;
@@ -386,6 +406,18 @@ done:
event_loopexit(NULL);
}
+static int
+rpc_hook_add_header(struct evhttp_request *req,
+ struct evbuffer *evbuf, void *arg)
+{
+ const char *hook_type = arg;
+ if (strcmp("input", hook_type) == 0)
+ evhttp_add_header(req->input_headers, "X-Hook", hook_type);
+ else
+ evhttp_add_header(req->output_headers, "X-Hook", hook_type);
+ return (0);
+}
+
static void
rpc_basic_client(void)
{
@@ -400,6 +432,14 @@ rpc_basic_client(void)
rpc_setup(&http, &port, &base);
+ need_input_hook = 1;
+ need_output_hook = 1;
+
+ assert(evrpc_add_hook(base, INPUT, rpc_hook_add_header, "input")
+ != NULL);
+ assert(evrpc_add_hook(base, OUTPUT, rpc_hook_add_header, "output")
+ != NULL);
+
pool = rpc_pool_with_connection(port);
/* set up the basic message */