diff options
-rw-r--r-- | qpid/cpp/src/Makefile.am | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCompletion.h | 171 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 27 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableMessage.cpp | 26 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableMessage.h | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionContext.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 151 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 32 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Makefile.am | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxPublishTest.cpp | 4 |
13 files changed, 367 insertions, 103 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 9f97b94b8a..739424783a 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -548,8 +548,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HandlerImpl.h \ qpid/broker/HeadersExchange.cpp \ qpid/broker/HeadersExchange.h \ - qpid/broker/IncompleteMessageList.cpp \ - qpid/broker/IncompleteMessageList.h \ + qpid/broker/AsyncCompletion.h \ qpid/broker/Link.cpp \ qpid/broker/Link.h \ qpid/broker/LinkRegistry.cpp \ diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h new file mode 100644 index 0000000000..382e2f7565 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -0,0 +1,171 @@ +#ifndef _Completion_ +#define _Completion_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { + namespace broker { + + /** + * Class to implement asynchronous notification of completion. + * + * Use-case: An "initiator" needs to wait for a set of "completers" to + * finish a unit of work before an action can occur. This object + * tracks the progress of the set of completers, and allows the action + * to occur once all completers have signalled that they are done. + * + * The initiator and completers may be running in separate threads. + * + * The initiating thread is the thread that initiates the action, + * i.e. the connection read thread. + * + * A completing thread is any thread that contributes to completion, + * e.g. a store thread that does an async write. + * There may be zero or more completers. + * + * When the work is complete, a callback is invoked. The callback + * may be invoked in the Initiator thread, or one of the Completer + * threads. The callback is passed a flag indicating whether or not + * the callback is running under the context of the Initiator thread. + * + * Use model: + * 1) Initiator thread invokes begin() + * 2) After begin() has been invoked, zero or more Completers invoke + * startCompleter(). Completers may be running in the same or + * different thread as the Initiator, as long as they guarantee that + * startCompleter() is invoked at least once before the Initiator invokes end(). + * 3) Completers may invoke finishCompleter() at any time, even after the + * initiator has invoked end(). finishCompleter() may be called from any + * thread. + * 4) startCompleter()/finishCompleter() calls "nest": for each call to + * startCompleter(), a corresponding call to finishCompleter() must be made. + * Once the last finishCompleter() is called, the Completer must no longer + * reference the completion object. + * 5) The Initiator invokes end() at the point where it has finished + * dispatching work to the Completers, and is prepared for the callback + * handler to be invoked. Note: if there are no outstanding Completers + * pending when the Initiator invokes end(), the callback will be invoked + * directly, and the sync parameter will be set true. This indicates to the + * Initiator that the callback is executing in the context of the end() call, + * and the Initiator is free to optimize the handling of the completion, + * assuming no need for synchronization with Completer threads. + */ + class AsyncCompletion { + public: + // encapsulates the completion callback handler + class CompletionHandler { + public: + virtual void operator() (bool) { /* bool == true if called via end() */} + }; + + private: + mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; + mutable qpid::sys::Monitor callbackLock; + bool inCallback; + void invokeCallback(bool sync) { + qpid::sys::Mutex::ScopedLock l(callbackLock); + inCallback = true; + if (handler) { + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + (*handler)(sync); + handler.reset(); + } + inCallback = false; + callbackLock.notifyAll(); + } + + protected: + /** Invoked when all completers have signalled that they have completed + * (via calls to finishCompleter()). + */ + boost::shared_ptr<CompletionHandler> handler; + + public: + AsyncCompletion() : completionsNeeded(0), inCallback(false) {}; + virtual ~AsyncCompletion() { /* @todo KAG - assert(completionsNeeded.get() == 0); */ }; + + /** True when all outstanding operations have compeleted + */ + bool isDone() + { + qpid::sys::Mutex::ScopedLock l(callbackLock); + return !inCallback && completionsNeeded.get() == 0; + } + + /** Called to signal the start of an asynchronous operation. The operation + * is considered pending until finishCompleter() is called. + * E.g. called when initiating an async store operation. + */ + void startCompleter() { ++completionsNeeded; } + + /** Called by completer to signal that it has finished the operation started + * when startCompleter() was invoked. + * e.g. called when async write complete. + */ + void finishCompleter() + { + if (--completionsNeeded == 0) { + invokeCallback(false); + } + } + + /** called by initiator before any calls to startCompleter can be done. + */ + void begin() { startCompleter(); }; + + /** called by initiator after all potential completers have called + * startCompleter(). + */ + //void end(CompletionHandler::shared_ptr& _handler) + void end(boost::shared_ptr<CompletionHandler> _handler) + { + assert(completionsNeeded.get() > 0); // ensure begin() has been called! + handler = _handler; + if (--completionsNeeded == 0) { + invokeCallback(true); + } + } + + /** may be called by Initiator to cancel the callback registered by end() + */ + void cancel() { + qpid::sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); + handler.reset(); + } + + /** may be called by Initiator after all completers have been added but + * prior to calling end(). Allows initiator to determine if it _really_ + * needs to wait for pending Completers (e.g. count > 1). + */ + uint32_t getPendingCompleters() { return completionsNeeded.get(); } + }; + + }} // qpid::broker:: +#endif /*!_Completion_*/ diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 147b9e7a6a..a16180f3ae 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) {} + expiration(FAR_FUTURE), dequeueCallback(0), + inCallback(false), requiredCredit(0) +{} Message::Message(const Message& original) : PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) + expiration(original.expiration), dequeueCallback(0), + inCallback(false), requiredCredit(0) { setExpiryPolicy(original.expiryPolicy); } @@ -431,30 +432,12 @@ struct ScopedSet { }; } -void Message::allEnqueuesComplete() { - ScopedSet ss(callbackLock, inCallback); - MessageCallback* cb = enqueueCallback; - if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); -} - void Message::allDequeuesComplete() { ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = &cb; -} - -void Message::resetEnqueueCompleteCallback() { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = 0; -} - void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index ee80657f39..e8a8a19d53 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -156,10 +156,6 @@ public: boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) const; void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* qfor); - /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ - void setEnqueueCompleteCallback(MessageCallback& cb); - void resetEnqueueCompleteCallback(); - /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); @@ -170,7 +166,6 @@ public: typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement; MessageAdapter& getAdapter() const; - void allEnqueuesComplete(); void allDequeuesComplete(); mutable sys::Mutex lock; @@ -192,7 +187,6 @@ public: mutable boost::intrusive_ptr<Message> empty; sys::Monitor callbackLock; - MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; bool inCallback; diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp index e5fbb71cbd..7ba28eb293 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.cpp @@ -34,7 +34,6 @@ class MessageStore; PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : - asyncEnqueueCounter(0), asyncDequeueCounter(0), store(0) {} @@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleased() const return contentReleaseState.released; } -bool PersistableMessage::isEnqueueComplete() { - sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - return asyncEnqueueCounter == 0; -} - -void PersistableMessage::enqueueComplete() { - bool notify = false; - { - sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - if (asyncEnqueueCounter > 0) { - if (--asyncEnqueueCounter == 0) { - notify = true; - } - } - } - if (notify) - allEnqueuesComplete(); -} bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ if (store && (queue->getPersistenceId()!=0)) { @@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { addToSyncList(queue, _store); - enqueueAsync(); -} - -void PersistableMessage::enqueueAsync() { - sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - asyncEnqueueCounter++; + enqueueStart(); } bool PersistableMessage::isDequeueComplete() { diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index 96fb922c1a..e22d0ec4b8 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -31,6 +31,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/AsyncCompletion.h" namespace qpid { namespace broker { @@ -43,18 +44,18 @@ class MessageStore; class PersistableMessage : public Persistable { typedef std::list< boost::weak_ptr<PersistableQueue> > syncList; - sys::Mutex asyncEnqueueLock; sys::Mutex asyncDequeueLock; sys::Mutex storeLock; - + /** - * Tracks the number of outstanding asynchronous enqueue - * operations. When the message is enqueued asynchronously the - * count is incremented; when that enqueue completes it is - * decremented. Thus when it is 0, there are no outstanding - * enqueues. + * Tracks the number of outstanding asynchronous operations that must + * complete before the message can be considered safely received by the + * broker. E.g. all enqueues have completed, the message has been written + * to store, credit has been replenished, etc. Once all outstanding + * operations have completed, the transfer of this message from the client + * may be considered complete. */ - int asyncEnqueueCounter; + AsyncCompletion receiveCompletion; /** * Tracks the number of outstanding asynchronous dequeue @@ -65,7 +66,6 @@ class PersistableMessage : public Persistable */ int asyncDequeueCounter; - void enqueueAsync(); void dequeueAsync(); syncList synclist; @@ -80,8 +80,6 @@ class PersistableMessage : public Persistable ContentReleaseState contentReleaseState; protected: - /** Called when all enqueues are complete for this message. */ - virtual void allEnqueuesComplete() = 0; /** Called when all dequeues are complete for this message. */ virtual void allDequeuesComplete() = 0; @@ -115,9 +113,9 @@ class PersistableMessage : public Persistable virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; - QPID_BROKER_EXTERN bool isEnqueueComplete(); - - QPID_BROKER_EXTERN void enqueueComplete(); + QPID_BROKER_EXTERN bool isReceiveComplete() { return receiveCompletion.isDone(); } + QPID_BROKER_EXTERN void enqueueStart() { receiveCompletion.startCompleter(); } + QPID_BROKER_EXTERN void enqueueComplete() { receiveCompletion.finishCompleter(); } QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); @@ -133,7 +131,8 @@ class PersistableMessage : public Persistable bool isStoredOnQueue(PersistableQueue::shared_ptr queue); void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); - + + QPID_BROKER_EXTERN AsyncCompletion& getReceiveCompletion() { return receiveCompletion; } }; }} diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index e59857462c..24469ed0e4 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -157,13 +157,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); } else { - // if no store then mark as enqueued - if (!enqueue(0, msg)){ - push(msg); - msg->enqueueComplete(); - }else { - push(msg); - } + enqueue(0, msg); + push(msg); mgntEnqStats(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } @@ -688,7 +683,7 @@ uint32_t Queue::getEnqueueCompleteMessageCount() const //NOTE: don't need to use checkLvqReplace() here as it //is only relevant for LVQ which does not support persistence //so the enqueueComplete check has no effect - if ( i->payload->isEnqueueComplete() ) count ++; + if ( i->payload->isReceiveComplete() ) count ++; } return count; diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index d4f6b28ccb..3a98d52d15 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/SequenceSet.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/broker/SessionState.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -586,7 +587,12 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st -void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op +void SessionAdapter::ExecutionHandlerImpl::sync() +{ + session.addPendingExecutionSync(); + /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */ + +} void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/) { diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index afbbb2cc22..253ce8dcf2 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -46,6 +46,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; virtual const SessionId& getSessionId() const = 0; + virtual void addPendingExecutionSync() = 0; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 6f02399795..ca98ee1437 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -59,7 +59,6 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), mgmtObject(0), rateFlowcontrol(0) { @@ -94,6 +93,18 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); + + // clean up any outstanding incomplete receive messages + + qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); + while (!incompleteRcvMsgs.empty()) { + boost::shared_ptr<IncompleteRcvMsg> ref(incompleteRcvMsgs.front()); + incompleteRcvMsgs.pop_front(); + { + qpid::sys::ScopedUnlock<Mutex> ul(incompleteRcvMsgsLock); + ref->cancel(); + } + } } AMQP_ClientProxy& SessionState::getProxy() { @@ -195,15 +206,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, } void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { + currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + if (currentCommandComplete) receiverCompleted(id); + if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { - incomplete.process(enqueuedOp, true); + + if (method->isSync() && currentCommandComplete) { sendAcceptAndCompletion(); } } @@ -247,21 +260,24 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); + + msg->getReceiveCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - - if (msg->isEnqueueComplete()) { - enqueued(msg); - } else { - incomplete.add(msg); - } - - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { - incomplete.process(enqueuedOp, true); - sendAcceptAndCompletion(); + if (msg->getReceiveCompletion().getPendingCompleters() == 1) { + // There are no other pending receive completers (just this SessionState). + // Mark the message as completed. + completeRcvMsg( msg ); } else { - incomplete.process(enqueuedOp, false); + // There are outstanding receive completers. Save the message until + // they are all done. + QPID_LOG(debug, getId() << ": delaying completion of msg seq=" << msg->getCommandId()); + boost::shared_ptr<IncompleteRcvMsg> pendingMsg(new IncompleteRcvMsg(*this, msg)); + { + qpid::sys::ScopedLock<Mutex> l(incompleteRcvMsgsLock); + incompleteRcvMsgs.push_back(pendingMsg); + } + msg->getReceiveCompletion().end( pendingMsg ); // allows others to complete } } @@ -312,11 +328,36 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +/** Invoked when the given inbound message is finished being processed + * by all interested parties (eg. it is done being enqueued to all queues, + * its credit has been accounted for, etc). At this point, msg is considered + * by this receiver as 'completed' (as defined by AMQP 0_10) + */ +void SessionState::completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg) { + bool callSendCompletion = false; receiverCompleted(msg->getCommandId()); if (msg->requiresAccept()) + // will cause msg's seq to appear in the next message.accept we send. accepted.add(msg->getCommandId()); + + // Are there any outstanding Execution.Sync commands pending the + // completion of this msg? If so, complete them. + while (!pendingExecutionSyncs.empty() && + receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { + const SequenceNumber& id = pendingExecutionSyncs.front(); + pendingExecutionSyncs.pop(); + QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed."); + receiverCompleted(id); + callSendCompletion = true; // likely peer is pending for this completion. + } + + // if the sender has requested immediate notification of the completion... + if (msg->getFrames().getMethod()->isSync()) { + sendAcceptAndCompletion(); + } else if (callSendCompletion) { + sendCompletion(); + } } void SessionState::handleIn(AMQFrame& frame) { @@ -389,4 +430,80 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { return handler->getClusterOrderProxy(); } + +// Current received command is an execution.sync command. +// Complete this command only when all preceding commands have completed. +// (called via the invoker() in handleCommand() above) +void SessionState::addPendingExecutionSync() +{ + SequenceNumber syncCommandId = receiverGetCurrent(); + if (receiverGetIncomplete().front() < syncCommandId) { + currentCommandComplete = false; + pendingExecutionSyncs.push(syncCommandId); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + } +} + + +/** Invoked by the asynchronous completer associated with + * a received msg that is pending Completion. May be invoked + * by the SessionState directly (sync == true), or some external + * entity (!sync). + */ +void SessionState::IncompleteRcvMsg::operator() (bool sync) +{ + QPID_LOG(debug, ": async completion callback for msg seq=" << msg->getCommandId() << " sync=" << sync); + boost::shared_ptr<IncompleteRcvMsg> tmp; + { + qpid::sys::ScopedLock<Mutex> l(session->incompleteRcvMsgsLock); + for (std::list< boost::shared_ptr<IncompleteRcvMsg> >::iterator i = session->incompleteRcvMsgs.begin(); + i != session->incompleteRcvMsgs.end(); ++i) { + if (i->get() == this) { + tmp.swap(*i); + session->incompleteRcvMsgs.remove(*i); + break; + } + } + } + + if (session->isAttached()) { + if (sync) { + QPID_LOG(debug, ": receive completed for msg seq=" << msg->getCommandId()); + session->completeRcvMsg(msg); + } else { // potentially called from a different thread + QPID_LOG(debug, ": scheduling completion for msg seq=" << msg->getCommandId()); + session->getConnection().requestIOProcessing(boost::bind(&SessionState::IncompleteRcvMsg::scheduledCompleter, tmp)); + } + } +} + + +/** Scheduled from IncompleteRcvMsg callback, completes the message receive + * asynchronously + */ +void SessionState::IncompleteRcvMsg::scheduledCompleter(boost::shared_ptr<SessionState::IncompleteRcvMsg> iMsg) +{ + QPID_LOG(debug, ": scheduled completion for msg seq=" << iMsg->msg->getCommandId()); + if (iMsg->session && iMsg->session->isAttached()) { + QPID_LOG(debug, iMsg->session->getId() << ": receive completed for msg seq=" << iMsg->msg->getCommandId()); + iMsg->session->completeRcvMsg(iMsg->msg); + } +} + + +/** Cancels a pending incomplete receive message completion callback. Note + * well: will wait for the callback to finish if it is currently in progress + * on another thread. + */ +void SessionState::IncompleteRcvMsg::cancel() +{ + QPID_LOG(debug, session->getId() << ": cancelling outstanding completion for msg seq=" << msg->getCommandId()); + // Cancel the message complete callback. On return, we are guaranteed there + // will be no outstanding calls to SessionState::IncompleteRcvMsg::operator() (bool sync) + msg->getReceiveCompletion().cancel(); + // there may be calls to SessionState::IncompleteRcvMsg::scheduledCompleter() pending, + // clear the session so scheduledCompleter() will ignore this IncompleteRcvMsg. + session = 0; +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 3dcb0a62d4..b33181eee4 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -30,10 +30,11 @@ #include "qmf/org/apache/qpid/broker/Session.h" #include "qpid/broker/SessionAdapter.h" #include "qpid/broker/DeliveryAdapter.h" -#include "qpid/broker/IncompleteMessageList.h" +#include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SemanticState.h" +#include "qpid/sys/Monitor.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -122,11 +123,15 @@ class SessionState : public qpid::SessionState, const SessionId& getSessionId() const { return getId(); } + // Used by ExecutionHandler sync command processing. Notifies + // the SessionState of a received Execution.Sync command. + void addPendingExecutionSync(); + private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); - void enqueued(boost::intrusive_ptr<Message> msg); + void completeRcvMsg(boost::intrusive_ptr<qpid::broker::Message> msg); void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); @@ -152,8 +157,6 @@ class SessionState : public qpid::SessionState, SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; - IncompleteMessageList incomplete; - IncompleteMessageList::CompletionListener enqueuedOp; qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; @@ -162,7 +165,28 @@ class SessionState : public qpid::SessionState, boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; boost::intrusive_ptr<sys::TimerTask> flowControlTimer; + // sequence numbers for pending received Execution.Sync commands + std::queue<SequenceNumber> pendingExecutionSyncs; + bool currentCommandComplete; + + class IncompleteRcvMsg : public AsyncCompletion::CompletionHandler + { + public: + IncompleteRcvMsg(SessionState& _session, boost::intrusive_ptr<Message> _msg) + : session(&_session), msg(_msg) {} + virtual void operator() (bool sync); + void cancel(); // cancel pending incomplete callback [operator() above]. + + private: + SessionState *session; + boost::intrusive_ptr<Message> msg; + static void scheduledCompleter( boost::shared_ptr<IncompleteRcvMsg> incompleteMsg ); + }; + std::list< boost::shared_ptr<IncompleteRcvMsg> > incompleteRcvMsgs; + qpid::sys::Mutex incompleteRcvMsgsLock; + friend class SessionManager; + friend class IncompleteRcvMsg; }; diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 07405bcd8f..fc3f7c0854 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ InlineVector.cpp \ SequenceSet.cpp \ StringUtils.cpp \ - IncompleteMessageList.cpp \ RangeSet.cpp \ AtomicValue.cpp \ QueueTest.cpp \ diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index 6b44d95baa..ffb0125302 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare) BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second); BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first); BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second); - BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete()); + BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isReceiveComplete()); } QPID_AUTO_TEST_CASE(testCommit) @@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit) BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount()); intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload; - BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete()); + BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isReceiveComplete()); BOOST_CHECK_EQUAL(t.msg, msg_dequeue); BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount()); |