summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-27 15:36:11 -0800
committerSage Weil <sage@inktank.com>2012-11-29 16:09:45 -0800
commitf454bb591d64de6d7986ba099a4f9d1ecc599a55 (patch)
tree41a50424baf4b53f57c9cdb2d4b4b7d872d20e09
parentb97aaca3870c4f9628d5d310c63d1a816d3e513c (diff)
downloadceph-f454bb591d64de6d7986ba099a4f9d1ecc599a55.tar.gz
msg/Pipe: refactor msgr delays
- move all delay state into a single class - create thread once and only once per Pipe - adjust debug levels - discard messages at the appropriate times Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/msg/Pipe.cc104
-rw-r--r--src/msg/Pipe.h52
2 files changed, 73 insertions, 83 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 609df8e7dd2..34ded27d77f 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -54,8 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
- dispatch_thread(NULL), delay_queue(NULL), delay_until(NULL),
- delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
+ delay_thread(NULL),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
sd(-1), port(0),
@@ -96,16 +95,7 @@ Pipe::~Pipe()
if (connection_state)
connection_state->put();
delete session_security;
- if (dispatch_thread) {
- delete dispatch_thread;
- assert(delay_queue->empty());
- delete delay_queue;
- assert(delay_until->empty());
- delete delay_until;
- assert(!delay_lock->is_locked());
- delete delay_lock;
- delete delay_cond;
- }
+ delete delay_thread;
}
void Pipe::handle_ack(uint64_t seq)
@@ -137,27 +127,15 @@ void Pipe::start_reader()
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
- if (!dispatch_thread &&
- msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
- != string::npos) {
+ if (!delay_thread &&
+ msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type)) != string::npos) {
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
- dispatch_thread = new DelayedDelivery(this);
- delay_queue = new std::deque< Message * >();
- delay_until = new std::deque< utime_t>();
- delay_lock = new Mutex("delay_lock");
- delay_cond = new Cond();
+ delay_thread = new DelayedDelivery(this);
+ delay_thread->create();
} else
lsubdout(msgr->cct, ms, 1) << "Pipe " << this << " peer is " << ceph_entity_type_name(connection_state->peer_type)
<< "; NOT injecting delays because it does not match "
<< msgr->cct->_conf->ms_inject_delay_type << dendl;
-
- if (dispatch_thread && stop_delayed_delivery) {
- lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
- delay_lock->Lock();
- stop_delayed_delivery = false;
- dispatch_thread->create();
- delay_lock->Unlock();
- }
}
void Pipe::start_writer()
@@ -183,44 +161,45 @@ void Pipe::join_reader()
void Pipe::queue_received(Message *m, int priority)
{
assert(pipe_lock.is_locked());
- if (delay_queue) {
- lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
- Mutex::Locker locker(*delay_lock);
- delay_queue->push_back(m);
- utime_t delay = ceph_clock_now(msgr->cct);
- delay += 1.0;
- delay_until->push_back(delay);
- delay_cond->Signal();
+
+ if (delay_thread) {
+ utime_t release;
+ if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+ release = m->get_recv_stamp();
+ release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
+ }
+ delay_thread->queue(release, m);
return;
}
+
in_q->enqueue(m, priority, conn_id);
}
-void Pipe::delayed_delivery() {
- Mutex::Locker locker(*delay_lock);
+void *Pipe::DelayedDelivery::entry()
+{
+ Mutex::Locker locker(delay_lock);
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry start" << dendl;
+
while (!stop_delayed_delivery) {
- if (delay_queue->empty()) {
- lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
- delay_cond->Wait(*delay_lock);
+ if (delay_queue.empty()) {
+ lgeneric_subdout(pipe->msgr->cct, ms, 30) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond because delay queue is empty" << dendl;
+ delay_cond.Wait(delay_lock);
continue;
}
- if (delay_until->front() > ceph_clock_now(msgr->cct)) {
- lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond until message " << delay_queue->front()
- << " delay passes" << dendl;
- delay_cond->WaitUntil(*delay_lock, delay_until->front());
+ utime_t release = delay_queue.front().first;
+ if (release > ceph_clock_now(pipe->msgr->cct)) {
+ lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry sleeping on delay_cond until " << release << dendl;
+ delay_cond.WaitUntil(delay_lock, release);
continue;
}
- Message *m = delay_queue->front();
- lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delivery because delay until "
- << delay_until->front() << " has passed" << dendl;
- delay_queue->pop_front();
- delay_until->pop_front();
- in_q->enqueue(m, m->get_priority(), conn_id);
- if (delay_queue->empty()) {
- lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
- delay_cond->Wait(*delay_lock);
- }
+ Message *m = delay_queue.front().second;
+ lgeneric_subdout(pipe->msgr->cct, ms, 10) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry dequeuing message " << m << " for delivery, past " << release << dendl;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
}
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::entry stop" << dendl;
+ return NULL;
}
int Pipe::accept()
@@ -1125,6 +1104,8 @@ void Pipe::fault(bool onread)
msgr->lock.Unlock();
in_q->discard_queue(conn_id);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
// disconnect from Connection, and mark it failed. future messages
@@ -1190,6 +1171,8 @@ void Pipe::was_session_reset()
ldout(msgr->cct,10) << "was_session_reset" << dendl;
in_q->discard_queue(conn_id);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
msgr->dispatch_queue.queue_remote_reset(connection_state);
@@ -1209,17 +1192,6 @@ void Pipe::stop()
state = STATE_CLOSED;
cond.Signal();
shutdown_socket();
- if (dispatch_thread) {
- lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl;
- Mutex::Locker locker(*delay_lock);
- stop_delayed_delivery = true;
- while (!delay_queue->empty()) {
- delay_queue->front()->put();
- delay_queue->pop_front();
- delay_until->pop_front();
- }
- delay_cond->Signal();
- }
}
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index 58df8e76d24..8b09ec0f27d 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -70,20 +70,41 @@ class DispatchQueue;
*/
class DelayedDelivery: public Thread {
Pipe *pipe;
+ std::deque< pair<utime_t,Message*> > delay_queue;
+ Mutex delay_lock;
+ Cond delay_cond;
+ bool stop_delayed_delivery;
+
public:
- DelayedDelivery(Pipe *p) : pipe(p) {}
- void *entry() { pipe->delayed_delivery(); return 0; }
- };
+ DelayedDelivery(Pipe *p)
+ : pipe(p),
+ delay_lock("Pipe::DelayedDelivery::delay_lock"),
+ stop_delayed_delivery(false) { }
+ ~DelayedDelivery() {
+ discard();
+ }
+ void *entry();
+ void queue(utime_t release, Message *m) {
+ Mutex::Locker l(delay_lock);
+ delay_queue.push_back(make_pair(release, m));
+ delay_cond.Signal();
+ }
+ void discard() {
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ delay_queue.front().second->put();
+ delay_queue.pop_front();
+ }
+ }
+ void stop() {
+ delay_lock.Lock();
+ stop_delayed_delivery = true;
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ }
+ } *delay_thread;
friend class DelayedDelivery;
- DelayedDelivery *dispatch_thread;
- // TODO: clean up the delay_queue better on shutdown
- std::deque< Message * > *delay_queue;
- std::deque< utime_t > *delay_until;
- Mutex *delay_lock;
- Cond *delay_cond;
- bool stop_delayed_delivery;
-
public:
Pipe(SimpleMessenger *r, int st, Connection *con);
~Pipe();
@@ -234,12 +255,9 @@ class DispatchQueue;
writer_thread.join();
if (reader_thread.is_started())
reader_thread.join();
- if (dispatch_thread && dispatch_thread->is_started()) {
- delay_lock->Lock();
- stop_delayed_delivery = true;
- delay_cond->Signal();
- delay_lock->Unlock();
- dispatch_thread->join();
+ if (delay_thread) {
+ delay_thread->stop();
+ delay_thread->join();
}
}
void stop();