summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOndřej Kuzník <ondra@mistotebe.net>2013-12-03 22:49:57 +0000
committerOndřej Kuzník <ondra@mistotebe.net>2013-12-03 23:39:13 +0000
commit61ee18b8b1d2ac0025955b3f949531c712fb7527 (patch)
tree78b005fea5cec5daebe28d38ad1b74a5f3b3b3af
parent4ce242bd0087ed3f6d36c64d0d15094d8a6fc9fc (diff)
downloadlibevent-61ee18b8b1d2ac0025955b3f949531c712fb7527.tar.gz
Add an option to trigger bufferevent I/O callbacks
-rw-r--r--bufferevent-internal.h10
-rw-r--r--bufferevent.c55
-rw-r--r--bufferevent_async.c7
-rw-r--r--bufferevent_filter.c10
-rw-r--r--bufferevent_openssl.c13
-rw-r--r--bufferevent_pair.c13
-rw-r--r--bufferevent_sock.c8
-rw-r--r--include/event2/bufferevent.h26
-rw-r--r--test/regress_bufferevent.c127
9 files changed, 213 insertions, 56 deletions
diff --git a/bufferevent-internal.h b/bufferevent-internal.h
index 0c4df871..0fa690b9 100644
--- a/bufferevent-internal.h
+++ b/bufferevent-internal.h
@@ -341,14 +341,20 @@ int bufferevent_decref_and_unlock_(struct bufferevent *bufev);
/** Internal: If callbacks are deferred and we have a read callback, schedule
* a readcb. Otherwise just run the readcb. */
-void bufferevent_run_readcb_(struct bufferevent *bufev);
+void bufferevent_run_readcb_(struct bufferevent *bufev, int options);
/** Internal: If callbacks are deferred and we have a write callback, schedule
* a writecb. Otherwise just run the writecb. */
-void bufferevent_run_writecb_(struct bufferevent *bufev);
+void bufferevent_run_writecb_(struct bufferevent *bufev, int options);
/** Internal: If callbacks are deferred and we have an eventcb, schedule
* it to run with events "what". Otherwise just run the eventcb. */
void bufferevent_run_eventcb_(struct bufferevent *bufev, short what);
+/** Internal: Run or schedule (if deferred or options contain
+ * BEV_TRIG_DEFER_CALLBACKS) I/O callbacks specified in iotype.
+ * Must already hold the bufev lock. */
+void bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options);
+
+
/** Internal: Add the event 'ev' with timeout tv, unless tv is set to 0, in
* which case add ev with no timeout. */
int bufferevent_add_event_(struct event *ev, const struct timeval *tv);
diff --git a/bufferevent.c b/bufferevent.c
index 96ce109f..fd95941b 100644
--- a/bufferevent.c
+++ b/bufferevent.c
@@ -219,14 +219,15 @@ bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg
void
-bufferevent_run_readcb_(struct bufferevent *bufev)
+bufferevent_run_readcb_(struct bufferevent *bufev, int options)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->readcb == NULL)
return;
- if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+ if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
+ (options & BEV_TRIG_DEFER_CALLBACKS)) {
p->readcb_pending = 1;
SCHEDULE_DEFERRED(p);
} else {
@@ -235,14 +236,15 @@ bufferevent_run_readcb_(struct bufferevent *bufev)
}
void
-bufferevent_run_writecb_(struct bufferevent *bufev)
+bufferevent_run_writecb_(struct bufferevent *bufev, int options)
{
/* Requires that we hold the lock and a reference */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (bufev->writecb == NULL)
return;
- if (p->options & BEV_OPT_DEFER_CALLBACKS) {
+ if ((p->options & BEV_OPT_DEFER_CALLBACKS) ||
+ (options & BEV_TRIG_DEFER_CALLBACKS)) {
p->writecb_pending = 1;
SCHEDULE_DEFERRED(p);
} else {
@@ -251,6 +253,25 @@ bufferevent_run_writecb_(struct bufferevent *bufev)
}
void
+bufferevent_trigger_nolock_(struct bufferevent *bufev, short iotype, int options)
+{
+ if ((iotype & EV_READ) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
+ evbuffer_get_length(bufev->input) >= bufev->wm_read.low))
+ bufferevent_run_readcb_(bufev, options);
+ if ((iotype & EV_WRITE) && ((options & BEV_TRIG_IGNORE_WATERMARKS) ||
+ evbuffer_get_length(bufev->output) <= bufev->wm_write.low))
+ bufferevent_run_writecb_(bufev, options);
+}
+
+void
+bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
+{
+ bufferevent_incref_and_lock_(bufev);
+ bufferevent_trigger_nolock_(bufev, iotype, options);
+ bufferevent_decref_and_unlock_(bufev);
+}
+
+void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what)
{
/* Requires that we hold the lock and a reference */
@@ -322,20 +343,18 @@ bufferevent_init_common_(struct bufferevent_private *bufev_private,
event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
return -1;
}
- if (options & BEV_OPT_DEFER_CALLBACKS) {
- if (options & BEV_OPT_UNLOCK_CALLBACKS)
- event_deferred_cb_init_(
- &bufev_private->deferred,
- event_base_get_npriorities(base) / 2,
- bufferevent_run_deferred_callbacks_unlocked,
- bufev_private);
- else
- event_deferred_cb_init_(
- &bufev_private->deferred,
- event_base_get_npriorities(base) / 2,
- bufferevent_run_deferred_callbacks_locked,
- bufev_private);
- }
+ if (options & BEV_OPT_UNLOCK_CALLBACKS)
+ event_deferred_cb_init_(
+ &bufev_private->deferred,
+ event_base_get_npriorities(base) / 2,
+ bufferevent_run_deferred_callbacks_unlocked,
+ bufev_private);
+ else
+ event_deferred_cb_init_(
+ &bufev_private->deferred,
+ event_base_get_npriorities(base) / 2,
+ bufferevent_run_deferred_callbacks_locked,
+ bufev_private);
bufev_private->options = options;
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 0152fd16..4e686479 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -458,8 +458,7 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
- bufferevent_run_readcb_(bev);
+ bufferevent_trigger_nolock_(bev, EV_READ, 0);
bev_async_consider_reading(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
@@ -502,9 +501,7 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- if (evbuffer_get_length(bev->output) <=
- bev->wm_write.low)
- bufferevent_run_writecb_(bev);
+ bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
bev_async_consider_writing(bev_a);
} else if (!ok) {
what |= BEV_EVENT_ERROR;
diff --git a/bufferevent_filter.c b/bufferevent_filter.c
index cc02230c..cb1c0097 100644
--- a/bufferevent_filter.c
+++ b/bufferevent_filter.c
@@ -376,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
/* Or if we have filled the underlying output buffer. */
!be_underlying_writebuf_full(bevf,state));
- if (processed &&
- evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
+ if (processed) {
/* call the write callback.*/
- bufferevent_run_writecb_(bufev);
+ bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
if (res == BEV_OK &&
(bufev->enabled & EV_WRITE) &&
@@ -442,9 +441,8 @@ be_filter_readcb(struct bufferevent *underlying, void *me_)
/* XXX This should be in process_input, not here. There are
* other places that can call process-input, and they should
* force readcb calls as needed. */
- if (processed_any &&
- evbuffer_get_length(bufev->input) >= bufev->wm_read.low)
- bufferevent_run_readcb_(bufev);
+ if (processed_any)
+ bufferevent_trigger_nolock_(bufev, EV_READ, 0);
bufferevent_decref_and_unlock_(bufev);
}
diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c
index 1ce124f9..ed9e4a3d 100644
--- a/bufferevent_openssl.c
+++ b/bufferevent_openssl.c
@@ -709,8 +709,7 @@ do_write(struct bufferevent_openssl *bev_ssl, int atmost)
if (bev_ssl->underlying)
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- if (evbuffer_get_length(output) <= bev->wm_write.low)
- bufferevent_run_writecb_(bev);
+ bufferevent_trigger_nolock_(bev, EV_WRITE, 0);
}
return result;
}
@@ -824,11 +823,8 @@ consider_reading(struct bufferevent_openssl *bev_ssl)
if (all_result_flags & OP_MADE_PROGRESS) {
struct bufferevent *bev = &bev_ssl->bev.bev;
- struct evbuffer *input = bev->input;
- if (evbuffer_get_length(input) >= bev->wm_read.low) {
- bufferevent_run_readcb_(bev);
- }
+ bufferevent_trigger_nolock_(bev, EV_READ, 0);
}
if (!bev_ssl->underlying) {
@@ -852,11 +848,8 @@ consider_writing(struct bufferevent_openssl *bev_ssl)
r = do_read(bev_ssl, 1024); /* XXXX 1024 is a hack */
if (r & OP_MADE_PROGRESS) {
struct bufferevent *bev = &bev_ssl->bev.bev;
- struct evbuffer *input = bev->input;
- if (evbuffer_get_length(input) >= bev->wm_read.low) {
- bufferevent_run_readcb_(bev);
- }
+ bufferevent_trigger_nolock_(bev, EV_READ, 0);
}
if (r & (OP_ERR|OP_BLOCKED))
break;
diff --git a/bufferevent_pair.c b/bufferevent_pair.c
index 4d467260..eb3da3e3 100644
--- a/bufferevent_pair.c
+++ b/bufferevent_pair.c
@@ -151,7 +151,7 @@ static void
be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
int ignore_wm)
{
- size_t src_size, dst_size;
+ size_t dst_size;
size_t n;
evbuffer_unfreeze(src->output, 1);
@@ -182,15 +182,8 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
}
- src_size = evbuffer_get_length(src->output);
- dst_size = evbuffer_get_length(dst->input);
-
- if (dst_size >= dst->wm_read.low) {
- bufferevent_run_readcb_(dst);
- }
- if (src_size <= src->wm_write.low) {
- bufferevent_run_writecb_(src);
- }
+ bufferevent_trigger_nolock_(dst, EV_READ, 0);
+ bufferevent_trigger_nolock_(src, EV_WRITE, 0);
done:
evbuffer_freeze(src->output, 1);
evbuffer_freeze(dst->input, 0);
diff --git a/bufferevent_sock.c b/bufferevent_sock.c
index 5ce4953b..82983ed7 100644
--- a/bufferevent_sock.c
+++ b/bufferevent_sock.c
@@ -184,8 +184,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
bufferevent_decrement_read_buckets_(bufev_p, res);
/* Invoke the user callback - must always be called last */
- if (evbuffer_get_length(input) >= bufev->wm_read.low)
- bufferevent_run_readcb_(bufev);
+ bufferevent_trigger_nolock_(bufev, EV_READ, 0);
goto done;
@@ -294,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
* Invoke the user callback if our buffer is drained or below the
* low watermark.
*/
- if ((res || !connected) &&
- evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
- bufferevent_run_writecb_(bufev);
+ if (res || !connected) {
+ bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
}
goto done;
diff --git a/include/event2/bufferevent.h b/include/event2/bufferevent.h
index efe0617b..af6f7cde 100644
--- a/include/event2/bufferevent.h
+++ b/include/event2/bufferevent.h
@@ -559,6 +559,32 @@ int bufferevent_flush(struct bufferevent *bufev,
enum bufferevent_flush_mode mode);
/**
+ Flags for bufferevent_trigger(_event) that modify when and how to trigger
+ the callback.
+*/
+enum bufferevent_trigger_options {
+ /** trigger the callback regardless of the watermarks */
+ BEV_TRIG_IGNORE_WATERMARKS = (1<<0),
+
+ /** defer even if the callbacks are not */
+ BEV_TRIG_DEFER_CALLBACKS = (1<<1),
+};
+
+/**
+ Triggers bufferevent data callbacks.
+
+ The function will honor watermarks unless options contain
+ BEV_TRIG_IGNORE_WATERMARKS. If the options contain BEV_OPT_DEFER_CALLBACKS,
+ the callbacks are deferred.
+
+ @param bufev the bufferevent object
+ @param iotype either EV_READ or EV_WRITE or both.
+ @param options
+ */
+void bufferevent_trigger(struct bufferevent *bufev, short iotype,
+ int options);
+
+/**
@name Filtering support
@{
diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c
index 89c405bf..874d6018 100644
--- a/test/regress_bufferevent.c
+++ b/test/regress_bufferevent.c
@@ -447,6 +447,7 @@ sender_errorcb(struct bufferevent *bev, short what, void *ctx)
}
static int bufferevent_connect_test_flags = 0;
+static int bufferevent_trigger_test_flags = 0;
static int n_strings_read = 0;
static int n_reads_invoked = 0;
@@ -812,6 +813,122 @@ end:
bufferevent_free(bev2);
}
+static void
+trigger_failure_cb(evutil_socket_t fd, short what, void *ctx)
+{
+ TT_FAIL(("The triggered callback did not fire or the machine is really slow (try increasing timeout)."));
+}
+
+static void
+trigger_readcb_triggered(struct bufferevent *bev, void *ctx)
+{
+ struct event_base *base = ctx;
+
+ TT_BLATHER(("Read successfully triggered."));
+ n_reads_invoked++;
+ event_base_loopexit(base, NULL);
+}
+
+static void
+trigger_readcb(struct bufferevent *bev, void *ctx)
+{
+ struct timeval timeout = { 30, 0 };
+ struct event_base *base = ctx;
+ size_t low, high, len;
+ int expected_reads;
+
+ TT_BLATHER(("Read invoked on %d.", (int)bufferevent_getfd(bev)));
+ expected_reads = ++n_reads_invoked;
+
+ bufferevent_setcb(bev, trigger_readcb_triggered, NULL, reader_eventcb, ctx);
+
+ bufferevent_getwatermark(bev, EV_READ, &low, &high);
+ len = evbuffer_get_length(bufferevent_get_input(bev));
+
+ bufferevent_setwatermark(bev, EV_READ, len + 1, 0);
+ bufferevent_trigger(bev, EV_READ, bufferevent_trigger_test_flags);
+ /* no callback expected */
+ tt_int_op(n_reads_invoked, ==, expected_reads);
+
+ if ((bufferevent_trigger_test_flags & BEV_TRIG_DEFER_CALLBACKS) ||
+ (bufferevent_connect_test_flags & BEV_OPT_DEFER_CALLBACKS)) {
+ /* will be deferred */
+ } else {
+ expected_reads++;
+ }
+
+ event_base_once(base, -1, EV_TIMEOUT, trigger_failure_cb, NULL, &timeout);
+
+ bufferevent_trigger(bev, EV_READ,
+ bufferevent_trigger_test_flags | BEV_TRIG_IGNORE_WATERMARKS);
+ tt_int_op(n_reads_invoked, ==, expected_reads);
+
+ bufferevent_setwatermark(bev, EV_READ, low, high);
+end:
+ ;
+}
+
+static void
+test_bufferevent_trigger(void *arg)
+{
+ struct basic_test_data *data = arg;
+ struct evconnlistener *lev=NULL;
+ struct bufferevent *bev=NULL;
+ struct sockaddr_in localhost;
+ struct sockaddr_storage ss;
+ struct sockaddr *sa;
+ ev_socklen_t slen;
+
+ int be_flags=BEV_OPT_CLOSE_ON_FREE;
+ int trig_flags=0;
+
+ if (strstr((char*)data->setup_data, "defer")) {
+ be_flags |= BEV_OPT_DEFER_CALLBACKS;
+ }
+ bufferevent_connect_test_flags = be_flags;
+
+ if (strstr((char*)data->setup_data, "postpone")) {
+ trig_flags |= BEV_TRIG_DEFER_CALLBACKS;
+ }
+ bufferevent_trigger_test_flags = trig_flags;
+
+ memset(&localhost, 0, sizeof(localhost));
+
+ localhost.sin_port = 0; /* pick-a-port */
+ localhost.sin_addr.s_addr = htonl(0x7f000001L);
+ localhost.sin_family = AF_INET;
+ sa = (struct sockaddr *)&localhost;
+ lev = evconnlistener_new_bind(data->base, listen_cb, data->base,
+ LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE,
+ 16, sa, sizeof(localhost));
+ tt_assert(lev);
+
+ sa = (struct sockaddr *)&ss;
+ slen = sizeof(ss);
+ if (regress_get_listener_addr(lev, sa, &slen) < 0) {
+ tt_abort_perror("getsockname");
+ }
+
+ tt_assert(!evconnlistener_enable(lev));
+ bev = bufferevent_socket_new(data->base, -1, be_flags);
+ tt_assert(bev);
+ bufferevent_setcb(bev, trigger_readcb, NULL, reader_eventcb, data->base);
+
+ bufferevent_enable(bev, EV_READ);
+
+ tt_want(!bufferevent_socket_connect(bev, sa, sizeof(localhost)));
+
+ event_base_dispatch(data->base);
+
+ tt_int_op(n_reads_invoked, ==, 2);
+end:
+ if (lev)
+ evconnlistener_free(lev);
+
+ if (bev)
+ bufferevent_free(bev);
+}
+
struct testcase_t bufferevent_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED),
@@ -842,6 +959,16 @@ struct testcase_t bufferevent_testcases[] = {
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter" },
{ "bufferevent_timeout_filter_pair", test_bufferevent_timeouts,
TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"filter pair" },
+ { "bufferevent_trigger", test_bufferevent_trigger, TT_FORK|TT_NEED_BASE,
+ &basic_setup, (void*)"" },
+ { "bufferevent_trigger_defer", test_bufferevent_trigger,
+ TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
+ { "bufferevent_trigger_postpone", test_bufferevent_trigger,
+ TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
+ (void*)"postpone" },
+ { "bufferevent_trigger_defer_postpone", test_bufferevent_trigger,
+ TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
+ (void*)"defer postpone" },
#ifdef EVENT__HAVE_LIBZ
LEGACY(bufferevent_zlib, TT_ISOLATED),
#else