diff options
author | Gordon Sim <gsim@apache.org> | 2011-11-10 15:12:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2011-11-10 15:12:06 +0000 |
commit | c24a572078bede3f82300a48eddd12be82576886 (patch) | |
tree | 73863b7052e6f830e86f93fb40c7480412fca50f | |
parent | dfa6ce8c81d487c0b3dfe64845051b883c383339 (diff) | |
download | qpid-python-c24a572078bede3f82300a48eddd12be82576886.tar.gz |
QPID-3603: Initial (very rough) cut of queue and exchange propagation from one node to another
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1200368 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Event.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/managementgen/qmfgen/templates/Event.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Bridge.cpp | 57 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionAdapter.cpp | 4 | ||||
-rw-r--r-- | qpid/specs/management-schema.xml | 2 | ||||
-rwxr-xr-x | qpid/tools/src/py/qpid-config | 6 |
10 files changed, 79 insertions, 4 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp index a8fdd0bd92..a6043ffa15 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp @@ -99,3 +99,8 @@ void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::types::Variant::Map& map) co using namespace ::qpid::types; /*MGEN:Event.ArgMap*/ } + +bool Event/*MGEN:Event.NameCap*/::match(const std::string& evt, const std::string& pkg) +{ + return eventName == evt && packageName == pkg; +} diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.h b/qpid/cpp/managementgen/qmfgen/templates/Event.h index 4f912cf220..5fa5f8e576 100644 --- a/qpid/cpp/managementgen/qmfgen/templates/Event.h +++ b/qpid/cpp/managementgen/qmfgen/templates/Event.h @@ -51,6 +51,8 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; } void encode(std::string& buffer) const; void mapEncode(::qpid::types::Variant::Map& map) const; + + static bool match(const std::string& evt, const std::string& pkg); }; }/*MGEN:Event.CloseNamespaces*/ diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index d4631cb1c3..8d900265bd 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -999,6 +999,7 @@ set (qpidbroker_SOURCES qpid/broker/LegacyLVQ.cpp qpid/broker/MessageDeque.cpp qpid/broker/MessageMap.cpp + qpid/broker/NodeClone.cpp qpid/broker/PriorityQueue.cpp qpid/broker/Queue.cpp qpid/broker/QueueCleaner.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 999f3a4b21..8fde735380 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -593,6 +593,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ + qpid/broker/NodeClone.h \ + qpid/broker/NodeClone.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 5762d2d31c..b00cdd00dc 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -24,17 +24,28 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" +#include "qpid/broker/NodeClone.h" #include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionState.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/framing/Uuid.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include <iostream> using qpid::framing::FieldTable; using qpid::framing::Uuid; using qpid::framing::Buffer; +using qpid::framing::AMQFrame; +using qpid::framing::AMQContentBody; +using qpid::framing::AMQHeaderBody; +using qpid::framing::MessageProperties; +using qpid::framing::MessageTransferBody; +using qpid::types::Variant; using qpid::management::ManagementAgent; using std::string; namespace _qmf = qmf::org::apache::qpid::broker; @@ -105,6 +116,52 @@ void Bridge::create(Connection& c) peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest); + } else if (NodeClone::isNodeCloneDestination(args.i_dest)) { + //declare and bind an event queue + peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable()); + //subscribe to the queue + peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions + Variant::Map request; + request["_what"] = "OBJECT"; + Variant::Map schema; + schema["_class_name"] = (i == 0 ? "queue" : "exchange"); + schema["_package_name"] = "org.apache.qpid.broker"; + request["_schema_id"] = schema; + + AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId("qmf2"); + props->getApplicationHeaders().setString("qmf.opcode", "_query_request"); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker"); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); + } + } else { FieldTable queueSettings; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index ec3cf9d340..3eba98d265 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -904,7 +904,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue( //event instead? managementAgent->raiseEvent( _qmf::EventQueueDeclare(connectionId, userId, name, - durable, owner, autodelete, + durable, owner, autodelete, alternateExchange, ManagementAgent::toMap(arguments), "created")); } diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index e4d374567f..609c4ecb87 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -25,6 +25,7 @@ #include "qpid/broker/DtxAck.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" +#include "qpid/broker/NodeClone.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueReplicator.h" #include "qpid/broker/SessionContext.h" @@ -694,6 +695,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); + if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker()); if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->setProperties(msg); diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index 63c4b660b2..54a5231758 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -321,8 +321,8 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(), - name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments), - "existing")); + name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments), + "existing")); } } diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index 9e2a644c2a..5a18168ae1 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -425,7 +425,7 @@ <event name="clientDisconnect" sev="inform" args="rhost, user"/> <event name="brokerLinkUp" sev="inform" args="rhost"/> <event name="brokerLinkDown" sev="warn" args="rhost"/> - <event name="queueDeclare" sev="inform" args="rhost, user, qName, durable, excl, autoDel, args, disp"/> + <event name="queueDeclare" sev="inform" args="rhost, user, qName, durable, excl, autoDel, altEx, args, disp"/> <event name="queueDelete" sev="inform" args="rhost, user, qName"/> <event name="exchangeDeclare" sev="inform" args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/> <event name="exchangeDelete" sev="inform" args="rhost, user, exName"/> diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config index bb49b9d7c9..d0057d66f5 100755 --- a/qpid/tools/src/py/qpid-config +++ b/qpid/tools/src/py/qpid-config @@ -492,6 +492,12 @@ class BrokerManager: etype = args[0] ename = args[1] declArgs = {} + for a in config._extra_arguments: + r = a.split("=", 1) + if len(r) == 2: value = r[1] + else: value = None + declArgs[r[0]] = value + if config._msgSequence: declArgs[MSG_SEQUENCE] = 1 if config._ive: |