diff options
author | Alan Conway <aconway@apache.org> | 2010-10-14 19:38:40 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-10-14 19:38:40 +0000 |
commit | 23204010207ad7db58500b6547b92b7f91d2df53 (patch) | |
tree | ffa680168fd8d4c04a3b3bcad0472d1a920985b7 /cpp/src | |
parent | 0be15c353c4cdc2612757fa4c877e5bb42e0228d (diff) | |
download | qpid-python-23204010207ad7db58500b6547b92b7f91d2df53.tar.gz |
Code cleanup in broker directory.
- Removed un-necessary #includes for broker/Queue.h
- Removed "using std::string" in header files.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1022679 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
48 files changed, 490 insertions, 461 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp index 3e632f6659..9381f00268 100644 --- a/cpp/src/qpid/broker/Bridge.cpp +++ b/cpp/src/qpid/broker/Bridge.cpp @@ -34,6 +34,7 @@ using qpid::framing::FieldTable; using qpid::framing::Uuid; using qpid::framing::Buffer; using qpid::management::ManagementAgent; +using std::string; namespace _qmf = qmf::org::apache::qpid::broker; namespace diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h index f25d32e7b2..a846254c57 100644 --- a/cpp/src/qpid/broker/Bridge.h +++ b/cpp/src/qpid/broker/Bridge.h @@ -75,7 +75,7 @@ public: // Exchange::DynamicBridge methods void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0); void sendReorigin(); - void ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, framing::FieldTable args); + void ioThreadPropagateBinding(const std::string& queue, const std::string& exchange, const std::string& key, framing::FieldTable args); bool containsLocalTag(const std::string& tagList) const; const std::string& getLocalTag() const; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 1a8bed1be0..33364e48df 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -70,6 +70,8 @@ using qpid::management::ManagementAgent; using qpid::management::ManagementObject; using qpid::management::Manageable; using qpid::management::Args; +using std::string; + namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 33ed032327..d50f0c946a 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" #include "qpid/sys/SecuritySettings.h" #include "qpid/sys/ClusterSafe.h" @@ -273,7 +274,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions. while (!channels.empty()) ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { - Queue::shared_ptr q(exclusiveQueues.front()); + boost::shared_ptr<Queue> q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); if (q->canAutoDelete()) { Queue::tryAutoDelete(broker, q); diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 8ad78f6652..c1b2b5a8fc 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -94,7 +94,7 @@ class Connection : public sys::ConnectionInputHandler, SessionHandler& getChannel(framing::ChannelId channel); /** Close the connection */ - void close(framing::connection::CloseCode code, const string& text); + void close(framing::connection::CloseCode code, const std::string& text); // ConnectionInputHandler methods void received(framing::AMQFrame& frame); @@ -116,7 +116,7 @@ class Connection : public sys::ConnectionInputHandler, std::string getAuthMechanism(); std::string getAuthCredentials(); void notifyConnectionForced(const std::string& text); - void setUserId(const string& uid); + void setUserId(const std::string& uid); void raiseConnectEvent(); const std::string& getUserId() const { return ConnectionState::getUserId(); } const std::string& getMgmtId() const { return mgmtId; } @@ -158,7 +158,7 @@ class Connection : public sys::ConnectionInputHandler, private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; - typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; + typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator; ChannelMap channels; qpid::sys::SecuritySettings securitySettings; diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h index 19caacb595..774c37408d 100644 --- a/cpp/src/qpid/broker/ConnectionState.h +++ b/cpp/src/qpid/broker/ConnectionState.h @@ -61,16 +61,16 @@ class ConnectionState : public ConnectionToken, public management::Manageable void setHeartbeat(uint16_t hb) { heartbeat = hb; } void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } - virtual void setUserId(const string& uid) { userId = uid; } - const string& getUserId() const { return userId; } + virtual void setUserId(const std::string& uid) { userId = uid; } + const std::string& getUserId() const { return userId; } - void setUrl(const string& _url) { url = _url; } - const string& getUrl() const { return url; } + void setUrl(const std::string& _url) { url = _url; } + const std::string& getUrl() const { return url; } void setFederationLink(bool b) { federationLink = b; } bool isFederationLink() const { return federationLink; } - void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); } - const string& getFederationPeerTag() const { return federationPeerTag; } + void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } + const std::string& getFederationPeerTag() const { return federationPeerTag; } std::vector<Url>& getKnownHosts() { return knownHosts; } void setClientThrottling(bool set=true) { clientSupportsThrottling = set; } @@ -79,7 +79,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable Broker& getBroker() { return broker; } Broker& broker; - std::vector<Queue::shared_ptr> exclusiveQueues; + std::vector<boost::shared_ptr<Queue> > exclusiveQueues; //contained output tasks sys::AggregateOutput outputTasks; @@ -104,10 +104,10 @@ class ConnectionState : public ConnectionToken, public management::Manageable uint32_t framemax; uint16_t heartbeat; uint16_t heartbeatmax; - string userId; - string url; + std::string userId; + std::string url; bool federationLink; - string federationPeerTag; + std::string federationPeerTag; std::vector<Url> knownHosts; bool clientSupportsThrottling; framing::FrameHandler* clusterOrderOut; diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h index 433469a212..ffb5a77bca 100644 --- a/cpp/src/qpid/broker/Deliverable.h +++ b/cpp/src/qpid/broker/Deliverable.h @@ -21,7 +21,6 @@ #ifndef _Deliverable_ #define _Deliverable_ -#include "qpid/broker/Queue.h" #include "qpid/broker/Message.h" namespace qpid { diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp index 658e6bf48f..3ebb12461c 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.cpp +++ b/cpp/src/qpid/broker/DeliverableMessage.cpp @@ -18,7 +18,9 @@ * under the License. * */ + #include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/Queue.h" using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h index 08abce35ef..ce613e7b6e 100644 --- a/cpp/src/qpid/broker/DeliverableMessage.h +++ b/cpp/src/qpid/broker/DeliverableMessage.h @@ -23,7 +23,6 @@ #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Deliverable.h" -#include "qpid/broker/Queue.h" #include "qpid/broker/Message.h" #include <boost/intrusive_ptr.hpp> diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index b3a49c485d..9443eb6ea5 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/Exchange.h" +#include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" #include "qpid/framing/FrameHandler.h" #include "qpid/framing/MessageTransferBody.h" diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 5f802766b6..d388ba94be 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -28,13 +28,14 @@ #include <ostream> #include "qpid/framing/SequenceSet.h" #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" #include "qpid/broker/DeliveryId.h" #include "qpid/broker/Message.h" namespace qpid { namespace broker { + +class TransactionContext; class SemanticState; struct AckRange; @@ -44,7 +45,7 @@ struct AckRange; class DeliveryRecord { QueuedMessage msg; - mutable Queue::shared_ptr queue; + mutable boost::shared_ptr<Queue> queue; std::string tag; DeliveryId id; bool acquired : 1; @@ -65,7 +66,7 @@ class DeliveryRecord public: QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg, - const Queue::shared_ptr& queue, + const boost::shared_ptr<Queue>& queue, const std::string& tag, bool acquired, bool accepted, @@ -105,7 +106,7 @@ class DeliveryRecord static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last); const QueuedMessage& getMessage() const { return msg; } framing::SequenceNumber getId() const { return id; } - Queue::shared_ptr getQueue() const { return queue; } + boost::shared_ptr<Queue> getQueue() const { return queue; } friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index 1787c01af9..0db941f93b 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -18,7 +18,9 @@ * under the License. * */ + #include "qpid/log/Statement.h" +#include "qpid/broker/Queue.h" #include "qpid/broker/DirectExchange.h" #include <iostream> diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 9a73f3bc41..a6f9cf91af 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -28,7 +28,6 @@ #include "qpid/framing/FieldTable.h" #include "qpid/sys/CopyOnWriteArray.h" #include "qpid/sys/Mutex.h" -#include "qpid/broker/Queue.h" namespace qpid { namespace broker { @@ -38,7 +37,7 @@ class DirectExchange : public virtual Exchange { Queues queues; FedBinding fedBinding; }; - typedef std::map<string, BoundKey> Bindings; + typedef std::map<std::string, BoundKey> Bindings; Bindings bindings; qpid::sys::Mutex lock; @@ -47,22 +46,22 @@ public: QPID_BROKER_EXTERN DirectExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - QPID_BROKER_EXTERN DirectExchange(const string& _name, + QPID_BROKER_EXTERN DirectExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } - QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, + QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); QPID_BROKER_EXTERN virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); - QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, - const string* const routingKey, + QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue, + const std::string* const routingKey, const qpid::framing::FieldTable* const args); QPID_BROKER_EXTERN virtual ~DirectExchange(); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 1188cf0e5b..98980e0360 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -23,6 +23,7 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/Broker.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/broker/Queue.h" #include "qpid/log/Statement.h" #include "qpid/framing/MessageProperties.h" #include "qpid/framing/reply_exceptions.h" diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 23d044ffd3..4b6b90026b 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -25,7 +25,6 @@ #include <boost/shared_ptr.hpp> #include "qpid/broker/BrokerImportExport.h" #include "qpid/broker/Deliverable.h" -#include "qpid/broker/Queue.h" #include "qpid/broker/MessageStore.h" #include "qpid/broker/PersistableExchange.h" #include "qpid/framing/FieldTable.h" @@ -37,6 +36,7 @@ namespace qpid { namespace broker { +class Broker; class ExchangeRegistry; class Exchange : public PersistableExchange, public management::Manageable { @@ -46,13 +46,13 @@ public: typedef std::vector<Binding::shared_ptr> vector; Exchange* parent; - Queue::shared_ptr queue; + boost::shared_ptr<Queue> queue; const std::string key; const framing::FieldTable args; std::string origin; qmf::org::apache::qpid::broker::Binding* mgmtBinding; - Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0, + Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0, framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string()); ~Binding(); void startManagement(); @@ -90,8 +90,8 @@ protected: struct MatchQueue { - const Queue::shared_ptr queue; - MatchQueue(Queue::shared_ptr q); + const boost::shared_ptr<Queue> queue; + MatchQueue(boost::shared_ptr<Queue> q); bool operator()(Exchange::Binding::shared_ptr b); }; @@ -145,9 +145,9 @@ public: bool inUseAsAlternate() { return alternateUsers > 0; } virtual std::string getType() const = 0; - virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; - virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; + virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; + virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0; QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&); virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 8122e5c2d9..99b121cbce 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -31,6 +31,7 @@ using namespace qpid::broker; using namespace qpid::sys; using std::pair; +using std::string; using qpid::framing::FieldTable; pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){ diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 7bcf6367cf..1a7d486796 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -41,7 +41,7 @@ class FanOutExchange : public virtual Exchange { QPID_BROKER_EXTERN FanOutExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - QPID_BROKER_EXTERN FanOutExchange(const string& _name, + QPID_BROKER_EXTERN FanOutExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); @@ -59,7 +59,7 @@ class FanOutExchange : public virtual Exchange { const qpid::framing::FieldTable* args); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, - const string* const routingKey, + const std::string* const routingKey, const qpid::framing::FieldTable* const args); QPID_BROKER_EXTERN virtual ~FanOutExchange(); diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 3de26253a6..33c119cbbb 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -60,11 +60,11 @@ class HeadersExchange : public virtual Exchange { struct FedUnbindModifier { - string fedOrigin; + std::string fedOrigin; bool shouldUnbind; bool shouldPropagate; FedUnbindModifier(); - FedUnbindModifier(string & origin); + FedUnbindModifier(std::string & origin); bool operator()(BoundKey & bk); }; @@ -82,9 +82,9 @@ class HeadersExchange : public virtual Exchange { public: static const std::string typeName; - QPID_BROKER_EXTERN HeadersExchange(const string& name, + QPID_BROKER_EXTERN HeadersExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - QPID_BROKER_EXTERN HeadersExchange(const string& _name, + QPID_BROKER_EXTERN HeadersExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); @@ -92,17 +92,17 @@ class HeadersExchange : public virtual Exchange { virtual std::string getType() const { return typeName; } QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, - const string* const routingKey, + const std::string* const routingKey, const qpid::framing::FieldTable* const args); QPID_BROKER_EXTERN virtual ~HeadersExchange(); diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index 5a1dfd9656..c13a24da95 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -42,6 +42,7 @@ using qpid::management::Manageable; using qpid::management::Args; using qpid::sys::Mutex; using std::stringstream; +using std::string; namespace _qmf = qmf::org::apache::qpid::broker; Link::Link(LinkRegistry* _links, diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index 9da610076b..bd74fe2a2f 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -37,7 +37,6 @@ namespace qpid { namespace broker { - using std::string; class LinkRegistry; class Broker; class Connection; @@ -47,13 +46,13 @@ namespace qpid { sys::Mutex lock; LinkRegistry* links; MessageStore* store; - string host; + std::string host; uint16_t port; - string transport; + std::string transport; bool durable; - string authMechanism; - string username; - string password; + std::string authMechanism; + std::string username; + std::string password; mutable uint64_t persistenceId; qmf::org::apache::qpid::broker::Link* mgmtObject; Broker* broker; @@ -93,13 +92,13 @@ namespace qpid { Link(LinkRegistry* links, MessageStore* store, - string& host, + std::string& host, uint16_t port, - string& transport, + std::string& transport, bool durable, - string& authMechanism, - string& username, - string& password, + std::string& authMechanism, + std::string& username, + std::string& password, Broker* broker, management::Manageable* parent = 0); virtual ~Link(); @@ -117,9 +116,9 @@ namespace qpid { void setConnection(Connection*); // Set pointer to the AMQP Connection void reconnect(const Address&); //called by LinkRegistry - string getAuthMechanism() { return authMechanism; } - string getUsername() { return username; } - string getPassword() { return password; } + std::string getAuthMechanism() { return authMechanism; } + std::string getUsername() { return username; } + std::string getPassword() { return password; } Broker* getBroker() { return broker; } void notifyConnectionForced(const std::string text); @@ -130,7 +129,7 @@ namespace qpid { uint64_t getPersistenceId() const { return persistenceId; } uint32_t encodedSize() const; void encode(framing::Buffer& buffer) const; - const string& getName() const; + const std::string& getName() const; static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer); diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index 49e0ec33e5..9d429a2dcc 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -27,6 +27,7 @@ using namespace qpid::broker; using namespace qpid::sys; +using std::string; using std::pair; using std::stringstream; using boost::intrusive_ptr; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index ad67bff34b..147b9e7a6a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/ExpiryPolicy.h" #include "qpid/StringUtils.h" diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 5f7cceebd3..cd9fd4c933 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -28,6 +28,7 @@ using boost::intrusive_ptr; using qpid::framing::FieldTable; +using std::string; namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 72cf40caab..dc8615d58b 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -85,11 +85,11 @@ void NullMessageStore::stage(const intrusive_ptr<PersistableMessage>&) {} void NullMessageStore::destroy(PersistableMessage&) {} -void NullMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>&, const string&) {} +void NullMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>&, const std::string&) {} void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const intrusive_ptr<const PersistableMessage>&, - string&, uint64_t, uint32_t) + std::string&, uint64_t, uint32_t) { throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled"); } @@ -139,7 +139,7 @@ void NullMessageStore::abort(TransactionContext& ctxt) prepared.erase(DummyCtxt::getXid(ctxt)); } -void NullMessageStore::collectPreparedXids(std::set<string>& out) +void NullMessageStore::collectPreparedXids(std::set<std::string>& out) { out.insert(prepared.begin(), prepared.end()); } diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 41c6b46f2b..96c79d1b92 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -39,8 +39,8 @@ #include "qpid/framing/amqp_types.h" #include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> #include <boost/intrusive_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> #include <list> #include <vector> @@ -49,324 +49,319 @@ #include <algorithm> namespace qpid { - namespace broker { - class Broker; - class MessageStore; - class QueueEvents; - class QueueRegistry; - class TransactionContext; - class Exchange; - - using std::string; - - - - /** - * The brokers representation of an amqp queue. Messages are - * delivered to a queue from where they can be dispatched to - * registered consumers or be stored until dequeued or until one - * or more consumers registers. - */ - class Queue : public boost::enable_shared_from_this<Queue>, - public PersistableQueue, public management::Manageable { - - struct UsageBarrier - { - Queue& parent; - uint count; +namespace broker { +class Broker; +class MessageStore; +class QueueEvents; +class QueueRegistry; +class TransactionContext; +class Exchange; + +/** + * The brokers representation of an amqp queue. Messages are + * delivered to a queue from where they can be dispatched to + * registered consumers or be stored until dequeued or until one + * or more consumers registers. + */ +class Queue : public boost::enable_shared_from_this<Queue>, + public PersistableQueue, public management::Manageable { + + struct UsageBarrier + { + Queue& parent; + uint count; - UsageBarrier(Queue&); - bool acquire(); - void release(); - void destroy(); - }; + UsageBarrier(Queue&); + bool acquire(); + void release(); + void destroy(); + }; - struct ScopedUse - { - UsageBarrier& barrier; - const bool acquired; - ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} - ~ScopedUse() { if (acquired) barrier.release(); } - }; + struct ScopedUse + { + UsageBarrier& barrier; + const bool acquired; + ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {} + ~ScopedUse() { if (acquired) barrier.release(); } + }; - typedef std::deque<QueuedMessage> Messages; - typedef std::map<string,boost::intrusive_ptr<Message> > LVQ; - enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; - - const string name; - const bool autodelete; - MessageStore* store; - const OwnershipToken* owner; - uint32_t consumerCount; - OwnershipToken* exclusive; - bool noLocal; - bool lastValueQueue; - bool lastValueQueueNoBrowse; - bool persistLastNode; - bool inLastNodeFailure; - std::string traceId; - std::vector<std::string> traceExclude; - QueueListeners listeners; - Messages messages; - Messages pendingDequeues;//used to avoid dequeuing during recovery - LVQ lvq; - mutable qpid::sys::Mutex consumerLock; - mutable qpid::sys::Monitor messageLock; - mutable qpid::sys::Mutex ownershipLock; - mutable uint64_t persistenceId; - framing::FieldTable settings; - std::auto_ptr<QueuePolicy> policy; - bool policyExceeded; - QueueBindings bindings; - std::string alternateExchangeName; - boost::shared_ptr<Exchange> alternateExchange; - framing::SequenceNumber sequence; - qmf::org::apache::qpid::broker::Queue* mgmtObject; - RateTracker dequeueTracker; - int eventMode; - QueueEvents* eventMgr; - bool insertSeqNo; - std::string seqNoKey; - Broker* broker; - bool deleted; - UsageBarrier barrier; - - void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); - void setPolicy(std::auto_ptr<QueuePolicy> policy); - bool seek(QueuedMessage& msg, Consumer::shared_ptr position); - bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); - void notifyListener(); - - void removeListener(Consumer::shared_ptr); - - bool isExcluded(boost::intrusive_ptr<Message>& msg); - - void dequeued(const QueuedMessage& msg); - void popAndDequeue(); - QueuedMessage getFront(); - QueuedMessage& checkLvqReplace(QueuedMessage& msg); - void clearLVQIndex(const QueuedMessage& msg); - - inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) - { - if (mgmtObject != 0) { - mgmtObject->inc_msgTotalEnqueues (); - mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); - if (msg->isPersistent ()) { - mgmtObject->inc_msgPersistEnqueues (); - mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); - } - } + typedef std::deque<QueuedMessage> Messages; + typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ; + enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; + + const std::string name; + const bool autodelete; + MessageStore* store; + const OwnershipToken* owner; + uint32_t consumerCount; + OwnershipToken* exclusive; + bool noLocal; + bool lastValueQueue; + bool lastValueQueueNoBrowse; + bool persistLastNode; + bool inLastNodeFailure; + std::string traceId; + std::vector<std::string> traceExclude; + QueueListeners listeners; + Messages messages; + Messages pendingDequeues;//used to avoid dequeuing during recovery + LVQ lvq; + mutable qpid::sys::Mutex consumerLock; + mutable qpid::sys::Monitor messageLock; + mutable qpid::sys::Mutex ownershipLock; + mutable uint64_t persistenceId; + framing::FieldTable settings; + std::auto_ptr<QueuePolicy> policy; + bool policyExceeded; + QueueBindings bindings; + std::string alternateExchangeName; + boost::shared_ptr<Exchange> alternateExchange; + framing::SequenceNumber sequence; + qmf::org::apache::qpid::broker::Queue* mgmtObject; + RateTracker dequeueTracker; + int eventMode; + QueueEvents* eventMgr; + bool insertSeqNo; + std::string seqNoKey; + Broker* broker; + bool deleted; + UsageBarrier barrier; + + void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); + void setPolicy(std::auto_ptr<QueuePolicy> policy); + bool seek(QueuedMessage& msg, Consumer::shared_ptr position); + bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c); + void notifyListener(); + + void removeListener(Consumer::shared_ptr); + + bool isExcluded(boost::intrusive_ptr<Message>& msg); + + void dequeued(const QueuedMessage& msg); + void popMsg(QueuedMessage& qmsg); + void popAndDequeue(); + QueuedMessage getFront(); + QueuedMessage& checkLvqReplace(QueuedMessage& msg); + void clearLVQIndex(const QueuedMessage& msg); + + inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg) + { + if (mgmtObject != 0) { + mgmtObject->inc_msgTotalEnqueues (); + mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + if (msg->isPersistent ()) { + mgmtObject->inc_msgPersistEnqueues (); + mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } - inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg) - { - if (mgmtObject != 0){ - mgmtObject->inc_msgTotalDequeues (); - mgmtObject->inc_byteTotalDequeues (msg->contentSize()); - if (msg->isPersistent ()){ - mgmtObject->inc_msgPersistDequeues (); - mgmtObject->inc_bytePersistDequeues (msg->contentSize()); - } - } + } + } + inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg) + { + if (mgmtObject != 0){ + mgmtObject->inc_msgTotalDequeues (); + mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + if (msg->isPersistent ()){ + mgmtObject->inc_msgPersistDequeues (); + mgmtObject->inc_bytePersistDequeues (msg->contentSize()); } + } + } - Messages::iterator findAt(framing::SequenceNumber pos); - void checkNotDeleted(); - - public: - - typedef boost::shared_ptr<Queue> shared_ptr; - - typedef std::vector<shared_ptr> vector; - - QPID_BROKER_EXTERN Queue(const string& name, - bool autodelete = false, - MessageStore* const store = 0, - const OwnershipToken* const owner = 0, - management::Manageable* parent = 0, - Broker* broker = 0); - QPID_BROKER_EXTERN ~Queue(); - - QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); - - void create(const qpid::framing::FieldTable& settings); - - // "recovering" means we are doing a MessageStore recovery. - QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings, - bool recovering = false); - void destroy(); - void notifyDeleted(); - QPID_BROKER_EXTERN void bound(const string& exchange, - const string& key, - const qpid::framing::FieldTable& args); - QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges, - Queue::shared_ptr shared_ref); - - QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); - QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); - - /** - * Delivers a message to the queue. Will record it as - * enqueued if persistent then process it. - */ - QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg); - /** - * Dispatches the messages immediately to a consumer if - * one is available or stores it for later if not. - */ - QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg); - /** - * Returns a message to the in-memory queue (due to lack - * of acknowledegement from a receiver). If a consumer is - * available it will be dispatched immediately, else it - * will be returned to the front of the queue. - */ - QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg); - /** - * Used during recovery to add stored messages back to the queue - */ - QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg); - - QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c, - bool exclusive = false); - QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); - - uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages - QPID_BROKER_EXTERN void purgeExpired(); - - //move qty # of messages to destination Queue destq - uint32_t move(const Queue::shared_ptr destq, uint32_t qty); - - QPID_BROKER_EXTERN uint32_t getMessageCount() const; - QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; - QPID_BROKER_EXTERN uint32_t getConsumerCount() const; - inline const string& getName() const { return name; } - bool isExclusiveOwner(const OwnershipToken* const o) const; - void releaseExclusiveOwnership(); - bool setExclusiveOwner(const OwnershipToken* const o); - bool hasExclusiveConsumer() const; - bool hasExclusiveOwner() const; - inline bool isDurable() const { return store != 0; } - inline const framing::FieldTable& getSettings() const { return settings; } - inline bool isAutoDelete() const { return autodelete; } - bool canAutoDelete() const; - const QueueBindings& getBindings() const { return bindings; } - - /** - * used to take messages from in memory and flush down to disk. - */ - QPID_BROKER_EXTERN void setLastNodeFailure(); - QPID_BROKER_EXTERN void clearLastNodeFailure(); - - bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false); - void enqueueAborted(boost::intrusive_ptr<Message> msg); - /** - * dequeue from store (only done once messages is acknowledged) - */ - QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg); - /** - * Inform the queue that a previous transactional dequeue - * committed. - */ - void dequeueCommitted(const QueuedMessage& msg); - - /** - * Inform queue of messages that were enqueued, have since - * been acquired but not yet accepted or released (and - * thus are still logically on the queue) - used in - * clustered broker. - */ - void enqueued(const QueuedMessage& msg); - - /** - * Test whether the specified message (identified by its - * sequence/position), is still enqueued (note this - * doesn't mean it is available for delivery as it may - * have been delievered to a subscriber who has not yet - * accepted it). - */ - bool isEnqueued(const QueuedMessage& msg); + Messages::iterator findAt(framing::SequenceNumber pos); + void checkNotDeleted(); + + public: + + typedef boost::shared_ptr<Queue> shared_ptr; + + typedef std::vector<shared_ptr> vector; + + QPID_BROKER_EXTERN Queue(const std::string& name, + bool autodelete = false, + MessageStore* const store = 0, + const OwnershipToken* const owner = 0, + management::Manageable* parent = 0, + Broker* broker = 0); + QPID_BROKER_EXTERN ~Queue(); + + QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr); + + void create(const qpid::framing::FieldTable& settings); + + // "recovering" means we are doing a MessageStore recovery. + QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings, + bool recovering = false); + void destroy(); + void notifyDeleted(); + QPID_BROKER_EXTERN void bound(const std::string& exchange, + const std::string& key, + const qpid::framing::FieldTable& args); + QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges, + Queue::shared_ptr shared_ref); + + QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg); + QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message); + + /** + * Delivers a message to the queue. Will record it as + * enqueued if persistent then process it. + */ + QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg); + /** + * Dispatches the messages immediately to a consumer if + * one is available or stores it for later if not. + */ + QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg); + /** + * Returns a message to the in-memory queue (due to lack + * of acknowledegement from a receiver). If a consumer is + * available it will be dispatched immediately, else it + * will be returned to the front of the queue. + */ + QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg); + /** + * Used during recovery to add stored messages back to the queue + */ + QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg); + + QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c, + bool exclusive = false); + QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c); + + uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages + QPID_BROKER_EXTERN void purgeExpired(); + + //move qty # of messages to destination Queue destq + uint32_t move(const Queue::shared_ptr destq, uint32_t qty); + + QPID_BROKER_EXTERN uint32_t getMessageCount() const; + QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const; + QPID_BROKER_EXTERN uint32_t getConsumerCount() const; + inline const std::string& getName() const { return name; } + bool isExclusiveOwner(const OwnershipToken* const o) const; + void releaseExclusiveOwnership(); + bool setExclusiveOwner(const OwnershipToken* const o); + bool hasExclusiveConsumer() const; + bool hasExclusiveOwner() const; + inline bool isDurable() const { return store != 0; } + inline const framing::FieldTable& getSettings() const { return settings; } + inline bool isAutoDelete() const { return autodelete; } + bool canAutoDelete() const; + const QueueBindings& getBindings() const { return bindings; } + + /** + * used to take messages from in memory and flush down to disk. + */ + QPID_BROKER_EXTERN void setLastNodeFailure(); + QPID_BROKER_EXTERN void clearLastNodeFailure(); + + bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false); + void enqueueAborted(boost::intrusive_ptr<Message> msg); + /** + * dequeue from store (only done once messages is acknowledged) + */ + QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg); + /** + * Inform the queue that a previous transactional dequeue + * committed. + */ + void dequeueCommitted(const QueuedMessage& msg); + + /** + * Inform queue of messages that were enqueued, have since + * been acquired but not yet accepted or released (and + * thus are still logically on the queue) - used in + * clustered broker. + */ + void enqueued(const QueuedMessage& msg); + + /** + * Test whether the specified message (identified by its + * sequence/position), is still enqueued (note this + * doesn't mean it is available for delivery as it may + * have been delievered to a subscriber who has not yet + * accepted it). + */ + bool isEnqueued(const QueuedMessage& msg); - /** - * Gets the next available message - */ - QPID_BROKER_EXTERN QueuedMessage get(); - - /** Get the message at position pos */ - QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; - - const QueuePolicy* getPolicy(); - - void setAlternateExchange(boost::shared_ptr<Exchange> exchange); - boost::shared_ptr<Exchange> getAlternateExchange(); - bool isLocal(boost::intrusive_ptr<Message>& msg); - - //PersistableQueue support: - uint64_t getPersistenceId() const; - void setPersistenceId(uint64_t persistenceId) const; - void encode(framing::Buffer& buffer) const; - uint32_t encodedSize() const; - - // "recovering" means we are doing a MessageStore recovery. - static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false ); - static void tryAutoDelete(Broker& broker, Queue::shared_ptr); - - virtual void setExternalQueueStore(ExternalQueueStore* inst); - - // Manageable entry points - management::ManagementObject* GetManagementObject (void) const; - management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); - - /** Apply f to each Message on the queue. */ - template <class F> void eachMessage(F f) { - sys::Mutex::ScopedLock l(messageLock); - if (lastValueQueue) { - for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { - f(checkLvqReplace(*i)); - } - } else { - std::for_each(messages.begin(), messages.end(), f); - } - } - - /** Apply f to each QueueBinding on the queue */ - template <class F> void eachBinding(F f) { - bindings.eachBinding(f); + /** + * Gets the next available message + */ + QPID_BROKER_EXTERN QueuedMessage get(); + + /** Get the message at position pos */ + QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const; + + const QueuePolicy* getPolicy(); + + void setAlternateExchange(boost::shared_ptr<Exchange> exchange); + boost::shared_ptr<Exchange> getAlternateExchange(); + bool isLocal(boost::intrusive_ptr<Message>& msg); + + //PersistableQueue support: + uint64_t getPersistenceId() const; + void setPersistenceId(uint64_t persistenceId) const; + void encode(framing::Buffer& buffer) const; + uint32_t encodedSize() const; + + // "recovering" means we are doing a MessageStore recovery. + static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false ); + static void tryAutoDelete(Broker& broker, Queue::shared_ptr); + + virtual void setExternalQueueStore(ExternalQueueStore* inst); + + // Manageable entry points + management::ManagementObject* GetManagementObject (void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); + + /** Apply f to each Message on the queue. */ + template <class F> void eachMessage(F f) { + sys::Mutex::ScopedLock l(messageLock); + if (lastValueQueue) { + for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) { + f(checkLvqReplace(*i)); } + } else { + std::for_each(messages.begin(), messages.end(), f); + } + } - void popMsg(QueuedMessage& qmsg); - - /** Set the position sequence number for the next message on the queue. - * Must be >= the current sequence number. - * Used by cluster to replicate queues. - */ - QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos); - /** return current position sequence number for the next message on the queue. - */ - QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); - int getEventMode(); - void setQueueEventManager(QueueEvents&); - QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); - /** - * Notify queue that recovery has completed. - */ - void recoveryComplete(ExchangeRegistry& exchanges); - - // For cluster update - QueueListeners& getListeners(); - - /** - * Reserve space in policy for an enqueued message that - * has been recovered in the prepared state (dtx only) - */ - void recoverPrepared(boost::intrusive_ptr<Message>& msg); - - void flush(); - }; + /** Apply f to each QueueBinding on the queue */ + template <class F> void eachBinding(F f) { + bindings.eachBinding(f); } + + /** Set the position sequence number for the next message on the queue. + * Must be >= the current sequence number. + * Used by cluster to replicate queues. + */ + QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos); + /** return current position sequence number for the next message on the queue. + */ + QPID_BROKER_EXTERN framing::SequenceNumber getPosition(); + int getEventMode(); + void setQueueEventManager(QueueEvents&); + QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key); + /** + * Notify queue that recovery has completed. + */ + void recoveryComplete(ExchangeRegistry& exchanges); + + // For cluster update + QueueListeners& getListeners(); + + /** + * Reserve space in policy for an enqueued message that + * has been recovered in the prepared state (dtx only) + */ + void recoverPrepared(boost::intrusive_ptr<Message>& msg); + + void flush(); +}; +} } diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp index 3f43a8ef68..60d315acfe 100644 --- a/cpp/src/qpid/broker/QueueBindings.cpp +++ b/cpp/src/qpid/broker/QueueBindings.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/Queue.h" #include "qpid/broker/QueueBindings.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/framing/reply_exceptions.h" diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp index 6cdf506873..3499ea8a4d 100644 --- a/cpp/src/qpid/broker/QueueCleaner.cpp +++ b/cpp/src/qpid/broker/QueueCleaner.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/Queue.h" #include "qpid/broker/QueueCleaner.h" #include "qpid/broker/Broker.h" diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 4b1fa62709..28b2d60cda 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -18,6 +18,7 @@ * under the License. * */ +#include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/QueueEvents.h" #include "qpid/log/Statement.h" @@ -26,6 +27,7 @@ using namespace qpid::broker; using namespace qpid::sys; +using std::string; QueueRegistry::QueueRegistry(Broker* b) : counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {} diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 72a91dff24..66437f9665 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -22,17 +22,21 @@ #define _QueueRegistry_ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/Queue.h" #include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" #include <boost/bind.hpp> +#include <boost/shared_ptr.hpp> #include <algorithm> #include <map> namespace qpid { namespace broker { +class Queue; class QueueEvents; +class OwnershipToken; +class Broker; +class MessageStore; /** * A registry of queues indexed by queue name. @@ -52,11 +56,11 @@ class QueueRegistry { * @return The queue and a boolean flag which is true if the queue * was created by this declare call false if it already existed. */ - QPID_BROKER_EXTERN std::pair<Queue::shared_ptr, bool> declare - (const string& name, - bool durable = false, - bool autodelete = false, - const OwnershipToken* owner = 0); + QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare( + const std::string& name, + bool durable = false, + bool autodelete = false, + const OwnershipToken* owner = 0); /** * Destroy the named queue. @@ -70,8 +74,8 @@ class QueueRegistry { * subsequent calls to find or declare with the same name. * */ - QPID_BROKER_EXTERN void destroy(const string& name); - template <class Test> bool destroyIf(const string& name, Test test) + QPID_BROKER_EXTERN void destroy(const std::string& name); + template <class Test> bool destroyIf(const std::string& name, Test test) { qpid::sys::RWlock::ScopedWlock locker(lock); if (test()) { @@ -85,12 +89,12 @@ class QueueRegistry { /** * Find the named queue. Return 0 if not found. */ - QPID_BROKER_EXTERN Queue::shared_ptr find(const string& name); + QPID_BROKER_EXTERN boost::shared_ptr<Queue> find(const std::string& name); /** * Generate unique queue name. */ - string generateName(); + std::string generateName(); void setQueueEvents(QueueEvents*); @@ -123,7 +127,7 @@ class QueueRegistry { void updateQueueClusterState(bool lastNode); private: - typedef std::map<string, Queue::shared_ptr> QueueMap; + typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap; QueueMap queues; mutable qpid::sys::RWlock lock; int counter; @@ -134,7 +138,7 @@ private: Broker* broker; //destroy impl that assumes lock is already held: - void destroyLH (const string& name); + void destroyLH (const std::string& name); }; diff --git a/cpp/src/qpid/broker/RecoveredDequeue.cpp b/cpp/src/qpid/broker/RecoveredDequeue.cpp index 658fd5a89e..38cb8043c9 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.cpp +++ b/cpp/src/qpid/broker/RecoveredDequeue.cpp @@ -18,6 +18,8 @@ * under the License. * */ + +#include "qpid/broker/Queue.h" #include "qpid/broker/RecoveredDequeue.h" using boost::intrusive_ptr; diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h index 67b37db5f9..66e66f1d5f 100644 --- a/cpp/src/qpid/broker/RecoveredDequeue.h +++ b/cpp/src/qpid/broker/RecoveredDequeue.h @@ -24,7 +24,6 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" #include "qpid/broker/MessageStore.h" -#include "qpid/broker/Queue.h" #include "qpid/broker/TxOp.h" #include <boost/intrusive_ptr.hpp> @@ -36,18 +35,18 @@ namespace qpid { namespace broker { class RecoveredDequeue : public TxOp{ - Queue::shared_ptr queue; + boost::shared_ptr<Queue> queue; boost::intrusive_ptr<Message> msg; public: - RecoveredDequeue(Queue::shared_ptr queue, boost::intrusive_ptr<Message> msg); + RecoveredDequeue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); virtual ~RecoveredDequeue(){} virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } - Queue::shared_ptr getQueue() const { return queue; } + boost::shared_ptr<Queue> getQueue() const { return queue; } boost::intrusive_ptr<Message> getMessage() const { return msg; } }; } diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/cpp/src/qpid/broker/RecoveredEnqueue.cpp index 48faa0942c..6263c63e3d 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.cpp +++ b/cpp/src/qpid/broker/RecoveredEnqueue.cpp @@ -18,6 +18,8 @@ * under the License. * */ + +#include "qpid/broker/Queue.h" #include "qpid/broker/RecoveredEnqueue.h" using boost::intrusive_ptr; diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h index 09f928f098..5f718001d5 100644 --- a/cpp/src/qpid/broker/RecoveredEnqueue.h +++ b/cpp/src/qpid/broker/RecoveredEnqueue.h @@ -24,7 +24,6 @@ #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" #include "qpid/broker/MessageStore.h" -#include "qpid/broker/Queue.h" #include "qpid/broker/TxOp.h" #include <boost/intrusive_ptr.hpp> @@ -34,24 +33,24 @@ #include <list> namespace qpid { - namespace broker { - class RecoveredEnqueue : public TxOp{ - Queue::shared_ptr queue; - boost::intrusive_ptr<Message> msg; - - public: - RecoveredEnqueue(Queue::shared_ptr queue, boost::intrusive_ptr<Message> msg); - virtual bool prepare(TransactionContext* ctxt) throw(); - virtual void commit() throw(); - virtual void rollback() throw(); - virtual ~RecoveredEnqueue(){} - virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } - - Queue::shared_ptr getQueue() const { return queue; } - boost::intrusive_ptr<Message> getMessage() const { return msg; } +namespace broker { +class RecoveredEnqueue : public TxOp{ + boost::shared_ptr<Queue> queue; + boost::intrusive_ptr<Message> msg; + + public: + RecoveredEnqueue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg); + virtual bool prepare(TransactionContext* ctxt) throw(); + virtual void commit() throw(); + virtual void rollback() throw(); + virtual ~RecoveredEnqueue(){} + virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); } + + boost::shared_ptr<Queue> getQueue() const { return queue; } + boost::intrusive_ptr<Message> getMessage() const { return msg; } - }; - } +}; +} } diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index dd4b7543af..2f04943581 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -30,6 +30,7 @@ using boost::dynamic_pointer_cast; using boost::intrusive_ptr; +using std::string; namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index bfdc11dc76..b2e648410a 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -44,6 +44,7 @@ #include <map> #include <vector> +#include <boost/enable_shared_from_this.hpp> #include <boost/intrusive_ptr.hpp> #include <boost/cast.hpp> @@ -74,14 +75,14 @@ class SemanticState : private boost::noncopyable { { mutable qpid::sys::Mutex lock; SemanticState* const parent; - const string name; - const Queue::shared_ptr queue; + const std::string name; + const boost::shared_ptr<Queue> queue; const bool ackExpected; const bool acquire; bool blocked; bool windowing; bool exclusive; - string resumeId; + std::string resumeId; uint64_t resumeTtl; framing::FieldTable arguments; uint32_t msgCredit; @@ -99,7 +100,7 @@ class SemanticState : private boost::noncopyable { typedef boost::shared_ptr<ConsumerImpl> shared_ptr; ConsumerImpl(SemanticState* parent, - const string& name, Queue::shared_ptr queue, + const std::string& name, boost::shared_ptr<Queue> queue, bool ack, bool acquire, bool exclusive, const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); ~ConsumerImpl(); @@ -122,7 +123,7 @@ class SemanticState : private boost::noncopyable { void flush(); void stop(); void complete(DeliveryRecord&); - Queue::shared_ptr getQueue() const { return queue; } + boost::shared_ptr<Queue> getQueue() const { return queue; } bool isBlocked() const { return blocked; } bool setBlocked(bool set) { std::swap(set, blocked); return set; } @@ -164,8 +165,8 @@ class SemanticState : private boost::noncopyable { boost::shared_ptr<Exchange> cacheExchange; AclModule* acl; const bool authMsg; - const string userID; - const string userName; + const std::string userID; + const std::string userName; const bool isDefaultRealm; bool closeComplete; @@ -194,17 +195,17 @@ class SemanticState : private boost::noncopyable { * @exception: ChannelException if no queue of that name is found. * @exception: ConnectionException if name="" and session has no default. */ - Queue::shared_ptr getQueue(const std::string& name) const; + boost::shared_ptr<Queue> getQueue(const std::string& name) const; - bool exists(const string& consumerTag); + bool exists(const std::string& consumerTag); - void consume(const string& destination, - Queue::shared_ptr queue, + void consume(const std::string& destination, + boost::shared_ptr<Queue> queue, bool ackRequired, bool acquire, bool exclusive, - const string& resumeId=string(), uint64_t resumeTtl=0, + const std::string& resumeId=std::string(), uint64_t resumeTtl=0, const framing::FieldTable& = framing::FieldTable()); - void cancel(const string& tag); + void cancel(const std::string& tag); void setWindowMode(const std::string& destination); void setCreditMode(const std::string& destination); diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index 3107801740..ca27fb6e1d 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -82,7 +82,7 @@ class Queue; { HandlerHelper(SemanticState& s) : HandlerImpl(s) {} - Queue::shared_ptr getQueue(const string& name) const; + boost::shared_ptr<Queue> getQueue(const std::string& name) const; }; @@ -156,7 +156,7 @@ class Queue; public: MessageHandlerImpl(SemanticState& session); - void transfer(const string& destination, + void transfer(const std::string& destination, uint8_t acceptMode, uint8_t acquireMode); @@ -164,34 +164,34 @@ class Queue; void reject(const framing::SequenceSet& commands, uint16_t code, - const string& text); + const std::string& text); void release(const framing::SequenceSet& commands, bool setRedelivered); framing::MessageAcquireResult acquire(const framing::SequenceSet&); - void subscribe(const string& queue, - const string& destination, + void subscribe(const std::string& queue, + const std::string& destination, uint8_t acceptMode, uint8_t acquireMode, bool exclusive, - const string& resumeId, + const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments); - void cancel(const string& destination); + void cancel(const std::string& destination); - void setFlowMode(const string& destination, + void setFlowMode(const std::string& destination, uint8_t flowMode); - void flow(const string& destination, + void flow(const std::string& destination, uint8_t unit, uint32_t value); - void flush(const string& destination); + void flush(const std::string& destination); - void stop(const string& destination); + void stop(const std::string& destination); framing::MessageResumeResult resume(const std::string& destination, const std::string& resumeId); @@ -204,7 +204,7 @@ class Queue; ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {} void sync(); - void result(const framing::SequenceNumber& commandId, const string& value); + void result(const framing::SequenceNumber& commandId, const std::string& value); void exception(uint16_t errorCode, const framing::SequenceNumber& commandId, uint8_t classCode, diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 3bbf143889..54c3bb32c8 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -41,7 +41,7 @@ class TopicExchange : public virtual Exchange { BindingMap bindings; qpid::sys::RWlock lock; - bool isBound(Queue::shared_ptr queue, const string& pattern); + bool isBound(Queue::shared_ptr queue, const std::string& pattern); public: static const std::string typeName; @@ -49,9 +49,9 @@ class TopicExchange : public virtual Exchange { static QPID_BROKER_EXTERN bool match(const std::string& pattern, const std::string& topic); static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern); - QPID_BROKER_EXTERN TopicExchange(const string& name, + QPID_BROKER_EXTERN TopicExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - QPID_BROKER_EXTERN TopicExchange(const string& _name, + QPID_BROKER_EXTERN TopicExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); @@ -59,17 +59,17 @@ class TopicExchange : public virtual Exchange { virtual std::string getType() const { return typeName; } QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args); + virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); QPID_BROKER_EXTERN virtual void route(Deliverable& msg, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue, - const string* const routingKey, + const std::string* const routingKey, const qpid::framing::FieldTable* const args); QPID_BROKER_EXTERN virtual ~TopicExchange(); diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp index 22deb771bd..36a451e62c 100644 --- a/cpp/src/qpid/broker/TxPublish.cpp +++ b/cpp/src/qpid/broker/TxPublish.cpp @@ -20,6 +20,7 @@ */ #include "qpid/log/Statement.h" #include "qpid/broker/TxPublish.h" +#include "qpid/broker/Queue.h" using boost::intrusive_ptr; using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h index b6ab9767ab..effa585676 100644 --- a/cpp/src/qpid/broker/TxPublish.h +++ b/cpp/src/qpid/broker/TxPublish.h @@ -22,7 +22,6 @@ #define _TxPublish_ #include "qpid/broker/BrokerImportExport.h" -#include "qpid/broker/Queue.h" #include "qpid/broker/Deliverable.h" #include "qpid/broker/Message.h" #include "qpid/broker/MessageStore.h" @@ -62,8 +61,8 @@ namespace qpid { }; boost::intrusive_ptr<Message> msg; - std::list<Queue::shared_ptr> queues; - std::list<Queue::shared_ptr> prepared; + std::list<boost::shared_ptr<Queue> > queues; + std::list<boost::shared_ptr<Queue> > prepared; void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>); @@ -83,7 +82,7 @@ namespace qpid { QPID_BROKER_EXTERN uint64_t contentSize(); boost::intrusive_ptr<Message> getMessage() const { return msg; } - const std::list<Queue::shared_ptr> getQueues() const { return queues; } + const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; } }; } } diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h index b4caa70db4..2e1edfc0ae 100644 --- a/cpp/src/qpid/cluster/FailoverExchange.h +++ b/cpp/src/qpid/cluster/FailoverExchange.h @@ -49,17 +49,17 @@ class FailoverExchange : public broker::Exchange // Exchange overrides std::string getType() const; - bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args); - bool unbind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args); - bool isBound(broker::Queue::shared_ptr queue, const std::string* const routingKey, const framing::FieldTable* const args); + bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); + bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args); + bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args); void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args); private: - void sendUpdate(const broker::Queue::shared_ptr&); + void sendUpdate(const boost::shared_ptr<broker::Queue>&); typedef sys::Mutex::ScopedLock Lock; typedef std::vector<Url> Urls; - typedef std::set<broker::Queue::shared_ptr> Queues; + typedef std::set<boost::shared_ptr<broker::Queue> > Queues; sys::Mutex lock; Urls urls; diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h index ab691afa70..7507179c06 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.h +++ b/cpp/src/qpid/management/ManagementDirectExchange.h @@ -36,15 +36,15 @@ class ManagementDirectExchange : public virtual DirectExchange public: static const std::string typeName; - ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementDirectExchange(const string& _name, bool _durable, + ManagementDirectExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementDirectExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } virtual void route(Deliverable& msg, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h index ece1c88ecf..232300265e 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.h +++ b/cpp/src/qpid/management/ManagementTopicExchange.h @@ -36,19 +36,19 @@ class ManagementTopicExchange : public virtual TopicExchange public: static const std::string typeName; - ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0); - ManagementTopicExchange(const string& _name, bool _durable, + ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0); + ManagementTopicExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* _parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } virtual void route(Deliverable& msg, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); virtual bool bind(Queue::shared_ptr queue, - const string& routingKey, + const std::string& routingKey, const qpid::framing::FieldTable* args); void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp index b5911bb71e..4b6d25ac7d 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.cpp +++ b/cpp/src/qpid/replication/ReplicationExchange.cpp @@ -22,6 +22,8 @@ #include "qpid/replication/constants.h" #include "qpid/Plugin.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h index f0252448f9..4b34e0df13 100644 --- a/cpp/src/qpid/replication/ReplicationExchange.h +++ b/cpp/src/qpid/replication/ReplicationExchange.h @@ -26,6 +26,11 @@ #include "qpid/framing/SequenceNumber.h" namespace qpid { + +namespace broker { +class QueueRegistry; +} + namespace replication { /** @@ -49,9 +54,9 @@ class ReplicationExchange : public qpid::broker::Exchange void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); - bool bind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - bool unbind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args); - bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); + bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args); + bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); private: qpid::broker::QueueRegistry& queues; qpid::framing::SequenceNumber sequence; diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp index fbf7566a18..f0afc8d451 100644 --- a/cpp/src/qpid/xml/XmlExchange.cpp +++ b/cpp/src/qpid/xml/XmlExchange.cpp @@ -49,6 +49,7 @@ using namespace qpid::framing; using namespace qpid::sys; using qpid::management::Manageable; +using std::string; namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h index 4394ede5e7..f34c417633 100644 --- a/cpp/src/qpid/xml/XmlExchange.h +++ b/cpp/src/qpid/xml/XmlExchange.h @@ -57,7 +57,7 @@ class XmlExchange : public virtual Exchange { }; - typedef std::map<string, XmlBinding::vector > XmlBindingsMap; + typedef std::map<std::string, XmlBinding::vector > XmlBindingsMap; XmlBindingsMap bindingsMap; XQilla xqilla; @@ -69,7 +69,7 @@ class XmlExchange : public virtual Exchange { static const std::string typeName; XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0); - XmlExchange(const string& _name, bool _durable, + XmlExchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0); virtual std::string getType() const { return typeName; } @@ -80,7 +80,7 @@ class XmlExchange : public virtual Exchange { virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args); - virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args); + virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args); virtual ~XmlExchange(); }; diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp index 17f9a0d148..f7013014ff 100644 --- a/cpp/src/tests/DeliveryRecordTest.cpp +++ b/cpp/src/tests/DeliveryRecordTest.cpp @@ -20,6 +20,7 @@ * */ #include "qpid/broker/DeliveryRecord.h" +#include "qpid/broker/Queue.h" #include "unit_test.h" #include <iostream> #include <memory> diff --git a/cpp/src/tests/QueueRegistryTest.cpp b/cpp/src/tests/QueueRegistryTest.cpp index 712cb568c3..ae555539a4 100644 --- a/cpp/src/tests/QueueRegistryTest.cpp +++ b/cpp/src/tests/QueueRegistryTest.cpp @@ -18,6 +18,7 @@ */ #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Queue.h" #include "unit_test.h" #include <string> |