summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.h')
-rw-r--r--cpp/src/qpid/broker/Broker.h99
1 files changed, 34 insertions, 65 deletions
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 698d446bca..468da8983a 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -25,40 +25,30 @@
#include "qpid/broker/AsyncResultQueueImpl.h"
#include "qpid/broker/AsyncStore.h"
#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/broker/ConnectionFactory.h"
-#include "qpid/broker/ConnectionToken.h"
-#include "qpid/broker/DirectExchange.h"
+
+#include "qpid/DataDir.h"
+#include "qpid/Options.h"
+#include "qpid/Plugin.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
//#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionManager.h"
#include "qpid/broker/QueueCleaner.h"
#include "qpid/broker/Vhost.h"
#include "qpid/broker/System.h"
-#include "qpid/broker/ExpiryPolicy.h"
#include "qpid/broker/ConsumerFactory.h"
#include "qpid/broker/ConnectionObservers.h"
#include "qpid/broker/ConfigurationObservers.h"
#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementAgent.h"
-#include "qmf/org/apache/qpid/broker/Broker.h"
-#include "qmf/org/apache/qpid/broker/ArgsBrokerConnect.h"
-#include "qpid/Options.h"
-#include "qpid/Plugin.h"
-#include "qpid/DataDir.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Timer.h"
-#include "qpid/types/Variant.h"
-#include "qpid/RefCounted.h"
-#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
#include <boost/intrusive_ptr.hpp>
+
#include <string>
#include <vector>
@@ -67,12 +57,14 @@ namespace qpid {
namespace sys {
class ProtocolFactory;
class Poller;
+class Timer;
}
struct Url;
namespace broker {
+class AclModule;
class ConnectionState;
class ExpiryPolicy;
class Message;
@@ -103,6 +95,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool noDataDir;
std::string dataDir;
uint16_t port;
+ std::vector<std::string> listenInterfaces;
int workerThreads;
int connectionBacklog;
bool enableMgmt;
@@ -139,10 +132,15 @@ class Broker : public sys::Runnable, public Plugin::Target,
void declareStandardExchange(const std::string& name, const std::string& type);
void setStore ();
- static void recoverComplete(const AsyncResultHandle* const);
- static void configureComplete(const AsyncResultHandle* const);
+ static void recoverCompleteCb(const AsyncResultHandle* const);
+ static void configureCompleteCb(const AsyncResultHandle* const);
+ static Broker* thisBroker;
+ void recoverComplete(const AsyncResultHandle* const arh);
+ void configureComplete(const AsyncResultHandle* const arh);
void setLogLevel(const std::string& level);
std::string getLogLevel();
+ void setLogHiresTimestamp(bool enabled);
+ bool getLogHiresTimestamp();
void createObject(const std::string& type, const std::string& name,
const qpid::types::Variant::Map& properties, bool strict, const ConnectionState* context);
void deleteObject(const std::string& type, const std::string& name,
@@ -158,8 +156,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
Manageable::status_t setTimestampConfig(const bool receive,
const ConnectionState* context);
boost::shared_ptr<sys::Poller> poller;
- sys::Timer timer;
- std::auto_ptr<sys::Timer> clusterTimer;
+ std::auto_ptr<sys::Timer> timer;
Options config;
std::auto_ptr<management::ManagementAgent> managementAgent;
ProtocolFactoryMap protocolFactories;
@@ -178,7 +175,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
boost::shared_ptr<sys::ConnectionCodec::Factory> factory;
DtxManager dtxManager;
SessionManager sessionManager;
- qmf::org::apache::qpid::broker::Broker* mgmtObject;
+ qmf::org::apache::qpid::broker::Broker::shared_ptr mgmtObject;
Vhost::shared_ptr vhostObject;
System::shared_ptr systemObject;
QueueCleaner queueCleaner;
@@ -187,10 +184,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
bool deferDeliveryImpl(const std::string& queue,
const Message& msg);
std::string federationTag;
- bool recovery;
- bool inCluster, clusterUpdatee;
+ bool recoveryInProgress;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
+ ProtocolRegistry protocolRegistry;
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
@@ -233,6 +230,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
AsyncResultQueueImpl& getAsyncResultQueue() { return asyncResultQueue; }
+ ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -240,7 +238,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
SessionManager& getSessionManager() { return sessionManager; }
const std::string& getFederationTag() const { return federationTag; }
- QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject() const;
+ QPID_BROKER_EXTERN management::ManagementObject::shared_ptr GetManagementObject() const;
QPID_BROKER_EXTERN management::Manageable* GetVhostObject() const;
QPID_BROKER_EXTERN management::Manageable::status_t ManagementMethod(
uint32_t methodId, management::Args& args, std::string& text);
@@ -253,19 +251,17 @@ class Broker : public sys::Runnable, public Plugin::Target,
QPID_BROKER_EXTERN void accept();
/** Create a connection to another broker. */
- void connect(const std::string& host, const std::string& port,
+ void connect(const std::string& name,
+ const std::string& host, const std::string& port,
const std::string& transport,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* =0);
- /** Create a connection to another broker. */
- void connect(const Url& url,
- boost::function2<void, int, std::string> failed,
- sys::ConnectionCodec::Factory* =0);
+ boost::function2<void, int, std::string> failed);
/** Move messages from one queue to another.
A zero quantity means to move all messages
+ Return -1 if one of the queues does not exist, otherwise
+ the number of messages moved.
*/
- QPID_BROKER_EXTERN uint32_t queueMoveMessages(
+ QPID_BROKER_EXTERN int32_t queueMoveMessages(
const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty,
@@ -277,46 +273,17 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Expose poller so plugins can register their descriptors. */
QPID_BROKER_EXTERN boost::shared_ptr<sys::Poller> getPoller();
- boost::shared_ptr<sys::ConnectionCodec::Factory> getConnectionFactory() { return factory; }
- void setConnectionFactory(boost::shared_ptr<sys::ConnectionCodec::Factory> f) { factory = f; }
-
/** Timer for local tasks affecting only this broker */
- sys::Timer& getTimer() { return timer; }
-
- /** Timer for tasks that must be synchronized if we are in a cluster */
- sys::Timer& getClusterTimer() { return clusterTimer.get() ? *clusterTimer : timer; }
- QPID_BROKER_EXTERN void setClusterTimer(std::auto_ptr<sys::Timer>);
+ sys::Timer& getTimer() { return *timer; }
boost::function<std::vector<Url> ()> getKnownBrokers;
static QPID_BROKER_EXTERN const std::string TCP_TRANSPORT;
- void setRecovery(bool set) { recovery = set; }
- bool getRecovery() const { return recovery; }
-
- /** True of this broker is part of a cluster.
- * Only valid after early initialization of plugins is complete.
- */
- bool isInCluster() const { return inCluster; }
- void setInCluster(bool set) { inCluster = set; }
-
- /** True if this broker is joining a cluster and in the process of
- * receiving a state update.
- */
- bool isClusterUpdatee() const { return clusterUpdatee; }
- void setClusterUpdatee(bool set) { clusterUpdatee = set; }
+ bool inRecovery() const { return recoveryInProgress; }
management::ManagementAgent* getManagementAgent() { return managementAgent.get(); }
- /**
- * Never true in a stand-alone broker. In a cluster, return true
- * to defer delivery of messages deliveredg in a cluster-unsafe
- * context.
- *@return true if delivery of a message should be deferred.
- */
- boost::function<bool (const std::string& queue,
- const Message& msg)> deferDelivery;
-
bool isAuthenticating ( ) { return config.auth; }
bool isTimestamping() { return config.timestampRcvMsgs; }
@@ -371,8 +338,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const;
QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&);
+ QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; }
/** Information identifying this system */
boost::shared_ptr<const System> getSystem() const { return systemObject; }
+ friend class StatusCheckThread;
};
}}