diff options
author | Sage Weil <sage@inktank.com> | 2012-11-30 20:23:52 -0800 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-12-01 12:45:22 -0800 |
commit | 8dcc6c399cfe761ea66c9d8fda61c0dc8542b3d3 (patch) | |
tree | 7d8d5baa3197d41963f0c28809df4fb76dbc5b2a | |
parent | 6d65fa4ef9f17ce7994e19674d93012aa23f6eb0 (diff) | |
download | ceph-8dcc6c399cfe761ea66c9d8fda61c0dc8542b3d3.tar.gz |
msg/Pipe: flush delayed messages when stealing/failing pipes
If we are failing a pipe, flush the incoming messages before we try to
reconnect. Similarly, flush queued messages on an existing pipe beore we
replace it. This ensures that when we get a socket failure and reconnect
the delayed messages are handled in the normal fashion.
Specifically, it fixes a situation like:
- read msg, update in_seq etc.
- delay msg
- pipe faults
- peer reconnects, we replace existing pipe, discard delayed msgs
- peer resends msgs
- we discard, because they are < in_seq
Signed-off-by: Sage Weil <sage@inktank.com>
-rw-r--r-- | src/msg/Pipe.cc | 24 | ||||
-rw-r--r-- | src/msg/Pipe.h | 1 |
2 files changed, 23 insertions, 2 deletions
diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 2bbaf0bc17f..1ebf2854473 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -160,6 +160,7 @@ void Pipe::join_reader() void Pipe::DelayedDelivery::discard() { + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::discard" << dendl; Mutex::Locker l(delay_lock); while (!delay_queue.empty()) { Message *m = delay_queue.front().second; @@ -169,6 +170,17 @@ void Pipe::DelayedDelivery::discard() } } +void Pipe::DelayedDelivery::flush() +{ + lgeneric_subdout(pipe->msgr->cct, ms, 20) << pipe->_pipe_prefix(_dout) << "DelayedDelivery::flush" << dendl; + Mutex::Locker l(delay_lock); + while (!delay_queue.empty()) { + Message *m = delay_queue.front().second; + delay_queue.pop_front(); + pipe->in_q->enqueue(m, m->get_priority(), pipe->conn_id); + } +} + void *Pipe::DelayedDelivery::entry() { Mutex::Locker locker(delay_lock); @@ -543,7 +555,11 @@ int Pipe::accept() // make existing Connection reference us existing->connection_state->reset_pipe(this); - + + // flush/queue any existing delayed messages + if (existing->delay_thread) + existing->delay_thread->flush(); + // steal incoming queue uint64_t replaced_conn_id = conn_id; conn_id = existing->conn_id; @@ -1113,6 +1129,10 @@ void Pipe::fault(bool onread) return; } + // queue delayed items immediately + if (delay_thread) + delay_thread->flush(); + // requeue sent items requeue_sent(); @@ -1120,7 +1140,7 @@ void Pipe::fault(bool onread) ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; state = STATE_STANDBY; return; - } + } if (state != STATE_CONNECTING) { if (policy.server) { diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index 30fb7eede11..1bcc8263f4a 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -90,6 +90,7 @@ class DispatchQueue; delay_cond.Signal(); } void discard(); + void flush(); void stop() { delay_lock.Lock(); stop_delayed_delivery = true; |