diff options
-rw-r--r-- | buffer.c | 114 | ||||
-rw-r--r-- | bufferevent.c | 12 | ||||
-rw-r--r-- | bufferevent_filter.c | 6 | ||||
-rw-r--r-- | bufferevent_sock.c | 5 | ||||
-rw-r--r-- | evbuffer-internal.h | 10 | ||||
-rw-r--r-- | include/event2/buffer.h | 18 | ||||
-rw-r--r-- | include/event2/buffer_compat.h | 22 | ||||
-rw-r--r-- | test/regress_buffer.c | 10 |
8 files changed, 144 insertions, 53 deletions
@@ -112,10 +112,16 @@ static int use_mmap = 1; #define EVBUFFER_CB_USER_FLAGS 0xffff /* Mask of all internal-use-only flags. */ #define EVBUFFER_CB_INTERNAL_FLAGS 0xffff0000 + +#if 0 /* Flag set on suspended callbacks. */ #define EVBUFFER_CB_SUSPENDED 0x00010000 /* Flag set if we should invoke the callback on wakeup. */ #define EVBUFFER_CB_CALL_ON_UNSUSPEND 0x00020000 +#endif + +/* Flag set if the callback is using the cb_obsolete function pointer */ +#define EVBUFFER_CB_OBSOLETE 0x00040000 /* evbuffer_chain support */ #define CHAIN_SPACE_PTR(ch) ((ch)->buffer + (ch)->misalign + (ch)->off) @@ -225,13 +231,25 @@ evbuffer_new(void) } static inline void -evbuffer_invoke_callbacks(struct evbuffer *buffer, size_t old_size) +evbuffer_invoke_callbacks(struct evbuffer *buffer) { struct evbuffer_cb_entry *cbent, *next; - size_t new_size = buffer->total_len; - if (TAILQ_EMPTY(&buffer->callbacks) || old_size == new_size) + struct evbuffer_cb_info info; + size_t new_size; + + if (TAILQ_EMPTY(&buffer->callbacks)) { + buffer->n_add_for_cb = buffer->n_del_for_cb = 0; return; + } + if (buffer->n_add_for_cb == 0 && buffer->n_del_for_cb == 0) + return; + new_size = buffer->total_len; + info.orig_size = new_size + buffer->n_del_for_cb - buffer->n_add_for_cb; + info.n_added = buffer->n_add_for_cb; + info.n_deleted = buffer->n_del_for_cb; + buffer->n_add_for_cb = 0; + buffer->n_del_for_cb = 0; for (cbent = TAILQ_FIRST(&buffer->callbacks); cbent != TAILQ_END(&buffer->callbacks); @@ -240,11 +258,16 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer, size_t old_size) * to remove itself or something. */ next = TAILQ_NEXT(cbent, next); if ((cbent->flags & EVBUFFER_CB_ENABLED)) { +#if 0 if ((cbent->flags & EVBUFFER_CB_SUSPENDED)) cbent->flags |= EVBUFFER_CB_CALL_ON_UNSUSPEND; else - cbent->cb(buffer, - old_size, new_size, cbent->cbarg); +#endif + if ((cbent->flags & EVBUFFER_CB_OBSOLETE)) + cbent->cb.cb_obsolete(buffer, + info.orig_size, new_size, cbent->cbarg); + else + cbent->cb.cb_func(buffer, &info, cbent->cbarg); } } } @@ -347,8 +370,8 @@ evbuffer_commit_space(struct evbuffer *buf, size_t size) int evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) { - size_t out_total_len = outbuf->total_len; size_t in_total_len = inbuf->total_len; + size_t out_total_len = outbuf->total_len; if (in_total_len == 0 || outbuf == inbuf) return (0); @@ -361,9 +384,11 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) /* remove everything from inbuf */ ZERO_CHAIN(inbuf); + inbuf->n_del_for_cb += in_total_len; + outbuf->n_add_for_cb += in_total_len; - evbuffer_invoke_callbacks(inbuf, in_total_len); - evbuffer_invoke_callbacks(outbuf, out_total_len); + evbuffer_invoke_callbacks(inbuf); + evbuffer_invoke_callbacks(outbuf); return (0); } @@ -371,8 +396,8 @@ evbuffer_add_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) void evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) { - size_t out_total_len = outbuf->total_len; size_t in_total_len = inbuf->total_len; + size_t out_total_len = outbuf->total_len; if (!in_total_len || inbuf == outbuf) return; @@ -385,21 +410,24 @@ evbuffer_prepend_buffer(struct evbuffer *outbuf, struct evbuffer *inbuf) /* remove everything from inbuf */ ZERO_CHAIN(inbuf); + inbuf->n_del_for_cb += in_total_len; + outbuf->n_add_for_cb += in_total_len; - evbuffer_invoke_callbacks(inbuf, in_total_len); - evbuffer_invoke_callbacks(outbuf, out_total_len); + evbuffer_invoke_callbacks(inbuf); + evbuffer_invoke_callbacks(outbuf); } void evbuffer_drain(struct evbuffer *buf, size_t len) { struct evbuffer_chain *chain, *next; - size_t old_len = buf->total_len; + size_t old_len = buf->total_len; if (old_len == 0) return; if (len >= old_len) { + len = old_len; for (chain = buf->first; chain != NULL; chain = next) { next = chain->next; @@ -424,8 +452,9 @@ evbuffer_drain(struct evbuffer *buf, size_t len) chain->off -= len; } + buf->n_del_for_cb += len; /* Tell someone about changes in this buffer */ - evbuffer_invoke_callbacks(buf, old_len); + evbuffer_invoke_callbacks(buf); } /* Reads data from an event buffer and drains the bytes read */ @@ -470,8 +499,9 @@ evbuffer_remove(struct evbuffer *buf, void *data_out, size_t datlen) buf->total_len -= nread; + buf->n_del_for_cb += nread; if (nread) - evbuffer_invoke_callbacks(buf, buf->total_len + nread); + evbuffer_invoke_callbacks(buf); return (nread); } @@ -523,6 +553,7 @@ evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, src->previous_to_last = NULL; dst->total_len += nread; + dst->n_add_for_cb += nread; } /* we know that there is more data in the src buffer than @@ -533,10 +564,11 @@ evbuffer_remove_buffer(struct evbuffer *src, struct evbuffer *dst, nread += datlen; src->total_len -= nread; + src->n_del_for_cb += nread; if (nread) { - evbuffer_invoke_callbacks(dst, dst->total_len - nread); - evbuffer_invoke_callbacks(src, src->total_len + nread); + evbuffer_invoke_callbacks(dst); + evbuffer_invoke_callbacks(src); } return (nread); @@ -795,7 +827,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) { struct evbuffer_chain *chain = buf->last, *tmp; const unsigned char *data = data_in; - size_t old_len = buf->total_len, remain, to_alloc; + size_t remain, to_alloc; /* If there are no chains allocated for this buffer, allocate one * big enough to hold all the data. */ @@ -814,6 +846,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) data, datlen); chain->off += datlen; buf->total_len += datlen; + buf->n_add_for_cb += datlen; goto out; } else if (chain->misalign >= datlen) { /* we can fit the data into the misalignment */ @@ -825,6 +858,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) memcpy(chain->buffer + chain->off, data, datlen); chain->off += datlen; buf->total_len += datlen; + buf->n_add_for_cb += datlen; goto out; } } else { @@ -847,6 +881,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) data, remain); chain->off += remain; buf->total_len += remain; + buf->n_add_for_cb += remain; } data += remain; @@ -857,7 +892,7 @@ evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) evbuffer_chain_insert(buf, tmp); out: - evbuffer_invoke_callbacks(buf, old_len); + evbuffer_invoke_callbacks(buf); return (0); } @@ -866,7 +901,6 @@ int evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) { struct evbuffer_chain *chain = buf->first, *tmp; - size_t old_len = buf->total_len; if (chain == NULL) { if (evbuffer_expand(buf, datlen) == -1) @@ -884,6 +918,7 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) chain->off += datlen; chain->misalign -= datlen; buf->total_len += datlen; + buf->n_add_for_cb += datlen; goto out; } else if (chain->misalign) { memcpy(chain->buffer, @@ -891,6 +926,7 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) chain->misalign); chain->off += chain->misalign; buf->total_len += chain->misalign; + buf->n_add_for_cb += chain->misalign; datlen -= chain->misalign; chain->misalign = 0; } @@ -909,9 +945,10 @@ evbuffer_prepend(struct evbuffer *buf, const void *data, size_t datlen) memcpy(tmp->buffer + tmp->misalign, data, datlen); buf->total_len += datlen; + buf->n_add_for_cb += chain->misalign; out: - evbuffer_invoke_callbacks(buf, old_len); + evbuffer_invoke_callbacks(buf); return (0); } @@ -1083,7 +1120,6 @@ int evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) { struct evbuffer_chain *chain = buf->last; - size_t old_len = buf->total_len; int n = EVBUFFER_MAX_READ; #ifdef USE_IOVEC_IMPL int nvecs; @@ -1214,9 +1250,10 @@ evbuffer_read(struct evbuffer *buf, evutil_socket_t fd, int howmuch) chain->off += n; #endif buf->total_len += n; + buf->n_add_for_cb += n; /* Tell someone about changes in this buffer */ - evbuffer_invoke_callbacks(buf, old_len); + evbuffer_invoke_callbacks(buf); return (n); } @@ -1482,7 +1519,6 @@ evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap) { char *buffer; size_t space; - size_t old_len = buf->total_len; int sz; va_list aq; @@ -1511,8 +1547,9 @@ evbuffer_add_vprintf(struct evbuffer *buf, const char *fmt, va_list ap) if (sz < space) { chain->off += sz; buf->total_len += sz; + buf->n_add_for_cb += sz; - evbuffer_invoke_callbacks(buf, old_len); + evbuffer_invoke_callbacks(buf); return (sz); } if (evbuffer_expand(buf, sz + 1) == -1) @@ -1540,7 +1577,6 @@ evbuffer_add_reference(struct evbuffer *outbuf, const void *data, size_t datlen, void (*cleanupfn)(void *extra), void *extra) { - size_t old_len = outbuf->total_len; struct evbuffer_chain *chain = evbuffer_chain_new(sizeof(struct evbuffer_chain_reference)); struct evbuffer_chain_reference *info; @@ -1557,8 +1593,9 @@ evbuffer_add_reference(struct evbuffer *outbuf, info->extra = extra; evbuffer_chain_insert(outbuf, chain); + outbuf->n_add_for_cb += datlen; - evbuffer_invoke_callbacks(outbuf, old_len); + evbuffer_invoke_callbacks(outbuf); return (0); } @@ -1574,8 +1611,6 @@ int evbuffer_add_file(struct evbuffer *outbuf, int fd, off_t offset, size_t length) { - size_t old_len = outbuf->total_len; - #if defined(USE_SENDFILE) || defined(_EVENT_HAVE_MMAP) struct evbuffer_chain *chain; struct evbuffer_chain_fd *info; @@ -1598,6 +1633,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain); info->fd = fd; + outbuf->n_add_for_cb += length; evbuffer_chain_insert(outbuf, chain); } else #endif @@ -1635,6 +1671,8 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, info = EVBUFFER_CHAIN_EXTRA(struct evbuffer_chain_fd, chain); info->fd = fd; + outbuf->n_add_for_cb += length; + evbuffer_chain_insert(outbuf, chain); /* we need to subtract whatever we don't need */ @@ -1673,7 +1711,7 @@ evbuffer_add_file(struct evbuffer *outbuf, int fd, close(fd); } - evbuffer_invoke_callbacks(outbuf, old_len); + evbuffer_invoke_callbacks(outbuf); return (0); } @@ -1685,17 +1723,21 @@ evbuffer_setcb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg) if (!TAILQ_EMPTY(&buffer->callbacks)) evbuffer_remove_all_callbacks(buffer); - if (cb) - evbuffer_add_cb(buffer, cb, cbarg); + if (cb) { + struct evbuffer_cb_entry *ent = + evbuffer_add_cb(buffer, NULL, cbarg); + ent->cb.cb_obsolete = cb; + ent->flags |= EVBUFFER_CB_OBSOLETE; + } } struct evbuffer_cb_entry * -evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg) +evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg) { struct evbuffer_cb_entry *e; if (! (e = mm_calloc(1, sizeof(struct evbuffer_cb_entry)))) return NULL; - e->cb = cb; + e->cb.cb_func = cb; e->cbarg = cbarg; e->flags = EVBUFFER_CB_ENABLED; TAILQ_INSERT_HEAD(&buffer->callbacks, e, next); @@ -1712,11 +1754,11 @@ evbuffer_remove_cb_entry(struct evbuffer *buffer, } int -evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg) +evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg) { struct evbuffer_cb_entry *cbent; TAILQ_FOREACH(cbent, &buffer->callbacks, next) { - if (cb == cbent->cb && cbarg == cbent->cbarg) { + if (cb == cbent->cb.cb_func && cbarg == cbent->cbarg) { return evbuffer_remove_cb_entry(buffer, cbent); } } @@ -1732,6 +1774,7 @@ evbuffer_cb_set_flags(struct evbuffer *buffer, return 0; } +#if 0 void evbuffer_cb_suspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb) { @@ -1755,3 +1798,4 @@ evbuffer_cb_unsuspend(struct evbuffer *buffer, struct evbuffer_cb_entry *cb) } } } +#endif diff --git a/bufferevent.c b/bufferevent.c index a0dd4b83..6fe9f3f5 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -82,20 +82,22 @@ bufferevent_wm_unsuspend_read(struct bufferevent *bufev) /* Callback to implement watermarks on the input buffer. Only enabled * if the watermark is set. */ static void -bufferevent_inbuf_wm_cb(struct evbuffer *buf, size_t old, size_t now, - void *arg) +bufferevent_inbuf_wm_cb(struct evbuffer *buf, + const struct evbuffer_cb_info *cbinfo, + void *arg) { struct bufferevent *bufev = arg; + size_t size = evbuffer_get_length(buf); - if (now > old) { + if (cbinfo->n_added > cbinfo->n_deleted) { /* Data got added. If it put us over the watermark, stop * reading. */ - if (now >= bufev->wm_read.high) + if (size >= bufev->wm_read.high) bufferevent_wm_suspend_read(bufev); } else { /* Data got removed. If it puts us under the watermark, stop reading. */ - if (now < bufev->wm_read.high) + if (size < bufev->wm_read.high) bufferevent_wm_unsuspend_read(bufev); } } diff --git a/bufferevent_filter.c b/bufferevent_filter.c index fd0f236f..ed3e4122 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -73,7 +73,7 @@ static int be_filter_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode); static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, - size_t old, size_t now, void *arg); + const struct evbuffer_cb_info *info, void *arg); struct bufferevent_filtered { struct bufferevent bev; @@ -354,11 +354,11 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* Called when the size of our outbuf changes. */ static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf, - size_t old, size_t now, void *arg) + const struct evbuffer_cb_info *cbinfo, void *arg) { struct bufferevent_filtered *bevf = arg; - if (now > old) { + if (cbinfo->n_added) { int processed_any = 0; /* Somebody added more data to the output buffer. Try to * process it, if we should. */ diff --git a/bufferevent_sock.c b/bufferevent_sock.c index a233f646..679204d0 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -91,11 +91,12 @@ be_socket_add(struct event *ev, const struct timeval *tv) static void bufferevent_socket_outbuf_cb(struct evbuffer *buf, - size_t old, size_t now, void *arg) + const struct evbuffer_cb_info *cbinfo, + void *arg) { struct bufferevent *bufev = arg; - if (now > old && + if (cbinfo->n_added && (bufev->enabled & EV_WRITE) && !event_pending(&bufev->ev_write, EV_WRITE, NULL)) { /* Somebody added data to the buffer, and we would like to diff --git a/evbuffer-internal.h b/evbuffer-internal.h index 7f87da62..2751a472 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -43,14 +43,19 @@ struct evbuffer_cb_entry { /** Structures to implement a doubly-linked queue of callbacks */ TAILQ_ENTRY(evbuffer_cb_entry) next; /** The callback function to invoke when this callback is called */ - evbuffer_cb cb; + union { + evbuffer_cb_func cb_func; + evbuffer_cb cb_obsolete; + } cb; /** Argument to pass to cb. */ void *cbarg; /** Currently set flags on this callback. */ ev_uint32_t flags; +#if 0 /** Size of the evbuffer before this callback was suspended, or 0 if this callback is not suspended. */ size_t size_before_suspend; +#endif }; struct evbuffer_chain; @@ -64,6 +69,9 @@ struct evbuffer { evbuffer_cb cb; void *cbarg; + size_t n_add_for_cb; + size_t n_del_for_cb; + TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; }; diff --git a/include/event2/buffer.h b/include/event2/buffer.h index b9c57668..b10d1535 100644 --- a/include/event2/buffer.h +++ b/include/event2/buffer.h @@ -397,6 +397,14 @@ int evbuffer_ptr_set(struct evbuffer *buffer, struct evbuffer_ptr *pos, size_t position, enum evbuffer_ptr_how how); +/** Structure passed to an evbuffer callback */ +struct evbuffer_cb_info { + /** The size of */ + size_t orig_size; + size_t n_added; + size_t n_deleted; +}; + /** Type definition for a callback that is invoked whenever data is added or removed from an evbuffer. @@ -413,11 +421,10 @@ evbuffer_ptr_set(struct evbuffer *buffer, struct evbuffer_ptr *pos, one: watch out! @param buffer the buffer whose size has changed - @param old_len the previous length of the buffer - @param new_len the current length of the buffer + @param info a structure describing how the buffer changed @param arg a pointer to user data */ -typedef void (*evbuffer_cb)(struct evbuffer *buffer, size_t old_len, size_t new_len, void *arg); +typedef void (*evbuffer_cb_func)(struct evbuffer *buffer, const struct evbuffer_cb_info *info, void *arg); struct evbuffer_cb_entry; /** Add a new callback to an evbuffer. @@ -431,7 +438,7 @@ struct evbuffer_cb_entry; @param cbarg an argument to be provided to the callback function @return a handle to the callback on success, or NULL on failure. */ -struct evbuffer_cb_entry *evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg); +struct evbuffer_cb_entry *evbuffer_add_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg); /** Remove a callback from an evbuffer, given a handle returned from evbuffer_add_cb. @@ -450,10 +457,11 @@ int evbuffer_remove_cb_entry(struct evbuffer *buffer, @return 0 if a callback was removed, or -1 if no matching callback was found. */ -int evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb cb, void *cbarg); +int evbuffer_remove_cb(struct evbuffer *buffer, evbuffer_cb_func cb, void *cbarg); #define EVBUFFER_CB_DISABLED 0 #define EVBUFFER_CB_ENABLED 1 + /** Change whether a given callback is enabled on a buffer or not. A disabled callback is not invoked even when the buffer size changes. diff --git a/include/event2/buffer_compat.h b/include/event2/buffer_compat.h index e77b81ed..88a4599c 100644 --- a/include/event2/buffer_compat.h +++ b/include/event2/buffer_compat.h @@ -22,6 +22,28 @@ */ char *evbuffer_readline(struct evbuffer *buffer); +/** Type definition for a callback that is invoked whenever data is added or + removed from an evbuffer. + + An evbuffer may have one or more callbacks set at a time. The order + in which they are exectuded is undefined. + + A callback function may add more callbacks, or remove itself from the + list of callbacks, or add or remove data from the buffer. It may not + remove another callback from the list. + + If a callback adds or removes data from the buffer or from another + buffer, this can cause a recursive invocation of your callback or + other callbacks. If you ask for an infinite loop, you might just get + one: watch out! + + @param buffer the buffer whose size has changed + @param old_len the previous length of the buffer + @param new_len the current length of the buffer + @param arg a pointer to user data +*/ +typedef void (*evbuffer_cb)(struct evbuffer *buffer, size_t old_len, size_t new_len, void *arg); + /** Replace all callbacks on an evbuffer with a single new callback, or remove them. diff --git a/test/regress_buffer.c b/test/regress_buffer.c index 1054d27e..135b933c 100644 --- a/test/regress_buffer.c +++ b/test/regress_buffer.c @@ -622,9 +622,13 @@ end: } static void -log_change_callback(struct evbuffer *buffer, size_t old_len, size_t new_len, - void *arg) +log_change_callback(struct evbuffer *buffer, + const struct evbuffer_cb_info *cbinfo, + void *arg) { + + size_t old_len = cbinfo->orig_size; + size_t new_len = old_len + cbinfo->n_added - cbinfo->n_deleted; struct evbuffer *out = arg; evbuffer_add_printf(out, "%lu->%lu; ", (unsigned long)old_len, (unsigned long)new_len); @@ -685,6 +689,7 @@ test_evbuffer_callbacks(void *ptr) evbuffer_drain(buf, EVBUFFER_LENGTH(buf)); +#if 0 /* Now let's try a suspended callback. */ cb1 = evbuffer_add_cb(buf, log_change_callback, buf_out1); cb2 = evbuffer_add_cb(buf, log_change_callback, buf_out2); @@ -702,6 +707,7 @@ test_evbuffer_callbacks(void *ptr) "0->11; 11->11; 11->0; "); tt_str_op(evbuffer_pullup(buf_out2, -1), ==, "0->15; 15->11; 11->0; "); +#endif end: if (buf) |