summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-10-14 19:38:40 +0000
committerAlan Conway <aconway@apache.org>2010-10-14 19:38:40 +0000
commit23204010207ad7db58500b6547b92b7f91d2df53 (patch)
treeffa680168fd8d4c04a3b3bcad0472d1a920985b7 /cpp/src
parent0be15c353c4cdc2612757fa4c877e5bb42e0228d (diff)
downloadqpid-python-23204010207ad7db58500b6547b92b7f91d2df53.tar.gz
Code cleanup in broker directory.
- Removed un-necessary #includes for broker/Queue.h - Removed "using std::string" in header files. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1022679 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp1
-rw-r--r--cpp/src/qpid/broker/Bridge.h2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--cpp/src/qpid/broker/Connection.h6
-rw-r--r--cpp/src/qpid/broker/ConnectionState.h20
-rw-r--r--cpp/src/qpid/broker/Deliverable.h1
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h9
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h13
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp1
-rw-r--r--cpp/src/qpid/broker/Exchange.h16
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp1
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h16
-rw-r--r--cpp/src/qpid/broker/Link.cpp1
-rw-r--r--cpp/src/qpid/broker/Link.h29
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp1
-rw-r--r--cpp/src/qpid/broker/Message.cpp1
-rw-r--r--cpp/src/qpid/broker/MessageStoreModule.cpp1
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp6
-rw-r--r--cpp/src/qpid/broker/Queue.h613
-rw-r--r--cpp/src/qpid/broker/QueueBindings.cpp1
-rw-r--r--cpp/src/qpid/broker/QueueCleaner.cpp1
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h28
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h7
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h35
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp1
-rw-r--r--cpp/src/qpid/broker/SemanticState.h27
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h24
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h14
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp1
-rw-r--r--cpp/src/qpid/broker/TxPublish.h7
-rw-r--r--cpp/src/qpid/cluster/FailoverExchange.h10
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.h6
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h8
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.cpp2
-rw-r--r--cpp/src/qpid/replication/ReplicationExchange.h11
-rw-r--r--cpp/src/qpid/xml/XmlExchange.cpp1
-rw-r--r--cpp/src/qpid/xml/XmlExchange.h6
-rw-r--r--cpp/src/tests/DeliveryRecordTest.cpp1
-rw-r--r--cpp/src/tests/QueueRegistryTest.cpp1
48 files changed, 490 insertions, 461 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 3e632f6659..9381f00268 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -34,6 +34,7 @@ using qpid::framing::FieldTable;
using qpid::framing::Uuid;
using qpid::framing::Buffer;
using qpid::management::ManagementAgent;
+using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace
diff --git a/cpp/src/qpid/broker/Bridge.h b/cpp/src/qpid/broker/Bridge.h
index f25d32e7b2..a846254c57 100644
--- a/cpp/src/qpid/broker/Bridge.h
+++ b/cpp/src/qpid/broker/Bridge.h
@@ -75,7 +75,7 @@ public:
// Exchange::DynamicBridge methods
void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0);
void sendReorigin();
- void ioThreadPropagateBinding(const string& queue, const string& exchange, const string& key, framing::FieldTable args);
+ void ioThreadPropagateBinding(const std::string& queue, const std::string& exchange, const std::string& key, framing::FieldTable args);
bool containsLocalTag(const std::string& tagList) const;
const std::string& getLocalTag() const;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 1a8bed1be0..33364e48df 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -70,6 +70,8 @@ using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
+using std::string;
+
namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 33ed032327..d50f0c946a 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
#include "qpid/sys/SecuritySettings.h"
#include "qpid/sys/ClusterSafe.h"
@@ -273,7 +274,7 @@ void Connection::closed(){ // Physically closed, suspend open sessions.
while (!channels.empty())
ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
- Queue::shared_ptr q(exclusiveQueues.front());
+ boost::shared_ptr<Queue> q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
if (q->canAutoDelete()) {
Queue::tryAutoDelete(broker, q);
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 8ad78f6652..c1b2b5a8fc 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -94,7 +94,7 @@ class Connection : public sys::ConnectionInputHandler,
SessionHandler& getChannel(framing::ChannelId channel);
/** Close the connection */
- void close(framing::connection::CloseCode code, const string& text);
+ void close(framing::connection::CloseCode code, const std::string& text);
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
@@ -116,7 +116,7 @@ class Connection : public sys::ConnectionInputHandler,
std::string getAuthMechanism();
std::string getAuthCredentials();
void notifyConnectionForced(const std::string& text);
- void setUserId(const string& uid);
+ void setUserId(const std::string& uid);
void raiseConnectEvent();
const std::string& getUserId() const { return ConnectionState::getUserId(); }
const std::string& getMgmtId() const { return mgmtId; }
@@ -158,7 +158,7 @@ class Connection : public sys::ConnectionInputHandler,
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
- typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
+ typedef std::vector<boost::shared_ptr<Queue> >::iterator queue_iterator;
ChannelMap channels;
qpid::sys::SecuritySettings securitySettings;
diff --git a/cpp/src/qpid/broker/ConnectionState.h b/cpp/src/qpid/broker/ConnectionState.h
index 19caacb595..774c37408d 100644
--- a/cpp/src/qpid/broker/ConnectionState.h
+++ b/cpp/src/qpid/broker/ConnectionState.h
@@ -61,16 +61,16 @@ class ConnectionState : public ConnectionToken, public management::Manageable
void setHeartbeat(uint16_t hb) { heartbeat = hb; }
void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
- virtual void setUserId(const string& uid) { userId = uid; }
- const string& getUserId() const { return userId; }
+ virtual void setUserId(const std::string& uid) { userId = uid; }
+ const std::string& getUserId() const { return userId; }
- void setUrl(const string& _url) { url = _url; }
- const string& getUrl() const { return url; }
+ void setUrl(const std::string& _url) { url = _url; }
+ const std::string& getUrl() const { return url; }
void setFederationLink(bool b) { federationLink = b; }
bool isFederationLink() const { return federationLink; }
- void setFederationPeerTag(const string& tag) { federationPeerTag = string(tag); }
- const string& getFederationPeerTag() const { return federationPeerTag; }
+ void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
+ const std::string& getFederationPeerTag() const { return federationPeerTag; }
std::vector<Url>& getKnownHosts() { return knownHosts; }
void setClientThrottling(bool set=true) { clientSupportsThrottling = set; }
@@ -79,7 +79,7 @@ class ConnectionState : public ConnectionToken, public management::Manageable
Broker& getBroker() { return broker; }
Broker& broker;
- std::vector<Queue::shared_ptr> exclusiveQueues;
+ std::vector<boost::shared_ptr<Queue> > exclusiveQueues;
//contained output tasks
sys::AggregateOutput outputTasks;
@@ -104,10 +104,10 @@ class ConnectionState : public ConnectionToken, public management::Manageable
uint32_t framemax;
uint16_t heartbeat;
uint16_t heartbeatmax;
- string userId;
- string url;
+ std::string userId;
+ std::string url;
bool federationLink;
- string federationPeerTag;
+ std::string federationPeerTag;
std::vector<Url> knownHosts;
bool clientSupportsThrottling;
framing::FrameHandler* clusterOrderOut;
diff --git a/cpp/src/qpid/broker/Deliverable.h b/cpp/src/qpid/broker/Deliverable.h
index 433469a212..ffb5a77bca 100644
--- a/cpp/src/qpid/broker/Deliverable.h
+++ b/cpp/src/qpid/broker/Deliverable.h
@@ -21,7 +21,6 @@
#ifndef _Deliverable_
#define _Deliverable_
-#include "qpid/broker/Queue.h"
#include "qpid/broker/Message.h"
namespace qpid {
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index 658e6bf48f..3ebb12461c 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -18,7 +18,9 @@
* under the License.
*
*/
+
#include "qpid/broker/DeliverableMessage.h"
+#include "qpid/broker/Queue.h"
using namespace qpid::broker;
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index 08abce35ef..ce613e7b6e 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -23,7 +23,6 @@
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/Queue.h"
#include "qpid/broker/Message.h"
#include <boost/intrusive_ptr.hpp>
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index b3a49c485d..9443eb6ea5 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/MessageTransferBody.h"
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index 5f802766b6..d388ba94be 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -28,13 +28,14 @@
#include <ostream>
#include "qpid/framing/SequenceSet.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Queue.h"
#include "qpid/broker/QueuedMessage.h"
#include "qpid/broker/DeliveryId.h"
#include "qpid/broker/Message.h"
namespace qpid {
namespace broker {
+
+class TransactionContext;
class SemanticState;
struct AckRange;
@@ -44,7 +45,7 @@ struct AckRange;
class DeliveryRecord
{
QueuedMessage msg;
- mutable Queue::shared_ptr queue;
+ mutable boost::shared_ptr<Queue> queue;
std::string tag;
DeliveryId id;
bool acquired : 1;
@@ -65,7 +66,7 @@ class DeliveryRecord
public:
QPID_BROKER_EXTERN DeliveryRecord(const QueuedMessage& msg,
- const Queue::shared_ptr& queue,
+ const boost::shared_ptr<Queue>& queue,
const std::string& tag,
bool acquired,
bool accepted,
@@ -105,7 +106,7 @@ class DeliveryRecord
static AckRange findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last);
const QueuedMessage& getMessage() const { return msg; }
framing::SequenceNumber getId() const { return id; }
- Queue::shared_ptr getQueue() const { return queue; }
+ boost::shared_ptr<Queue> getQueue() const { return queue; }
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index 1787c01af9..0db941f93b 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -18,7 +18,9 @@
* under the License.
*
*/
+
#include "qpid/log/Statement.h"
+#include "qpid/broker/Queue.h"
#include "qpid/broker/DirectExchange.h"
#include <iostream>
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 9a73f3bc41..a6f9cf91af 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -28,7 +28,6 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/CopyOnWriteArray.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
@@ -38,7 +37,7 @@ class DirectExchange : public virtual Exchange {
Queues queues;
FedBinding fedBinding;
};
- typedef std::map<string, BoundKey> Bindings;
+ typedef std::map<std::string, BoundKey> Bindings;
Bindings bindings;
qpid::sys::Mutex lock;
@@ -47,22 +46,22 @@ public:
QPID_BROKER_EXTERN DirectExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
- QPID_BROKER_EXTERN DirectExchange(const string& _name,
+ QPID_BROKER_EXTERN DirectExchange(const std::string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
- QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
+ QPID_BROKER_EXTERN virtual bool bind(boost::shared_ptr<Queue> queue,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
const std::string& routingKey,
const qpid::framing::FieldTable* args);
- QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
- const string* const routingKey,
+ QPID_BROKER_EXTERN virtual bool isBound(boost::shared_ptr<Queue> queue,
+ const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
QPID_BROKER_EXTERN virtual ~DirectExchange();
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 1188cf0e5b..98980e0360 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -23,6 +23,7 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/Broker.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/broker/Queue.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/MessageProperties.h"
#include "qpid/framing/reply_exceptions.h"
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 23d044ffd3..4b6b90026b 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -25,7 +25,6 @@
#include <boost/shared_ptr.hpp>
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/broker/Deliverable.h"
-#include "qpid/broker/Queue.h"
#include "qpid/broker/MessageStore.h"
#include "qpid/broker/PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
@@ -37,6 +36,7 @@
namespace qpid {
namespace broker {
+class Broker;
class ExchangeRegistry;
class Exchange : public PersistableExchange, public management::Manageable {
@@ -46,13 +46,13 @@ public:
typedef std::vector<Binding::shared_ptr> vector;
Exchange* parent;
- Queue::shared_ptr queue;
+ boost::shared_ptr<Queue> queue;
const std::string key;
const framing::FieldTable args;
std::string origin;
qmf::org::apache::qpid::broker::Binding* mgmtBinding;
- Binding(const std::string& key, Queue::shared_ptr queue, Exchange* parent = 0,
+ Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
~Binding();
void startManagement();
@@ -90,8 +90,8 @@ protected:
struct MatchQueue {
- const Queue::shared_ptr queue;
- MatchQueue(Queue::shared_ptr q);
+ const boost::shared_ptr<Queue> queue;
+ MatchQueue(boost::shared_ptr<Queue> q);
bool operator()(Exchange::Binding::shared_ptr b);
};
@@ -145,9 +145,9 @@ public:
bool inUseAsAlternate() { return alternateUsers > 0; }
virtual std::string getType() const = 0;
- virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
- virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+ virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 8122e5c2d9..99b121cbce 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -31,6 +31,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
using std::pair;
+using std::string;
using qpid::framing::FieldTable;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type){
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 7bcf6367cf..1a7d486796 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -41,7 +41,7 @@ class FanOutExchange : public virtual Exchange {
QPID_BROKER_EXTERN FanOutExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
- QPID_BROKER_EXTERN FanOutExchange(const string& _name,
+ QPID_BROKER_EXTERN FanOutExchange(const std::string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
@@ -59,7 +59,7 @@ class FanOutExchange : public virtual Exchange {
const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
- const string* const routingKey,
+ const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
QPID_BROKER_EXTERN virtual ~FanOutExchange();
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 3de26253a6..33c119cbbb 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -60,11 +60,11 @@ class HeadersExchange : public virtual Exchange {
struct FedUnbindModifier
{
- string fedOrigin;
+ std::string fedOrigin;
bool shouldUnbind;
bool shouldPropagate;
FedUnbindModifier();
- FedUnbindModifier(string & origin);
+ FedUnbindModifier(std::string & origin);
bool operator()(BoundKey & bk);
};
@@ -82,9 +82,9 @@ class HeadersExchange : public virtual Exchange {
public:
static const std::string typeName;
- QPID_BROKER_EXTERN HeadersExchange(const string& name,
+ QPID_BROKER_EXTERN HeadersExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
- QPID_BROKER_EXTERN HeadersExchange(const string& _name,
+ QPID_BROKER_EXTERN HeadersExchange(const std::string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
@@ -92,17 +92,17 @@ class HeadersExchange : public virtual Exchange {
virtual std::string getType() const { return typeName; }
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
- virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
- const string* const routingKey,
+ const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
QPID_BROKER_EXTERN virtual ~HeadersExchange();
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 5a1dfd9656..c13a24da95 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -42,6 +42,7 @@ using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::Mutex;
using std::stringstream;
+using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
Link::Link(LinkRegistry* _links,
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 9da610076b..bd74fe2a2f 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -37,7 +37,6 @@
namespace qpid {
namespace broker {
- using std::string;
class LinkRegistry;
class Broker;
class Connection;
@@ -47,13 +46,13 @@ namespace qpid {
sys::Mutex lock;
LinkRegistry* links;
MessageStore* store;
- string host;
+ std::string host;
uint16_t port;
- string transport;
+ std::string transport;
bool durable;
- string authMechanism;
- string username;
- string password;
+ std::string authMechanism;
+ std::string username;
+ std::string password;
mutable uint64_t persistenceId;
qmf::org::apache::qpid::broker::Link* mgmtObject;
Broker* broker;
@@ -93,13 +92,13 @@ namespace qpid {
Link(LinkRegistry* links,
MessageStore* store,
- string& host,
+ std::string& host,
uint16_t port,
- string& transport,
+ std::string& transport,
bool durable,
- string& authMechanism,
- string& username,
- string& password,
+ std::string& authMechanism,
+ std::string& username,
+ std::string& password,
Broker* broker,
management::Manageable* parent = 0);
virtual ~Link();
@@ -117,9 +116,9 @@ namespace qpid {
void setConnection(Connection*); // Set pointer to the AMQP Connection
void reconnect(const Address&); //called by LinkRegistry
- string getAuthMechanism() { return authMechanism; }
- string getUsername() { return username; }
- string getPassword() { return password; }
+ std::string getAuthMechanism() { return authMechanism; }
+ std::string getUsername() { return username; }
+ std::string getPassword() { return password; }
Broker* getBroker() { return broker; }
void notifyConnectionForced(const std::string text);
@@ -130,7 +129,7 @@ namespace qpid {
uint64_t getPersistenceId() const { return persistenceId; }
uint32_t encodedSize() const;
void encode(framing::Buffer& buffer) const;
- const string& getName() const;
+ const std::string& getName() const;
static Link::shared_ptr decode(LinkRegistry& links, framing::Buffer& buffer);
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index 49e0ec33e5..9d429a2dcc 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -27,6 +27,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
+using std::string;
using std::pair;
using std::stringstream;
using boost::intrusive_ptr;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index ad67bff34b..147b9e7a6a 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/StringUtils.h"
diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp
index 5f7cceebd3..cd9fd4c933 100644
--- a/cpp/src/qpid/broker/MessageStoreModule.cpp
+++ b/cpp/src/qpid/broker/MessageStoreModule.cpp
@@ -28,6 +28,7 @@
using boost::intrusive_ptr;
using qpid::framing::FieldTable;
+using std::string;
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 72cf40caab..dc8615d58b 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -85,11 +85,11 @@ void NullMessageStore::stage(const intrusive_ptr<PersistableMessage>&) {}
void NullMessageStore::destroy(PersistableMessage&) {}
-void NullMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>&, const string&) {}
+void NullMessageStore::appendContent(const intrusive_ptr<const PersistableMessage>&, const std::string&) {}
void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&,
const intrusive_ptr<const PersistableMessage>&,
- string&, uint64_t, uint32_t)
+ std::string&, uint64_t, uint32_t)
{
throw qpid::framing::InternalErrorException("Can't load content; persistence not enabled");
}
@@ -139,7 +139,7 @@ void NullMessageStore::abort(TransactionContext& ctxt)
prepared.erase(DummyCtxt::getXid(ctxt));
}
-void NullMessageStore::collectPreparedXids(std::set<string>& out)
+void NullMessageStore::collectPreparedXids(std::set<std::string>& out)
{
out.insert(prepared.begin(), prepared.end());
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 41c6b46f2b..96c79d1b92 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -39,8 +39,8 @@
#include "qpid/framing/amqp_types.h"
#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <list>
#include <vector>
@@ -49,324 +49,319 @@
#include <algorithm>
namespace qpid {
- namespace broker {
- class Broker;
- class MessageStore;
- class QueueEvents;
- class QueueRegistry;
- class TransactionContext;
- class Exchange;
-
- using std::string;
-
-
-
- /**
- * The brokers representation of an amqp queue. Messages are
- * delivered to a queue from where they can be dispatched to
- * registered consumers or be stored until dequeued or until one
- * or more consumers registers.
- */
- class Queue : public boost::enable_shared_from_this<Queue>,
- public PersistableQueue, public management::Manageable {
-
- struct UsageBarrier
- {
- Queue& parent;
- uint count;
+namespace broker {
+class Broker;
+class MessageStore;
+class QueueEvents;
+class QueueRegistry;
+class TransactionContext;
+class Exchange;
+
+/**
+ * The brokers representation of an amqp queue. Messages are
+ * delivered to a queue from where they can be dispatched to
+ * registered consumers or be stored until dequeued or until one
+ * or more consumers registers.
+ */
+class Queue : public boost::enable_shared_from_this<Queue>,
+ public PersistableQueue, public management::Manageable {
+
+ struct UsageBarrier
+ {
+ Queue& parent;
+ uint count;
- UsageBarrier(Queue&);
- bool acquire();
- void release();
- void destroy();
- };
+ UsageBarrier(Queue&);
+ bool acquire();
+ void release();
+ void destroy();
+ };
- struct ScopedUse
- {
- UsageBarrier& barrier;
- const bool acquired;
- ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
- ~ScopedUse() { if (acquired) barrier.release(); }
- };
+ struct ScopedUse
+ {
+ UsageBarrier& barrier;
+ const bool acquired;
+ ScopedUse(UsageBarrier& b) : barrier(b), acquired(barrier.acquire()) {}
+ ~ScopedUse() { if (acquired) barrier.release(); }
+ };
- typedef std::deque<QueuedMessage> Messages;
- typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
- enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
-
- const string name;
- const bool autodelete;
- MessageStore* store;
- const OwnershipToken* owner;
- uint32_t consumerCount;
- OwnershipToken* exclusive;
- bool noLocal;
- bool lastValueQueue;
- bool lastValueQueueNoBrowse;
- bool persistLastNode;
- bool inLastNodeFailure;
- std::string traceId;
- std::vector<std::string> traceExclude;
- QueueListeners listeners;
- Messages messages;
- Messages pendingDequeues;//used to avoid dequeuing during recovery
- LVQ lvq;
- mutable qpid::sys::Mutex consumerLock;
- mutable qpid::sys::Monitor messageLock;
- mutable qpid::sys::Mutex ownershipLock;
- mutable uint64_t persistenceId;
- framing::FieldTable settings;
- std::auto_ptr<QueuePolicy> policy;
- bool policyExceeded;
- QueueBindings bindings;
- std::string alternateExchangeName;
- boost::shared_ptr<Exchange> alternateExchange;
- framing::SequenceNumber sequence;
- qmf::org::apache::qpid::broker::Queue* mgmtObject;
- RateTracker dequeueTracker;
- int eventMode;
- QueueEvents* eventMgr;
- bool insertSeqNo;
- std::string seqNoKey;
- Broker* broker;
- bool deleted;
- UsageBarrier barrier;
-
- void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
- void setPolicy(std::auto_ptr<QueuePolicy> policy);
- bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
- bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
- void notifyListener();
-
- void removeListener(Consumer::shared_ptr);
-
- bool isExcluded(boost::intrusive_ptr<Message>& msg);
-
- void dequeued(const QueuedMessage& msg);
- void popAndDequeue();
- QueuedMessage getFront();
- QueuedMessage& checkLvqReplace(QueuedMessage& msg);
- void clearLVQIndex(const QueuedMessage& msg);
-
- inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
- {
- if (mgmtObject != 0) {
- mgmtObject->inc_msgTotalEnqueues ();
- mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
- if (msg->isPersistent ()) {
- mgmtObject->inc_msgPersistEnqueues ();
- mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
- }
- }
+ typedef std::deque<QueuedMessage> Messages;
+ typedef std::map<std::string,boost::intrusive_ptr<Message> > LVQ;
+ enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
+
+ const std::string name;
+ const bool autodelete;
+ MessageStore* store;
+ const OwnershipToken* owner;
+ uint32_t consumerCount;
+ OwnershipToken* exclusive;
+ bool noLocal;
+ bool lastValueQueue;
+ bool lastValueQueueNoBrowse;
+ bool persistLastNode;
+ bool inLastNodeFailure;
+ std::string traceId;
+ std::vector<std::string> traceExclude;
+ QueueListeners listeners;
+ Messages messages;
+ Messages pendingDequeues;//used to avoid dequeuing during recovery
+ LVQ lvq;
+ mutable qpid::sys::Mutex consumerLock;
+ mutable qpid::sys::Monitor messageLock;
+ mutable qpid::sys::Mutex ownershipLock;
+ mutable uint64_t persistenceId;
+ framing::FieldTable settings;
+ std::auto_ptr<QueuePolicy> policy;
+ bool policyExceeded;
+ QueueBindings bindings;
+ std::string alternateExchangeName;
+ boost::shared_ptr<Exchange> alternateExchange;
+ framing::SequenceNumber sequence;
+ qmf::org::apache::qpid::broker::Queue* mgmtObject;
+ RateTracker dequeueTracker;
+ int eventMode;
+ QueueEvents* eventMgr;
+ bool insertSeqNo;
+ std::string seqNoKey;
+ Broker* broker;
+ bool deleted;
+ UsageBarrier barrier;
+
+ void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
+ void setPolicy(std::auto_ptr<QueuePolicy> policy);
+ bool seek(QueuedMessage& msg, Consumer::shared_ptr position);
+ bool getNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ ConsumeCode consumeNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
+ void notifyListener();
+
+ void removeListener(Consumer::shared_ptr);
+
+ bool isExcluded(boost::intrusive_ptr<Message>& msg);
+
+ void dequeued(const QueuedMessage& msg);
+ void popMsg(QueuedMessage& qmsg);
+ void popAndDequeue();
+ QueuedMessage getFront();
+ QueuedMessage& checkLvqReplace(QueuedMessage& msg);
+ void clearLVQIndex(const QueuedMessage& msg);
+
+ inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
+ {
+ if (mgmtObject != 0) {
+ mgmtObject->inc_msgTotalEnqueues ();
+ mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ if (msg->isPersistent ()) {
+ mgmtObject->inc_msgPersistEnqueues ();
+ mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
- inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
- {
- if (mgmtObject != 0){
- mgmtObject->inc_msgTotalDequeues ();
- mgmtObject->inc_byteTotalDequeues (msg->contentSize());
- if (msg->isPersistent ()){
- mgmtObject->inc_msgPersistDequeues ();
- mgmtObject->inc_bytePersistDequeues (msg->contentSize());
- }
- }
+ }
+ }
+ inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
+ {
+ if (mgmtObject != 0){
+ mgmtObject->inc_msgTotalDequeues ();
+ mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ if (msg->isPersistent ()){
+ mgmtObject->inc_msgPersistDequeues ();
+ mgmtObject->inc_bytePersistDequeues (msg->contentSize());
}
+ }
+ }
- Messages::iterator findAt(framing::SequenceNumber pos);
- void checkNotDeleted();
-
- public:
-
- typedef boost::shared_ptr<Queue> shared_ptr;
-
- typedef std::vector<shared_ptr> vector;
-
- QPID_BROKER_EXTERN Queue(const string& name,
- bool autodelete = false,
- MessageStore* const store = 0,
- const OwnershipToken* const owner = 0,
- management::Manageable* parent = 0,
- Broker* broker = 0);
- QPID_BROKER_EXTERN ~Queue();
-
- QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
-
- void create(const qpid::framing::FieldTable& settings);
-
- // "recovering" means we are doing a MessageStore recovery.
- QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
- bool recovering = false);
- void destroy();
- void notifyDeleted();
- QPID_BROKER_EXTERN void bound(const string& exchange,
- const string& key,
- const qpid::framing::FieldTable& args);
- QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
- Queue::shared_ptr shared_ref);
-
- QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
- QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
-
- /**
- * Delivers a message to the queue. Will record it as
- * enqueued if persistent then process it.
- */
- QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
- /**
- * Dispatches the messages immediately to a consumer if
- * one is available or stores it for later if not.
- */
- QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
- /**
- * Returns a message to the in-memory queue (due to lack
- * of acknowledegement from a receiver). If a consumer is
- * available it will be dispatched immediately, else it
- * will be returned to the front of the queue.
- */
- QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
- /**
- * Used during recovery to add stored messages back to the queue
- */
- QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
-
- QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
- bool exclusive = false);
- QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
-
- uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
- QPID_BROKER_EXTERN void purgeExpired();
-
- //move qty # of messages to destination Queue destq
- uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
-
- QPID_BROKER_EXTERN uint32_t getMessageCount() const;
- QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
- QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
- inline const string& getName() const { return name; }
- bool isExclusiveOwner(const OwnershipToken* const o) const;
- void releaseExclusiveOwnership();
- bool setExclusiveOwner(const OwnershipToken* const o);
- bool hasExclusiveConsumer() const;
- bool hasExclusiveOwner() const;
- inline bool isDurable() const { return store != 0; }
- inline const framing::FieldTable& getSettings() const { return settings; }
- inline bool isAutoDelete() const { return autodelete; }
- bool canAutoDelete() const;
- const QueueBindings& getBindings() const { return bindings; }
-
- /**
- * used to take messages from in memory and flush down to disk.
- */
- QPID_BROKER_EXTERN void setLastNodeFailure();
- QPID_BROKER_EXTERN void clearLastNodeFailure();
-
- bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
- void enqueueAborted(boost::intrusive_ptr<Message> msg);
- /**
- * dequeue from store (only done once messages is acknowledged)
- */
- QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
- /**
- * Inform the queue that a previous transactional dequeue
- * committed.
- */
- void dequeueCommitted(const QueuedMessage& msg);
-
- /**
- * Inform queue of messages that were enqueued, have since
- * been acquired but not yet accepted or released (and
- * thus are still logically on the queue) - used in
- * clustered broker.
- */
- void enqueued(const QueuedMessage& msg);
-
- /**
- * Test whether the specified message (identified by its
- * sequence/position), is still enqueued (note this
- * doesn't mean it is available for delivery as it may
- * have been delievered to a subscriber who has not yet
- * accepted it).
- */
- bool isEnqueued(const QueuedMessage& msg);
+ Messages::iterator findAt(framing::SequenceNumber pos);
+ void checkNotDeleted();
+
+ public:
+
+ typedef boost::shared_ptr<Queue> shared_ptr;
+
+ typedef std::vector<shared_ptr> vector;
+
+ QPID_BROKER_EXTERN Queue(const std::string& name,
+ bool autodelete = false,
+ MessageStore* const store = 0,
+ const OwnershipToken* const owner = 0,
+ management::Manageable* parent = 0,
+ Broker* broker = 0);
+ QPID_BROKER_EXTERN ~Queue();
+
+ QPID_BROKER_EXTERN bool dispatch(Consumer::shared_ptr);
+
+ void create(const qpid::framing::FieldTable& settings);
+
+ // "recovering" means we are doing a MessageStore recovery.
+ QPID_BROKER_EXTERN void configure(const qpid::framing::FieldTable& settings,
+ bool recovering = false);
+ void destroy();
+ void notifyDeleted();
+ QPID_BROKER_EXTERN void bound(const std::string& exchange,
+ const std::string& key,
+ const qpid::framing::FieldTable& args);
+ QPID_BROKER_EXTERN void unbind(ExchangeRegistry& exchanges,
+ Queue::shared_ptr shared_ref);
+
+ QPID_BROKER_EXTERN bool acquire(const QueuedMessage& msg);
+ QPID_BROKER_EXTERN bool acquireMessageAt(const qpid::framing::SequenceNumber& position, QueuedMessage& message);
+
+ /**
+ * Delivers a message to the queue. Will record it as
+ * enqueued if persistent then process it.
+ */
+ QPID_BROKER_EXTERN void deliver(boost::intrusive_ptr<Message> msg);
+ /**
+ * Dispatches the messages immediately to a consumer if
+ * one is available or stores it for later if not.
+ */
+ QPID_BROKER_EXTERN void process(boost::intrusive_ptr<Message>& msg);
+ /**
+ * Returns a message to the in-memory queue (due to lack
+ * of acknowledegement from a receiver). If a consumer is
+ * available it will be dispatched immediately, else it
+ * will be returned to the front of the queue.
+ */
+ QPID_BROKER_EXTERN void requeue(const QueuedMessage& msg);
+ /**
+ * Used during recovery to add stored messages back to the queue
+ */
+ QPID_BROKER_EXTERN void recover(boost::intrusive_ptr<Message>& msg);
+
+ QPID_BROKER_EXTERN void consume(Consumer::shared_ptr c,
+ bool exclusive = false);
+ QPID_BROKER_EXTERN void cancel(Consumer::shared_ptr c);
+
+ uint32_t purge(const uint32_t purge_request=0, boost::shared_ptr<Exchange> dest=boost::shared_ptr<Exchange>()); //defaults to all messages
+ QPID_BROKER_EXTERN void purgeExpired();
+
+ //move qty # of messages to destination Queue destq
+ uint32_t move(const Queue::shared_ptr destq, uint32_t qty);
+
+ QPID_BROKER_EXTERN uint32_t getMessageCount() const;
+ QPID_BROKER_EXTERN uint32_t getEnqueueCompleteMessageCount() const;
+ QPID_BROKER_EXTERN uint32_t getConsumerCount() const;
+ inline const std::string& getName() const { return name; }
+ bool isExclusiveOwner(const OwnershipToken* const o) const;
+ void releaseExclusiveOwnership();
+ bool setExclusiveOwner(const OwnershipToken* const o);
+ bool hasExclusiveConsumer() const;
+ bool hasExclusiveOwner() const;
+ inline bool isDurable() const { return store != 0; }
+ inline const framing::FieldTable& getSettings() const { return settings; }
+ inline bool isAutoDelete() const { return autodelete; }
+ bool canAutoDelete() const;
+ const QueueBindings& getBindings() const { return bindings; }
+
+ /**
+ * used to take messages from in memory and flush down to disk.
+ */
+ QPID_BROKER_EXTERN void setLastNodeFailure();
+ QPID_BROKER_EXTERN void clearLastNodeFailure();
+
+ bool enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg, bool suppressPolicyCheck = false);
+ void enqueueAborted(boost::intrusive_ptr<Message> msg);
+ /**
+ * dequeue from store (only done once messages is acknowledged)
+ */
+ QPID_BROKER_EXTERN bool dequeue(TransactionContext* ctxt, const QueuedMessage &msg);
+ /**
+ * Inform the queue that a previous transactional dequeue
+ * committed.
+ */
+ void dequeueCommitted(const QueuedMessage& msg);
+
+ /**
+ * Inform queue of messages that were enqueued, have since
+ * been acquired but not yet accepted or released (and
+ * thus are still logically on the queue) - used in
+ * clustered broker.
+ */
+ void enqueued(const QueuedMessage& msg);
+
+ /**
+ * Test whether the specified message (identified by its
+ * sequence/position), is still enqueued (note this
+ * doesn't mean it is available for delivery as it may
+ * have been delievered to a subscriber who has not yet
+ * accepted it).
+ */
+ bool isEnqueued(const QueuedMessage& msg);
- /**
- * Gets the next available message
- */
- QPID_BROKER_EXTERN QueuedMessage get();
-
- /** Get the message at position pos */
- QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
-
- const QueuePolicy* getPolicy();
-
- void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
- boost::shared_ptr<Exchange> getAlternateExchange();
- bool isLocal(boost::intrusive_ptr<Message>& msg);
-
- //PersistableQueue support:
- uint64_t getPersistenceId() const;
- void setPersistenceId(uint64_t persistenceId) const;
- void encode(framing::Buffer& buffer) const;
- uint32_t encodedSize() const;
-
- // "recovering" means we are doing a MessageStore recovery.
- static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false );
- static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
-
- virtual void setExternalQueueStore(ExternalQueueStore* inst);
-
- // Manageable entry points
- management::ManagementObject* GetManagementObject (void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
-
- /** Apply f to each Message on the queue. */
- template <class F> void eachMessage(F f) {
- sys::Mutex::ScopedLock l(messageLock);
- if (lastValueQueue) {
- for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
- f(checkLvqReplace(*i));
- }
- } else {
- std::for_each(messages.begin(), messages.end(), f);
- }
- }
-
- /** Apply f to each QueueBinding on the queue */
- template <class F> void eachBinding(F f) {
- bindings.eachBinding(f);
+ /**
+ * Gets the next available message
+ */
+ QPID_BROKER_EXTERN QueuedMessage get();
+
+ /** Get the message at position pos */
+ QPID_BROKER_EXTERN QueuedMessage find(framing::SequenceNumber pos) const;
+
+ const QueuePolicy* getPolicy();
+
+ void setAlternateExchange(boost::shared_ptr<Exchange> exchange);
+ boost::shared_ptr<Exchange> getAlternateExchange();
+ bool isLocal(boost::intrusive_ptr<Message>& msg);
+
+ //PersistableQueue support:
+ uint64_t getPersistenceId() const;
+ void setPersistenceId(uint64_t persistenceId) const;
+ void encode(framing::Buffer& buffer) const;
+ uint32_t encodedSize() const;
+
+ // "recovering" means we are doing a MessageStore recovery.
+ static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer, bool recovering = false );
+ static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
+
+ virtual void setExternalQueueStore(ExternalQueueStore* inst);
+
+ // Manageable entry points
+ management::ManagementObject* GetManagementObject (void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args, std::string& text);
+
+ /** Apply f to each Message on the queue. */
+ template <class F> void eachMessage(F f) {
+ sys::Mutex::ScopedLock l(messageLock);
+ if (lastValueQueue) {
+ for (Messages::iterator i = messages.begin(); i != messages.end(); ++i) {
+ f(checkLvqReplace(*i));
}
+ } else {
+ std::for_each(messages.begin(), messages.end(), f);
+ }
+ }
- void popMsg(QueuedMessage& qmsg);
-
- /** Set the position sequence number for the next message on the queue.
- * Must be >= the current sequence number.
- * Used by cluster to replicate queues.
- */
- QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
- /** return current position sequence number for the next message on the queue.
- */
- QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
- int getEventMode();
- void setQueueEventManager(QueueEvents&);
- QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
- /**
- * Notify queue that recovery has completed.
- */
- void recoveryComplete(ExchangeRegistry& exchanges);
-
- // For cluster update
- QueueListeners& getListeners();
-
- /**
- * Reserve space in policy for an enqueued message that
- * has been recovered in the prepared state (dtx only)
- */
- void recoverPrepared(boost::intrusive_ptr<Message>& msg);
-
- void flush();
- };
+ /** Apply f to each QueueBinding on the queue */
+ template <class F> void eachBinding(F f) {
+ bindings.eachBinding(f);
}
+
+ /** Set the position sequence number for the next message on the queue.
+ * Must be >= the current sequence number.
+ * Used by cluster to replicate queues.
+ */
+ QPID_BROKER_EXTERN void setPosition(framing::SequenceNumber pos);
+ /** return current position sequence number for the next message on the queue.
+ */
+ QPID_BROKER_EXTERN framing::SequenceNumber getPosition();
+ int getEventMode();
+ void setQueueEventManager(QueueEvents&);
+ QPID_BROKER_EXTERN void insertSequenceNumbers(const std::string& key);
+ /**
+ * Notify queue that recovery has completed.
+ */
+ void recoveryComplete(ExchangeRegistry& exchanges);
+
+ // For cluster update
+ QueueListeners& getListeners();
+
+ /**
+ * Reserve space in policy for an enqueued message that
+ * has been recovered in the prepared state (dtx only)
+ */
+ void recoverPrepared(boost::intrusive_ptr<Message>& msg);
+
+ void flush();
+};
+}
}
diff --git a/cpp/src/qpid/broker/QueueBindings.cpp b/cpp/src/qpid/broker/QueueBindings.cpp
index 3f43a8ef68..60d315acfe 100644
--- a/cpp/src/qpid/broker/QueueBindings.cpp
+++ b/cpp/src/qpid/broker/QueueBindings.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueBindings.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/framing/reply_exceptions.h"
diff --git a/cpp/src/qpid/broker/QueueCleaner.cpp b/cpp/src/qpid/broker/QueueCleaner.cpp
index 6cdf506873..3499ea8a4d 100644
--- a/cpp/src/qpid/broker/QueueCleaner.cpp
+++ b/cpp/src/qpid/broker/QueueCleaner.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueCleaner.h"
#include "qpid/broker/Broker.h"
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 4b1fa62709..28b2d60cda 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/log/Statement.h"
@@ -26,6 +27,7 @@
using namespace qpid::broker;
using namespace qpid::sys;
+using std::string;
QueueRegistry::QueueRegistry(Broker* b) :
counter(1), store(0), events(0), parent(0), lastNode(false), broker(b) {}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 72a91dff24..66437f9665 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -22,17 +22,21 @@
#define _QueueRegistry_
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Queue.h"
#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
#include <algorithm>
#include <map>
namespace qpid {
namespace broker {
+class Queue;
class QueueEvents;
+class OwnershipToken;
+class Broker;
+class MessageStore;
/**
* A registry of queues indexed by queue name.
@@ -52,11 +56,11 @@ class QueueRegistry {
* @return The queue and a boolean flag which is true if the queue
* was created by this declare call false if it already existed.
*/
- QPID_BROKER_EXTERN std::pair<Queue::shared_ptr, bool> declare
- (const string& name,
- bool durable = false,
- bool autodelete = false,
- const OwnershipToken* owner = 0);
+ QPID_BROKER_EXTERN std::pair<boost::shared_ptr<Queue>, bool> declare(
+ const std::string& name,
+ bool durable = false,
+ bool autodelete = false,
+ const OwnershipToken* owner = 0);
/**
* Destroy the named queue.
@@ -70,8 +74,8 @@ class QueueRegistry {
* subsequent calls to find or declare with the same name.
*
*/
- QPID_BROKER_EXTERN void destroy(const string& name);
- template <class Test> bool destroyIf(const string& name, Test test)
+ QPID_BROKER_EXTERN void destroy(const std::string& name);
+ template <class Test> bool destroyIf(const std::string& name, Test test)
{
qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
@@ -85,12 +89,12 @@ class QueueRegistry {
/**
* Find the named queue. Return 0 if not found.
*/
- QPID_BROKER_EXTERN Queue::shared_ptr find(const string& name);
+ QPID_BROKER_EXTERN boost::shared_ptr<Queue> find(const std::string& name);
/**
* Generate unique queue name.
*/
- string generateName();
+ std::string generateName();
void setQueueEvents(QueueEvents*);
@@ -123,7 +127,7 @@ class QueueRegistry {
void updateQueueClusterState(bool lastNode);
private:
- typedef std::map<string, Queue::shared_ptr> QueueMap;
+ typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap;
QueueMap queues;
mutable qpid::sys::RWlock lock;
int counter;
@@ -134,7 +138,7 @@ private:
Broker* broker;
//destroy impl that assumes lock is already held:
- void destroyLH (const string& name);
+ void destroyLH (const std::string& name);
};
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.cpp b/cpp/src/qpid/broker/RecoveredDequeue.cpp
index 658fd5a89e..38cb8043c9 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.cpp
+++ b/cpp/src/qpid/broker/RecoveredDequeue.cpp
@@ -18,6 +18,8 @@
* under the License.
*
*/
+
+#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveredDequeue.h"
using boost::intrusive_ptr;
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index 67b37db5f9..66e66f1d5f 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -24,7 +24,6 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/Queue.h"
#include "qpid/broker/TxOp.h"
#include <boost/intrusive_ptr.hpp>
@@ -36,18 +35,18 @@
namespace qpid {
namespace broker {
class RecoveredDequeue : public TxOp{
- Queue::shared_ptr queue;
+ boost::shared_ptr<Queue> queue;
boost::intrusive_ptr<Message> msg;
public:
- RecoveredDequeue(Queue::shared_ptr queue, boost::intrusive_ptr<Message> msg);
+ RecoveredDequeue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredDequeue(){}
virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
- Queue::shared_ptr getQueue() const { return queue; }
+ boost::shared_ptr<Queue> getQueue() const { return queue; }
boost::intrusive_ptr<Message> getMessage() const { return msg; }
};
}
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/cpp/src/qpid/broker/RecoveredEnqueue.cpp
index 48faa0942c..6263c63e3d 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.cpp
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.cpp
@@ -18,6 +18,8 @@
* under the License.
*
*/
+
+#include "qpid/broker/Queue.h"
#include "qpid/broker/RecoveredEnqueue.h"
using boost::intrusive_ptr;
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index 09f928f098..5f718001d5 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -24,7 +24,6 @@
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/broker/Queue.h"
#include "qpid/broker/TxOp.h"
#include <boost/intrusive_ptr.hpp>
@@ -34,24 +33,24 @@
#include <list>
namespace qpid {
- namespace broker {
- class RecoveredEnqueue : public TxOp{
- Queue::shared_ptr queue;
- boost::intrusive_ptr<Message> msg;
-
- public:
- RecoveredEnqueue(Queue::shared_ptr queue, boost::intrusive_ptr<Message> msg);
- virtual bool prepare(TransactionContext* ctxt) throw();
- virtual void commit() throw();
- virtual void rollback() throw();
- virtual ~RecoveredEnqueue(){}
- virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
-
- Queue::shared_ptr getQueue() const { return queue; }
- boost::intrusive_ptr<Message> getMessage() const { return msg; }
+namespace broker {
+class RecoveredEnqueue : public TxOp{
+ boost::shared_ptr<Queue> queue;
+ boost::intrusive_ptr<Message> msg;
+
+ public:
+ RecoveredEnqueue(boost::shared_ptr<Queue> queue, boost::intrusive_ptr<Message> msg);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~RecoveredEnqueue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const { visitor(*this); }
+
+ boost::shared_ptr<Queue> getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
- };
- }
+};
+}
}
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index dd4b7543af..2f04943581 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -30,6 +30,7 @@
using boost::dynamic_pointer_cast;
using boost::intrusive_ptr;
+using std::string;
namespace qpid {
namespace broker {
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index bfdc11dc76..b2e648410a 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -44,6 +44,7 @@
#include <map>
#include <vector>
+#include <boost/enable_shared_from_this.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
@@ -74,14 +75,14 @@ class SemanticState : private boost::noncopyable {
{
mutable qpid::sys::Mutex lock;
SemanticState* const parent;
- const string name;
- const Queue::shared_ptr queue;
+ const std::string name;
+ const boost::shared_ptr<Queue> queue;
const bool ackExpected;
const bool acquire;
bool blocked;
bool windowing;
bool exclusive;
- string resumeId;
+ std::string resumeId;
uint64_t resumeTtl;
framing::FieldTable arguments;
uint32_t msgCredit;
@@ -99,7 +100,7 @@ class SemanticState : private boost::noncopyable {
typedef boost::shared_ptr<ConsumerImpl> shared_ptr;
ConsumerImpl(SemanticState* parent,
- const string& name, Queue::shared_ptr queue,
+ const std::string& name, boost::shared_ptr<Queue> queue,
bool ack, bool acquire, bool exclusive,
const std::string& resumeId, uint64_t resumeTtl, const framing::FieldTable& arguments);
~ConsumerImpl();
@@ -122,7 +123,7 @@ class SemanticState : private boost::noncopyable {
void flush();
void stop();
void complete(DeliveryRecord&);
- Queue::shared_ptr getQueue() const { return queue; }
+ boost::shared_ptr<Queue> getQueue() const { return queue; }
bool isBlocked() const { return blocked; }
bool setBlocked(bool set) { std::swap(set, blocked); return set; }
@@ -164,8 +165,8 @@ class SemanticState : private boost::noncopyable {
boost::shared_ptr<Exchange> cacheExchange;
AclModule* acl;
const bool authMsg;
- const string userID;
- const string userName;
+ const std::string userID;
+ const std::string userName;
const bool isDefaultRealm;
bool closeComplete;
@@ -194,17 +195,17 @@ class SemanticState : private boost::noncopyable {
* @exception: ChannelException if no queue of that name is found.
* @exception: ConnectionException if name="" and session has no default.
*/
- Queue::shared_ptr getQueue(const std::string& name) const;
+ boost::shared_ptr<Queue> getQueue(const std::string& name) const;
- bool exists(const string& consumerTag);
+ bool exists(const std::string& consumerTag);
- void consume(const string& destination,
- Queue::shared_ptr queue,
+ void consume(const std::string& destination,
+ boost::shared_ptr<Queue> queue,
bool ackRequired, bool acquire, bool exclusive,
- const string& resumeId=string(), uint64_t resumeTtl=0,
+ const std::string& resumeId=std::string(), uint64_t resumeTtl=0,
const framing::FieldTable& = framing::FieldTable());
- void cancel(const string& tag);
+ void cancel(const std::string& tag);
void setWindowMode(const std::string& destination);
void setCreditMode(const std::string& destination);
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index 3107801740..ca27fb6e1d 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -82,7 +82,7 @@ class Queue;
{
HandlerHelper(SemanticState& s) : HandlerImpl(s) {}
- Queue::shared_ptr getQueue(const string& name) const;
+ boost::shared_ptr<Queue> getQueue(const std::string& name) const;
};
@@ -156,7 +156,7 @@ class Queue;
public:
MessageHandlerImpl(SemanticState& session);
- void transfer(const string& destination,
+ void transfer(const std::string& destination,
uint8_t acceptMode,
uint8_t acquireMode);
@@ -164,34 +164,34 @@ class Queue;
void reject(const framing::SequenceSet& commands,
uint16_t code,
- const string& text);
+ const std::string& text);
void release(const framing::SequenceSet& commands,
bool setRedelivered);
framing::MessageAcquireResult acquire(const framing::SequenceSet&);
- void subscribe(const string& queue,
- const string& destination,
+ void subscribe(const std::string& queue,
+ const std::string& destination,
uint8_t acceptMode,
uint8_t acquireMode,
bool exclusive,
- const string& resumeId,
+ const std::string& resumeId,
uint64_t resumeTtl,
const framing::FieldTable& arguments);
- void cancel(const string& destination);
+ void cancel(const std::string& destination);
- void setFlowMode(const string& destination,
+ void setFlowMode(const std::string& destination,
uint8_t flowMode);
- void flow(const string& destination,
+ void flow(const std::string& destination,
uint8_t unit,
uint32_t value);
- void flush(const string& destination);
+ void flush(const std::string& destination);
- void stop(const string& destination);
+ void stop(const std::string& destination);
framing::MessageResumeResult resume(const std::string& destination,
const std::string& resumeId);
@@ -204,7 +204,7 @@ class Queue;
ExecutionHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
void sync();
- void result(const framing::SequenceNumber& commandId, const string& value);
+ void result(const framing::SequenceNumber& commandId, const std::string& value);
void exception(uint16_t errorCode,
const framing::SequenceNumber& commandId,
uint8_t classCode,
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 3bbf143889..54c3bb32c8 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -41,7 +41,7 @@ class TopicExchange : public virtual Exchange {
BindingMap bindings;
qpid::sys::RWlock lock;
- bool isBound(Queue::shared_ptr queue, const string& pattern);
+ bool isBound(Queue::shared_ptr queue, const std::string& pattern);
public:
static const std::string typeName;
@@ -49,9 +49,9 @@ class TopicExchange : public virtual Exchange {
static QPID_BROKER_EXTERN bool match(const std::string& pattern, const std::string& topic);
static QPID_BROKER_EXTERN std::string normalize(const std::string& pattern);
- QPID_BROKER_EXTERN TopicExchange(const string& name,
+ QPID_BROKER_EXTERN TopicExchange(const std::string& name,
management::Manageable* parent = 0, Broker* broker = 0);
- QPID_BROKER_EXTERN TopicExchange(const string& _name,
+ QPID_BROKER_EXTERN TopicExchange(const std::string& _name,
bool _durable,
const qpid::framing::FieldTable& _args,
management::Manageable* parent = 0, Broker* broker = 0);
@@ -59,17 +59,17 @@ class TopicExchange : public virtual Exchange {
virtual std::string getType() const { return typeName; }
QPID_BROKER_EXTERN virtual bool bind(Queue::shared_ptr queue,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
- virtual bool unbind(Queue::shared_ptr queue, const string& routingKey, const qpid::framing::FieldTable* args);
+ virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual void route(Deliverable& msg,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
QPID_BROKER_EXTERN virtual bool isBound(Queue::shared_ptr queue,
- const string* const routingKey,
+ const std::string* const routingKey,
const qpid::framing::FieldTable* const args);
QPID_BROKER_EXTERN virtual ~TopicExchange();
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index 22deb771bd..36a451e62c 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/log/Statement.h"
#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/Queue.h"
using boost::intrusive_ptr;
using namespace qpid::broker;
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index b6ab9767ab..effa585676 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -22,7 +22,6 @@
#define _TxPublish_
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/Queue.h"
#include "qpid/broker/Deliverable.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/MessageStore.h"
@@ -62,8 +61,8 @@ namespace qpid {
};
boost::intrusive_ptr<Message> msg;
- std::list<Queue::shared_ptr> queues;
- std::list<Queue::shared_ptr> prepared;
+ std::list<boost::shared_ptr<Queue> > queues;
+ std::list<boost::shared_ptr<Queue> > prepared;
void prepare(TransactionContext* ctxt, boost::shared_ptr<Queue>);
@@ -83,7 +82,7 @@ namespace qpid {
QPID_BROKER_EXTERN uint64_t contentSize();
boost::intrusive_ptr<Message> getMessage() const { return msg; }
- const std::list<Queue::shared_ptr> getQueues() const { return queues; }
+ const std::list<boost::shared_ptr<Queue> > getQueues() const { return queues; }
};
}
}
diff --git a/cpp/src/qpid/cluster/FailoverExchange.h b/cpp/src/qpid/cluster/FailoverExchange.h
index b4caa70db4..2e1edfc0ae 100644
--- a/cpp/src/qpid/cluster/FailoverExchange.h
+++ b/cpp/src/qpid/cluster/FailoverExchange.h
@@ -49,17 +49,17 @@ class FailoverExchange : public broker::Exchange
// Exchange overrides
std::string getType() const;
- bool bind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args);
- bool unbind(broker::Queue::shared_ptr queue, const std::string& routingKey, const framing::FieldTable* args);
- bool isBound(broker::Queue::shared_ptr queue, const std::string* const routingKey, const framing::FieldTable* const args);
+ bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const framing::FieldTable* args);
+ bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const framing::FieldTable* const args);
void route(broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args);
private:
- void sendUpdate(const broker::Queue::shared_ptr&);
+ void sendUpdate(const boost::shared_ptr<broker::Queue>&);
typedef sys::Mutex::ScopedLock Lock;
typedef std::vector<Url> Urls;
- typedef std::set<broker::Queue::shared_ptr> Queues;
+ typedef std::set<boost::shared_ptr<broker::Queue> > Queues;
sys::Mutex lock;
Urls urls;
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h
index ab691afa70..7507179c06 100644
--- a/cpp/src/qpid/management/ManagementDirectExchange.h
+++ b/cpp/src/qpid/management/ManagementDirectExchange.h
@@ -36,15 +36,15 @@ class ManagementDirectExchange : public virtual DirectExchange
public:
static const std::string typeName;
- ManagementDirectExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementDirectExchange(const string& _name, bool _durable,
+ ManagementDirectExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementDirectExchange(const std::string& _name, bool _durable,
const qpid::framing::FieldTable& _args,
Manageable* _parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
virtual void route(Deliverable& msg,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
index ece1c88ecf..232300265e 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -36,19 +36,19 @@ class ManagementTopicExchange : public virtual TopicExchange
public:
static const std::string typeName;
- ManagementTopicExchange(const string& name, Manageable* _parent = 0, Broker* broker = 0);
- ManagementTopicExchange(const string& _name, bool _durable,
+ ManagementTopicExchange(const std::string& name, Manageable* _parent = 0, Broker* broker = 0);
+ ManagementTopicExchange(const std::string& _name, bool _durable,
const qpid::framing::FieldTable& _args,
Manageable* _parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
virtual void route(Deliverable& msg,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
virtual bool bind(Queue::shared_ptr queue,
- const string& routingKey,
+ const std::string& routingKey,
const qpid::framing::FieldTable* args);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
diff --git a/cpp/src/qpid/replication/ReplicationExchange.cpp b/cpp/src/qpid/replication/ReplicationExchange.cpp
index b5911bb71e..4b6d25ac7d 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.cpp
+++ b/cpp/src/qpid/replication/ReplicationExchange.cpp
@@ -22,6 +22,8 @@
#include "qpid/replication/constants.h"
#include "qpid/Plugin.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/log/Statement.h"
diff --git a/cpp/src/qpid/replication/ReplicationExchange.h b/cpp/src/qpid/replication/ReplicationExchange.h
index f0252448f9..4b34e0df13 100644
--- a/cpp/src/qpid/replication/ReplicationExchange.h
+++ b/cpp/src/qpid/replication/ReplicationExchange.h
@@ -26,6 +26,11 @@
#include "qpid/framing/SequenceNumber.h"
namespace qpid {
+
+namespace broker {
+class QueueRegistry;
+}
+
namespace replication {
/**
@@ -49,9 +54,9 @@ class ReplicationExchange : public qpid::broker::Exchange
void route(qpid::broker::Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
- bool bind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- bool unbind(qpid::broker::Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
- bool isBound(qpid::broker::Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
+ bool bind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ bool unbind(boost::shared_ptr<broker::Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+ bool isBound(boost::shared_ptr<broker::Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
private:
qpid::broker::QueueRegistry& queues;
qpid::framing::SequenceNumber sequence;
diff --git a/cpp/src/qpid/xml/XmlExchange.cpp b/cpp/src/qpid/xml/XmlExchange.cpp
index fbf7566a18..f0afc8d451 100644
--- a/cpp/src/qpid/xml/XmlExchange.cpp
+++ b/cpp/src/qpid/xml/XmlExchange.cpp
@@ -49,6 +49,7 @@
using namespace qpid::framing;
using namespace qpid::sys;
using qpid::management::Manageable;
+using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
diff --git a/cpp/src/qpid/xml/XmlExchange.h b/cpp/src/qpid/xml/XmlExchange.h
index 4394ede5e7..f34c417633 100644
--- a/cpp/src/qpid/xml/XmlExchange.h
+++ b/cpp/src/qpid/xml/XmlExchange.h
@@ -57,7 +57,7 @@ class XmlExchange : public virtual Exchange {
};
- typedef std::map<string, XmlBinding::vector > XmlBindingsMap;
+ typedef std::map<std::string, XmlBinding::vector > XmlBindingsMap;
XmlBindingsMap bindingsMap;
XQilla xqilla;
@@ -69,7 +69,7 @@ class XmlExchange : public virtual Exchange {
static const std::string typeName;
XmlExchange(const std::string& name, management::Manageable* parent = 0, Broker* broker = 0);
- XmlExchange(const string& _name, bool _durable,
+ XmlExchange(const std::string& _name, bool _durable,
const qpid::framing::FieldTable& _args, management::Manageable* parent = 0, Broker* broker = 0);
virtual std::string getType() const { return typeName; }
@@ -80,7 +80,7 @@ class XmlExchange : public virtual Exchange {
virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
- virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+ virtual bool isBound(Queue::shared_ptr queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args);
virtual ~XmlExchange();
};
diff --git a/cpp/src/tests/DeliveryRecordTest.cpp b/cpp/src/tests/DeliveryRecordTest.cpp
index 17f9a0d148..f7013014ff 100644
--- a/cpp/src/tests/DeliveryRecordTest.cpp
+++ b/cpp/src/tests/DeliveryRecordTest.cpp
@@ -20,6 +20,7 @@
*
*/
#include "qpid/broker/DeliveryRecord.h"
+#include "qpid/broker/Queue.h"
#include "unit_test.h"
#include <iostream>
#include <memory>
diff --git a/cpp/src/tests/QueueRegistryTest.cpp b/cpp/src/tests/QueueRegistryTest.cpp
index 712cb568c3..ae555539a4 100644
--- a/cpp/src/tests/QueueRegistryTest.cpp
+++ b/cpp/src/tests/QueueRegistryTest.cpp
@@ -18,6 +18,7 @@
*/
#include "qpid/broker/QueueRegistry.h"
+#include "qpid/broker/Queue.h"
#include "unit_test.h"
#include <string>