summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2011-11-10 15:12:06 +0000
committerGordon Sim <gsim@apache.org>2011-11-10 15:12:06 +0000
commitc24a572078bede3f82300a48eddd12be82576886 (patch)
tree73863b7052e6f830e86f93fb40c7480412fca50f
parentdfa6ce8c81d487c0b3dfe64845051b883c383339 (diff)
downloadqpid-python-c24a572078bede3f82300a48eddd12be82576886.tar.gz
QPID-3603: Initial (very rough) cut of queue and exchange propagation from one node to another
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1200368 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Event.cpp5
-rw-r--r--qpid/cpp/managementgen/qmfgen/templates/Event.h2
-rw-r--r--qpid/cpp/src/CMakeLists.txt1
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp57
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp4
-rw-r--r--qpid/specs/management-schema.xml2
-rwxr-xr-xqpid/tools/src/py/qpid-config6
10 files changed, 79 insertions, 4 deletions
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
index a8fdd0bd92..a6043ffa15 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
+++ b/qpid/cpp/managementgen/qmfgen/templates/Event.cpp
@@ -99,3 +99,8 @@ void Event/*MGEN:Event.NameCap*/::mapEncode(::qpid::types::Variant::Map& map) co
using namespace ::qpid::types;
/*MGEN:Event.ArgMap*/
}
+
+bool Event/*MGEN:Event.NameCap*/::match(const std::string& evt, const std::string& pkg)
+{
+ return eventName == evt && packageName == pkg;
+}
diff --git a/qpid/cpp/managementgen/qmfgen/templates/Event.h b/qpid/cpp/managementgen/qmfgen/templates/Event.h
index 4f912cf220..5fa5f8e576 100644
--- a/qpid/cpp/managementgen/qmfgen/templates/Event.h
+++ b/qpid/cpp/managementgen/qmfgen/templates/Event.h
@@ -51,6 +51,8 @@ class Event/*MGEN:Event.NameCap*/ : public ::qpid::management::ManagementEvent
uint8_t getSeverity() const { return /*MGEN:Event.Severity*/; }
void encode(std::string& buffer) const;
void mapEncode(::qpid::types::Variant::Map& map) const;
+
+ static bool match(const std::string& evt, const std::string& pkg);
};
}/*MGEN:Event.CloseNamespaces*/
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index d4631cb1c3..8d900265bd 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -999,6 +999,7 @@ set (qpidbroker_SOURCES
qpid/broker/LegacyLVQ.cpp
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
+ qpid/broker/NodeClone.cpp
qpid/broker/PriorityQueue.cpp
qpid/broker/Queue.cpp
qpid/broker/QueueCleaner.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 999f3a4b21..8fde735380 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -593,6 +593,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
+ qpid/broker/NodeClone.h \
+ qpid/broker/NodeClone.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 5762d2d31c..b00cdd00dc 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -24,17 +24,28 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
+#include "qpid/broker/NodeClone.h"
#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionState.h"
#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
#include <iostream>
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
using qpid::framing::Buffer;
+using qpid::framing::AMQFrame;
+using qpid::framing::AMQContentBody;
+using qpid::framing::AMQHeaderBody;
+using qpid::framing::MessageProperties;
+using qpid::framing::MessageTransferBody;
+using qpid::types::Variant;
using qpid::management::ManagementAgent;
using std::string;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -105,6 +116,52 @@ void Bridge::create(Connection& c)
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
+ } else if (NodeClone::isNodeCloneDestination(args.i_dest)) {
+ //declare and bind an event queue
+ peer->getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer->getExchange().bind(queueName, "qmf.default.topic", "agent.ind.event.org_apache_qpid_broker.#", FieldTable());
+ //subscribe to the queue
+ peer->getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ for (int i = 0; i < 2; ++i) {//TODO: cleanup this code into reusable utility functions
+ Variant::Map request;
+ request["_what"] = "OBJECT";
+ Variant::Map schema;
+ schema["_class_name"] = (i == 0 ? "queue" : "exchange");
+ schema["_package_name"] = "org.apache.qpid.broker";
+ request["_schema_id"] = schema;
+
+ AMQFrame method((MessageTransferBody(qpid::framing::ProtocolVersion(), "qmf.default.direct", 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId("qmf2");
+ props->getApplicationHeaders().setString("qmf.opcode", "_query_request");
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey("broker");
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+ }
+
} else {
FieldTable queueSettings;
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index ec3cf9d340..3eba98d265 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -904,7 +904,7 @@ std::pair<boost::shared_ptr<Queue>, bool> Broker::createQueue(
//event instead?
managementAgent->raiseEvent(
_qmf::EventQueueDeclare(connectionId, userId, name,
- durable, owner, autodelete,
+ durable, owner, autodelete, alternateExchange,
ManagementAgent::toMap(arguments),
"created"));
}
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index e4d374567f..609c4ecb87 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -25,6 +25,7 @@
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Message.h"
+#include "qpid/broker/NodeClone.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueReplicator.h"
#include "qpid/broker/SessionContext.h"
@@ -694,6 +695,7 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
+ if (!cacheExchange) cacheExchange = NodeClone::create(exchangeName, getSession().getBroker());
if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index 63c4b660b2..54a5231758 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -321,8 +321,8 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
agent->raiseEvent(_qmf::EventQueueDeclare(getConnection().getUrl(), getConnection().getUserId(),
- name, durable, exclusive, autoDelete, ManagementAgent::toMap(arguments),
- "existing"));
+ name, durable, exclusive, autoDelete, alternateExchange, ManagementAgent::toMap(arguments),
+ "existing"));
}
}
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index 9e2a644c2a..5a18168ae1 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -425,7 +425,7 @@
<event name="clientDisconnect" sev="inform" args="rhost, user"/>
<event name="brokerLinkUp" sev="inform" args="rhost"/>
<event name="brokerLinkDown" sev="warn" args="rhost"/>
- <event name="queueDeclare" sev="inform" args="rhost, user, qName, durable, excl, autoDel, args, disp"/>
+ <event name="queueDeclare" sev="inform" args="rhost, user, qName, durable, excl, autoDel, altEx, args, disp"/>
<event name="queueDelete" sev="inform" args="rhost, user, qName"/>
<event name="exchangeDeclare" sev="inform" args="rhost, user, exName, exType, altEx, durable, autoDel, args, disp"/>
<event name="exchangeDelete" sev="inform" args="rhost, user, exName"/>
diff --git a/qpid/tools/src/py/qpid-config b/qpid/tools/src/py/qpid-config
index bb49b9d7c9..d0057d66f5 100755
--- a/qpid/tools/src/py/qpid-config
+++ b/qpid/tools/src/py/qpid-config
@@ -492,6 +492,12 @@ class BrokerManager:
etype = args[0]
ename = args[1]
declArgs = {}
+ for a in config._extra_arguments:
+ r = a.split("=", 1)
+ if len(r) == 2: value = r[1]
+ else: value = None
+ declArgs[r[0]] = value
+
if config._msgSequence:
declArgs[MSG_SEQUENCE] = 1
if config._ive: