summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2011-03-08 15:04:07 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2011-03-08 15:04:07 +0000
commitcf54529d706ef87e01e13f55735b7df7f324838c (patch)
treeb941d3badaed7e97e83557aef0d827838b51c9fc
parent27dff9d6c468158640a47dca87830930688ff082 (diff)
downloadqpid-python-cf54529d706ef87e01e13f55735b7df7f324838c.tar.gz
QPID-3073: refactor to eliminate locks, malloc, and map insert/remove in receive path.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1079385 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h63
-rw-r--r--qpid/cpp/src/qpid/broker/PersistableMessage.h11
-rw-r--r--qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp146
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h112
-rw-r--r--qpid/cpp/src/tests/MessageUtils.h14
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp2
7 files changed, 169 insertions, 190 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index 1f3d11e0ee..3190861e14 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -22,6 +22,8 @@
*
*/
+#include <boost/intrusive_ptr.hpp>
+
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -77,6 +79,22 @@ namespace broker {
class AsyncCompletion
{
+ public:
+
+ /** Supplied by the Initiator to the end() method, allows for a callback
+ * when all outstanding completers are done. If the callback cannot be
+ * made during the end() call, the clone() method must supply a copy of
+ * this callback object that persists after end() returns. The cloned
+ * callback object will be used by the last completer thread, and
+ * released when the callback returns.
+ */
+ class Callback : public RefCounted
+ {
+ public:
+ virtual void completed(bool) = 0;
+ virtual boost::intrusive_ptr<Callback> clone() = 0;
+ };
+
private:
mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
mutable qpid::sys::Monitor callbackLock;
@@ -85,14 +103,17 @@ class AsyncCompletion
void invokeCallback(bool sync) {
qpid::sys::Mutex::ScopedLock l(callbackLock);
if (active) {
- inCallback = true;
- {
- qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
- completed(sync);
+ if (callback) {
+ inCallback = true;
+ {
+ qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+ callback->completed(sync);
+ }
+ inCallback = false;
+ callback.reset();
+ callbackLock.notifyAll();
}
- inCallback = false;
active = false;
- callbackLock.notifyAll();
}
}
@@ -100,17 +121,17 @@ class AsyncCompletion
/** Invoked when all completers have signalled that they have completed
* (via calls to finishCompleter()). bool == true if called via end()
*/
- virtual void completed(bool) = 0;
+ boost::intrusive_ptr<Callback> callback;
public:
AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {};
virtual ~AsyncCompletion() { cancel(); }
+
/** True when all outstanding operations have compeleted
*/
bool isDone()
{
- qpid::sys::Mutex::ScopedLock l(callbackLock);
return !active;
}
@@ -135,17 +156,32 @@ class AsyncCompletion
*/
void begin()
{
- qpid::sys::Mutex::ScopedLock l(callbackLock);
++completionsNeeded;
}
/** called by initiator after all potential completers have called
* startCompleter().
*/
- void end()
+ void end(Callback& cb)
{
assert(completionsNeeded.get() > 0); // ensure begin() has been called!
+ // the following only "decrements" the count if it is 1. This means
+ // there are no more outstanding completers and we are done.
+ if (completionsNeeded.boolCompareAndSwap(1, 0)) {
+ // done! Complete immediately
+ cb.completed(true);
+ return;
+ }
+
+ // the compare-and-swap did not succeed. This means there are
+ // outstanding completers pending (count > 1). Get a persistent
+ // Callback object to use when the last completer is done.
+ // Decrement after setting up the callback ensures that pending
+ // completers cannot touch the callback until it is ready.
+ callback = cb.clone();
if (--completionsNeeded == 0) {
+ // note that a completer may have completed during the
+ // callback setup or decrement:
invokeCallback(true);
}
}
@@ -156,14 +192,9 @@ class AsyncCompletion
virtual void cancel() {
qpid::sys::Mutex::ScopedLock l(callbackLock);
while (inCallback) callbackLock.wait();
+ callback.reset();
active = false;
}
-
- /** may be called by Initiator after all completers have been added but
- * prior to calling end(). Allows initiator to determine if it _really_
- * needs to wait for pending Completers (e.g. count > 1).
- */
- //uint32_t getPendingCompleters() { return completionsNeeded.get(); }
};
}} // qpid::broker::
diff --git a/qpid/cpp/src/qpid/broker/PersistableMessage.h b/qpid/cpp/src/qpid/broker/PersistableMessage.h
index a84aa45d76..e50b52c09a 100644
--- a/qpid/cpp/src/qpid/broker/PersistableMessage.h
+++ b/qpid/cpp/src/qpid/broker/PersistableMessage.h
@@ -56,7 +56,7 @@ class PersistableMessage : public Persistable
* operations have completed, the transfer of this message from the client
* may be considered complete.
*/
- boost::shared_ptr<AsyncCompletion> ingressCompletion;
+ AsyncCompletion ingressCompletion;
/**
* Tracks the number of outstanding asynchronous dequeue
@@ -115,12 +115,11 @@ class PersistableMessage : public Persistable
virtual QPID_BROKER_EXTERN bool isPersistent() const = 0;
/** track the progress of a message received by the broker - see ingressCompletion above */
- QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); }
- QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; }
- QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; }
+ QPID_BROKER_EXTERN bool isIngressComplete() { return ingressCompletion.isDone(); }
+ QPID_BROKER_EXTERN AsyncCompletion& getIngressCompletion() { return ingressCompletion; }
- QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); }
- QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); }
+ QPID_BROKER_EXTERN void enqueueStart() { ingressCompletion.startCompleter(); }
+ QPID_BROKER_EXTERN void enqueueComplete() { ingressCompletion.finishCompleter(); }
QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store);
diff --git a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
index 21c7a2a737..3494288f7b 100644
--- a/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
@@ -144,11 +144,6 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
}
}
- /** @todo KAG: - REMOVE ONCE STABLE */
- if (index.find(msg.payload) != index.end()) {
- QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position);
- }
-
if (flowStopped || !index.empty()) {
// ignore flow control if we are populating the queue due to cluster replication:
if (broker && broker->isClusterUpdatee()) {
@@ -156,7 +151,7 @@ void QueueFlowLimit::enqueued(const QueuedMessage& msg)
return;
}
QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position);
- msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes
+ msg.payload->getIngressCompletion().startCompleter(); // don't complete until flow resumes
index.insert(msg.payload);
}
}
@@ -196,14 +191,14 @@ void QueueFlowLimit::dequeued(const QueuedMessage& msg)
// flow enabled - release all pending msgs
while (!index.empty()) {
std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin();
- (*itr)->getIngressCompletion()->finishCompleter();
+ (*itr)->getIngressCompletion().finishCompleter();
index.erase(itr);
}
} else {
// even if flow controlled, we must release this msg as it is being dequeued
std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload);
if (itr != index.end()) { // this msg is flow controlled, release it:
- (*itr)->getIngressCompletion()->finishCompleter();
+ (*itr)->getIngressCompletion().finishCompleter();
index.erase(itr);
}
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index 11f3e84b70..1ed3277aae 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -62,7 +62,7 @@ SessionState::SessionState(
msgBuilder(&broker.getStore()),
mgmtObject(0),
rateFlowcontrol(0),
- scheduledCompleterContext(new ScheduledCompleterContext(this))
+ asyncCommandCompleter(new AsyncCommandCompleter(this))
{
uint32_t maxRate = broker.getOptions().maxSessionRate;
if (maxRate) {
@@ -102,25 +102,7 @@ SessionState::~SessionState() {
if (flowControlTimer)
flowControlTimer->cancel();
- // clean up any outstanding incomplete commands
- {
- qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds);
- incompleteCmds.clear();
- while (!copy.empty()) {
- boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second);
- copy.erase(copy.begin());
- {
- // note: need to drop lock, as callback may attempt to take it.
- qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock);
- ref->cancel();
- }
- }
- }
-
- // At this point, we are guaranteed no further completion callbacks will be
- // made. Cancel any outstanding scheduledCompleter calls...
- scheduledCompleterContext->cancel();
+ asyncCommandCompleter->cancel();
}
AMQP_ClientProxy& SessionState::getProxy() {
@@ -276,13 +258,11 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
msg->getFrames().append(header);
}
msg->setPublisher(&getConnection());
-
- boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg)));
- msg->setIngressCompletion( ac );
- ac->begin();
+ msg->getIngressCompletion().begin();
semanticState.handle(msg);
msgBuilder.end();
- ac->end(); // allows msg to complete xfer
+ IncompleteIngressMsgXfer xfer(this, msg);
+ msg->getIngressCompletion().end(xfer); // allows msg to complete xfer
}
// Handle producer session flow control
@@ -451,110 +431,94 @@ void SessionState::addPendingExecutionSync()
}
-/** factory for creating IncompleteIngressMsgXfer objects which
- * can be references from Messages as ingress AsyncCompletion objects.
+/** factory for creating a reference-counted IncompleteIngressMsgXfer object
+ * which will be attached to a message that will be completed asynchronously.
*/
-boost::shared_ptr<SessionState::IncompleteIngressMsgXfer>
-SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg)
+boost::intrusive_ptr<AsyncCompletion::Callback>
+SessionState::IncompleteIngressMsgXfer::clone()
{
- SequenceNumber id = msg->getCommandId();
- boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg));
- qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock);
- incompleteCmds[id] = cmd;
- return cmd;
+ boost::intrusive_ptr<SessionState::IncompleteIngressMsgXfer> cb(new SessionState::IncompleteIngressMsgXfer(session, msg));
+ return cb;
}
-/** Invoked by the asynchronous completer associated with
- * a received msg that is pending Completion. May be invoked
- * by the SessionState directly (sync == true), or some external
- * entity (!sync).
+/** Invoked by the asynchronous completer associated with a received
+ * msg that is pending Completion. May be invoked by the IO thread
+ * (sync == true), or some external thread (!sync).
*/
void SessionState::IncompleteIngressMsgXfer::completed(bool sync)
{
if (!sync) {
/** note well: this path may execute in any thread. It is safe to access
- * the session, as the SessionState destructor will cancel all outstanding
- * callbacks before getting destroyed (so we'll never get here).
+ * the scheduledCompleterContext, since *this has a shared pointer to it.
+ * but not session or msg!
*/
+ session = 0; msg = 0;
QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id);
- if (session->scheduledCompleterContext->scheduleCompletion(id))
- session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter,
- session->scheduledCompleterContext));
- } else { // command is being completed in IO thread.
- // this path runs only on the IO thread.
- qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock);
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
- cmd = session->incompleteCmds.find(id);
- if (cmd != session->incompleteCmds.end()) {
- boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
- session->incompleteCmds.erase(cmd);
-
- if (session->isAttached()) {
- QPID_LOG(debug, ": receive completed for msg seq=" << id);
- qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock);
- session->completeRcvMsg(id, requiresAccept, requiresSync);
- return;
- }
+ completerContext->scheduleMsgCompletion(id, requiresAccept, requiresSync);
+ } else {
+ // this path runs directly from the ac->end() call in handleContent() above,
+ // so *session and *msg are definately valid.
+ if (session->isAttached()) {
+ QPID_LOG(debug, ": receive completed for msg seq=" << id);
+ session->completeRcvMsg(id, requiresAccept, requiresSync);
}
}
+ completerContext.reset(); // ??? KAG optional ???
}
-/** Scheduled from incomplete command's completed callback, safely completes all
- * completed commands in the IO Thread. Guaranteed not to be running at the same
- * time as the message receive code.
+/** Scheduled from an asynchronous command's completed callback to run on
+ * the IO thread.
*/
-void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt)
+void SessionState::AsyncCommandCompleter::schedule(boost::intrusive_ptr<AsyncCommandCompleter> ctxt)
{
ctxt->completeCommands();
}
-/** mark a command (sequence) as completed, return True if caller should
- * schedule a call to completeCommands()
+/** mark an ingress Message.Transfer command as completed.
+ * This method must be thread safe - it may run on any thread.
*/
-bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd)
+void SessionState::AsyncCommandCompleter::scheduleMsgCompletion(SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync)
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
-
- completedCmds.push_back(cmd);
- return (completedCmds.size() == 1);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
+
+ if (session) {
+ MessageInfo msg(cmd, requiresAccept, requiresSync);
+ completedMsgs.push_back(msg);
+ if (completedMsgs.size() == 1) {
+ session->getConnection().requestIOProcessing(boost::bind(&schedule,
+ session->asyncCommandCompleter));
+ }
+ }
}
-/** Cause the session to complete all completed commands */
-void SessionState::ScheduledCompleterContext::completeCommands()
+/** Cause the session to complete all completed commands.
+ * Executes on the IO thread.
+ */
+void SessionState::AsyncCommandCompleter::completeCommands()
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
// when session is destroyed, it clears the session pointer via cancel().
- if (!session) return;
-
- while (!completedCmds.empty()) {
- SequenceNumber id = completedCmds.front();
- completedCmds.pop_front();
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd;
- {
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock);
-
- cmd = session->incompleteCmds.find(id);
- if (cmd !=session->incompleteCmds.end()) {
- boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second);
- {
- qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock);
- tmp->do_completion(); // retakes incompleteCmdslock
- }
- }
+ if (session && session->isAttached()) {
+ for (std::vector<MessageInfo>::iterator msg = completedMsgs.begin();
+ msg != completedMsgs.end(); ++msg) {
+ session->completeRcvMsg(msg->cmd, msg->requiresAccept, msg->requiresSync);
}
}
+ completedMsgs.clear();
}
/** cancel any pending calls to scheduleComplete */
-void SessionState::ScheduledCompleterContext::cancel()
+void SessionState::AsyncCommandCompleter::cancel()
{
- qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock);
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(completerLock);
session = 0;
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index 5e162e6475..2250940102 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -38,6 +38,7 @@
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <set>
#include <vector>
@@ -176,79 +177,84 @@ class SessionState : public qpid::SessionState,
std::queue<SequenceNumber> pendingExecutionSyncs;
bool currentCommandComplete;
- /** Abstract class that represents a command that is pending
- * completion.
+ /** This class provides a context for completing asynchronous commands in a thread
+ * safe manner. Asynchronous commands save their completion state in this class.
+ * This class then schedules the completeCommands() method in the IO thread.
+ * While running in the IO thread, completeCommands() may safely complete all
+ * saved commands without the risk of colliding with other operations on this
+ * SessionState.
*/
- class IncompleteCommandContext : public AsyncCompletion
+ class AsyncCommandCompleter : public RefCounted {
+ private:
+ SessionState *session;
+ qpid::sys::Mutex completerLock;
+
+ // special-case message.transfer commands for optimization
+ struct MessageInfo {
+ SequenceNumber cmd; // message.transfer command id
+ bool requiresAccept;
+ bool requiresSync;
+ MessageInfo(SequenceNumber c, bool a, bool s)
+ : cmd(c), requiresAccept(a), requiresSync(s) {}
+ };
+ std::vector<MessageInfo> completedMsgs;
+
+ /** complete all pending commands, runs in IO thread */
+ void completeCommands();
+
+ /** for scheduling a run of "completeCommands()" on the IO thread */
+ static void schedule(boost::intrusive_ptr<AsyncCommandCompleter>);
+
+ public:
+ AsyncCommandCompleter(SessionState *s) : session(s) {};
+ ~AsyncCommandCompleter() {};
+
+ /** schedule the completion of an ingress message.transfer command */
+ void scheduleMsgCompletion(SequenceNumber cmd,
+ bool requiresAccept,
+ bool requiresSync);
+ void cancel(); // called by SessionState destructor.
+ };
+ boost::intrusive_ptr<AsyncCommandCompleter> asyncCommandCompleter;
+
+ /** Abstract class that represents a single asynchronous command that is
+ * pending completion.
+ */
+ class AsyncCommandContext : public AsyncCompletion::Callback
{
public:
- IncompleteCommandContext( SessionState *ss, SequenceNumber _id )
- : id(_id), session(ss) {}
- virtual ~IncompleteCommandContext() {}
-
- /* allows manual invokation of completion, used by IO thread to
- * complete a command that was originally finished on a different
- * thread.
- */
- void do_completion() { completed(true); }
+ AsyncCommandContext( SessionState *ss, SequenceNumber _id )
+ : id(_id), completerContext(ss->asyncCommandCompleter) {}
+ virtual ~AsyncCommandContext() {}
protected:
SequenceNumber id;
- SessionState *session;
+ boost::intrusive_ptr<AsyncCommandCompleter> completerContext;
};
/** incomplete Message.transfer commands - inbound to broker from client
*/
- class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext
+ class IncompleteIngressMsgXfer : public SessionState::AsyncCommandContext
{
public:
IncompleteIngressMsgXfer( SessionState *ss,
- SequenceNumber _id,
- boost::intrusive_ptr<Message> msg )
- : IncompleteCommandContext(ss, _id),
- requiresAccept(msg->requiresAccept()),
- requiresSync(msg->getFrames().getMethod()->isSync()) {};
+ boost::intrusive_ptr<Message> m )
+ : AsyncCommandContext(ss, m->getCommandId()),
+ session(ss),
+ msg(m.get()),
+ requiresAccept(msg->requiresAccept()),
+ requiresSync(msg->getFrames().getMethod()->isSync()) {};
virtual ~IncompleteIngressMsgXfer() {};
- protected:
virtual void completed(bool);
+ virtual boost::intrusive_ptr<AsyncCompletion::Callback> clone();
private:
- /** meta-info required to complete the message */
+ SessionState *session; // only valid if sync == true
+ Message *msg; // only valid if sync == true
bool requiresAccept;
- bool requiresSync; // method's isSync() flag
+ bool requiresSync;
};
- /** creates a command context suitable for use as an AsyncCompletion in a message */
- boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg);
-
- /* A list of commands that are pending completion. These commands are
- * awaiting some set of asynchronous operations to finish (eg: store,
- * flow-control, etc). before the command can be completed to the client
- */
- std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds;
- qpid::sys::Mutex incompleteCmdsLock; // locks above container
-
- /** This context is shared between the SessionState and scheduledCompleter,
- * holds the sequence numbers of all commands that have completed asynchronously.
- */
- class ScheduledCompleterContext {
- private:
- std::list<SequenceNumber> completedCmds;
- // ordering: take this lock first, then incompleteCmdsLock
- qpid::sys::Mutex completedCmdsLock;
- SessionState *session;
- public:
- ScheduledCompleterContext(SessionState *s) : session(s) {};
- bool scheduleCompletion(SequenceNumber cmd);
- void completeCommands();
- void cancel();
- };
- boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext;
-
- /** The following method runs the in IO thread and completes commands that
- * where finished asynchronously.
- */
- static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>);
friend class SessionManager;
};
diff --git a/qpid/cpp/src/tests/MessageUtils.h b/qpid/cpp/src/tests/MessageUtils.h
index baca14cf4e..a1b140d484 100644
--- a/qpid/cpp/src/tests/MessageUtils.h
+++ b/qpid/cpp/src/tests/MessageUtils.h
@@ -20,7 +20,6 @@
*/
#include "qpid/broker/Message.h"
-#include "qpid/broker/AsyncCompletion.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/Uuid.h"
@@ -29,17 +28,6 @@ using namespace qpid;
using namespace broker;
using namespace framing;
-namespace {
- class DummyCompletion : public AsyncCompletion
- {
- public:
- DummyCompletion() {}
- virtual ~DummyCompletion() {}
- protected:
- void completed(bool) {}
- };
-}
-
namespace qpid {
namespace tests {
@@ -62,8 +50,6 @@ struct MessageUtils
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
if (durable)
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2);
- boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
- msg->setIngressCompletion(dc);
return msg;
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index fd30a98ac0..2059727e7b 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -88,8 +88,6 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK
msg->getFrames().append(method);
msg->getFrames().append(header);
msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
- boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion());
- msg->setIngressCompletion(dc);
return msg;
}