summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-01-04 05:14:33 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-01-04 05:14:33 +0000
commitd49689fb2761ba7bc5bfae03ea35811a55f2cadc (patch)
treec288f2cbe80e06387f963e593415eb9f7bff6ff2 /qpid/cpp/src
parent0245fea288acf0f0dac4a6174be1d9422f34168c (diff)
downloadqpid-python-d49689fb2761ba7bc5bfae03ea35811a55f2cadc.tar.gz
NO-JIRA: slim down some header file inclusions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1428722 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h24
-rw-r--r--qpid/cpp/src/qpid/broker/ConsumerFactory.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp58
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.h200
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h2
-rw-r--r--qpid/cpp/src/tests/BrokerMgmtAgent.cpp1
-rw-r--r--qpid/cpp/src/tests/test_store.cpp1
9 files changed, 151 insertions, 147 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index b9d528011f..fd49b1414f 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -20,11 +20,14 @@
*/
#include "qpid/broker/Broker.h"
+
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/ConnectionState.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
#include "qpid/broker/MessageStoreModule.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/RecoveryManagerImpl.h"
#include "qpid/broker/SaslAuthenticator.h"
@@ -37,6 +40,7 @@
#include "qpid/broker/MessageGroupManager.h"
#include "qmf/org/apache/qpid/broker/Package.h"
+#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h"
#include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h"
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 5c9c9edb12..bae67ee071 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -23,11 +23,11 @@
*/
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/ConnectionToken.h"
-#include "qpid/broker/DirectExchange.h"
+
+#include "qpid/DataDir.h"
+#include "qpid/Plugin.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
-#include "qpid/broker/MessageStore.h"
#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
@@ -35,29 +35,16 @@
#include "qpid/broker/QueueCleaner.h"
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
#include "qpid/broker/ConfigurationObservers.h"
-#include "qpid/sys/ConnectionCodec.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qmf/org/apache/qpid/broker/Broker.h"
-#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
-#include "qpid/Options.h"
-#include "qpid/Plugin.h"
-#include "qpid/DataDir.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/ConnectionCodec.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/types/Variant.h"
-#include "qpid/RefCounted.h"
-#include "qpid/broker/AclModule.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include <boost/intrusive_ptr.hpp>
+
#include <string>
#include <vector>
@@ -73,6 +60,7 @@ struct Url;
namespace broker {
+class AclModule;
class ConnectionState;
class ExpiryPolicy;
class Message;
diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
index abd39fb3f8..1c0f2571e2 100644
--- a/qpid/cpp/src/qpid/broker/ConsumerFactory.h
+++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h
@@ -25,11 +25,14 @@
// TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public.
// Refactor to use a more abstract interface.
-#include "qpid/broker/SemanticState.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+class SemanticState;
+class SemanticStateConsumerImpl;
+
/**
* Base class for consumer factoires. Plugins can register a
* ConsumerFactory via Broker:: getConsumerFactories() Each time a
@@ -41,7 +44,7 @@ class ConsumerFactory
public:
virtual ~ConsumerFactory() {}
- virtual boost::shared_ptr<SemanticState::ConsumerImpl> create(
+ virtual boost::shared_ptr<SemanticStateConsumerImpl> create(
SemanticState* parent,
const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive, const std::string& tag,
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 4bc3c01271..80c3f99176 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -33,6 +33,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/broker/AclModule.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/NameGenerator.h"
#include "qpid/UrlArray.h"
namespace qpid {
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 680488d66a..f97d37e893 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -285,7 +285,7 @@ void SemanticState::record(const DeliveryRecord& delivery)
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
-SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent,
+SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
@@ -328,12 +328,12 @@ Consumer(_name, type),
}
}
-ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const
+ManagementObject::shared_ptr SemanticStateConsumerImpl::GetManagementObject (void) const
{
return mgmtObject;
}
-Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
+Manageable::status_t SemanticStateConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&)
{
Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
@@ -343,16 +343,16 @@ Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t met
}
-OwnershipToken* SemanticState::ConsumerImpl::getSession()
+OwnershipToken* SemanticStateConsumerImpl::getSession()
{
return &(parent->session);
}
-bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
+bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg)
{
return deliver(cursor, msg, shared_from_this());
}
-bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
+bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer)
{
allocateCredit(msg);
boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
@@ -377,12 +377,12 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa
return true;
}
-bool SemanticState::ConsumerImpl::filter(const Message&)
+bool SemanticStateConsumerImpl::filter(const Message&)
{
return true;
}
-bool SemanticState::ConsumerImpl::accept(const Message& msg)
+bool SemanticStateConsumerImpl::accept(const Message& msg)
{
// TODO aconway 2009-06-08: if we have byte & message credit but
// checkCredit fails because the message is to big, we should
@@ -395,8 +395,8 @@ bool SemanticState::ConsumerImpl::accept(const Message& msg)
namespace {
struct ConsumerName {
- const SemanticState::ConsumerImpl& consumer;
- ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {}
+ const SemanticStateConsumerImpl& consumer;
+ ConsumerName(const SemanticStateConsumerImpl& ci) : consumer(ci) {}
};
ostream& operator<<(ostream& o, const ConsumerName& pc) {
@@ -405,7 +405,7 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) {
}
}
-void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
+void SemanticStateConsumerImpl::allocateCredit(const Message& msg)
{
Credit original = credit;
boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
@@ -415,7 +415,7 @@ void SemanticState::ConsumerImpl::allocateCredit(const Message& msg)
}
-bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
+bool SemanticStateConsumerImpl::checkCredit(const Message& msg)
{
boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg);
bool enoughCredit = credit.check(1, transfer->getRequiredCredit());
@@ -425,7 +425,7 @@ bool SemanticState::ConsumerImpl::checkCredit(const Message& msg)
return enoughCredit;
}
-SemanticState::ConsumerImpl::~ConsumerImpl()
+SemanticStateConsumerImpl::~SemanticStateConsumerImpl()
{
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
@@ -498,7 +498,7 @@ void SemanticState::requestDispatch()
i->second->requestDispatch();
}
-void SemanticState::ConsumerImpl::requestDispatch()
+void SemanticStateConsumerImpl::requestDispatch()
{
if (blocked) {
parent->session.getConnection().outputTasks.addOutputTask(this);
@@ -516,7 +516,7 @@ bool SemanticState::complete(DeliveryRecord& delivery)
return delivery.isRedundant();
}
-void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery)
+void SemanticStateConsumerImpl::complete(DeliveryRecord& delivery)
{
if (!delivery.isComplete()) {
delivery.complete();
@@ -541,7 +541,7 @@ SessionContext& SemanticState::getSession() { return session; }
const SessionContext& SemanticState::getSession() const { return session; }
-const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
+const SemanticStateConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const
{
ConsumerImpl::shared_ptr consumer;
if (!find(destination, consumer)) {
@@ -598,7 +598,7 @@ void SemanticState::stop(const std::string& destination)
find(destination)->stop();
}
-void SemanticState::ConsumerImpl::setWindowMode()
+void SemanticStateConsumerImpl::setWindowMode()
{
credit.setWindowMode(true);
if (mgmtObject){
@@ -606,7 +606,7 @@ void SemanticState::ConsumerImpl::setWindowMode()
}
}
-void SemanticState::ConsumerImpl::setCreditMode()
+void SemanticStateConsumerImpl::setCreditMode()
{
credit.setWindowMode(false);
if (mgmtObject){
@@ -614,17 +614,17 @@ void SemanticState::ConsumerImpl::setCreditMode()
}
}
-void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
+void SemanticStateConsumerImpl::addByteCredit(uint32_t value)
{
credit.addByteCredit(value);
}
-void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
+void SemanticStateConsumerImpl::addMessageCredit(uint32_t value)
{
credit.addMessageCredit(value);
}
-bool SemanticState::ConsumerImpl::haveCredit()
+bool SemanticStateConsumerImpl::haveCredit()
{
if (credit) {
return true;
@@ -634,19 +634,19 @@ bool SemanticState::ConsumerImpl::haveCredit()
}
}
-bool SemanticState::ConsumerImpl::doDispatch()
+bool SemanticStateConsumerImpl::doDispatch()
{
return queue->dispatch(shared_from_this());
}
-void SemanticState::ConsumerImpl::flush()
+void SemanticStateConsumerImpl::flush()
{
while(haveCredit() && doDispatch())
;
credit.cancel();
}
-void SemanticState::ConsumerImpl::stop()
+void SemanticStateConsumerImpl::stop()
{
credit.cancel();
}
@@ -701,7 +701,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last)
getSession().setUnackedCount(unacked.size());
}
-bool SemanticState::ConsumerImpl::doOutput()
+bool SemanticStateConsumerImpl::doOutput()
{
try {
return haveCredit() && doDispatch();
@@ -710,24 +710,24 @@ bool SemanticState::ConsumerImpl::doOutput()
}
}
-void SemanticState::ConsumerImpl::enableNotify()
+void SemanticStateConsumerImpl::enableNotify()
{
Mutex::ScopedLock l(lock);
notifyEnabled = true;
}
-void SemanticState::ConsumerImpl::disableNotify()
+void SemanticStateConsumerImpl::disableNotify()
{
Mutex::ScopedLock l(lock);
notifyEnabled = false;
}
-bool SemanticState::ConsumerImpl::isNotifyEnabled() const {
+bool SemanticStateConsumerImpl::isNotifyEnabled() const {
Mutex::ScopedLock l(lock);
return notifyEnabled;
}
-void SemanticState::ConsumerImpl::notify()
+void SemanticStateConsumerImpl::notify()
{
Mutex::ScopedLock l(lock);
if (notifyEnabled) {
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h
index 033d97a7b7..24ab30bf00 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.h
+++ b/qpid/cpp/src/qpid/broker/SemanticState.h
@@ -76,105 +76,16 @@ class SessionState;
* called when a client's socket is ready to write data.
*
*/
+class SemanticStateConsumerImpl;
class SemanticState : private boost::noncopyable {
- public:
- class ConsumerImpl : public Consumer, public sys::OutputTask,
- public boost::enable_shared_from_this<ConsumerImpl>,
- public management::Manageable
- {
- protected:
- mutable qpid::sys::Mutex lock;
- SemanticState* const parent;
- private:
- const boost::shared_ptr<Queue> queue;
- const bool ackExpected;
- const bool acquire;
- bool blocked;
- bool exclusive;
- std::string resumeId;
- const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
- uint64_t resumeTtl;
- framing::FieldTable arguments;
- Credit credit;
- bool notifyEnabled;
- const int syncFrequency;
- int deliveryCount;
- qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
- ProtocolRegistry& protocols;
-
- bool checkCredit(const Message& msg);
- void allocateCredit(const Message& msg);
- bool haveCredit();
-
- protected:
- QPID_BROKER_EXTERN virtual bool doDispatch();
- size_t unacked() { return parent->unacked.size(); }
- QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
-
- public:
- typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
-
- QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent,
- const std::string& name, boost::shared_ptr<Queue> queue,
- bool ack, SubscriptionType type, bool exclusive,
- const std::string& tag, const std::string& resumeId,
- uint64_t resumeTtl, const framing::FieldTable& arguments);
- QPID_BROKER_EXTERN ~ConsumerImpl();
- QPID_BROKER_EXTERN OwnershipToken* getSession();
- QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
- QPID_BROKER_EXTERN bool filter(const Message&);
- QPID_BROKER_EXTERN bool accept(const Message&);
- QPID_BROKER_EXTERN void cancel() {}
-
- QPID_BROKER_EXTERN void disableNotify();
- QPID_BROKER_EXTERN void enableNotify();
- QPID_BROKER_EXTERN void notify();
- QPID_BROKER_EXTERN bool isNotifyEnabled() const;
-
- QPID_BROKER_EXTERN void requestDispatch();
-
- QPID_BROKER_EXTERN void setWindowMode();
- QPID_BROKER_EXTERN void setCreditMode();
- QPID_BROKER_EXTERN void addByteCredit(uint32_t value);
- QPID_BROKER_EXTERN void addMessageCredit(uint32_t value);
- QPID_BROKER_EXTERN void flush();
- QPID_BROKER_EXTERN void stop();
- QPID_BROKER_EXTERN void complete(DeliveryRecord&);
- boost::shared_ptr<Queue> getQueue() const { return queue; }
- bool isBlocked() const { return blocked; }
- bool setBlocked(bool set) { std::swap(set, blocked); return set; }
-
- QPID_BROKER_EXTERN bool doOutput();
-
- Credit& getCredit() { return credit; }
- const Credit& getCredit() const { return credit; }
- bool isAckExpected() const { return ackExpected; }
- bool isAcquire() const { return acquire; }
- bool isExclusive() const { return exclusive; }
- std::string getResumeId() const { return resumeId; };
- const std::string& getTag() const { return tag; }
- uint64_t getResumeTtl() const { return resumeTtl; }
- uint32_t getDeliveryCount() const { return deliveryCount; }
- void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
- const framing::FieldTable& getArguments() const { return arguments; }
-
- SemanticState& getParent() { return *parent; }
- const SemanticState& getParent() const { return *parent; }
-
- void acknowledged(const DeliveryRecord&) {}
-
- // manageable entry points
- QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
- GetManagementObject(void) const;
-
- QPID_BROKER_EXTERN management::Manageable::status_t
- ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
- };
+ friend class SemanticStateConsumerImpl;
+ public:
+ typedef SemanticStateConsumerImpl ConsumerImpl;
typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
private:
- typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
+ typedef std::map<std::string, boost::shared_ptr<ConsumerImpl> > ConsumerImplMap;
typedef boost::tuple<std::string, std::string, std::string, std::string> Binding;
typedef std::set<Binding> Bindings;
@@ -201,8 +112,8 @@ class SemanticState : private boost::noncopyable {
bool complete(DeliveryRecord&);
AckRange findRange(DeliveryId first, DeliveryId last);
void requestDispatch();
- void cancel(ConsumerImpl::shared_ptr);
- void disable(ConsumerImpl::shared_ptr);
+ void cancel(boost::shared_ptr<ConsumerImpl>);
+ void disable(boost::shared_ptr<ConsumerImpl>);
void unbindSessionBindings();
public:
@@ -213,8 +124,8 @@ class SemanticState : private boost::noncopyable {
SessionContext& getSession();
const SessionContext& getSession() const;
- const ConsumerImpl::shared_ptr find(const std::string& destination) const;
- bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const;
+ const boost::shared_ptr<ConsumerImpl> find(const std::string& destination) const;
+ bool find(const std::string& destination, boost::shared_ptr<ConsumerImpl>&) const;
/**
* Get named queue, never returns 0.
@@ -280,6 +191,99 @@ class SemanticState : private boost::noncopyable {
const std::string& routingKey);
};
+class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask,
+ public boost::enable_shared_from_this<SemanticStateConsumerImpl>,
+ public management::Manageable
+{
+ protected:
+ mutable qpid::sys::Mutex lock;
+ SemanticState* const parent;
+ private:
+ const boost::shared_ptr<Queue> queue;
+ const bool ackExpected;
+ const bool acquire;
+ bool blocked;
+ bool exclusive;
+ std::string resumeId;
+ const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
+ uint64_t resumeTtl;
+ framing::FieldTable arguments;
+ Credit credit;
+ bool notifyEnabled;
+ const int syncFrequency;
+ int deliveryCount;
+ qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject;
+ ProtocolRegistry& protocols;
+
+ bool checkCredit(const Message& msg);
+ void allocateCredit(const Message& msg);
+ bool haveCredit();
+
+ protected:
+ QPID_BROKER_EXTERN virtual bool doDispatch();
+ size_t unacked() { return parent->unacked.size(); }
+ QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>);
+
+ public:
+ typedef boost::shared_ptr<SemanticStateConsumerImpl> shared_ptr;
+
+ QPID_BROKER_EXTERN SemanticStateConsumerImpl(SemanticState* parent,
+ const std::string& name, boost::shared_ptr<Queue> queue,
+ bool ack, SubscriptionType type, bool exclusive,
+ const std::string& tag, const std::string& resumeId,
+ uint64_t resumeTtl, const framing::FieldTable& arguments);
+ QPID_BROKER_EXTERN ~SemanticStateConsumerImpl();
+ QPID_BROKER_EXTERN OwnershipToken* getSession();
+ QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&);
+ QPID_BROKER_EXTERN bool filter(const Message&);
+ QPID_BROKER_EXTERN bool accept(const Message&);
+ QPID_BROKER_EXTERN void cancel() {}
+
+ QPID_BROKER_EXTERN void disableNotify();
+ QPID_BROKER_EXTERN void enableNotify();
+ QPID_BROKER_EXTERN void notify();
+ QPID_BROKER_EXTERN bool isNotifyEnabled() const;
+
+ QPID_BROKER_EXTERN void requestDispatch();
+
+ QPID_BROKER_EXTERN void setWindowMode();
+ QPID_BROKER_EXTERN void setCreditMode();
+ QPID_BROKER_EXTERN void addByteCredit(uint32_t value);
+ QPID_BROKER_EXTERN void addMessageCredit(uint32_t value);
+ QPID_BROKER_EXTERN void flush();
+ QPID_BROKER_EXTERN void stop();
+ QPID_BROKER_EXTERN void complete(DeliveryRecord&);
+ boost::shared_ptr<Queue> getQueue() const { return queue; }
+ bool isBlocked() const { return blocked; }
+ bool setBlocked(bool set) { std::swap(set, blocked); return set; }
+
+ QPID_BROKER_EXTERN bool doOutput();
+
+ Credit& getCredit() { return credit; }
+ const Credit& getCredit() const { return credit; }
+ bool isAckExpected() const { return ackExpected; }
+ bool isAcquire() const { return acquire; }
+ bool isExclusive() const { return exclusive; }
+ std::string getResumeId() const { return resumeId; };
+ const std::string& getTag() const { return tag; }
+ uint64_t getResumeTtl() const { return resumeTtl; }
+ uint32_t getDeliveryCount() const { return deliveryCount; }
+ void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; }
+ const framing::FieldTable& getArguments() const { return arguments; }
+
+ SemanticState& getParent() { return *parent; }
+ const SemanticState& getParent() const { return *parent; }
+
+ void acknowledged(const DeliveryRecord&) {}
+
+ // manageable entry points
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr
+ GetManagementObject(void) const;
+
+ QPID_BROKER_EXTERN management::Manageable::status_t
+ ManagementMethod(uint32_t methodId, management::Args& args, std::string& text);
+};
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index f714e8e01a..c2d35dd5cf 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -65,6 +65,8 @@ class QueueGuard;
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
+ typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+
struct Factory : public broker::ConsumerFactory {
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
index 9c21e51a18..a562958ea2 100644
--- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
+++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp
@@ -22,6 +22,7 @@
#include "unit_test.h"
#include "MessagingFixture.h"
#include "qpid/management/Buffer.h"
+#include "qpid/management/ManagementAgent.h"
#include "qpid/messaging/Message.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/log/Logger.h"
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
index 83f6a5e4b1..eac4deda2d 100644
--- a/qpid/cpp/src/tests/test_store.cpp
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -37,6 +37,7 @@
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Thread.h"
#include "qpid/Plugin.h"
#include "qpid/Options.h"
#include <boost/cast.hpp>