summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Exchange.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Exchange.h')
-rw-r--r--qpid/cpp/src/qpid/broker/Exchange.h248
1 files changed, 248 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/broker/Exchange.h b/qpid/cpp/src/qpid/broker/Exchange.h
new file mode 100644
index 0000000000..b12af9a1dd
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/Exchange.h
@@ -0,0 +1,248 @@
+#ifndef _broker_Exchange_h
+#define _broker_Exchange_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/broker/Deliverable.h"
+#include "qpid/broker/MessageStore.h"
+#include "qpid/broker/PersistableExchange.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Exchange.h"
+#include "qmf/org/apache/qpid/broker/Binding.h"
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+class ExchangeRegistry;
+
+class QPID_BROKER_CLASS_EXTERN Exchange : public PersistableExchange, public management::Manageable {
+public:
+ struct Binding : public management::Manageable {
+ typedef boost::shared_ptr<Binding> shared_ptr;
+ typedef std::vector<Binding::shared_ptr> vector;
+
+ Exchange* parent;
+ boost::shared_ptr<Queue> queue;
+ const std::string key;
+ const framing::FieldTable args;
+ std::string origin;
+ qmf::org::apache::qpid::broker::Binding* mgmtBinding;
+
+ Binding(const std::string& key, boost::shared_ptr<Queue> queue, Exchange* parent = 0,
+ framing::FieldTable args = framing::FieldTable(), const std::string& origin = std::string());
+ ~Binding();
+ void startManagement();
+ management::ManagementObject* GetManagementObject() const;
+ };
+
+private:
+ const std::string name;
+ const bool durable;
+ std::string alternateName;
+ boost::shared_ptr<Exchange> alternate;
+ uint32_t alternateUsers;
+ mutable uint64_t persistenceId;
+
+protected:
+ mutable qpid::framing::FieldTable args;
+ bool sequence;
+ mutable qpid::sys::Mutex sequenceLock;
+ int64_t sequenceNo;
+ bool ive;
+ boost::intrusive_ptr<Message> lastMsg;
+
+ class PreRoute{
+ public:
+ PreRoute(Deliverable& msg, Exchange* _p);
+ ~PreRoute();
+ private:
+ Exchange* parent;
+ };
+
+ typedef boost::shared_ptr<const std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > ConstBindingList;
+ typedef boost::shared_ptr< std::vector<boost::shared_ptr<qpid::broker::Exchange::Binding> > > BindingList;
+ void doRoute(Deliverable& msg, ConstBindingList b);
+ void routeIVE();
+
+
+ struct MatchQueue {
+ const boost::shared_ptr<Queue> queue;
+ MatchQueue(boost::shared_ptr<Queue> q);
+ bool operator()(Exchange::Binding::shared_ptr b);
+ };
+
+ /** A FedBinding keeps track of information that Federation needs
+ to know when to propagate changes.
+
+ Dynamic federation needs to know which exchanges have at least
+ one local binding. The bindings on these exchanges need to be
+ propagated.
+
+ Federated binds and unbinds need to know which federation
+ origins are associated with the bindings for each queue. When
+ origins are added or deleted, the corresponding bindings need
+ to be propagated.
+
+ fedBindings[queueName] contains the origins associated with
+ the given queue.
+ */
+
+ class FedBinding {
+ uint32_t localBindings;
+
+ typedef std::set<std::string> originSet;
+ std::map<std::string, originSet> fedBindings;
+
+ public:
+ FedBinding() : localBindings(0) {}
+ bool hasLocal() const { return localBindings != 0; }
+
+ /** Returns true if propagation is needed. */
+ bool addOrigin(const std::string& queueName, const std::string& origin) {
+ if (origin.empty()) {
+ localBindings++;
+ return localBindings == 1;
+ }
+ fedBindings[queueName].insert(origin);
+ return true;
+ }
+
+ /** Returns true if propagation is needed. */
+ bool delOrigin(const std::string& queueName, const std::string& origin){
+ if (origin.empty()) { // no remote == local binding
+ if (localBindings > 0)
+ localBindings--;
+ return localBindings == 0;
+ }
+ size_t match = fedBindings[queueName].erase(origin);
+ if (fedBindings[queueName].empty())
+ fedBindings.erase(queueName);
+ return match != 0;
+ }
+
+ uint32_t count() {
+ return localBindings + fedBindings.size();
+ }
+
+ uint32_t countFedBindings(const std::string& queueName) {
+ // don't use '[]' - it may increase size of fedBindings!
+ std::map<std::string, originSet>::iterator i;
+ if ((i = fedBindings.find(queueName)) != fedBindings.end())
+ return i->second.size();
+ return 0;
+ }
+ };
+
+ qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
+
+public:
+ typedef boost::shared_ptr<Exchange> shared_ptr;
+
+ QPID_BROKER_EXTERN explicit Exchange(const std::string& name, management::Manageable* parent = 0,
+ Broker* broker = 0);
+ QPID_BROKER_EXTERN Exchange(const std::string& _name, bool _durable, const qpid::framing::FieldTable& _args,
+ management::Manageable* parent = 0, Broker* broker = 0);
+ QPID_BROKER_INLINE_EXTERN virtual ~Exchange();
+
+ const std::string& getName() const { return name; }
+ bool isDurable() { return durable; }
+ qpid::framing::FieldTable& getArgs() { return args; }
+
+ Exchange::shared_ptr getAlternate() { return alternate; }
+ void setAlternate(Exchange::shared_ptr _alternate);
+ void incAlternateUsers() { alternateUsers++; }
+ void decAlternateUsers() { alternateUsers--; }
+ bool inUseAsAlternate() { return alternateUsers > 0; }
+
+ virtual std::string getType() const = 0;
+
+ /**
+ * bind() is used for two distinct purposes:
+ *
+ * 1. To create a binding, in the conventional sense
+ *
+ * 2. As a vehicle for any FedOp, currently including federated
+ * binding, federated unbinding, federated reorigin.
+ *
+ */
+
+ virtual bool bind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool unbind(boost::shared_ptr<Queue> queue, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+ virtual bool isBound(boost::shared_ptr<Queue> queue, const std::string* const routingKey, const qpid::framing::FieldTable* const args) = 0;
+ QPID_BROKER_EXTERN virtual void setProperties(const boost::intrusive_ptr<Message>&);
+ virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args) = 0;
+
+ //PersistableExchange:
+ QPID_BROKER_EXTERN void setPersistenceId(uint64_t id) const;
+ uint64_t getPersistenceId() const { return persistenceId; }
+ QPID_BROKER_EXTERN uint32_t encodedSize() const;
+ QPID_BROKER_EXTERN virtual void encode(framing::Buffer& buffer) const;
+
+ static QPID_BROKER_EXTERN Exchange::shared_ptr decode(ExchangeRegistry& exchanges, framing::Buffer& buffer);
+
+ // Manageable entry points
+ QPID_BROKER_EXTERN management::ManagementObject* GetManagementObject(void) const;
+
+ // Federation hooks
+ class DynamicBridge {
+ public:
+ virtual ~DynamicBridge() {}
+ virtual void propagateBinding(const std::string& key, const std::string& tagList, const std::string& op, const std::string& origin, qpid::framing::FieldTable* extra_args=0) = 0;
+ virtual void sendReorigin() = 0;
+ virtual bool containsLocalTag(const std::string& tagList) const = 0;
+ virtual const std::string& getLocalTag() const = 0;
+ };
+
+ void registerDynamicBridge(DynamicBridge* db);
+ void removeDynamicBridge(DynamicBridge* db);
+ virtual bool supportsDynamicBinding() { return false; }
+ Broker* getBroker() const { return broker; }
+ /**
+ * Notify exchange that recovery has completed.
+ */
+ void recoveryComplete(ExchangeRegistry& exchanges);
+
+ bool routeWithAlternate(Deliverable& message);
+
+ void destroy() { destroyed = true; }
+ bool isDestroyed() const { return destroyed; }
+
+protected:
+ qpid::sys::Mutex bridgeLock;
+ std::vector<DynamicBridge*> bridgeVector;
+ Broker* broker;
+ bool destroyed;
+
+ QPID_BROKER_EXTERN virtual void handleHelloRequest();
+ void propagateFedOp(const std::string& routingKey, const std::string& tags,
+ const std::string& op, const std::string& origin,
+ qpid::framing::FieldTable* extra_args=0);
+};
+
+}}
+
+#endif /*!_broker_Exchange.cpp_h*/