summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buffer.c18
-rw-r--r--bufferevent-internal.h5
-rw-r--r--bufferevent.c54
-rw-r--r--bufferevent_async.c6
-rw-r--r--bufferevent_filter.c15
-rw-r--r--bufferevent_openssl.c53
-rw-r--r--bufferevent_pair.c7
-rw-r--r--bufferevent_ratelim.c10
-rw-r--r--bufferevent_sock.c14
-rw-r--r--evbuffer-internal.h5
10 files changed, 128 insertions, 59 deletions
diff --git a/buffer.c b/buffer.c
index 7c35a69b..860ba0dc 100644
--- a/buffer.c
+++ b/buffer.c
@@ -3345,3 +3345,21 @@ evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb)
}
#endif
+int
+evbuffer_get_callbacks_(struct evbuffer *buffer, struct event_callback **cbs,
+ int max_cbs)
+{
+ int r = 0;
+ EVBUFFER_LOCK(buffer);
+ if (buffer->deferred_cbs) {
+ if (max_cbs < 1) {
+ r = -1;
+ goto done;
+ }
+ cbs[0] = &buffer->deferred;
+ r = 1;
+ }
+done:
+ EVBUFFER_UNLOCK(buffer);
+ return r;
+}
diff --git a/bufferevent-internal.h b/bufferevent-internal.h
index 63bf4708..ccfc7045 100644
--- a/bufferevent-internal.h
+++ b/bufferevent-internal.h
@@ -252,8 +252,11 @@ struct bufferevent_ops {
*/
int (*disable)(struct bufferevent *, short);
+ /** DOCUMENT */
+ void (*unlink)(struct bufferevent *);
+
/** Free any storage and deallocate any extra data or structures used
- in this implementation.
+ in this implementation. DOCUMENT
*/
void (*destruct)(struct bufferevent *);
diff --git a/bufferevent.c b/bufferevent.c
index 7c03ce90..b2bb0ac3 100644
--- a/bufferevent.c
+++ b/bufferevent.c
@@ -54,6 +54,7 @@
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
+#include "event-internal.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
@@ -61,7 +62,7 @@
#include "util-internal.h"
static void bufferevent_cancel_all_(struct bufferevent *bev);
-
+static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
void
bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
@@ -640,7 +641,9 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
{
struct bufferevent_private *bufev_private =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
- struct bufferevent *underlying;
+ int n_cbs = 0;
+#define MAX_CBS 16
+ struct event_callback *cbs[MAX_CBS];
EVUTIL_ASSERT(bufev_private->refcnt > 0);
@@ -649,6 +652,41 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
return 0;
}
+ if (bufev->be_ops->unlink)
+ bufev->be_ops->unlink(bufev);
+
+ /* Okay, we're out of references. Let's finalize this once all the
+ * callbacks are done running. */
+ cbs[0] = &bufev->ev_read.ev_evcallback;
+ cbs[1] = &bufev->ev_write.ev_evcallback;
+ cbs[2] = &bufev_private->deferred;
+ n_cbs = 3;
+ if (bufev_private->rate_limiting) {
+ struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
+ if (event_initialized(e))
+ cbs[n_cbs++] = &e->ev_evcallback;
+ }
+ n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
+ n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
+
+ event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
+ bufferevent_finalize_cb_);
+
+#undef MAX_CBS
+ BEV_UNLOCK(bufev);
+
+ return 1;
+}
+
+static void
+bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
+{
+ struct bufferevent *bufev = arg_;
+ struct bufferevent *underlying;
+ struct bufferevent_private *bufev_private =
+ EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
+
+ BEV_LOCK(bufev);
underlying = bufferevent_get_underlying(bufev);
/* Clean up the shared info */
@@ -665,17 +703,13 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
if (bufev_private->rate_limiting) {
if (bufev_private->rate_limiting->group)
bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
- if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event))
- event_del(&bufev_private->rate_limiting->refill_bucket_event);
- event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event);
mm_free(bufev_private->rate_limiting);
bufev_private->rate_limiting = NULL;
}
- event_debug_unassign(&bufev->ev_read);
- event_debug_unassign(&bufev->ev_write);
BEV_UNLOCK(bufev);
+
if (bufev_private->own_lock)
EVTHREAD_FREE_LOCK(bufev_private->lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
@@ -695,8 +729,6 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev)
*/
if (underlying)
bufferevent_decref_(underlying);
-
- return 1;
}
int
@@ -844,9 +876,9 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
void
bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
{
- evtimer_assign(&bev->ev_read, bev->ev_base,
+ event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
bufferevent_generic_read_timeout_cb, bev);
- evtimer_assign(&bev->ev_write, bev->ev_base,
+ event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
bufferevent_generic_write_timeout_cb, bev);
}
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 83b5c141..0152fd16 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -93,6 +93,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
evutil_offsetof(struct bufferevent_async, bev.bev),
be_async_enable,
be_async_disable,
+ NULL, /* Unlink */
be_async_destruct,
bufferevent_generic_adj_timeouts_,
be_async_flush,
@@ -384,11 +385,6 @@ be_async_destruct(struct bufferevent *bev)
/* XXXX possible double-close */
evutil_closesocket(fd);
}
- /* delete this in case non-blocking connect was used */
- if (event_initialized(&bev->ev_write)) {
- event_del(&bev->ev_write);
- bufferevent_del_generic_timeout_cbs_(bev);
- }
}
/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
diff --git a/bufferevent_filter.c b/bufferevent_filter.c
index 8a74f808..cc02230c 100644
--- a/bufferevent_filter.c
+++ b/bufferevent_filter.c
@@ -61,6 +61,7 @@
/* prototypes */
static int be_filter_enable(struct bufferevent *, short);
static int be_filter_disable(struct bufferevent *, short);
+static void be_filter_unlink(struct bufferevent *);
static void be_filter_destruct(struct bufferevent *);
static void be_filter_readcb(struct bufferevent *, void *);
@@ -99,6 +100,7 @@ const struct bufferevent_ops bufferevent_ops_filter = {
evutil_offsetof(struct bufferevent_filtered, bev.bev),
be_filter_enable,
be_filter_disable,
+ be_filter_unlink,
be_filter_destruct,
bufferevent_generic_adj_timeouts_,
be_filter_flush,
@@ -214,12 +216,10 @@ bufferevent_filter_new(struct bufferevent *underlying,
}
static void
-be_filter_destruct(struct bufferevent *bev)
+be_filter_unlink(struct bufferevent *bev)
{
struct bufferevent_filtered *bevf = upcast(bev);
EVUTIL_ASSERT(bevf);
- if (bevf->free_context)
- bevf->free_context(bevf->context);
if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
/* Yes, there is also a decref in bufferevent_decref_.
@@ -242,8 +242,15 @@ be_filter_destruct(struct bufferevent *bev)
BEV_SUSPEND_FILT_READ);
}
}
+}
- bufferevent_del_generic_timeout_cbs_(bev);
+static void
+be_filter_destruct(struct bufferevent *bev)
+{
+ struct bufferevent_filtered *bevf = upcast(bev);
+ EVUTIL_ASSERT(bevf);
+ if (bevf->free_context)
+ bevf->free_context(bevf->context);
}
static int
diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c
index 99ed5f8d..48c61c08 100644
--- a/bufferevent_openssl.c
+++ b/bufferevent_openssl.c
@@ -326,6 +326,7 @@ struct bufferevent_openssl {
static int be_openssl_enable(struct bufferevent *, short);
static int be_openssl_disable(struct bufferevent *, short);
+static void be_openssl_unlink(struct bufferevent *);
static void be_openssl_destruct(struct bufferevent *);
static int be_openssl_adj_timeouts(struct bufferevent *);
static int be_openssl_flush(struct bufferevent *bufev,
@@ -337,6 +338,7 @@ const struct bufferevent_ops bufferevent_ops_openssl = {
evutil_offsetof(struct bufferevent_openssl, bev.bev),
be_openssl_enable,
be_openssl_disable,
+ be_openssl_unlink,
be_openssl_destruct,
be_openssl_adj_timeouts,
be_openssl_flush,
@@ -977,9 +979,11 @@ set_open_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd)
event_del(&bev->ev_write);
}
event_assign(&bev->ev_read, bev->ev_base, fd,
- EV_READ|EV_PERSIST, be_openssl_readeventcb, bev_ssl);
+ EV_READ|EV_PERSIST|EV_FINALIZE,
+ be_openssl_readeventcb, bev_ssl);
event_assign(&bev->ev_write, bev->ev_base, fd,
- EV_WRITE|EV_PERSIST, be_openssl_writeeventcb, bev_ssl);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE,
+ be_openssl_writeeventcb, bev_ssl);
if (rpending)
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
if (wpending)
@@ -1079,9 +1083,11 @@ set_handshake_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd)
event_del(&bev->ev_write);
}
event_assign(&bev->ev_read, bev->ev_base, fd,
- EV_READ|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
+ EV_READ|EV_PERSIST|EV_FINALIZE,
+ be_openssl_handshakeeventcb, bev_ssl);
event_assign(&bev->ev_write, bev->ev_base, fd,
- EV_WRITE|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE,
+ be_openssl_handshakeeventcb, bev_ssl);
if (fd >= 0) {
r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read);
r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write);
@@ -1176,17 +1182,10 @@ be_openssl_disable(struct bufferevent *bev, short events)
}
static void
-be_openssl_destruct(struct bufferevent *bev)
+be_openssl_unlink(struct bufferevent *bev)
{
struct bufferevent_openssl *bev_ssl = upcast(bev);
- if (bev_ssl->underlying) {
- bufferevent_del_generic_timeout_cbs_(bev);
- } else {
- event_del(&bev->ev_read);
- event_del(&bev->ev_write);
- }
-
if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
if (bev_ssl->underlying) {
if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) {
@@ -1194,17 +1193,11 @@ be_openssl_destruct(struct bufferevent *bev)
"bufferevent with too few references");
} else {
bufferevent_free(bev_ssl->underlying);
- bev_ssl->underlying = NULL;
+ /* We still have a reference to it, since DOCUMENT. So we don't
+ * drop this. */
+ // bev_ssl->underlying = NULL;
}
- } else {
- evutil_socket_t fd = -1;
- BIO *bio = SSL_get_wbio(bev_ssl->ssl);
- if (bio)
- fd = BIO_get_fd(bio, NULL);
- if (fd >= 0)
- evutil_closesocket(fd);
}
- SSL_free(bev_ssl->ssl);
} else {
if (bev_ssl->underlying) {
if (bev_ssl->underlying->errorcb == be_openssl_eventcb)
@@ -1216,6 +1209,24 @@ be_openssl_destruct(struct bufferevent *bev)
}
}
+static void
+be_openssl_destruct(struct bufferevent *bev)
+{
+ struct bufferevent_openssl *bev_ssl = upcast(bev);
+
+ if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) {
+ if (! bev_ssl->underlying) {
+ evutil_socket_t fd = -1;
+ BIO *bio = SSL_get_wbio(bev_ssl->ssl);
+ if (bio)
+ fd = BIO_get_fd(bio, NULL);
+ if (fd >= 0)
+ evutil_closesocket(fd);
+ }
+ SSL_free(bev_ssl->ssl);
+ }
+}
+
static int
be_openssl_adj_timeouts(struct bufferevent *bev)
{
diff --git a/bufferevent_pair.c b/bufferevent_pair.c
index 16edad3d..4d467260 100644
--- a/bufferevent_pair.c
+++ b/bufferevent_pair.c
@@ -267,7 +267,7 @@ be_pair_disable(struct bufferevent *bev, short events)
}
static void
-be_pair_destruct(struct bufferevent *bev)
+be_pair_unlink(struct bufferevent *bev)
{
struct bufferevent_pair *bev_p = upcast(bev);
@@ -275,8 +275,6 @@ be_pair_destruct(struct bufferevent *bev)
bev_p->partner->partner = NULL;
bev_p->partner = NULL;
}
-
- bufferevent_del_generic_timeout_cbs_(bev);
}
static int
@@ -327,7 +325,8 @@ const struct bufferevent_ops bufferevent_ops_pair = {
evutil_offsetof(struct bufferevent_pair, bev.bev),
be_pair_enable,
be_pair_disable,
- be_pair_destruct,
+ be_pair_unlink,
+ NULL, /* be_pair_destruct, */
bufferevent_generic_adj_timeouts_,
be_pair_flush,
NULL, /* ctrl */
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c
index f7de86a9..28fc0356 100644
--- a/bufferevent_ratelim.c
+++ b/bufferevent_ratelim.c
@@ -609,8 +609,8 @@ bufferevent_set_rate_limit(struct bufferevent *bev,
EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
event_del(&rlim->refill_bucket_event);
}
- evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
- bev_refill_callback_, bevp);
+ event_assign(&rlim->refill_bucket_event, bev->ev_base,
+ -1, EV_FINALIZE, bev_refill_callback_, bevp);
if (rlim->limit.read_limit > 0) {
bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
@@ -654,7 +654,7 @@ bufferevent_rate_limit_group_new(struct event_base *base,
ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
- event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
+ event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
bev_group_refill_callback_, g);
/*XXXX handle event_add failure */
event_add(&g->master_refill_event, &cfg->tick_timeout);
@@ -748,8 +748,8 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
BEV_UNLOCK(bev);
return -1;
}
- evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
- bev_refill_callback_, bevp);
+ event_assign(&rlim->refill_bucket_event, bev->ev_base,
+ -1, EV_FINALIZE, bev_refill_callback_, bevp);
bevp->rate_limiting = rlim;
}
diff --git a/bufferevent_sock.c b/bufferevent_sock.c
index 592be3a8..5ce4953b 100644
--- a/bufferevent_sock.c
+++ b/bufferevent_sock.c
@@ -90,6 +90,7 @@ const struct bufferevent_ops bufferevent_ops_socket = {
evutil_offsetof(struct bufferevent_private, bev),
be_socket_enable,
be_socket_disable,
+ NULL, /* unlink */
be_socket_destruct,
be_socket_adj_timeouts,
be_socket_flush,
@@ -338,9 +339,9 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,
evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
- EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
+ EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
@@ -399,7 +400,7 @@ bufferevent_socket_connect(struct bufferevent *bev,
* on a non-blocking connect() when ConnectEx() is unavailable. */
if (BEV_IS_ASYNC(bev)) {
event_assign(&bev->ev_write, bev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev);
}
#endif
bufferevent_setfd(bev, fd);
@@ -589,9 +590,6 @@ be_socket_destruct(struct bufferevent *bufev)
fd = event_get_fd(&bufev->ev_read);
- event_del(&bufev->ev_read);
- event_del(&bufev->ev_write);
-
if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0)
EVUTIL_CLOSESOCKET(fd);
}
@@ -637,9 +635,9 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd)
event_del(&bufev->ev_write);
event_assign(&bufev->ev_read, bufev->ev_base, fd,
- EV_READ|EV_PERSIST, bufferevent_readcb, bufev);
+ EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
- EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev);
+ EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
if (fd >= 0)
bufferevent_enable(bufev, bufev->enabled);
diff --git a/evbuffer-internal.h b/evbuffer-internal.h
index 91124338..fb67ec09 100644
--- a/evbuffer-internal.h
+++ b/evbuffer-internal.h
@@ -327,6 +327,11 @@ void evbuffer_set_parent_(struct evbuffer *buf, struct bufferevent *bev);
void evbuffer_invoke_callbacks_(struct evbuffer *buf);
+
+int evbuffer_get_callbacks_(struct evbuffer *buffer,
+ struct event_callback **cbs,
+ int max_cbs);
+
#ifdef __cplusplus
}
#endif