summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-11-30 20:23:52 -0800
committerSage Weil <sage@inktank.com>2012-12-01 12:45:22 -0800
commit8dcc6c399cfe761ea66c9d8fda61c0dc8542b3d3 (patch)
tree7d8d5baa3197d41963f0c28809df4fb76dbc5b2a
parent6d65fa4ef9f17ce7994e19674d93012aa23f6eb0 (diff)
downloadceph-8dcc6c399cfe761ea66c9d8fda61c0dc8542b3d3.tar.gz
msg/Pipe: flush delayed messages when stealing/failing pipes
If we are failing a pipe, flush the incoming messages before we try to reconnect. Similarly, flush queued messages on an existing pipe beore we replace it. This ensures that when we get a socket failure and reconnect the delayed messages are handled in the normal fashion. Specifically, it fixes a situation like: - read msg, update in_seq etc. - delay msg - pipe faults - peer reconnects, we replace existing pipe, discard delayed msgs - peer resends msgs - we discard, because they are < in_seq Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r--src/msg/Pipe.cc24
-rw-r--r--src/msg/Pipe.h1
2 files changed, 23 insertions, 2 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc
index 2bbaf0bc17f..1ebf2854473 100644
--- a/src/msg/Pipe.cc
+++ b/src/msg/Pipe.cc
@@ -160,6 +160,7 @@ void Pipe::join_reader()
void Pipe::DelayedDelivery::discard()
{
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl;
Mutex::Locker l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
@@ -169,6 +170,17 @@ void Pipe::DelayedDelivery::discard()
}
}
+void Pipe::DelayedDelivery::flush()
+{
+ lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl;
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ delay_queue.pop_front();
+ pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id);
+ }
+}
+
void *Pipe::DelayedDelivery::entry()
{
Mutex::Locker locker(delay_lock);
@@ -543,7 +555,11 @@ int Pipe::accept()
// make existing Connection reference us
existing->connection_state->reset_pipe(this);
-
+
+ // flush/queue any existing delayed messages
+ if (existing->delay_thread)
+ existing->delay_thread->flush();
+
// steal incoming queue
uint64_t replaced_conn_id = conn_id;
conn_id = existing->conn_id;
@@ -1113,6 +1129,10 @@ void Pipe::fault(bool onread)
return;
}
+ // queue delayed items immediately
+ if (delay_thread)
+ delay_thread->flush();
+
// requeue sent items
requeue_sent();
@@ -1120,7 +1140,7 @@ void Pipe::fault(bool onread)
ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
return;
- }
+ }
if (state != STATE_CONNECTING) {
if (policy.server) {
diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h
index 30fb7eede11..1bcc8263f4a 100644
--- a/src/msg/Pipe.h
+++ b/src/msg/Pipe.h
@@ -90,6 +90,7 @@ class DispatchQueue;
delay_cond.Signal();
}
void discard();
+ void flush();
void stop() {
delay_lock.Lock();
stop_delayed_delivery = true;