summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-02-07 22:51:45 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-02-07 22:51:45 +0000
commitf31b86c7d7dc7e76a8008860d28049e609b4a7ab (patch)
treef736c33c134ea28459eac0d648177b4785960b14
parentc7ed111cf813eda770866d1b62fa4eb1f83c1f7c (diff)
downloadqpid-python-f31b86c7d7dc7e76a8008860d28049e609b4a7ab.tar.gz
QPID-2935: clean up SessionState changes, fix races
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2935@1068199 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h10
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp24
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp87
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h26
5 files changed, 73 insertions, 78 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index 382e2f7565..c66609f8a6 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -92,9 +92,12 @@ namespace qpid {
qpid::sys::Mutex::ScopedLock l(callbackLock);
inCallback = true;
if (handler) {
- qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
- (*handler)(sync);
- handler.reset();
+ boost::shared_ptr<CompletionHandler> tmp;
+ tmp.swap(handler);
+ {
+ qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+ (*tmp)(sync);
+ }
}
inCallback = false;
callbackLock.notifyAll();
@@ -142,7 +145,6 @@ namespace qpid {
/** called by initiator after all potential completers have called
* startCompleter().
*/
- //void end(CompletionHandler::shared_ptr& _handler)
void end(boost::shared_ptr<CompletionHandler> _handler)
{
assert(completionsNeeded.get() > 0); // ensure begin() has been called!
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index ad7e8f9bb3..7cd7b99557 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -116,7 +116,7 @@ bool QueueFlowLimit::consume(const QueuedMessage& msg)
if (!msg.payload) return false;
- sys::Mutex::ScopedLock l(pendingFlowLock);
+ sys::Mutex::ScopedLock l(indexLock);
++count;
size += msg.payload->contentSize();
@@ -153,7 +153,7 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
if (!msg.payload) return false;
- sys::Mutex::ScopedLock l(pendingFlowLock);
+ sys::Mutex::ScopedLock l(indexLock);
if (count > 0) {
--count;
@@ -190,15 +190,6 @@ bool QueueFlowLimit::replenish(const QueuedMessage& msg)
if (itr != index.end()) { // this msg is flow controlled, release it:
(*itr)->getReceiveCompletion().finishCompleter();
index.erase(itr);
- //// stupid: (hopefully this is the next pending msg)
- //std::list< boost::intrusive_ptr<Message> >::iterator itr2 = find(pendingFlow.begin(),
- // pendingFlow.end(),
- // msg.payload);
- //if (itr2 == pendingFlow.end()) {
- // QPID_LOG(error, "Queue \"" << queueName << "\": indexed msg missing in list: " << msg.position);
- //} else {
- // pendingFlow.erase(itr2);
- //}
}
}
}
@@ -274,14 +265,3 @@ std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f)
}
}
-/**
- * TBD:
- * - Is there a direct way to determine if QM is on pendingFlow list?
- * - Rate limit the granting of flow.
- * - What about LVQ? A newer msg may replace the older one.
- * - What about queueing during a recovery?
- * - What about queue purge?
- * - What about message move?
- * - How do we treat orphaned messages?
- * -- Xfer a message to an alternate exchange - do we ack?
- */
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
index 57d06c6bdc..2de214801f 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
@@ -86,9 +86,7 @@ class QueueFlowLimit
protected:
// msgs waiting for flow to become available.
std::set< boost::intrusive_ptr<Message> > index;
- // KAG: is this necessary? Not if we release all pending when level < low (?)
- // std::list< boost::intrusive_ptr<Message> > pendingFlow; // ordered, oldest @front
- qpid::sys::Mutex pendingFlowLock;
+ qpid::sys::Mutex indexLock;
QueueFlowLimit(Queue *queue,
uint32_t flowStopCount, uint32_t flowResumeCount,
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index ca98ee1437..d9af4b13c5 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -60,7 +60,8 @@ SessionState::SessionState(
adapter(semanticState),
msgBuilder(&broker.getStore()),
mgmtObject(0),
- rateFlowcontrol(0)
+ rateFlowcontrol(0),
+ scheduledRcvMsgs(new IncompleteRcvMsg::deque)
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -95,16 +96,19 @@ SessionState::~SessionState() {
flowControlTimer->cancel();
// clean up any outstanding incomplete receive messages
-
qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
- while (!incompleteRcvMsgs.empty()) {
- boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front());
- incompleteRcvMsgs.pop_front();
+ std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> copy(incompleteRcvMsgs);
+ incompleteRcvMsgs.clear();
+ while (!copy.empty()) {
+ boost::shared_ptr<IncompleteRcvMsg> ref(copy.begin()->second);
+ copy.erase(copy.begin());
{
+ // note: need to drop lock, as callback may attempt to take it.
qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock);
ref->cancel();
}
}
+ scheduledRcvMsgs->clear(); // no need to lock - shared with IO thread.
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -264,21 +268,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getReceiveCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
- if (msg->getReceiveCompletion().getPendingCompleters() == 1) {
- // There are no other pending receive completers (just this SessionState).
- // Mark the message as completed.
- completeRcvMsg( msg );
- } else {
- // There are outstanding receive completers. Save the message until
- // they are all done.
- QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId());
- boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg));
- {
- qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock);
- incompleteRcvMsgs.push_back(pendingMsg);
- }
- msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete
- }
+ msg->getReceiveCompletion().end( createPendingMsg(msg) ); // allows msg to complete
}
// Handle producer session flow control
@@ -453,40 +443,45 @@ void SessionState::addPendingExecutionSync()
void SessionState::IncompleteRcvMsg::operator() (bool sync)
{
QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync);
- boost::shared_ptr<IncompleteRcvMsg> tmp;
- {
- qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
- for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin();
- i != session->incompleteRcvMsgs.end(); ++i) {
- if (i->get() == this) {
- tmp.swap(*i);
- session->incompleteRcvMsgs.remove(*i);
- break;
- }
- }
- }
- if (session->isAttached()) {
- if (sync) {
- QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
- session->completeRcvMsg(msg);
- } else { // potentially called from a different thread
- QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
- session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp));
+ qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock);
+ std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr>::iterator i = session->incompleteRcvMsgs.find(this);
+ if (i != session->incompleteRcvMsgs.end()) {
+ boost::shared_ptr<IncompleteRcvMsg> tmp(i->second);
+ session->incompleteRcvMsgs.erase(i);
+
+ if (session->isAttached()) {
+ if (sync) {
+ qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteRcvMsgsLock);
+ QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId());
+ session->completeRcvMsg(msg);
+ return;
+ } else { // potentially called from a different thread
+ QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId());
+ session->scheduledRcvMsgs->push_back(tmp);
+ if (session->scheduledRcvMsgs->size() == 1) {
+ session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
+ session->scheduledRcvMsgs));
+ }
+ }
}
}
}
-/** Scheduled from IncompleteRcvMsg callback, completes the message receive
- * asynchronously
+/** Scheduled from IncompleteRcvMsg callback, completes all pending message
+ * receives asynchronously.
*/
-void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg)
+void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<deque> msgs)
{
- QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
- if (iMsg->session && iMsg->session->isAttached()) {
- QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
- iMsg->session->completeRcvMsg(iMsg->msg);
+ while (!msgs->empty()) {
+ boost::shared_ptr<IncompleteRcvMsg> iMsg = msgs->front();
+ msgs->pop_front();
+ QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId());
+ if (iMsg->session && iMsg->session->isAttached()) {
+ QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId());
+ iMsg->session->completeRcvMsg(iMsg->msg);
+ }
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index b33181eee4..fc9fec7871 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -131,6 +131,9 @@ class SessionState : public qpid::SessionState,
void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id);
void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id);
+
+ // indicate that the given ingress msg has been completely received by the
+ // broker, and the msg's message.transfer command can be considered completed.
void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg);
void handleIn(framing::AMQFrame& frame);
@@ -169,21 +172,38 @@ class SessionState : public qpid::SessionState,
std::queue<SequenceNumber> pendingExecutionSyncs;
bool currentCommandComplete;
+ // A list of ingress messages whose message.transfer command is pending
+ // completion. These messages are awaiting some set of asynchronous
+ // operations to complete (eg: store, flow-control, etc). before
+ // the message.transfer can be completed.
class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler
{
public:
IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg)
: session(&_session), msg(_msg) {}
- virtual void operator() (bool sync);
+ virtual void operator() (bool sync); // invoked when msg is completed.
void cancel(); // cancel pending incomplete callback [operator() above].
+ typedef boost::shared_ptr<IncompleteRcvMsg> shared_ptr;
+ typedef std::deque<shared_ptr> deque;
+
private:
SessionState *session;
boost::intrusive_ptr<Message> msg;
- static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg );
+
+ static void scheduledCompleter(boost::shared_ptr<deque>);
};
- std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs;
+ std::map<const IncompleteRcvMsg *, IncompleteRcvMsg::shared_ptr> incompleteRcvMsgs; // msgs pending completion
qpid::sys::Mutex incompleteRcvMsgsLock;
+ boost::shared_ptr<IncompleteRcvMsg> createPendingMsg(boost::intrusive_ptr<Message>& msg) {
+ boost::shared_ptr<IncompleteRcvMsg> pending(new IncompleteRcvMsg(*this, msg));
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(incompleteRcvMsgsLock);
+ incompleteRcvMsgs[pending.get()] = pending;
+ return pending;
+ }
+
+ // holds msgs waiting for IO thread to run scheduledCompleter()
+ boost::shared_ptr<IncompleteRcvMsg::deque> scheduledRcvMsgs;
friend class SessionManager;
friend class IncompleteRcvMsg;