diff options
author | Sage Weil <sage@inktank.com> | 2012-11-27 15:36:11 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-11-29 16:09:45 -0800 |
commit | f454bb591d64de6d7986ba099a4f9d1ecc599a55 (patch) | |
tree | 41a50424baf4b53f57c9cdb2d4b4b7d872d20e09 | |
parent | b97aaca3870c4f9628d5d310c63d1a816d3e513c (diff) | |
download | ceph-f454bb591d64de6d7986ba099a4f9d1ecc599a55.tar.gz |
msg/Pipe: refactor msgr delays
- move all delay state into a single class
- create thread once and only once per Pipe
- adjust debug levels
- discard messages at the appropriate times
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/msg/Pipe.cc | 104 | ||||
-rw-r--r-- | src/msg/Pipe.h | 52 |
2 files changed, 73 insertions, 83 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 609df8e7dd2..34ded27d77f 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -54,8 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) { Pipe::Pipe(SimpleMessenger *r, int st, Connection *con) : reader_thread(this), writer_thread(this), - dispatch_thread(NULL), delay_queue(NULL), delay_until(NULL), - delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true), + delay_thread(NULL), msgr(r), conn_id(r->dispatch_queue.get_id()), sd(-1), port(0), @@ -96,16 +95,7 @@ Pipe::~Pipe() if (connection_state) connection_state->put(); delete session_security; - if (dispatch_thread) { - delete dispatch_thread; - assert(delay_queue->empty()); - delete delay_queue; - assert(delay_until->empty()); - delete delay_until; - assert(!delay_lock->is_locked()); - delete delay_lock; - delete delay_cond; - } + delete delay_thread; } void Pipe::handle_ack(uint64_t seq) @@ -137,27 +127,15 @@ void Pipe::start_reader() } reader_running = true; reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes); - if (!dispatch_thread && - msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) - != string::npos) { + if (!delay_thread && + msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) { lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl; - dispatch_thread = new DelayedDelivery(this); - delay_queue = new std::deque< Message * >(); - delay_until = new std::deque< utime_t>(); - delay_lock = new Mutex("delay_lock"); - delay_cond = new Cond(); + delay_thread = new DelayedDelivery(this); + delay_thread->create(); } else lsubdout(msgr->cct, ms, 1) << "Pipe " << this << " peer is " << ceph_entity_type_name(connection_state->peer_type) << "; NOT injecting delays because it does not match " << msgr->cct->_conf->ms_inject_delay_type << dendl; - - if (dispatch_thread && stop_delayed_delivery) { - lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl; - delay_lock->Lock(); - stop_delayed_delivery = false; - dispatch_thread->create(); - delay_lock->Unlock(); - } } void Pipe::start_writer() @@ -183,44 +161,45 @@ void Pipe::join_reader() void Pipe::queue_received(Message *m, int priority) { assert(pipe_lock.is_locked()); - if (delay_queue) { - lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl; - Mutex::Locker locker(*delay_lock); - delay_queue->push_back(m); - utime_t delay = ceph_clock_now(msgr->cct); - delay += 1.0; - delay_until->push_back(delay); - delay_cond->Signal(); + + if (delay_thread) { + utime_t release; + if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) { + release = m->get_recv_stamp(); + release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0; + lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl; + } + delay_thread->queue(release, m); return; } + in_q->enqueue(m, priority, conn_id); } -void Pipe::delayed_delivery() { - Mutex::Locker locker(*delay_lock); +void *Pipe::DelayedDelivery::entry() +{ + Mutex::Locker locker(delay_lock); + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl; + while (!stop_delayed_delivery) { - if (delay_queue->empty()) { - lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl; - delay_cond->Wait(*delay_lock); + if (delay_queue.empty()) { + lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl; + delay_cond.Wait(delay_lock); continue; } - if (delay_until->front() > ceph_clock_now(msgr->cct)) { - lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond until message " << delay_queue->front() - << " delay passes" << dendl; - delay_cond->WaitUntil(*delay_lock, delay_until->front()); + utime_t release = delay_queue.front().first; + if (release > ceph_clock_now(pipe->msgr->cct)) { + lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl; + delay_cond.WaitUntil(delay_lock, release); continue; } - Message *m = delay_queue->front(); - lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delivery because delay until " - << delay_until->front() << " has passed" << dendl; - delay_queue->pop_front(); - delay_until->pop_front(); - in_q->enqueue(m, m->get_priority(), conn_id); - if (delay_queue->empty()) { - lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl; - delay_cond->Wait(*delay_lock); - } + Message *m = delay_queue.front().second; + lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl; + delay_queue.pop_front(); + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); } + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl; + return NULL; } int Pipe::accept() @@ -1125,6 +1104,8 @@ void Pipe::fault(bool onread) msgr->lock.Unlock(); in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); // disconnect from Connection, and mark it failed. future messages @@ -1190,6 +1171,8 @@ void Pipe::was_session_reset() ldout(msgr->cct,10) << "was_session_reset" << dendl; in_q->discard_queue(conn_id); + if (delay_thread) + delay_thread->discard(); discard_out_queue(); msgr->dispatch_queue.queue_remote_reset(connection_state); @@ -1209,17 +1192,6 @@ void Pipe::stop() state = STATE_CLOSED; cond.Signal(); shutdown_socket(); - if (dispatch_thread) { - lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl; - Mutex::Locker locker(*delay_lock); - stop_delayed_delivery = true; - while (!delay_queue->empty()) { - delay_queue->front()->put(); - delay_queue->pop_front(); - delay_until->pop_front(); - } - delay_cond->Signal(); - } } diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 58df8e76d24..8b09ec0f27d 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -70,20 +70,41 @@ class DispatchQueue; */ class DelayedDelivery: public Thread { Pipe *pipe; + std::deque< pair<utime_t,Message*> > delay_queue; + Mutex delay_lock; + Cond delay_cond; + bool stop_delayed_delivery; + public: - DelayedDelivery(Pipe *p) : pipe(p) {} - void *entry() { pipe->delayed_delivery(); return 0; } - }; + DelayedDelivery(Pipe *p) + : pipe(p), + delay_lock("Pipe::DelayedDelivery::delay_lock"), + stop_delayed_delivery(false) { } + ~DelayedDelivery() { + discard(); + } + void *entry(); + void queue(utime_t release, Message *m) { + Mutex::Locker l(delay_lock); + delay_queue.push_back(make_pair(release, m)); + delay_cond.Signal(); + } + void discard() { + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + delay_queue.front().second->put(); + delay_queue.pop_front(); + } + } + void stop() { + delay_lock.Lock(); + stop_delayed_delivery = true; + delay_cond.Signal(); + delay_lock.Unlock(); + } + } *delay_thread; friend class DelayedDelivery; - DelayedDelivery *dispatch_thread; - // TODO: clean up the delay_queue better on shutdown - std::deque< Message * > *delay_queue; - std::deque< utime_t > *delay_until; - Mutex *delay_lock; - Cond *delay_cond; - bool stop_delayed_delivery; - public: Pipe(SimpleMessenger *r, int st, Connection *con); ~Pipe(); @@ -234,12 +255,9 @@ class DispatchQueue; writer_thread.join(); if (reader_thread.is_started()) reader_thread.join(); - if (dispatch_thread && dispatch_thread->is_started()) { - delay_lock->Lock(); - stop_delayed_delivery = true; - delay_cond->Signal(); - delay_lock->Unlock(); - dispatch_thread->join(); + if (delay_thread) { + delay_thread->stop(); + delay_thread->join(); } } void stop(); |