diff options
Diffstat (limited to 'bufferevent_ratelim.c')
-rw-r--r-- | bufferevent_ratelim.c | 237 |
1 files changed, 189 insertions, 48 deletions
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c index 78d0c430..44d18129 100644 --- a/bufferevent_ratelim.c +++ b/bufferevent_ratelim.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2007-2010 Niels Provos and Nick Mathewson + * Copyright (c) 2007-2011 Niels Provos and Nick Mathewson * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> * All rights reserved. * @@ -25,6 +25,7 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "evconfig-private.h" #include <sys/types.h> #include <limits.h> @@ -43,6 +44,7 @@ #include "bufferevent-internal.h" #include "mm-internal.h" #include "util-internal.h" +#include "event-internal.h" int ev_token_bucket_init(struct ev_token_bucket *bucket, @@ -56,9 +58,9 @@ ev_token_bucket_init(struct ev_token_bucket *bucket, leave "last_updated" as it is; the next update will add the appropriate amount of bandwidth to the bucket. */ - if (bucket->read_limit > cfg->read_maximum) + if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) bucket->read_limit = cfg->read_maximum; - if (bucket->write_limit > cfg->write_maximum) + if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) bucket->write_limit = cfg->write_maximum; } else { bucket->read_limit = cfg->read_rate; @@ -138,8 +140,8 @@ ev_token_bucket_get_tick(const struct timeval *tv, } struct ev_token_bucket_cfg * -ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst, - ev_uint32_t write_rate, ev_uint32_t write_burst, +ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, + size_t write_rate, size_t write_burst, const struct timeval *tick_len) { struct ev_token_bucket_cfg *r; @@ -152,6 +154,11 @@ ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst, if (read_rate > read_burst || write_rate > write_burst || read_rate < 1 || write_rate < 1) return NULL; + if (read_rate > EV_RATE_LIMIT_MAX || + write_rate > EV_RATE_LIMIT_MAX || + read_burst > EV_RATE_LIMIT_MAX || + write_burst > EV_RATE_LIMIT_MAX) + return NULL; r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); if (!r) return NULL; @@ -160,7 +167,8 @@ ev_token_bucket_cfg_new(ev_uint32_t read_rate, ev_uint32_t read_burst, r->read_maximum = read_burst; r->write_maximum = write_burst; memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); - r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000; + r->msec_per_tick = (tick_len->tv_sec * 1000) + + (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; return r; } @@ -170,28 +178,27 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) mm_free(cfg); } -/* No matter how big our bucket gets, don't try to read more than this - * much in a single read operation. */ -#define MAX_TO_READ_EVER 16384 -/* No matter how big our bucket gets, don't try to write more than this - * much in a single write operation. */ -#define MAX_TO_WRITE_EVER 16384 +/* Default values for max_single_read & max_single_write variables. */ +#define MAX_SINGLE_READ_DEFAULT 16384 +#define MAX_SINGLE_WRITE_DEFAULT 16384 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); +static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g); +static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g); /** Helper: figure out the maximum amount we should write if is_write, or the maximum amount we should read if is_read. Return that maximum, or 0 if our bucket is wholly exhausted. */ -static inline int +static inline ev_ssize_t _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) { /* needs lock on bev. */ - int max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; + ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; #define LIM(x) \ (is_write ? (x).write_limit : (x).read_limit) @@ -221,7 +228,7 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) if (bev->rate_limiting->group) { struct bufferevent_rate_limit_group *g = bev->rate_limiting->group; - ev_uint32_t share; + ev_ssize_t share; LOCK_GROUP(g); if (GROUP_SUSPENDED(g)) { /* We can get here if we failed to lock this @@ -245,23 +252,25 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) CLAMPTO(share); } + if (max_so_far < 0) + max_so_far = 0; return max_so_far; } -int +ev_ssize_t _bufferevent_get_read_max(struct bufferevent_private *bev) { return _bufferevent_get_rlim_max(bev, 0); } -int +ev_ssize_t _bufferevent_get_write_max(struct bufferevent_private *bev) { return _bufferevent_get_rlim_max(bev, 1); } int -_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes) +_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) { /* XXXXX Make sure all users of this function check its return value */ int r = 0; @@ -276,6 +285,10 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes) if (event_add(&bev->rate_limiting->refill_bucket_event, &bev->rate_limiting->cfg->tick_timeout) < 0) r = -1; + } else if (bev->read_suspended & BEV_SUSPEND_BW) { + if (!(bev->write_suspended & BEV_SUSPEND_BW)) + event_del(&bev->rate_limiting->refill_bucket_event); + bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); } } @@ -285,6 +298,8 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes) bev->rate_limiting->group->total_read += bytes; if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { _bev_group_suspend_reading(bev->rate_limiting->group); + } else if (bev->rate_limiting->group->read_suspended) { + _bev_group_unsuspend_reading(bev->rate_limiting->group); } UNLOCK_GROUP(bev->rate_limiting->group); } @@ -293,7 +308,7 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, int bytes) } int -_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes) +_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) { /* XXXXX Make sure all users of this function check its return value */ int r = 0; @@ -308,6 +323,10 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes) if (event_add(&bev->rate_limiting->refill_bucket_event, &bev->rate_limiting->cfg->tick_timeout) < 0) r = -1; + } else if (bev->write_suspended & BEV_SUSPEND_BW) { + if (!(bev->read_suspended & BEV_SUSPEND_BW)) + event_del(&bev->rate_limiting->refill_bucket_event); + bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); } } @@ -317,6 +336,8 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, int bytes) bev->rate_limiting->group->total_written += bytes; if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { _bev_group_suspend_writing(bev->rate_limiting->group); + } else if (bev->rate_limiting->group->write_suspended) { + _bev_group_unsuspend_writing(bev->rate_limiting->group); } UNLOCK_GROUP(bev->rate_limiting->group); } @@ -511,6 +532,7 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); LOCK_GROUP(g); + tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); @@ -541,15 +563,19 @@ bufferevent_set_rate_limit(struct bufferevent *bev, struct bufferevent_rate_limit *rlim; struct timeval now; ev_uint32_t tick; + int reinit = 0, suspended = 0; /* XXX reference-count cfg */ BEV_LOCK(bev); if (cfg == NULL) { if (bevp->rate_limiting) { - bevp->rate_limiting->cfg = NULL; + rlim = bevp->rate_limiting; + rlim->cfg = NULL; bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + if (event_initialized(&rlim->refill_bucket_event)) + event_del(&rlim->refill_bucket_event); } r = 0; goto done; @@ -559,29 +585,48 @@ bufferevent_set_rate_limit(struct bufferevent *bev, tick = ev_token_bucket_get_tick(&now, cfg); if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { - ; - } else if (bevp->rate_limiting) { - bevp->rate_limiting->cfg = cfg; - ev_token_bucket_init(&bevp->rate_limiting->limit, cfg, tick, 1); - if (bevp->rate_limiting->limit.read_limit > 0) - bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); - else - bufferevent_suspend_read(bev, BEV_SUSPEND_BW); - if (bevp->rate_limiting->limit.write_limit > 0) - bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); - else - bufferevent_suspend_write(bev, BEV_SUSPEND_BW); - } else { + /* no-op */ + r = 0; + goto done; + } + if (bevp->rate_limiting == NULL) { rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); if (!rlim) goto done; - rlim->cfg = cfg; - ev_token_bucket_init(&rlim->limit, cfg, tick, 0); - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - _bev_refill_callback, bevp); bevp->rate_limiting = rlim; + } else { + rlim = bevp->rate_limiting; + } + reinit = rlim->cfg != NULL; + + rlim->cfg = cfg; + ev_token_bucket_init(&rlim->limit, cfg, tick, reinit); + + if (reinit) { + EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); + event_del(&rlim->refill_bucket_event); + } + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, + _bev_refill_callback, bevp); + + if (rlim->limit.read_limit > 0) { + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + } else { + bufferevent_suspend_read(bev, BEV_SUSPEND_BW); + suspended=1; + } + if (rlim->limit.write_limit > 0) { + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + } else { + bufferevent_suspend_write(bev, BEV_SUSPEND_BW); + suspended = 1; } + + if (suspended) + event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); + r = 0; + done: BEV_UNLOCK(bev); return r; @@ -606,13 +651,15 @@ bufferevent_rate_limit_group_new(struct event_base *base, ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); - g->min_share = 64; event_assign(&g->master_refill_event, base, -1, EV_PERSIST, _bev_group_refill_callback, g); /*XXXX handle event_add failure */ event_add(&g->master_refill_event, &cfg->tick_timeout); EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); + + bufferevent_rate_limit_group_set_min_share(g, 64); + return g; } @@ -630,9 +677,9 @@ bufferevent_rate_limit_group_set_cfg( &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); - if (g->rate_limit.read_limit > cfg->read_maximum) + if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) g->rate_limit.read_limit = cfg->read_maximum; - if (g->rate_limit.write_limit > cfg->write_maximum) + if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) g->rate_limit.write_limit = cfg->write_maximum; if (!same_tick) { @@ -640,10 +687,33 @@ bufferevent_rate_limit_group_set_cfg( event_add(&g->master_refill_event, &cfg->tick_timeout); } + /* The new limits might force us to adjust min_share differently. */ + bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); + UNLOCK_GROUP(g); return 0; } +int +bufferevent_rate_limit_group_set_min_share( + struct bufferevent_rate_limit_group *g, + size_t share) +{ + if (share > EV_SSIZE_MAX) + return -1; + + g->configured_min_share = share; + + /* Can't set share to less than the one-tick maximum. IOW, at steady + * state, at least one connection can go per tick. */ + if (share > g->rate_limit_cfg.read_rate) + share = g->rate_limit_cfg.read_rate; + if (share > g->rate_limit_cfg.write_rate) + share = g->rate_limit_cfg.write_rate; + + g->min_share = share; + return 0; +} void bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) @@ -706,6 +776,13 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) { + return bufferevent_remove_from_rate_limit_group_internal(bev, 1); +} + +int +bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, + int unsuspend) +{ struct bufferevent_private *bevp = EVUTIL_UPCAST(bev, struct bufferevent_private, bev); BEV_LOCK(bev); @@ -718,8 +795,10 @@ bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) LIST_REMOVE(bevp, rate_limiting->next_in_group); UNLOCK_GROUP(g); } - bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); - bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); + if (unsuspend) { + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); + } BEV_UNLOCK(bev); return 0; } @@ -769,6 +848,56 @@ bufferevent_get_write_limit(struct bufferevent *bev) return r; } +int +bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) +{ + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + if (size == 0 || size > EV_SSIZE_MAX) + bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; + else + bevp->max_single_read = size; + BEV_UNLOCK(bev); + return 0; +} + +int +bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) +{ + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + if (size == 0 || size > EV_SSIZE_MAX) + bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; + else + bevp->max_single_write = size; + BEV_UNLOCK(bev); + return 0; +} + +ev_ssize_t +bufferevent_get_max_single_read(struct bufferevent *bev) +{ + ev_ssize_t r; + + BEV_LOCK(bev); + r = BEV_UPCAST(bev)->max_single_read; + BEV_UNLOCK(bev); + return r; +} + +ev_ssize_t +bufferevent_get_max_single_write(struct bufferevent *bev) +{ + ev_ssize_t r; + + BEV_LOCK(bev); + r = BEV_UPCAST(bev)->max_single_write; + BEV_UNLOCK(bev); + return r; +} + ev_ssize_t bufferevent_get_max_to_read(struct bufferevent *bev) { @@ -820,7 +949,7 @@ int bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) { int r = 0; - ev_int32_t old_limit, new_limit; + ev_ssize_t old_limit, new_limit; struct bufferevent_private *bevp; BEV_LOCK(bev); bevp = BEV_UPCAST(bev); @@ -834,7 +963,8 @@ bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) &bevp->rate_limiting->cfg->tick_timeout) < 0) r = -1; } else if (old_limit <= 0 && new_limit > 0) { - event_del(&bevp->rate_limiting->refill_bucket_event); + if (!(bevp->write_suspended & BEV_SUSPEND_BW)) + event_del(&bevp->rate_limiting->refill_bucket_event); bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); } @@ -848,7 +978,7 @@ bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) /* XXXX this is mostly copy-and-paste from * bufferevent_decrement_read_limit */ int r = 0; - ev_int32_t old_limit, new_limit; + ev_ssize_t old_limit, new_limit; struct bufferevent_private *bevp; BEV_LOCK(bev); bevp = BEV_UPCAST(bev); @@ -862,7 +992,8 @@ bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) &bevp->rate_limiting->cfg->tick_timeout) < 0) r = -1; } else if (old_limit <= 0 && new_limit > 0) { - event_del(&bevp->rate_limiting->refill_bucket_event); + if (!(bevp->read_suspended & BEV_SUSPEND_BW)) + event_del(&bevp->rate_limiting->refill_bucket_event); bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); } @@ -875,7 +1006,7 @@ bufferevent_rate_limit_group_decrement_read( struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) { int r = 0; - ev_int32_t old_limit, new_limit; + ev_ssize_t old_limit, new_limit; LOCK_GROUP(grp); old_limit = grp->rate_limit.read_limit; new_limit = (grp->rate_limit.read_limit -= decr); @@ -895,7 +1026,7 @@ bufferevent_rate_limit_group_decrement_write( struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) { int r = 0; - ev_int32_t old_limit, new_limit; + ev_ssize_t old_limit, new_limit; LOCK_GROUP(grp); old_limit = grp->rate_limit.write_limit; new_limit = (grp->rate_limit.write_limit -= decr); @@ -926,3 +1057,13 @@ bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *g { grp->total_read = grp->total_written = 0; } + +int +_bufferevent_ratelim_init(struct bufferevent_private *bev) +{ + bev->rate_limiting = NULL; + bev->max_single_read = MAX_SINGLE_READ_DEFAULT; + bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; + + return 0; +} |