summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Farnum <greg@inktank.com>2012-11-27 10:05:47 -0800
committerSage Weil <sage@inktank.com>2012-11-29 16:09:45 -0800
commitb97aaca3870c4f9628d5d310c63d1a816d3e513c (patch)
tree0ab4f11a351623368fc0ebec833f4b9b13177cd7
parent01059e9b430d2f0b248deed57f014d7c0d6c1d37 (diff)
downloadceph-b97aaca3870c4f9628d5d310c63d1a816d3e513c.tar.gz
msgr: add a delay_until queue that is used to delay deliveries.
Its life-cycle matches that of delay_queue, and the delayed_delivery function respects it. For now queue_received is just setting it to delay everything by 1 second. Signed-off-by: Greg Farnum <greg@inktank.com>
-rw-r--r--src/msg/Pipe.cc27
-rw-r--r--src/msg/Pipe.h1
2 files changed, 23 insertions, 5 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index e49658412c0..609df8e7dd2 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -54,7 +54,7 @@ ostream& Pipe::_pipe_prefix(std::ostream *_dout) {
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
- dispatch_thread(NULL), delay_queue(NULL),
+ dispatch_thread(NULL), delay_queue(NULL), delay_until(NULL),
delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
@@ -100,6 +100,8 @@ Pipe::~Pipe()
delete dispatch_thread;
assert(delay_queue->empty());
delete delay_queue;
+ assert(delay_until->empty());
+ delete delay_until;
assert(!delay_lock->is_locked());
delete delay_lock;
delete delay_cond;
@@ -141,6 +143,7 @@ void Pipe::start_reader()
lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
dispatch_thread = new DelayedDelivery(this);
delay_queue = new std::deque< Message * >();
+ delay_until = new std::deque< utime_t>();
delay_lock = new Mutex("delay_lock");
delay_cond = new Cond();
} else
@@ -184,6 +187,9 @@ void Pipe::queue_received(Message *m, int priority)
lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
Mutex::Locker locker(*delay_lock);
delay_queue->push_back(m);
+ utime_t delay = ceph_clock_now(msgr->cct);
+ delay += 1.0;
+ delay_until->push_back(delay);
delay_cond->Signal();
return;
}
@@ -192,13 +198,23 @@ void Pipe::queue_received(Message *m, int priority)
void Pipe::delayed_delivery() {
Mutex::Locker locker(*delay_lock);
- if (delay_queue->empty())
- lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
- delay_cond->Wait(*delay_lock);
while (!stop_delayed_delivery) {
+ if (delay_queue->empty()) {
+ lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
+ delay_cond->Wait(*delay_lock);
+ continue;
+ }
+ if (delay_until->front() > ceph_clock_now(msgr->cct)) {
+ lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond until message " << delay_queue->front()
+ << " delay passes" << dendl;
+ delay_cond->WaitUntil(*delay_lock, delay_until->front());
+ continue;
+ }
Message *m = delay_queue->front();
- lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl;
+ lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delivery because delay until "
+ << delay_until->front() << " has passed" << dendl;
delay_queue->pop_front();
+ delay_until->pop_front();
in_q->enqueue(m, m->get_priority(), conn_id);
if (delay_queue->empty()) {
lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
@@ -1200,6 +1216,7 @@ void Pipe::stop()
while (!delay_queue->empty()) {
delay_queue->front()->put();
delay_queue->pop_front();
+ delay_until->pop_front();
}
delay_cond->Signal();
}
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index 648e2e87c9f..58df8e76d24 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -79,6 +79,7 @@ class DispatchQueue;
DelayedDelivery *dispatch_thread;
// TODO: clean up the delay_queue better on shutdown
std::deque< Message * > *delay_queue;
+ std::deque< utime_t > *delay_until;
Mutex *delay_lock;
Cond *delay_cond;
bool stop_delayed_delivery;