diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-03 20:10:17 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-06-03 20:10:17 +0000 |
commit | 427a8c19ec9e14da39fc9fd0c1e10b2865abeec1 (patch) | |
tree | 9c55eb930ec8187490344d9a8d0782d507d7ce3a | |
parent | c578ed1e72954def2a5904990d7a97b9756c4f35 (diff) | |
download | qpid-python-427a8c19ec9e14da39fc9fd0c1e10b2865abeec1.tar.gz |
checkpoint - various fixes
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1131182 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCompletion.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 52 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionContext.h | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 66 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 51 |
5 files changed, 87 insertions, 94 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index bfd6718dcd..721e862bce 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -89,7 +89,7 @@ class AsyncCompletion : private boost::noncopyable * callback object will be used by the last completer thread, and * released when the callback returns. */ - class Callback : virtual public RefCounted + class Callback : public RefCounted { public: virtual void completed(bool) = 0; diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 0f670ded83..3530f3ef6f 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -824,13 +824,14 @@ namespace { pending.erase(i); if (ready && pending.empty()) { - framing::Invoker::Result r; + framing::Invoker::Result r; // message.accept does not return result data Mutex::ScopedUnlock ul(lock); completed( r ); } } - /** allow the Message.Accept to complete */ + /** allow the Message.Accept to complete - do this only after all + * deliveryIds have been added() */ void enable() { Mutex::ScopedLock l(lock); @@ -854,14 +855,14 @@ namespace { boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; public: DequeueDone( const DeliveryId & _id, - boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) + const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd ) : id(_id), cmd(_cmd) {} void operator()() { cmd->complete( id ); } }; /** factory to create the above callback - passed to queue's dequeue - method, only called if dequeue is async! */ + method, only used if dequeue is asynchronous! */ boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state, const DeliveryId& id, boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd ) @@ -873,10 +874,18 @@ namespace { boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) ); return x; } + + /** predicate to process unacked delivery records */ + bool acceptDelivery( SemanticState *state, + boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd, + DeliveryRecord dr ) + { + Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd); + return dr.accept((TransactionContext*) 0, &f); + } } void SemanticState::accepted(const SequenceSet& commands) { - QPID_LOG(error, "SemanticState::accepted (" << commands << ")"); assertClusterSafe(); if (txBuffer.get()) { //in transactional mode, don't dequeue or remove, just @@ -900,26 +909,21 @@ void SemanticState::accepted(const SequenceSet& commands) { unacked.erase(removed, unacked.end()); } } else { - /** @todo KAG - the following code removes the command from unacked - even if the dequeue has not completed. note that the command will - still not complete until all dequeues complete. I'm doing this to - avoid having to lock the unacked list, which would be necessary if - we remove when the dequeue completes. Is this ok? */ + /** @todo KAG - the following code removes the message from unacked + list even if the dequeue has not yet completed. Note that the + entire command will still not complete until all dequeues + complete. I'm doing this to avoid having to lock the unacked list, + which would be necessary if we remove when the dequeue + completes. Is this ok? */ boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd; - DeliveryRecords::iterator i; - DeliveryRecords undone; - for (i = unacked.begin(); i < unacked.end(); ++i) { - if (i->coveredBy(&commands)) { - Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd); - if (i->accept((TransactionContext*) 0, &f) == false) { - undone.push_back(*i); - } - } - } - if (undone.empty()) - unacked.clear(); - else - unacked.swap(undone); + DeliveryRecords::iterator removed = + remove_if(unacked.begin(), unacked.end(), + isInSequenceSetAnd(commands, + bind(acceptDelivery, + this, + cmd, + _1))); + unacked.erase(removed, unacked.end()); if (cmd) { boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd)); diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index 6f4a894f22..a774ab6c52 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -59,7 +59,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl // class for commands that need to complete asynchronously friend class AsyncCommandContext; - class AsyncCommandContext : virtual public RefCounted + class AsyncCommandContext : public RefCounted { private: framing::SequenceNumber id; @@ -71,11 +71,11 @@ class SessionContext : public OwnershipToken, public sys::OutputControl AsyncCommandContext() : id(0), requiresAccept(false), syncBitSet(false) {} virtual ~AsyncCommandContext() {} - framing::SequenceNumber getId() { return id; } + framing::SequenceNumber getId() const { return id; } void setId(const framing::SequenceNumber seq) { id = seq; } - bool getRequiresAccept() { return requiresAccept; } + bool getRequiresAccept() const { return requiresAccept; } void setRequiresAccept(const bool a) { requiresAccept = a; } - bool getSyncBitSet() { return syncBitSet; } + bool getSyncBitSet() const { return syncBitSet; } void setSyncBitSet(const bool s) { syncBitSet = s; } void setManager(SessionContext::AsyncCommandManager *m) { manager.reset(m); } @@ -95,7 +95,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl class AsyncCommandManager : public RefCounted { public: - virtual void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, + virtual void completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&) = 0; }; }; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index d84256b61b..4f6e7ff075 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -208,6 +208,7 @@ void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceN { currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). syncCurrentCommand = method->isSync(); + currentCommandId = id; acceptRequired = false; Invoker::Result invocation = invoke(adapter, *method); if (!invocation.wasHandled()) { @@ -263,6 +264,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) currentCommandComplete = true; // assumed syncCurrentCommand = msg->getFrames().getMethod()->isSync(); acceptRequired = msg->requiresAccept(); + currentCommandId = msg->getCommandId(); semanticState.handle(msg); msgBuilder.end(); IncompleteIngressMsgXfer xfer(this, msg); @@ -428,21 +430,23 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { // (called via the invoker() in handleCommand() above) void SessionState::addPendingExecutionSync() { - SequenceNumber syncCommandId = receiverGetCurrent(); - if (receiverGetIncomplete().front() < syncCommandId) { + if (receiverGetIncomplete().front() < currentCommandId) { currentCommandComplete = false; - pendingExecutionSyncs.push(syncCommandId); + pendingExecutionSyncs.push(currentCommandId); asyncCommandManager->flushPendingCommands(); - QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << currentCommandId); } } void SessionState::registerAsyncCommand(boost::intrusive_ptr<AsyncCommandContext>& aCmd) { - /** @todo KAG: ensure this is invoked during handleCommand() context! */ + /** @todo KAG: ensure this is invoked during handleCommand()/handleContent() context! */ currentCommandComplete = false; - asyncCommandManager->addPendingCommand( aCmd, receiverGetCurrent(), acceptRequired, syncCurrentCommand ); + asyncCommandManager->addPendingCommand( aCmd, currentCommandId, acceptRequired, syncCurrentCommand ); + if (syncCurrentCommand) { // if client wants ack, force command to complete asap. + aCmd->flush(); + } } @@ -460,14 +464,10 @@ SessionState::IncompleteIngressMsgXfer::clone() boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); // this routine is *only* invoked when the message needs to be asynchronously completed. Otherwise, ::completed() - // will be invoked directly. - pending = true; - boost::intrusive_ptr<SessionContext::AsyncCommandContext>ctxt(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cb)); + // will be invoked directly. Thus, let the SessionState know this command is not going to complete immediately: + pendingCmdCtxt = boost::intrusive_ptr<CommandContext>(new CommandContext(msg)); + boost::intrusive_ptr<qpid::broker::SessionContext::AsyncCommandContext> ctxt(pendingCmdCtxt); session->registerAsyncCommand(ctxt); - if (ctxt->getSyncBitSet()) { - // If the client is pending the message.transfer completion, flush now to force immediate write to journal. - msg->flush(); - } return cb; } @@ -478,35 +478,27 @@ SessionState::IncompleteIngressMsgXfer::clone() */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { - if (!sync) { - /** note well: this path may execute in any thread. It is safe to access - * the scheduledCompleterContext, since *this has a shared pointer to it. - * but not session! - */ + if (pendingCmdCtxt) { session = 0; - QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << getId()); - completed(framing::Invoker::Result()); + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << pendingCmdCtxt->getId()); + pendingCmdCtxt->completed(framing::Invoker::Result()); + pendingCmdCtxt.reset(); } else { + // Since "clone()" above was _not_ called, this -better- be sync! + if (!sync) assert(false); // this path runs directly from the ac->end() call in handleContent() above, // so *session is definately valid. if (session->isAttached()) { - QPID_LOG(debug, ": receive completed for msg seq=" << getId()); - session->completeCommand(getId(), framing::Invoker::Result(), getRequiresAccept(), getSyncBitSet()); - } - if (pending) { - boost::intrusive_ptr<AsyncCommandContext> p(this); - session->cancelAsyncCommand(p); + QPID_LOG(debug, ": receive completed for msg seq=" << session->currentCommandId); + session->completeCommand(session->currentCommandId, + framing::Invoker::Result(), // dummy + session->acceptRequired, + session->syncCurrentCommand); } } } -void SessionState::IncompleteIngressMsgXfer::flush() -{ - msg->flush(); -} - - /** Scheduled from an asynchronous command's completed callback to run on * the IO thread. */ @@ -516,9 +508,9 @@ void SessionState::AsyncCommandManager::schedule(boost::intrusive_ptr<AsyncComma } -void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd, - framing::SequenceNumber seq, - bool acceptRequired, bool syncBitSet) +void SessionState::AsyncCommandManager::addPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd, + const framing::SequenceNumber& seq, + const bool acceptRequired, const bool syncBitSet) { cmd->setId(seq); cmd->setRequiresAccept(acceptRequired); @@ -531,7 +523,7 @@ void SessionState::AsyncCommandManager::addPendingCommand(boost::intrusive_ptr<A } -void SessionState::AsyncCommandManager::cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd) +void SessionState::AsyncCommandManager::cancelPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); pendingCommands.erase(cmd->getId()); @@ -559,7 +551,7 @@ void SessionState::AsyncCommandManager::flushPendingCommands() /** mark a pending command as completed. * This method must be thread safe - it may run on any thread. */ -void SessionState::AsyncCommandManager::completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>& cmd, +void SessionState::AsyncCommandManager::completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>& cmd, const framing::Invoker::Result& result) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 6ee8a38f4e..cd6123688b 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -186,10 +186,11 @@ class SessionState : public qpid::SessionState, // sequence numbers of received Execution.Sync commands that are pending completion. std::queue<SequenceNumber> pendingExecutionSyncs; - // true if command completes during call to handleCommand() - bool currentCommandComplete; - bool syncCurrentCommand; - bool acceptRequired; + // flags that reflect the state of the currently executing received command: + bool currentCommandComplete; // true if the current command completed synchronously + SequenceNumber currentCommandId; + bool syncCurrentCommand; // true if sync-bit set in current command headers + bool acceptRequired; // true if current ingress message.transfer requires accept protected: /** This class provides a context for completing asynchronous commands in a thread @@ -231,24 +232,16 @@ class SessionState : public qpid::SessionState, AsyncCommandManager(SessionState *s) : session(s), isAttached(s->isAttached()) {}; ~AsyncCommandManager() {}; - /** track a message pending ingress completion */ - //void addPendingMessage(boost::intrusive_ptr<Message> m); - //void deletePendingMessage(SequenceNumber id); - //void flushPendingMessages(); - /** schedule the processing of a completed ingress message.transfer command */ - //void scheduleMsgCompletion(SequenceNumber cmd, - // bool requiresAccept, - // bool requiresSync); void cancel(); // called by SessionState destructor. void attached(); // called by SessionState on attach() void detached(); // called by SessionState on detach() - /** called by async command handlers */ - void addPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, - framing::SequenceNumber, bool, bool); - void cancelPendingCommand(boost::intrusive_ptr<AsyncCommandContext>&); + /** for mananging asynchronous commands */ + void addPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&, + const framing::SequenceNumber&, const bool, const bool); + void cancelPendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&); void flushPendingCommands(); - void completePendingCommand(boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&); + void completePendingCommand(const boost::intrusive_ptr<AsyncCommandContext>&, const framing::Invoker::Result&); }; boost::intrusive_ptr<AsyncCommandManager> asyncCommandManager; @@ -256,28 +249,32 @@ class SessionState : public qpid::SessionState, /** incomplete Message.transfer commands - inbound to broker from client */ - class IncompleteIngressMsgXfer : public AsyncCommandContext, - public AsyncCompletion::Callback + friend class IncompleteIngressMsgXfer; + class IncompleteIngressMsgXfer : public AsyncCompletion::Callback { public: IncompleteIngressMsgXfer( SessionState *ss, boost::intrusive_ptr<Message> m ) - : session(ss), - msg(m), - pending(false) {} + : session(ss), msg(m) {}; virtual ~IncompleteIngressMsgXfer() {}; - // async completion calls virtual void completed(bool); virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone(); - // async cmd calls - virtual void flush(); - private: + /** @todo KAG COMMENT ME */ + class CommandContext : public AsyncCommandContext + { + boost::intrusive_ptr<Message> msg; + public: + CommandContext(boost::intrusive_ptr<Message> _m) + : msg(_m) {} + virtual void flush() { msg->flush(); } + }; + SessionState *session; // only valid if sync flag in callback is true boost::intrusive_ptr<Message> msg; - bool pending; // true if msg saved on pending list... + boost::intrusive_ptr<CommandContext> pendingCmdCtxt; }; friend class SessionManager; |