diff options
-rw-r--r-- | buffer.c | 18 | ||||
-rw-r--r-- | bufferevent-internal.h | 5 | ||||
-rw-r--r-- | bufferevent.c | 54 | ||||
-rw-r--r-- | bufferevent_async.c | 6 | ||||
-rw-r--r-- | bufferevent_filter.c | 15 | ||||
-rw-r--r-- | bufferevent_openssl.c | 53 | ||||
-rw-r--r-- | bufferevent_pair.c | 7 | ||||
-rw-r--r-- | bufferevent_ratelim.c | 10 | ||||
-rw-r--r-- | bufferevent_sock.c | 14 | ||||
-rw-r--r-- | evbuffer-internal.h | 5 |
10 files changed, 128 insertions, 59 deletions
@@ -3345,3 +3345,21 @@ evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb) } #endif +int +evbuffer_get_callbacks_(struct evbuffer *buffer, struct event_callback **cbs, + int max_cbs) +{ + int r = 0; + EVBUFFER_LOCK(buffer); + if (buffer->deferred_cbs) { + if (max_cbs < 1) { + r = -1; + goto done; + } + cbs[0] = &buffer->deferred; + r = 1; + } +done: + EVBUFFER_UNLOCK(buffer); + return r; +} diff --git a/bufferevent-internal.h b/bufferevent-internal.h index 63bf4708..ccfc7045 100644 --- a/bufferevent-internal.h +++ b/bufferevent-internal.h @@ -252,8 +252,11 @@ struct bufferevent_ops { */ int (*disable)(struct bufferevent *, short); + /** DOCUMENT */ + void (*unlink)(struct bufferevent *); + /** Free any storage and deallocate any extra data or structures used - in this implementation. + in this implementation. DOCUMENT */ void (*destruct)(struct bufferevent *); diff --git a/bufferevent.c b/bufferevent.c index 7c03ce90..b2bb0ac3 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -54,6 +54,7 @@ #include "event2/bufferevent_struct.h" #include "event2/bufferevent_compat.h" #include "event2/event.h" +#include "event-internal.h" #include "log-internal.h" #include "mm-internal.h" #include "bufferevent-internal.h" @@ -61,7 +62,7 @@ #include "util-internal.h" static void bufferevent_cancel_all_(struct bufferevent *bev); - +static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_); void bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what) @@ -640,7 +641,9 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) { struct bufferevent_private *bufev_private = EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); - struct bufferevent *underlying; + int n_cbs = 0; +#define MAX_CBS 16 + struct event_callback *cbs[MAX_CBS]; EVUTIL_ASSERT(bufev_private->refcnt > 0); @@ -649,6 +652,41 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) return 0; } + if (bufev->be_ops->unlink) + bufev->be_ops->unlink(bufev); + + /* Okay, we're out of references. Let's finalize this once all the + * callbacks are done running. */ + cbs[0] = &bufev->ev_read.ev_evcallback; + cbs[1] = &bufev->ev_write.ev_evcallback; + cbs[2] = &bufev_private->deferred; + n_cbs = 3; + if (bufev_private->rate_limiting) { + struct event *e = &bufev_private->rate_limiting->refill_bucket_event; + if (event_initialized(e)) + cbs[n_cbs++] = &e->ev_evcallback; + } + n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs); + n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs); + + event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs, + bufferevent_finalize_cb_); + +#undef MAX_CBS + BEV_UNLOCK(bufev); + + return 1; +} + +static void +bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_) +{ + struct bufferevent *bufev = arg_; + struct bufferevent *underlying; + struct bufferevent_private *bufev_private = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); + + BEV_LOCK(bufev); underlying = bufferevent_get_underlying(bufev); /* Clean up the shared info */ @@ -665,17 +703,13 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) if (bufev_private->rate_limiting) { if (bufev_private->rate_limiting->group) bufferevent_remove_from_rate_limit_group_internal_(bufev,0); - if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event)) - event_del(&bufev_private->rate_limiting->refill_bucket_event); - event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event); mm_free(bufev_private->rate_limiting); bufev_private->rate_limiting = NULL; } - event_debug_unassign(&bufev->ev_read); - event_debug_unassign(&bufev->ev_write); BEV_UNLOCK(bufev); + if (bufev_private->own_lock) EVTHREAD_FREE_LOCK(bufev_private->lock, EVTHREAD_LOCKTYPE_RECURSIVE); @@ -695,8 +729,6 @@ bufferevent_decref_and_unlock_(struct bufferevent *bufev) */ if (underlying) bufferevent_decref_(underlying); - - return 1; } int @@ -844,9 +876,9 @@ bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) void bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev) { - evtimer_assign(&bev->ev_read, bev->ev_base, + event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE, bufferevent_generic_read_timeout_cb, bev); - evtimer_assign(&bev->ev_write, bev->ev_base, + event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE, bufferevent_generic_write_timeout_cb, bev); } diff --git a/bufferevent_async.c b/bufferevent_async.c index 83b5c141..0152fd16 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -93,6 +93,7 @@ const struct bufferevent_ops bufferevent_ops_async = { evutil_offsetof(struct bufferevent_async, bev.bev), be_async_enable, be_async_disable, + NULL, /* Unlink */ be_async_destruct, bufferevent_generic_adj_timeouts_, be_async_flush, @@ -384,11 +385,6 @@ be_async_destruct(struct bufferevent *bev) /* XXXX possible double-close */ evutil_closesocket(fd); } - /* delete this in case non-blocking connect was used */ - if (event_initialized(&bev->ev_write)) { - event_del(&bev->ev_write); - bufferevent_del_generic_timeout_cbs_(bev); - } } /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 8a74f808..cc02230c 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -61,6 +61,7 @@ /* prototypes */ static int be_filter_enable(struct bufferevent *, short); static int be_filter_disable(struct bufferevent *, short); +static void be_filter_unlink(struct bufferevent *); static void be_filter_destruct(struct bufferevent *); static void be_filter_readcb(struct bufferevent *, void *); @@ -99,6 +100,7 @@ const struct bufferevent_ops bufferevent_ops_filter = { evutil_offsetof(struct bufferevent_filtered, bev.bev), be_filter_enable, be_filter_disable, + be_filter_unlink, be_filter_destruct, bufferevent_generic_adj_timeouts_, be_filter_flush, @@ -214,12 +216,10 @@ bufferevent_filter_new(struct bufferevent *underlying, } static void -be_filter_destruct(struct bufferevent *bev) +be_filter_unlink(struct bufferevent *bev) { struct bufferevent_filtered *bevf = upcast(bev); EVUTIL_ASSERT(bevf); - if (bevf->free_context) - bevf->free_context(bevf->context); if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { /* Yes, there is also a decref in bufferevent_decref_. @@ -242,8 +242,15 @@ be_filter_destruct(struct bufferevent *bev) BEV_SUSPEND_FILT_READ); } } +} - bufferevent_del_generic_timeout_cbs_(bev); +static void +be_filter_destruct(struct bufferevent *bev) +{ + struct bufferevent_filtered *bevf = upcast(bev); + EVUTIL_ASSERT(bevf); + if (bevf->free_context) + bevf->free_context(bevf->context); } static int diff --git a/bufferevent_openssl.c b/bufferevent_openssl.c index 99ed5f8d..48c61c08 100644 --- a/bufferevent_openssl.c +++ b/bufferevent_openssl.c @@ -326,6 +326,7 @@ struct bufferevent_openssl { static int be_openssl_enable(struct bufferevent *, short); static int be_openssl_disable(struct bufferevent *, short); +static void be_openssl_unlink(struct bufferevent *); static void be_openssl_destruct(struct bufferevent *); static int be_openssl_adj_timeouts(struct bufferevent *); static int be_openssl_flush(struct bufferevent *bufev, @@ -337,6 +338,7 @@ const struct bufferevent_ops bufferevent_ops_openssl = { evutil_offsetof(struct bufferevent_openssl, bev.bev), be_openssl_enable, be_openssl_disable, + be_openssl_unlink, be_openssl_destruct, be_openssl_adj_timeouts, be_openssl_flush, @@ -977,9 +979,11 @@ set_open_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd) event_del(&bev->ev_write); } event_assign(&bev->ev_read, bev->ev_base, fd, - EV_READ|EV_PERSIST, be_openssl_readeventcb, bev_ssl); + EV_READ|EV_PERSIST|EV_FINALIZE, + be_openssl_readeventcb, bev_ssl); event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, be_openssl_writeeventcb, bev_ssl); + EV_WRITE|EV_PERSIST|EV_FINALIZE, + be_openssl_writeeventcb, bev_ssl); if (rpending) r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read); if (wpending) @@ -1079,9 +1083,11 @@ set_handshake_callbacks(struct bufferevent_openssl *bev_ssl, evutil_socket_t fd) event_del(&bev->ev_write); } event_assign(&bev->ev_read, bev->ev_base, fd, - EV_READ|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl); + EV_READ|EV_PERSIST|EV_FINALIZE, + be_openssl_handshakeeventcb, bev_ssl); event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, be_openssl_handshakeeventcb, bev_ssl); + EV_WRITE|EV_PERSIST|EV_FINALIZE, + be_openssl_handshakeeventcb, bev_ssl); if (fd >= 0) { r1 = bufferevent_add_event_(&bev->ev_read, &bev->timeout_read); r2 = bufferevent_add_event_(&bev->ev_write, &bev->timeout_write); @@ -1176,17 +1182,10 @@ be_openssl_disable(struct bufferevent *bev, short events) } static void -be_openssl_destruct(struct bufferevent *bev) +be_openssl_unlink(struct bufferevent *bev) { struct bufferevent_openssl *bev_ssl = upcast(bev); - if (bev_ssl->underlying) { - bufferevent_del_generic_timeout_cbs_(bev); - } else { - event_del(&bev->ev_read); - event_del(&bev->ev_write); - } - if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) { if (bev_ssl->underlying) { if (BEV_UPCAST(bev_ssl->underlying)->refcnt < 2) { @@ -1194,17 +1193,11 @@ be_openssl_destruct(struct bufferevent *bev) "bufferevent with too few references"); } else { bufferevent_free(bev_ssl->underlying); - bev_ssl->underlying = NULL; + /* We still have a reference to it, since DOCUMENT. So we don't + * drop this. */ + // bev_ssl->underlying = NULL; } - } else { - evutil_socket_t fd = -1; - BIO *bio = SSL_get_wbio(bev_ssl->ssl); - if (bio) - fd = BIO_get_fd(bio, NULL); - if (fd >= 0) - evutil_closesocket(fd); } - SSL_free(bev_ssl->ssl); } else { if (bev_ssl->underlying) { if (bev_ssl->underlying->errorcb == be_openssl_eventcb) @@ -1216,6 +1209,24 @@ be_openssl_destruct(struct bufferevent *bev) } } +static void +be_openssl_destruct(struct bufferevent *bev) +{ + struct bufferevent_openssl *bev_ssl = upcast(bev); + + if (bev_ssl->bev.options & BEV_OPT_CLOSE_ON_FREE) { + if (! bev_ssl->underlying) { + evutil_socket_t fd = -1; + BIO *bio = SSL_get_wbio(bev_ssl->ssl); + if (bio) + fd = BIO_get_fd(bio, NULL); + if (fd >= 0) + evutil_closesocket(fd); + } + SSL_free(bev_ssl->ssl); + } +} + static int be_openssl_adj_timeouts(struct bufferevent *bev) { diff --git a/bufferevent_pair.c b/bufferevent_pair.c index 16edad3d..4d467260 100644 --- a/bufferevent_pair.c +++ b/bufferevent_pair.c @@ -267,7 +267,7 @@ be_pair_disable(struct bufferevent *bev, short events) } static void -be_pair_destruct(struct bufferevent *bev) +be_pair_unlink(struct bufferevent *bev) { struct bufferevent_pair *bev_p = upcast(bev); @@ -275,8 +275,6 @@ be_pair_destruct(struct bufferevent *bev) bev_p->partner->partner = NULL; bev_p->partner = NULL; } - - bufferevent_del_generic_timeout_cbs_(bev); } static int @@ -327,7 +325,8 @@ const struct bufferevent_ops bufferevent_ops_pair = { evutil_offsetof(struct bufferevent_pair, bev.bev), be_pair_enable, be_pair_disable, - be_pair_destruct, + be_pair_unlink, + NULL, /* be_pair_destruct, */ bufferevent_generic_adj_timeouts_, be_pair_flush, NULL, /* ctrl */ diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c index f7de86a9..28fc0356 100644 --- a/bufferevent_ratelim.c +++ b/bufferevent_ratelim.c @@ -609,8 +609,8 @@ bufferevent_set_rate_limit(struct bufferevent *bev, EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); event_del(&rlim->refill_bucket_event); } - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - bev_refill_callback_, bevp); + event_assign(&rlim->refill_bucket_event, bev->ev_base, + -1, EV_FINALIZE, bev_refill_callback_, bevp); if (rlim->limit.read_limit > 0) { bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); @@ -654,7 +654,7 @@ bufferevent_rate_limit_group_new(struct event_base *base, ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); - event_assign(&g->master_refill_event, base, -1, EV_PERSIST, + event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, bev_group_refill_callback_, g); /*XXXX handle event_add failure */ event_add(&g->master_refill_event, &cfg->tick_timeout); @@ -748,8 +748,8 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, BEV_UNLOCK(bev); return -1; } - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - bev_refill_callback_, bevp); + event_assign(&rlim->refill_bucket_event, bev->ev_base, + -1, EV_FINALIZE, bev_refill_callback_, bevp); bevp->rate_limiting = rlim; } diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 592be3a8..5ce4953b 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -90,6 +90,7 @@ const struct bufferevent_ops bufferevent_ops_socket = { evutil_offsetof(struct bufferevent_private, bev), be_socket_enable, be_socket_disable, + NULL, /* unlink */ be_socket_destruct, be_socket_adj_timeouts, be_socket_flush, @@ -338,9 +339,9 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); event_assign(&bufev->ev_read, bufev->ev_base, fd, - EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); @@ -399,7 +400,7 @@ bufferevent_socket_connect(struct bufferevent *bev, * on a non-blocking connect() when ConnectEx() is unavailable. */ if (BEV_IS_ASYNC(bev)) { event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev); } #endif bufferevent_setfd(bev, fd); @@ -589,9 +590,6 @@ be_socket_destruct(struct bufferevent *bufev) fd = event_get_fd(&bufev->ev_read); - event_del(&bufev->ev_read); - event_del(&bufev->ev_write); - if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0) EVUTIL_CLOSESOCKET(fd); } @@ -637,9 +635,9 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) event_del(&bufev->ev_write); event_assign(&bufev->ev_read, bufev->ev_base, fd, - EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); if (fd >= 0) bufferevent_enable(bufev, bufev->enabled); diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 91124338..fb67ec09 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -327,6 +327,11 @@ void evbuffer_set_parent_(struct evbuffer *buf, struct bufferevent *bev); void evbuffer_invoke_callbacks_(struct evbuffer *buf); + +int evbuffer_get_callbacks_(struct evbuffer *buffer, + struct event_callback **cbs, + int max_cbs); + #ifdef __cplusplus } #endif |