diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-03-08 15:04:07 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-03-08 15:04:07 +0000 |
commit | cf54529d706ef87e01e13f55735b7df7f324838c (patch) | |
tree | b941d3badaed7e97e83557aef0d827838b51c9fc | |
parent | 27dff9d6c468158640a47dca87830930688ff082 (diff) | |
download | qpid-python-cf54529d706ef87e01e13f55735b7df7f324838c.tar.gz |
QPID-3073: refactor to eliminate locks, malloc, and map insert/remove in receive path.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1079385 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCompletion.h | 63 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PersistableMessage.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.cpp | 146 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionState.h | 112 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessageUtils.h | 14 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 2 |
7 files changed, 169 insertions, 190 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index 1f3d11e0ee..3190861e14 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -22,6 +22,8 @@ * */ +#include <boost/intrusive_ptr.hpp> + #include "qpid/broker/BrokerImportExport.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" @@ -77,6 +79,22 @@ namespace broker { class AsyncCompletion { + public: + + /** Supplied by the Initiator to the end() method, allows for a callback + * when all outstanding completers are done. If the callback cannot be + * made during the end() call, the clone() method must supply a copy of + * this callback object that persists after end() returns. The cloned + * callback object will be used by the last completer thread, and + * released when the callback returns. + */ + class Callback : public RefCounted + { + public: + virtual void completed(bool) = 0; + virtual boost::intrusive_ptr<Callback> clone() = 0; + }; + private: mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; mutable qpid::sys::Monitor callbackLock; @@ -85,14 +103,17 @@ class AsyncCompletion void invokeCallback(bool sync) { qpid::sys::Mutex::ScopedLock l(callbackLock); if (active) { - inCallback = true; - { - qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - completed(sync); + if (callback) { + inCallback = true; + { + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + callback->completed(sync); + } + inCallback = false; + callback.reset(); + callbackLock.notifyAll(); } - inCallback = false; active = false; - callbackLock.notifyAll(); } } @@ -100,17 +121,17 @@ class AsyncCompletion /** Invoked when all completers have signalled that they have completed * (via calls to finishCompleter()). bool == true if called via end() */ - virtual void completed(bool) = 0; + boost::intrusive_ptr<Callback> callback; public: AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {}; virtual ~AsyncCompletion() { cancel(); } + /** True when all outstanding operations have compeleted */ bool isDone() { - qpid::sys::Mutex::ScopedLock l(callbackLock); return !active; } @@ -135,17 +156,32 @@ class AsyncCompletion */ void begin() { - qpid::sys::Mutex::ScopedLock l(callbackLock); ++completionsNeeded; } /** called by initiator after all potential completers have called * startCompleter(). */ - void end() + void end(Callback& cb) { assert(completionsNeeded.get() > 0); // ensure begin() has been called! + // the following only "decrements" the count if it is 1. This means + // there are no more outstanding completers and we are done. + if (completionsNeeded.boolCompareAndSwap(1, 0)) { + // done! Complete immediately + cb.completed(true); + return; + } + + // the compare-and-swap did not succeed. This means there are + // outstanding completers pending (count > 1). Get a persistent + // Callback object to use when the last completer is done. + // Decrement after setting up the callback ensures that pending + // completers cannot touch the callback until it is ready. + callback = cb.clone(); if (--completionsNeeded == 0) { + // note that a completer may have completed during the + // callback setup or decrement: invokeCallback(true); } } @@ -156,14 +192,9 @@ class AsyncCompletion virtual void cancel() { qpid::sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); + callback.reset(); active = false; } - - /** may be called by Initiator after all completers have been added but - * prior to calling end(). Allows initiator to determine if it _really_ - * needs to wait for pending Completers (e.g. count > 1). - */ - //uint32_t getPendingCompleters() { return completionsNeeded.get(); } }; }} // qpid::broker:: diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h index a84aa45d76..e50b52c09a 100644 --- a/qpid/cpp/src/qpid/broker/PersistableMessage.h +++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h @@ -56,7 +56,7 @@ class PersistableMessage : public Persistable * operations have completed, the transfer of this message from the client * may be considered complete. */ - boost::shared_ptr<AsyncCompletion> ingressCompletion; + AsyncCompletion ingressCompletion; /** * Tracks the number of outstanding asynchronous dequeue @@ -115,12 +115,11 @@ class PersistableMessage : public Persistable virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; /** track the progress of a message received by the broker - see ingressCompletion above */ - QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); } - QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; } - QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; } + QPID_BROKER_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); } + QPID_BROKER_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; } - QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); } - QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); } + QPID_BROKER_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); } + QPID_BROKER_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); } QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp index 21c7a2a737..3494288f7b 100644 --- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -144,11 +144,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) } } - /** @todo KAG: - REMOVE ONCE STABLE */ - if (index.find(msg.payload) != index.end()) { - QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); - } - if (flowStopped || !index.empty()) { // ignore flow control if we are populating the queue due to cluster replication: if (broker && broker->isClusterUpdatee()) { @@ -156,7 +151,7 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg) return; } QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); - msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes + msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes index.insert(msg.payload); } } @@ -196,14 +191,14 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg) // flow enabled - release all pending msgs while (!index.empty()) { std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); - (*itr)->getIngressCompletion()->finishCompleter(); + (*itr)->getIngressCompletion().finishCompleter(); index.erase(itr); } } else { // even if flow controlled, we must release this msg as it is being dequeued std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); if (itr != index.end()) { // this msg is flow controlled, release it: - (*itr)->getIngressCompletion()->finishCompleter(); + (*itr)->getIngressCompletion().finishCompleter(); index.erase(itr); } } diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index 11f3e84b70..1ed3277aae 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -62,7 +62,7 @@ SessionState::SessionState( msgBuilder(&broker.getStore()), mgmtObject(0), rateFlowcontrol(0), - scheduledCompleterContext(new ScheduledCompleterContext(this)) + asyncCommandCompleter(new AsyncCommandCompleter(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -102,25 +102,7 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); - // clean up any outstanding incomplete commands - { - qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); - incompleteCmds.clear(); - while (!copy.empty()) { - boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); - copy.erase(copy.begin()); - { - // note: need to drop lock, as callback may attempt to take it. - qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); - ref->cancel(); - } - } - } - - // At this point, we are guaranteed no further completion callbacks will be - // made. Cancel any outstanding scheduledCompleter calls... - scheduledCompleterContext->cancel(); + asyncCommandCompleter->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -276,13 +258,11 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); - - boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg))); - msg->setIngressCompletion( ac ); - ac->begin(); + msg->getIngressCompletion().begin(); semanticState.handle(msg); msgBuilder.end(); - ac->end(); // allows msg to complete xfer + IncompleteIngressMsgXfer xfer(this, msg); + msg->getIngressCompletion().end(xfer); // allows msg to complete xfer } // Handle producer session flow control @@ -451,110 +431,94 @@ void SessionState::addPendingExecutionSync() } -/** factory for creating IncompleteIngressMsgXfer objects which - * can be references from Messages as ingress AsyncCompletion objects. +/** factory for creating a reference-counted IncompleteIngressMsgXfer object + * which will be attached to a message that will be completed asynchronously. */ -boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> -SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) +boost::intrusive_ptr<AsyncCompletion::Callback> +SessionState::IncompleteIngressMsgXfer::clone() { - SequenceNumber id = msg->getCommandId(); - boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg)); - qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); - incompleteCmds[id] = cmd; - return cmd; + boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg)); + return cb; } -/** Invoked by the asynchronous completer associated with - * a received msg that is pending Completion. May be invoked - * by the SessionState directly (sync == true), or some external - * entity (!sync). +/** Invoked by the asynchronous completer associated with a received + * msg that is pending Completion. May be invoked by the IO thread + * (sync == true), or some external thread (!sync). */ void SessionState::IncompleteIngressMsgXfer::completed(bool sync) { if (!sync) { /** note well: this path may execute in any thread. It is safe to access - * the session, as the SessionState destructor will cancel all outstanding - * callbacks before getting destroyed (so we'll never get here). + * the scheduledCompleterContext, since *this has a shared pointer to it. + * but not session or msg! */ + session = 0; msg = 0; QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); - if (session->scheduledCompleterContext->scheduleCompletion(id)) - session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, - session->scheduledCompleterContext)); - } else { // command is being completed in IO thread. - // this path runs only on the IO thread. - qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; - cmd = session->incompleteCmds.find(id); - if (cmd != session->incompleteCmds.end()) { - boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); - session->incompleteCmds.erase(cmd); - - if (session->isAttached()) { - QPID_LOG(debug, ": receive completed for msg seq=" << id); - qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); - session->completeRcvMsg(id, requiresAccept, requiresSync); - return; - } + completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync); + } else { + // this path runs directly from the ac->end() call in handleContent() above, + // so *session and *msg are definately valid. + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + session->completeRcvMsg(id, requiresAccept, requiresSync); } } + completerContext.reset(); // ??? KAG optional ??? } -/** Scheduled from incomplete command's completed callback, safely completes all - * completed commands in the IO Thread. Guaranteed not to be running at the same - * time as the message receive code. +/** Scheduled from an asynchronous command's completed callback to run on + * the IO thread. */ -void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) +void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt) { ctxt->completeCommands(); } -/** mark a command (sequence) as completed, return True if caller should - * schedule a call to completeCommands() +/** mark an ingress Message.Transfer command as completed. + * This method must be thread safe - it may run on any thread. */ -bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) +void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd, + bool requiresAccept, + bool requiresSync) { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); - - completedCmds.push_back(cmd); - return (completedCmds.size() == 1); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); + + if (session) { + MessageInfo msg(cmd, requiresAccept, requiresSync); + completedMsgs.push_back(msg); + if (completedMsgs.size() == 1) { + session->getConnection().requestIOProcessing(boost::bind(&schedule, + session->asyncCommandCompleter)); + } + } } -/** Cause the session to complete all completed commands */ -void SessionState::ScheduledCompleterContext::completeCommands() +/** Cause the session to complete all completed commands. + * Executes on the IO thread. + */ +void SessionState::AsyncCommandCompleter::completeCommands() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); // when session is destroyed, it clears the session pointer via cancel(). - if (!session) return; - - while (!completedCmds.empty()) { - SequenceNumber id = completedCmds.front(); - completedCmds.pop_front(); - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; - { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); - - cmd = session->incompleteCmds.find(id); - if (cmd !=session->incompleteCmds.end()) { - boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); - { - qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); - tmp->do_completion(); // retakes incompleteCmdslock - } - } + if (session && session->isAttached()) { + for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin(); + msg != completedMsgs.end(); ++msg) { + session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync); } } + completedMsgs.clear(); } /** cancel any pending calls to scheduleComplete */ -void SessionState::ScheduledCompleterContext::cancel() +void SessionState::AsyncCommandCompleter::cancel() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock); session = 0; } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index 5e162e6475..2250940102 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -38,6 +38,7 @@ #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> +#include <boost/intrusive_ptr.hpp> #include <set> #include <vector> @@ -176,79 +177,84 @@ class SessionState : public qpid::SessionState, std::queue<SequenceNumber> pendingExecutionSyncs; bool currentCommandComplete; - /** Abstract class that represents a command that is pending - * completion. + /** This class provides a context for completing asynchronous commands in a thread + * safe manner. Asynchronous commands save their completion state in this class. + * This class then schedules the completeCommands() method in the IO thread. + * While running in the IO thread, completeCommands() may safely complete all + * saved commands without the risk of colliding with other operations on this + * SessionState. */ - class IncompleteCommandContext : public AsyncCompletion + class AsyncCommandCompleter : public RefCounted { + private: + SessionState *session; + qpid::sys::Mutex completerLock; + + // special-case message.transfer commands for optimization + struct MessageInfo { + SequenceNumber cmd; // message.transfer command id + bool requiresAccept; + bool requiresSync; + MessageInfo(SequenceNumber c, bool a, bool s) + : cmd(c), requiresAccept(a), requiresSync(s) {} + }; + std::vector<MessageInfo> completedMsgs; + + /** complete all pending commands, runs in IO thread */ + void completeCommands(); + + /** for scheduling a run of "completeCommands()" on the IO thread */ + static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>); + + public: + AsyncCommandCompleter(SessionState *s) : session(s) {}; + ~AsyncCommandCompleter() {}; + + /** schedule the completion of an ingress message.transfer command */ + void scheduleMsgCompletion(SequenceNumber cmd, + bool requiresAccept, + bool requiresSync); + void cancel(); // called by SessionState destructor. + }; + boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter; + + /** Abstract class that represents a single asynchronous command that is + * pending completion. + */ + class AsyncCommandContext : public AsyncCompletion::Callback { public: - IncompleteCommandContext( SessionState *ss, SequenceNumber _id ) - : id(_id), session(ss) {} - virtual ~IncompleteCommandContext() {} - - /* allows manual invokation of completion, used by IO thread to - * complete a command that was originally finished on a different - * thread. - */ - void do_completion() { completed(true); } + AsyncCommandContext( SessionState *ss, SequenceNumber _id ) + : id(_id), completerContext(ss->asyncCommandCompleter) {} + virtual ~AsyncCommandContext() {} protected: SequenceNumber id; - SessionState *session; + boost::intrusive_ptr<AsyncCommandCompleter> completerContext; }; /** incomplete Message.transfer commands - inbound to broker from client */ - class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext + class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext { public: IncompleteIngressMsgXfer( SessionState *ss, - SequenceNumber _id, - boost::intrusive_ptr<Message> msg ) - : IncompleteCommandContext(ss, _id), - requiresAccept(msg->requiresAccept()), - requiresSync(msg->getFrames().getMethod()->isSync()) {}; + boost::intrusive_ptr<Message> m ) + : AsyncCommandContext(ss, m->getCommandId()), + session(ss), + msg(m.get()), + requiresAccept(msg->requiresAccept()), + requiresSync(msg->getFrames().getMethod()->isSync()) {}; virtual ~IncompleteIngressMsgXfer() {}; - protected: virtual void completed(bool); + virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone(); private: - /** meta-info required to complete the message */ + SessionState *session; // only valid if sync == true + Message *msg; // only valid if sync == true bool requiresAccept; - bool requiresSync; // method's isSync() flag + bool requiresSync; }; - /** creates a command context suitable for use as an AsyncCompletion in a message */ - boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg); - - /* A list of commands that are pending completion. These commands are - * awaiting some set of asynchronous operations to finish (eg: store, - * flow-control, etc). before the command can be completed to the client - */ - std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds; - qpid::sys::Mutex incompleteCmdsLock; // locks above container - - /** This context is shared between the SessionState and scheduledCompleter, - * holds the sequence numbers of all commands that have completed asynchronously. - */ - class ScheduledCompleterContext { - private: - std::list<SequenceNumber> completedCmds; - // ordering: take this lock first, then incompleteCmdsLock - qpid::sys::Mutex completedCmdsLock; - SessionState *session; - public: - ScheduledCompleterContext(SessionState *s) : session(s) {}; - bool scheduleCompletion(SequenceNumber cmd); - void completeCommands(); - void cancel(); - }; - boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext; - - /** The following method runs the in IO thread and completes commands that - * where finished asynchronously. - */ - static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>); friend class SessionManager; }; diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h index baca14cf4e..a1b140d484 100644 --- a/qpid/cpp/src/tests/MessageUtils.h +++ b/qpid/cpp/src/tests/MessageUtils.h @@ -20,7 +20,6 @@ */ #include "qpid/broker/Message.h" -#include "qpid/broker/AsyncCompletion.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/Uuid.h" @@ -29,17 +28,6 @@ using namespace qpid; using namespace broker; using namespace framing; -namespace { - class DummyCompletion : public AsyncCompletion - { - public: - DummyCompletion() {} - virtual ~DummyCompletion() {} - protected: - void completed(bool) {} - }; -} - namespace qpid { namespace tests { @@ -62,8 +50,6 @@ struct MessageUtils msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); if (durable) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2); - boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion()); - msg->setIngressCompletion(dc); return msg; } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index fd30a98ac0..2059727e7b 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -88,8 +88,6 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); - boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion()); - msg->setIngressCompletion(dc); return msg; } |