summaryrefslogtreecommitdiff
path: root/bufferevent_pair.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2009-04-10 15:01:31 +0000
committerNick Mathewson <nickm@torproject.org>2009-04-10 15:01:31 +0000
commit23085c92477035507c499530d4a200bceee5d8a1 (patch)
treec24947a8b1af1cf0c7b1beacc158f6f1a276a3f4 /bufferevent_pair.c
parent8161662007e4ffe8c71e3f8267733b2131d8d619 (diff)
downloadlibevent-23085c92477035507c499530d4a200bceee5d8a1.tar.gz
Add a linked-pair abstraction to bufferevents.
The new bufferevent_pair abstraction works like a set of buferevent_sockets connected by a socketpair, except that it doesn't require a socketpair, and therefore doesn't need to get the kernel involved. It's also a good way to make sure that deferred callbacks work. It's a good use case for deferred callbacks: before I implemented them, the recursive relationship between the evbuffer callback and the read callback would make the unit tests overflow the stack. svn:r1152
Diffstat (limited to 'bufferevent_pair.c')
-rw-r--r--bufferevent_pair.c276
1 files changed, 276 insertions, 0 deletions
diff --git a/bufferevent_pair.c b/bufferevent_pair.c
new file mode 100644
index 00000000..5e12ae8a
--- /dev/null
+++ b/bufferevent_pair.c
@@ -0,0 +1,276 @@
+/*
+ * Copyright (c) 2009 Niels Provos, Nick Mathewson
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
+ * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
+ * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
+ * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
+ * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <sys/types.h>
+#include <assert.h>
+
+#ifdef HAVE_CONFIG_H
+#include "event-config.h"
+#endif
+
+#include "event2/util.h"
+#include "event2/buffer.h"
+#include "event2/bufferevent.h"
+#include "event2/bufferevent_struct.h"
+#include "event2/event.h"
+#include "defer-internal.h"
+#include "bufferevent-internal.h"
+#include "mm-internal.h"
+
+struct bufferevent_pair {
+ struct bufferevent bev;
+ struct bufferevent_pair *partner;
+ struct deferred_cb deferred_write_cb;
+ struct deferred_cb deferred_read_cb;
+};
+
+
+/* Given a bufferevent that's really a bev part of a bufferevent_pair,
+ * return that bufferevent_filtered. Returns NULL otherwise.*/
+static inline struct bufferevent_pair *
+upcast(struct bufferevent *bev)
+{
+ struct bufferevent_pair *bev_p;
+ if (bev->be_ops != &bufferevent_ops_pair)
+ return NULL;
+ bev_p = (void*)( ((char*)bev) -
+ evutil_offsetof(struct bufferevent_pair, bev) );
+ assert(bev_p->bev.be_ops == &bufferevent_ops_pair);
+ return bev_p;
+}
+
+#define downcast(bev_pair) (&(bev_pair)->bev)
+
+/* XXX Handle close */
+
+static void be_pair_outbuf_cb(struct evbuffer *,
+ const struct evbuffer_cb_info *, void *);
+
+static void
+run_callback(struct deferred_cb *cb, void *arg)
+{
+ struct bufferevent_pair *bufev = arg;
+ struct bufferevent *bev = downcast(bufev);
+
+ if (cb == &bufev->deferred_read_cb) {
+ if (bev->readcb) {
+ bev->readcb(bev, bev->cbarg);
+ }
+ } else {
+ if (bev->writecb) {
+ bev->writecb(bev, bev->cbarg);
+ }
+ }
+}
+
+static struct bufferevent_pair *
+bufferevent_pair_elt_new(struct event_base *base,
+ enum bufferevent_options options)
+{
+ struct bufferevent_pair *bufev;
+ if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
+ return NULL;
+ if (bufferevent_init_common(&bufev->bev, base, &bufferevent_ops_pair,
+ options)) {
+ mm_free(bufev);
+ return NULL;
+ }
+ /* XXX set read timeout event */
+ /* XXX set write timeout event */
+ if (!evbuffer_add_cb(bufev->bev.output, be_pair_outbuf_cb, bufev)) {
+ bufferevent_free(downcast(bufev));
+ return NULL;
+ }
+ event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev);
+ event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev);
+
+ return bufev;
+}
+
+int
+bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
+ struct bufferevent *pair[2])
+{
+ struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
+
+ bufev1 = bufferevent_pair_elt_new(base, options);
+ if (!bufev1)
+ return -1;
+ bufev2 = bufferevent_pair_elt_new(base, options);
+ if (!bufev2) {
+ bufferevent_free(downcast(bufev1));
+ return -1;
+ }
+
+ bufev1->partner = bufev2;
+ bufev2->partner = bufev1;
+
+ pair[0] = downcast(bufev1);
+ pair[1] = downcast(bufev2);
+
+ return 0;
+}
+
+static void
+be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
+ int ignore_wm)
+{
+ size_t src_size, dst_size;
+ size_t n;
+
+ if (dst->wm_read.high) {
+ size_t dst_size = evbuffer_get_length(dst->input);
+ if (dst_size < dst->wm_read.high) {
+ n = dst->wm_read.high - dst_size;
+ evbuffer_remove_buffer(src->output, dst->input, n);
+ } else {
+ if (!ignore_wm)
+ return;
+ evbuffer_add_buffer(dst->input, src->output);
+ }
+ } else {
+ evbuffer_add_buffer(dst->input, src->output);
+ }
+
+ src_size = evbuffer_get_length(src->output);
+ dst_size = evbuffer_get_length(dst->input);
+
+ if (dst_size >= dst->wm_read.low && dst->readcb) {
+ event_deferred_cb_schedule(dst->ev_base,
+ &(upcast(dst)->deferred_read_cb));
+ }
+ if (src_size <= src->wm_write.low && src->writecb) {
+ event_deferred_cb_schedule(src->ev_base,
+ &(upcast(src)->deferred_write_cb));
+ }
+}
+
+static inline int
+be_pair_wants_to_talk(struct bufferevent *src, struct bufferevent *dst)
+{
+ return (src->enabled & EV_WRITE) &&
+ (dst->enabled & EV_READ) && !dst->read_suspended &&
+ evbuffer_get_length(src->output);
+}
+
+static void
+be_pair_outbuf_cb(struct evbuffer *outbuf,
+ const struct evbuffer_cb_info *info, void *arg)
+{
+ struct bufferevent_pair *bev_pair = arg;
+ struct bufferevent_pair *partner = bev_pair->partner;
+
+ if (info->n_added > info->n_deleted && partner) {
+ /* We got more data. If the other side's reading, then
+ hand it over. */
+ if (be_pair_wants_to_talk(downcast(bev_pair),
+ downcast(partner))) {
+ be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
+ }
+ }
+}
+
+static int
+be_pair_enable(struct bufferevent *bufev, short events)
+{
+ struct bufferevent_pair *bev_p = upcast(bufev);
+ struct bufferevent_pair *partner = bev_p->partner;
+
+ /* We're starting to read! Does the other side have anything to write?*/
+ if ((events & EV_READ) && partner &&
+ be_pair_wants_to_talk(downcast(partner), bufev)) {
+ be_pair_transfer(downcast(partner), bufev, 0);
+ }
+ /* We're starting to write! Does the other side want to read? */
+ if ((events & EV_WRITE) && partner &&
+ be_pair_wants_to_talk(bufev, downcast(partner))) {
+ be_pair_transfer(bufev, downcast(partner), 0);
+ }
+ return 0;
+}
+
+static int
+be_pair_disable(struct bufferevent *bev, short events)
+{
+ return 0;
+}
+
+static void
+be_pair_destruct(struct bufferevent *bev)
+{
+ struct bufferevent_pair *bev_p = upcast(bev);
+
+ if (bev_p->partner) {
+ bev_p->partner->partner = NULL;
+ bev_p->partner = NULL;
+ }
+ event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb);
+ event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb);
+}
+
+static void
+be_pair_adj_timeouts(struct bufferevent *bev)
+{
+ /* TODO: implement. */
+}
+
+static int
+be_pair_flush(struct bufferevent *bev, short iotype,
+ enum bufferevent_flush_mode mode)
+{
+ struct bufferevent_pair *bev_p = upcast(bev);
+ struct bufferevent *partner;
+ if (!bev_p->partner)
+ return -1;
+
+ partner = downcast(bev_p->partner);
+
+ if (mode == BEV_NORMAL)
+ return 0;
+
+ if ((iotype & EV_READ) != 0)
+ be_pair_transfer(partner, bev, 1);
+
+ if ((iotype & EV_WRITE) != 0)
+ be_pair_transfer(bev, partner, 1);
+
+ if (mode == BEV_FINISHED) {
+ if (partner->errorcb)
+ (*partner->errorcb)(partner,
+ iotype|EVBUFFER_EOF, partner->cbarg);
+ }
+ return 0;
+}
+
+const struct bufferevent_ops bufferevent_ops_pair = {
+ "pair_elt",
+ evutil_offsetof(struct bufferevent_pair, bev),
+ be_pair_enable,
+ be_pair_disable,
+ be_pair_destruct,
+ be_pair_adj_timeouts,
+ be_pair_flush,
+};