diff options
author | Andrew Stitcher <astitcher@apache.org> | 2013-01-04 05:14:33 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2013-01-04 05:14:33 +0000 |
commit | d49689fb2761ba7bc5bfae03ea35811a55f2cadc (patch) | |
tree | c288f2cbe80e06387f963e593415eb9f7bff6ff2 /qpid/cpp/src | |
parent | 0245fea288acf0f0dac4a6174be1d9422f34168c (diff) | |
download | qpid-python-d49689fb2761ba7bc5bfae03ea35811a55f2cadc.tar.gz |
NO-JIRA: slim down some header file inclusions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1428722 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/ConsumerFactory.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 58 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 200 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerMgmtAgent.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/tests/test_store.cpp | 1 |
9 files changed, 151 insertions, 147 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index b9d528011f..fd49b1414f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -20,11 +20,14 @@ */ #include "qpid/broker/Broker.h" + +#include "qpid/broker/AclModule.h" #include "qpid/broker/ConnectionState.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" #include "qpid/broker/MessageStoreModule.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/RecoveryManagerImpl.h" #include "qpid/broker/SaslAuthenticator.h" @@ -37,6 +40,7 @@ #include "qpid/broker/MessageGroupManager.h" #include "qmf/org/apache/qpid/broker/Package.h" +#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerDelete.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerQuery.h" diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 5c9c9edb12..bae67ee071 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -23,11 +23,11 @@ */ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/ConnectionToken.h" -#include "qpid/broker/DirectExchange.h" + +#include "qpid/DataDir.h" +#include "qpid/Plugin.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/MessageStore.h" #include "qpid/broker/Protocol.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" @@ -35,29 +35,16 @@ #include "qpid/broker/QueueCleaner.h" #include "qpid/broker/Vhost.h" #include "qpid/broker/System.h" -#include "qpid/broker/ExpiryPolicy.h" #include "qpid/broker/ConsumerFactory.h" #include "qpid/broker/ConnectionObservers.h" #include "qpid/broker/ConfigurationObservers.h" -#include "qpid/sys/ConnectionCodec.h" #include "qpid/management/Manageable.h" -#include "qpid/management/ManagementAgent.h" -#include "qmf/org/apache/qpid/broker/Broker.h" -#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h" -#include "qpid/Options.h" -#include "qpid/Plugin.h" -#include "qpid/DataDir.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/ConnectionCodec.h" -#include "qpid/sys/Runnable.h" -#include "qpid/types/Variant.h" -#include "qpid/RefCounted.h" -#include "qpid/broker/AclModule.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" #include <boost/intrusive_ptr.hpp> + #include <string> #include <vector> @@ -73,6 +60,7 @@ struct Url; namespace broker { +class AclModule; class ConnectionState; class ExpiryPolicy; class Message; diff --git a/qpid/cpp/src/qpid/broker/ConsumerFactory.h b/qpid/cpp/src/qpid/broker/ConsumerFactory.h index abd39fb3f8..1c0f2571e2 100644 --- a/qpid/cpp/src/qpid/broker/ConsumerFactory.h +++ b/qpid/cpp/src/qpid/broker/ConsumerFactory.h @@ -25,11 +25,14 @@ // TODO aconway 2011-11-25: it's ugly exposing SemanticState::ConsumerImpl in public. // Refactor to use a more abstract interface. -#include "qpid/broker/SemanticState.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { +class SemanticState; +class SemanticStateConsumerImpl; + /** * Base class for consumer factoires. Plugins can register a * ConsumerFactory via Broker:: getConsumerFactories() Each time a @@ -41,7 +44,7 @@ class ConsumerFactory public: virtual ~ConsumerFactory() {} - virtual boost::shared_ptr<SemanticState::ConsumerImpl> create( + virtual boost::shared_ptr<SemanticStateConsumerImpl> create( SemanticState* parent, const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, const std::string& tag, diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 4bc3c01271..80c3f99176 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -33,6 +33,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/broker/AclModule.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/NameGenerator.h" #include "qpid/UrlArray.h" namespace qpid { diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 680488d66a..f97d37e893 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -285,7 +285,7 @@ void SemanticState::record(const DeliveryRecord& delivery) const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); -SemanticState::ConsumerImpl::ConsumerImpl(SemanticState* _parent, +SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent, const string& _name, Queue::shared_ptr _queue, bool ack, @@ -328,12 +328,12 @@ Consumer(_name, type), } } -ManagementObject::shared_ptr SemanticState::ConsumerImpl::GetManagementObject (void) const +ManagementObject::shared_ptr SemanticStateConsumerImpl::GetManagementObject (void) const { return mgmtObject; } -Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) +Manageable::status_t SemanticStateConsumerImpl::ManagementMethod (uint32_t methodId, Args&, string&) { Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD; @@ -343,16 +343,16 @@ Manageable::status_t SemanticState::ConsumerImpl::ManagementMethod (uint32_t met } -OwnershipToken* SemanticState::ConsumerImpl::getSession() +OwnershipToken* SemanticStateConsumerImpl::getSession() { return &(parent->session); } -bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) +bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg) { return deliver(cursor, msg, shared_from_this()); } -bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) +bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message& msg, boost::shared_ptr<Consumer> consumer) { allocateCredit(msg); boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); @@ -377,12 +377,12 @@ bool SemanticState::ConsumerImpl::deliver(const QueueCursor& cursor, const Messa return true; } -bool SemanticState::ConsumerImpl::filter(const Message&) +bool SemanticStateConsumerImpl::filter(const Message&) { return true; } -bool SemanticState::ConsumerImpl::accept(const Message& msg) +bool SemanticStateConsumerImpl::accept(const Message& msg) { // TODO aconway 2009-06-08: if we have byte & message credit but // checkCredit fails because the message is to big, we should @@ -395,8 +395,8 @@ bool SemanticState::ConsumerImpl::accept(const Message& msg) namespace { struct ConsumerName { - const SemanticState::ConsumerImpl& consumer; - ConsumerName(const SemanticState::ConsumerImpl& ci) : consumer(ci) {} + const SemanticStateConsumerImpl& consumer; + ConsumerName(const SemanticStateConsumerImpl& ci) : consumer(ci) {} }; ostream& operator<<(ostream& o, const ConsumerName& pc) { @@ -405,7 +405,7 @@ ostream& operator<<(ostream& o, const ConsumerName& pc) { } } -void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) +void SemanticStateConsumerImpl::allocateCredit(const Message& msg) { Credit original = credit; boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); @@ -415,7 +415,7 @@ void SemanticState::ConsumerImpl::allocateCredit(const Message& msg) } -bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) +bool SemanticStateConsumerImpl::checkCredit(const Message& msg) { boost::intrusive_ptr<const amqp_0_10::MessageTransfer> transfer = protocols.translate(msg); bool enoughCredit = credit.check(1, transfer->getRequiredCredit()); @@ -425,7 +425,7 @@ bool SemanticState::ConsumerImpl::checkCredit(const Message& msg) return enoughCredit; } -SemanticState::ConsumerImpl::~ConsumerImpl() +SemanticStateConsumerImpl::~SemanticStateConsumerImpl() { if (mgmtObject != 0) mgmtObject->resourceDestroy (); @@ -498,7 +498,7 @@ void SemanticState::requestDispatch() i->second->requestDispatch(); } -void SemanticState::ConsumerImpl::requestDispatch() +void SemanticStateConsumerImpl::requestDispatch() { if (blocked) { parent->session.getConnection().outputTasks.addOutputTask(this); @@ -516,7 +516,7 @@ bool SemanticState::complete(DeliveryRecord& delivery) return delivery.isRedundant(); } -void SemanticState::ConsumerImpl::complete(DeliveryRecord& delivery) +void SemanticStateConsumerImpl::complete(DeliveryRecord& delivery) { if (!delivery.isComplete()) { delivery.complete(); @@ -541,7 +541,7 @@ SessionContext& SemanticState::getSession() { return session; } const SessionContext& SemanticState::getSession() const { return session; } -const SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const +const SemanticStateConsumerImpl::shared_ptr SemanticState::find(const std::string& destination) const { ConsumerImpl::shared_ptr consumer; if (!find(destination, consumer)) { @@ -598,7 +598,7 @@ void SemanticState::stop(const std::string& destination) find(destination)->stop(); } -void SemanticState::ConsumerImpl::setWindowMode() +void SemanticStateConsumerImpl::setWindowMode() { credit.setWindowMode(true); if (mgmtObject){ @@ -606,7 +606,7 @@ void SemanticState::ConsumerImpl::setWindowMode() } } -void SemanticState::ConsumerImpl::setCreditMode() +void SemanticStateConsumerImpl::setCreditMode() { credit.setWindowMode(false); if (mgmtObject){ @@ -614,17 +614,17 @@ void SemanticState::ConsumerImpl::setCreditMode() } } -void SemanticState::ConsumerImpl::addByteCredit(uint32_t value) +void SemanticStateConsumerImpl::addByteCredit(uint32_t value) { credit.addByteCredit(value); } -void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value) +void SemanticStateConsumerImpl::addMessageCredit(uint32_t value) { credit.addMessageCredit(value); } -bool SemanticState::ConsumerImpl::haveCredit() +bool SemanticStateConsumerImpl::haveCredit() { if (credit) { return true; @@ -634,19 +634,19 @@ bool SemanticState::ConsumerImpl::haveCredit() } } -bool SemanticState::ConsumerImpl::doDispatch() +bool SemanticStateConsumerImpl::doDispatch() { return queue->dispatch(shared_from_this()); } -void SemanticState::ConsumerImpl::flush() +void SemanticStateConsumerImpl::flush() { while(haveCredit() && doDispatch()) ; credit.cancel(); } -void SemanticState::ConsumerImpl::stop() +void SemanticStateConsumerImpl::stop() { credit.cancel(); } @@ -701,7 +701,7 @@ void SemanticState::reject(DeliveryId first, DeliveryId last) getSession().setUnackedCount(unacked.size()); } -bool SemanticState::ConsumerImpl::doOutput() +bool SemanticStateConsumerImpl::doOutput() { try { return haveCredit() && doDispatch(); @@ -710,24 +710,24 @@ bool SemanticState::ConsumerImpl::doOutput() } } -void SemanticState::ConsumerImpl::enableNotify() +void SemanticStateConsumerImpl::enableNotify() { Mutex::ScopedLock l(lock); notifyEnabled = true; } -void SemanticState::ConsumerImpl::disableNotify() +void SemanticStateConsumerImpl::disableNotify() { Mutex::ScopedLock l(lock); notifyEnabled = false; } -bool SemanticState::ConsumerImpl::isNotifyEnabled() const { +bool SemanticStateConsumerImpl::isNotifyEnabled() const { Mutex::ScopedLock l(lock); return notifyEnabled; } -void SemanticState::ConsumerImpl::notify() +void SemanticStateConsumerImpl::notify() { Mutex::ScopedLock l(lock); if (notifyEnabled) { diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 033d97a7b7..24ab30bf00 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -76,105 +76,16 @@ class SessionState; * called when a client's socket is ready to write data. * */ +class SemanticStateConsumerImpl; class SemanticState : private boost::noncopyable { - public: - class ConsumerImpl : public Consumer, public sys::OutputTask, - public boost::enable_shared_from_this<ConsumerImpl>, - public management::Manageable - { - protected: - mutable qpid::sys::Mutex lock; - SemanticState* const parent; - private: - const boost::shared_ptr<Queue> queue; - const bool ackExpected; - const bool acquire; - bool blocked; - bool exclusive; - std::string resumeId; - const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command - uint64_t resumeTtl; - framing::FieldTable arguments; - Credit credit; - bool notifyEnabled; - const int syncFrequency; - int deliveryCount; - qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; - ProtocolRegistry& protocols; - - bool checkCredit(const Message& msg); - void allocateCredit(const Message& msg); - bool haveCredit(); - - protected: - QPID_BROKER_EXTERN virtual bool doDispatch(); - size_t unacked() { return parent->unacked.size(); } - QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>); - - public: - typedef boost::shared_ptr<ConsumerImpl> shared_ptr; - - QPID_BROKER_EXTERN ConsumerImpl(SemanticState* parent, - const std::string& name, boost::shared_ptr<Queue> queue, - bool ack, SubscriptionType type, bool exclusive, - const std::string& tag, const std::string& resumeId, - uint64_t resumeTtl, const framing::FieldTable& arguments); - QPID_BROKER_EXTERN ~ConsumerImpl(); - QPID_BROKER_EXTERN OwnershipToken* getSession(); - QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&); - QPID_BROKER_EXTERN bool filter(const Message&); - QPID_BROKER_EXTERN bool accept(const Message&); - QPID_BROKER_EXTERN void cancel() {} - - QPID_BROKER_EXTERN void disableNotify(); - QPID_BROKER_EXTERN void enableNotify(); - QPID_BROKER_EXTERN void notify(); - QPID_BROKER_EXTERN bool isNotifyEnabled() const; - - QPID_BROKER_EXTERN void requestDispatch(); - - QPID_BROKER_EXTERN void setWindowMode(); - QPID_BROKER_EXTERN void setCreditMode(); - QPID_BROKER_EXTERN void addByteCredit(uint32_t value); - QPID_BROKER_EXTERN void addMessageCredit(uint32_t value); - QPID_BROKER_EXTERN void flush(); - QPID_BROKER_EXTERN void stop(); - QPID_BROKER_EXTERN void complete(DeliveryRecord&); - boost::shared_ptr<Queue> getQueue() const { return queue; } - bool isBlocked() const { return blocked; } - bool setBlocked(bool set) { std::swap(set, blocked); return set; } - - QPID_BROKER_EXTERN bool doOutput(); - - Credit& getCredit() { return credit; } - const Credit& getCredit() const { return credit; } - bool isAckExpected() const { return ackExpected; } - bool isAcquire() const { return acquire; } - bool isExclusive() const { return exclusive; } - std::string getResumeId() const { return resumeId; }; - const std::string& getTag() const { return tag; } - uint64_t getResumeTtl() const { return resumeTtl; } - uint32_t getDeliveryCount() const { return deliveryCount; } - void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } - const framing::FieldTable& getArguments() const { return arguments; } - - SemanticState& getParent() { return *parent; } - const SemanticState& getParent() const { return *parent; } - - void acknowledged(const DeliveryRecord&) {} - - // manageable entry points - QPID_BROKER_EXTERN management::ManagementObject::shared_ptr - GetManagementObject(void) const; - - QPID_BROKER_EXTERN management::Manageable::status_t - ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); - }; + friend class SemanticStateConsumerImpl; + public: + typedef SemanticStateConsumerImpl ConsumerImpl; typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap; private: - typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap; + typedef std::map<std::string, boost::shared_ptr<ConsumerImpl> > ConsumerImplMap; typedef boost::tuple<std::string, std::string, std::string, std::string> Binding; typedef std::set<Binding> Bindings; @@ -201,8 +112,8 @@ class SemanticState : private boost::noncopyable { bool complete(DeliveryRecord&); AckRange findRange(DeliveryId first, DeliveryId last); void requestDispatch(); - void cancel(ConsumerImpl::shared_ptr); - void disable(ConsumerImpl::shared_ptr); + void cancel(boost::shared_ptr<ConsumerImpl>); + void disable(boost::shared_ptr<ConsumerImpl>); void unbindSessionBindings(); public: @@ -213,8 +124,8 @@ class SemanticState : private boost::noncopyable { SessionContext& getSession(); const SessionContext& getSession() const; - const ConsumerImpl::shared_ptr find(const std::string& destination) const; - bool find(const std::string& destination, ConsumerImpl::shared_ptr&) const; + const boost::shared_ptr<ConsumerImpl> find(const std::string& destination) const; + bool find(const std::string& destination, boost::shared_ptr<ConsumerImpl>&) const; /** * Get named queue, never returns 0. @@ -280,6 +191,99 @@ class SemanticState : private boost::noncopyable { const std::string& routingKey); }; +class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask, + public boost::enable_shared_from_this<SemanticStateConsumerImpl>, + public management::Manageable +{ + protected: + mutable qpid::sys::Mutex lock; + SemanticState* const parent; + private: + const boost::shared_ptr<Queue> queue; + const bool ackExpected; + const bool acquire; + bool blocked; + bool exclusive; + std::string resumeId; + const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command + uint64_t resumeTtl; + framing::FieldTable arguments; + Credit credit; + bool notifyEnabled; + const int syncFrequency; + int deliveryCount; + qmf::org::apache::qpid::broker::Subscription::shared_ptr mgmtObject; + ProtocolRegistry& protocols; + + bool checkCredit(const Message& msg); + void allocateCredit(const Message& msg); + bool haveCredit(); + + protected: + QPID_BROKER_EXTERN virtual bool doDispatch(); + size_t unacked() { return parent->unacked.size(); } + QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&, boost::shared_ptr<Consumer>); + + public: + typedef boost::shared_ptr<SemanticStateConsumerImpl> shared_ptr; + + QPID_BROKER_EXTERN SemanticStateConsumerImpl(SemanticState* parent, + const std::string& name, boost::shared_ptr<Queue> queue, + bool ack, SubscriptionType type, bool exclusive, + const std::string& tag, const std::string& resumeId, + uint64_t resumeTtl, const framing::FieldTable& arguments); + QPID_BROKER_EXTERN ~SemanticStateConsumerImpl(); + QPID_BROKER_EXTERN OwnershipToken* getSession(); + QPID_BROKER_EXTERN bool deliver(const QueueCursor&, const Message&); + QPID_BROKER_EXTERN bool filter(const Message&); + QPID_BROKER_EXTERN bool accept(const Message&); + QPID_BROKER_EXTERN void cancel() {} + + QPID_BROKER_EXTERN void disableNotify(); + QPID_BROKER_EXTERN void enableNotify(); + QPID_BROKER_EXTERN void notify(); + QPID_BROKER_EXTERN bool isNotifyEnabled() const; + + QPID_BROKER_EXTERN void requestDispatch(); + + QPID_BROKER_EXTERN void setWindowMode(); + QPID_BROKER_EXTERN void setCreditMode(); + QPID_BROKER_EXTERN void addByteCredit(uint32_t value); + QPID_BROKER_EXTERN void addMessageCredit(uint32_t value); + QPID_BROKER_EXTERN void flush(); + QPID_BROKER_EXTERN void stop(); + QPID_BROKER_EXTERN void complete(DeliveryRecord&); + boost::shared_ptr<Queue> getQueue() const { return queue; } + bool isBlocked() const { return blocked; } + bool setBlocked(bool set) { std::swap(set, blocked); return set; } + + QPID_BROKER_EXTERN bool doOutput(); + + Credit& getCredit() { return credit; } + const Credit& getCredit() const { return credit; } + bool isAckExpected() const { return ackExpected; } + bool isAcquire() const { return acquire; } + bool isExclusive() const { return exclusive; } + std::string getResumeId() const { return resumeId; }; + const std::string& getTag() const { return tag; } + uint64_t getResumeTtl() const { return resumeTtl; } + uint32_t getDeliveryCount() const { return deliveryCount; } + void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } + const framing::FieldTable& getArguments() const { return arguments; } + + SemanticState& getParent() { return *parent; } + const SemanticState& getParent() const { return *parent; } + + void acknowledged(const DeliveryRecord&) {} + + // manageable entry points + QPID_BROKER_EXTERN management::ManagementObject::shared_ptr + GetManagementObject(void) const; + + QPID_BROKER_EXTERN management::Manageable::status_t + ManagementMethod(uint32_t methodId, management::Args& args, std::string& text); +}; + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index f714e8e01a..c2d35dd5cf 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -65,6 +65,8 @@ class QueueGuard; class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl { public: + typedef broker::SemanticState::ConsumerImpl ConsumerImpl; + struct Factory : public broker::ConsumerFactory { boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( broker::SemanticState* parent, diff --git a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp index 9c21e51a18..a562958ea2 100644 --- a/qpid/cpp/src/tests/BrokerMgmtAgent.cpp +++ b/qpid/cpp/src/tests/BrokerMgmtAgent.cpp @@ -22,6 +22,7 @@ #include "unit_test.h" #include "MessagingFixture.h" #include "qpid/management/Buffer.h" +#include "qpid/management/ManagementAgent.h" #include "qpid/messaging/Message.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/log/Logger.h" diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp index 83f6a5e4b1..eac4deda2d 100644 --- a/qpid/cpp/src/tests/test_store.cpp +++ b/qpid/cpp/src/tests/test_store.cpp @@ -37,6 +37,7 @@ #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/AMQFrame.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Thread.h" #include "qpid/Plugin.h" #include "qpid/Options.h" #include <boost/cast.hpp> |