summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--evrpc-internal.h10
-rw-r--r--evrpc.c159
-rw-r--r--evrpc.h122
-rw-r--r--http-internal.h5
-rw-r--r--test/regress_rpc.c3
5 files changed, 285 insertions, 14 deletions
diff --git a/evrpc-internal.h b/evrpc-internal.h
index 4a27a364..de2ab47d 100644
--- a/evrpc-internal.h
+++ b/evrpc-internal.h
@@ -27,6 +27,8 @@
#ifndef _EVRPC_INTERNAL_H_
#define _EVRPC_INTERNAL_H_
+#include "http-internal.h"
+
struct evrpc;
#define EVRPC_URI_PREFIX "/.rpc."
@@ -42,4 +44,12 @@ struct evrpc_base {
struct evrpc_req_generic;
void evrpc_reqstate_free(struct evrpc_req_generic* rpc_state);
+/* A pool for holding evhttp_connection objects */
+struct evrpc_pool {
+ struct evconq connections;
+
+ TAILQ_HEAD(evrpc_requestq, evrpc_request_wrapper) requests;
+};
+
+
#endif /* _EVRPC_INTERNAL_H_ */
diff --git a/evrpc.c b/evrpc.c
index 74d6601a..2b94d86a 100644
--- a/evrpc.c
+++ b/evrpc.c
@@ -36,6 +36,7 @@
#endif
#include <sys/types.h>
#include <sys/tree.h>
+#include <sys/socket.h>
#ifdef HAVE_SYS_TIME_H
#include <sys/time.h>
#else
@@ -89,25 +90,32 @@ void evrpc_request_done(struct evrpc_req_generic*);
* calls this function.
*/
-int
-evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
- void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
+char *
+evrpc_construct_uri(const char *uri)
{
char *constructed_uri;
int constructed_uri_len;
- rpc->cb = cb;
- rpc->cb_arg = cb_arg;
-
- constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(rpc->uri) + 1;
+ constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
event_err(1, "%s: failed to register rpc at %s",
- __func__, rpc->uri);
+ __func__, uri);
memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
- memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX),
- rpc->uri, strlen(rpc->uri));
+ memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
constructed_uri[constructed_uri_len - 1] = '\0';
+ return (constructed_uri);
+}
+
+int
+evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
+ void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
+{
+ char *constructed_uri = evrpc_construct_uri(rpc->uri);
+
+ rpc->cb = cb;
+ rpc->cb_arg = cb_arg;
+
TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
evhttp_set_cb(base->http_server,
@@ -216,3 +224,134 @@ error:
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
return;
}
+
+/* Client implementation of RPC site */
+
+static int evrpc_schedule_request(struct evhttp_connection *connection,
+ struct evrpc_request_wrapper *ctx);
+
+struct evrpc_pool *
+evrpc_pool_new()
+{
+ struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
+ if (pool == NULL)
+ return (NULL);
+
+ TAILQ_INIT(&pool->connections);
+ TAILQ_INIT(&pool->requests);
+
+ return (pool);
+}
+
+void
+evrpc_pool_free(struct evrpc_pool *pool)
+{
+ struct evhttp_connection *connection;
+ struct evrpc_request_wrapper *request;
+
+ while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
+ TAILQ_REMOVE(&pool->requests, request, next);
+ /* if this gets more complicated we need our own function */
+ free(request->name);
+ free(request);
+ }
+
+ while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
+ TAILQ_REMOVE(&pool->connections, connection, next);
+ evhttp_connection_free(connection);
+ }
+
+ free(pool);
+}
+
+void
+evrpc_pool_add_connection(struct evrpc_pool *pool,
+ struct evhttp_connection *connection) {
+ assert(connection->http_server == NULL);
+ TAILQ_INSERT_TAIL(&pool->connections, connection, next);
+
+ /*
+ * if we have any requests, pending schedule them with the new
+ * connections.
+ */
+
+ if (TAILQ_FIRST(&pool->requests) != NULL) {
+ struct evrpc_request_wrapper *request =
+ TAILQ_FIRST(&pool->requests);
+ TAILQ_REMOVE(&pool->requests, request, next);
+ evrpc_schedule_request(connection, request);
+ }
+}
+
+
+static void evrpc_reply_done(struct evhttp_request *, void *);
+
+/*
+ * Finds a connection object associated with the pool that is currently
+ * idle and can be used to make a request.
+ */
+static struct evhttp_connection *
+evrpc_pool_find_connection(struct evrpc_pool *pool)
+{
+ struct evhttp_connection *connection;
+ TAILQ_FOREACH(connection, &pool->connections, next) {
+ if (TAILQ_FIRST(&connection->requests) == NULL)
+ return (connection);
+ }
+
+ return (NULL);
+}
+
+/*
+ * We assume that the ctx is no longer queued on the pool.
+ */
+static int
+evrpc_schedule_request(struct evhttp_connection *connection,
+ struct evrpc_request_wrapper *ctx)
+{
+ struct evbuffer *output;
+ struct evhttp_request *req;
+ if ((output = evbuffer_new()) == NULL)
+ goto error;
+
+ if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
+ goto error;
+
+ return (0);
+
+error:
+ (*ctx->cb)(ctx->request, ctx->reply, ctx->cb_arg);
+ free(ctx);
+ return (-1);
+}
+
+int
+evrpc_make_request(struct evrpc_request_wrapper *ctx)
+{
+ struct evrpc_pool *pool = ctx->pool;
+ struct evhttp_connection *connection;
+
+ /* we better have some available connections on the pool */
+ assert(TAILQ_FIRST(&pool->connections) != NULL);
+
+
+ /* even if a connection might be available, we do FIFO */
+ if (TAILQ_FIRST(&pool->requests) == NULL) {
+ connection = evrpc_pool_find_connection(pool);
+ if (connection != NULL)
+ return evrpc_schedule_request(connection, ctx);
+ }
+
+ /*
+ * if no connection is available, we queue the request on the pool,
+ * the next time a connection is empty, the rpc will be send on that.
+ */
+ TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
+ return (0);
+}
+
+static void
+evrpc_reply_done(struct evhttp_request *req, void *arg)
+{
+ struct evrpc_request_wrapper *ctx = arg;
+}
diff --git a/evrpc.h b/evrpc.h
index 18db7f1b..5c9d2a81 100644
--- a/evrpc.h
+++ b/evrpc.h
@@ -27,6 +27,40 @@
#ifndef _EVRPC_H_
#define _EVRPC_H_
+/*
+ * This header files provides basic support for an RPC server and client.
+ *
+ * To support RPCs in a server, every supported RPC command needs to be
+ * defined and registered.
+ *
+ * EVRPC_HEADER(SendCommand, Request, Reply);
+ *
+ * SendCommand is the name of the RPC command.
+ * Request is the name of a structure generated by event_rpcgen.py.
+ * It contains all parameters relating to the SendCommand RPC. The
+ * server needs to fill in the Reply structure.
+ * Reply is the name of a structure generated by event_rpcgen.py. It
+ * contains the answer to the RPC.
+ *
+ * To register an RPC with an HTTP server, you need to first create an RPC
+ * base with:
+ *
+ * struct evrpc_base *base = evrpc_init(http);
+ *
+ * A specific RPC can then be registered with
+ *
+ * EVRPC_REGISTER(base, "SendCommand", Request, Reply, FunctionCB, arg);
+ *
+ * when the server receives an appropriately formatted RPC, the user callback
+ * is invokved. The callback needs to fill in the reply structure.
+ *
+ * void FunctionCB(EVRPC_STRUCT(SendCommand)* rpc, void *arg);
+ *
+ * To send the reply, call EVRPC_REQUEST_DONE(rpc);
+ *
+ * See the regression test for an example.
+ */
+
struct evbuffer;
struct evrpc_req_generic;
@@ -40,7 +74,7 @@ struct evrpc {
/* creates a new request structure */
void *(*request_new)(void);
- /* creates a new request structure */
+ /* frees the request structure */
void (*request_free)(void *);
/* unmarshals the buffer into the proper request structure */
@@ -92,14 +126,51 @@ struct evrpc_req_generic {
void (*done)(struct evrpc_req_generic* rpc);
};
-#define EVRPC_DEFINE(rpcname, reqstruct, rplystruct) \
+/*
+ * You need to use EVRPC_HEADER to create structures and function prototypes
+ * needed by the server and client implmentation.
+ */
+#define EVRPC_HEADER(rpcname, reqstruct, rplystruct) \
EVRPC_STRUCT(rpcname) { \
struct reqstruct* request; \
struct rplystruct* reply; \
struct evrpc* rpc; \
void (*done)(struct evrpc* rpc, void *request, void *reply); \
+}; \
+int evrpc_send_request_##rpcname(struct evrpc_pool *, \
+ struct reqstruct *, struct rplystruct *, \
+ void (*)(struct reqstruct *, struct rplystruct *, void *cbarg), \
+ void *);
+
+#define EVRPC_GENERATE(rpcname, reqstruct, rplystruct) \
+int evrpc_send_request_##rpcname(struct evrpc_pool *pool, \
+ struct reqstruct *request, struct rplystruct *reply, \
+ void (*cb)(struct reqstruct *, struct rplystruct *, void *cbarg), \
+ void *cbarg) { \
+ struct evrpc_request_wrapper *ctx; \
+ ctx = malloc(sizeof(struct evrpc_request_wrapper)); \
+ if (ctx == NULL) { \
+ (*(cb))(request, reply, cbarg); \
+ return (-1); \
+ } \
+ ctx->pool = pool; \
+ ctx->name = strdup(#rpcname); \
+ if (ctx->name == NULL) { \
+ free(ctx); \
+ (*(cb))(request, reply, cbarg); \
+ return (-1); \
+ } \
+ ctx->cb = (void (*)(void *, void *, void *))cb; \
+ ctx->cb_arg = cbarg; \
+ ctx->request = (void *)request; \
+ ctx->reply = (void *)reply; \
+ ctx->request_marshal = (void (*)(struct evbuffer *, void *))reqstruct##_marshal; \
+ ctx->reply_clear = (void (*)(void *))rplystruct##_clear; \
+ ctx->reply_unmarshal = (int (*)(void *, struct evbuffer *))rplystruct##_unmarshal; \
+ return (evrpc_make_request(ctx)); \
}
+
/*
* EVRPC_REQUEST_DONE is used to answer a request; the reply is expected
* to have been filled in. The request and reply pointers become invalid
@@ -144,4 +215,51 @@ struct evrpc_base *evrpc_init(struct evhttp *server);
int evrpc_register_rpc(struct evrpc_base *, struct evrpc *,
void (*)(struct evrpc_req_generic*, void *), void *);
+/*
+ * Client-side RPC support
+ */
+
+struct evrpc_pool;
+struct evhttp_connection;
+
+struct evrpc_request_wrapper {
+ TAILQ_ENTRY(evrpc_request_wrapper) next;
+
+ /* pool on which this rpc request is being made */
+ struct evrpc_pool *pool;
+
+ /* the name of the rpc */
+ char *name;
+
+ /* callback */
+ void (*cb)(void *request, void *reply, void *arg);
+ void *cb_arg;
+
+ void *request;
+ void *reply;
+
+ /* unmarshals the buffer into the proper request structure */
+ void (*request_marshal)(struct evbuffer *, void *);
+
+ /* removes all stored state in the reply */
+ void (*reply_clear)(void *);
+
+ /* marshals the reply into a buffer */
+ int (*reply_unmarshal)(void *, struct evbuffer*);
+};
+
+#define EVRPC_MAKE_REQUEST(name, request, reply, cb, cbarg) \
+ evrpc_send_request_##name(pool, request, reply, cb, cbarg)
+
+int evrpc_make_request(struct evrpc_request_wrapper *);
+
+/*
+ * a pool has a number of connections associated with it.
+ * rpc requests are always made via a pool.
+ */
+struct evrpc_pool *evrpc_pool_new();
+void evrpc_pool_free(struct evrpc_pool *);
+void evrpc_pool_add_connection(struct evrpc_pool *,
+ struct evhttp_connection *);
+
#endif /* _EVRPC_H_ */
diff --git a/http-internal.h b/http-internal.h
index 17015e1e..f6467f0e 100644
--- a/http-internal.h
+++ b/http-internal.h
@@ -65,11 +65,14 @@ struct evhttp_cb {
void *cbarg;
};
+/* both the http server as well as the rpc system need to queue connections */
+TAILQ_HEAD(evconq, evhttp_connection);
+
struct evhttp {
struct event bind_ev;
TAILQ_HEAD(httpcbq, evhttp_cb) callbacks;
- TAILQ_HEAD(evconq, evhttp_connection) connections;
+ struct evconq connections;
void (*gencb)(struct evhttp_request *req, void *);
void *gencbarg;
diff --git a/test/regress_rpc.c b/test/regress_rpc.c
index 59113bf5..7e4b3e38 100644
--- a/test/regress_rpc.c
+++ b/test/regress_rpc.c
@@ -84,7 +84,8 @@ http_setup(short *pport)
return (myhttp);
}
-EVRPC_DEFINE(Message, msg, kill);
+EVRPC_HEADER(Message, msg, kill);
+EVRPC_GENERATE(Message, msg, kill);
void
MessageCB(EVRPC_STRUCT(Message)* rpc, void *arg)