summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-03-14 19:36:26 +0000
committerGordon Sim <gsim@apache.org>2013-03-14 19:36:26 +0000
commit3c22a7e7a58c883cdb0575afb30c6658344d4e2b (patch)
tree0570d45f59f36063f9c44073d807e9f5ec4bfe1d
parent125e49d4df65142a89b69e7d367b34f01ea354a1 (diff)
downloadqpid-python-3c22a7e7a58c883cdb0575afb30c6658344d4e2b.tar.gz
QPID-4586: add ability to have qpidd establish outgoing connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1456621 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/CMakeLists.txt2
-rw-r--r--qpid/cpp/src/Makefile.am16
-rw-r--r--qpid/cpp/src/amqp.cmake14
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h9
-rw-r--r--qpid/cpp/src/qpid/broker/LossyQueue.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/ObjectFactory.cpp60
-rw-r--r--qpid/cpp/src/qpid/broker/ObjectFactory.h65
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp19
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp47
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h14
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.cpp279
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.h78
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp99
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h68
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp118
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.h63
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp151
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnects.h62
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp56
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h50
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h9
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp50
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Outgoing.h36
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.cpp278
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Relay.h128
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Sasl.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Sasl.h3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp178
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/SaslClient.h80
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp312
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h36
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp20
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt3
-rw-r--r--qpid/cpp/src/tests/Makefile.am4
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py157
-rwxr-xr-xqpid/cpp/src/tests/run_interlink_tests26
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in2
-rw-r--r--qpid/specs/management-schema.xml37
-rw-r--r--qpid/tools/src/py/qpidtoollibs/broker.py8
46 files changed, 2455 insertions, 249 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 5dc473d04a..ef68d79fb2 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1160,6 +1160,8 @@ set (qpidbroker_SOURCES
qpid/broker/Fairshare.cpp
qpid/broker/MessageDeque.cpp
qpid/broker/MessageMap.cpp
+ qpid/broker/ObjectFactory.h
+ qpid/broker/ObjectFactory.cpp
qpid/broker/PriorityQueue.cpp
qpid/broker/Protocol.cpp
qpid/broker/Queue.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 1b2d0fcbb4..d974aad3a1 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -659,6 +659,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/NameGenerator.h \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
+ qpid/broker/ObjectFactory.h \
+ qpid/broker/ObjectFactory.cpp \
qpid/broker/Observers.h \
qpid/broker/OwnershipToken.h \
qpid/broker/Persistable.h \
@@ -783,14 +785,24 @@ amqp_la_SOURCES = \
qpid/broker/amqp/Connection.cpp \
qpid/broker/amqp/DataReader.h \
qpid/broker/amqp/DataReader.cpp \
+ qpid/broker/amqp/Domain.h \
+ qpid/broker/amqp/Domain.cpp \
qpid/broker/amqp/Filter.h \
qpid/broker/amqp/Filter.cpp \
qpid/broker/amqp/Header.h \
qpid/broker/amqp/Header.cpp \
+ qpid/broker/amqp/Incoming.h \
+ qpid/broker/amqp/Incoming.cpp \
+ qpid/broker/amqp/Interconnect.h \
+ qpid/broker/amqp/Interconnect.cpp \
+ qpid/broker/amqp/Interconnects.h \
+ qpid/broker/amqp/Interconnects.cpp \
qpid/broker/amqp/ManagedConnection.h \
qpid/broker/amqp/ManagedConnection.cpp \
qpid/broker/amqp/ManagedSession.h \
qpid/broker/amqp/ManagedSession.cpp \
+ qpid/broker/amqp/ManagedIncomingLink.h \
+ qpid/broker/amqp/ManagedIncomingLink.cpp \
qpid/broker/amqp/ManagedOutgoingLink.h \
qpid/broker/amqp/ManagedOutgoingLink.cpp \
qpid/broker/amqp/Message.h \
@@ -800,8 +812,12 @@ amqp_la_SOURCES = \
qpid/broker/amqp/Outgoing.h \
qpid/broker/amqp/Outgoing.cpp \
qpid/broker/amqp/ProtocolPlugin.cpp \
+ qpid/broker/amqp/Relay.h \
+ qpid/broker/amqp/Relay.cpp \
qpid/broker/amqp/Sasl.h \
qpid/broker/amqp/Sasl.cpp \
+ qpid/broker/amqp/SaslClient.h \
+ qpid/broker/amqp/SaslClient.cpp \
qpid/broker/amqp/Session.h \
qpid/broker/amqp/Session.cpp \
qpid/broker/amqp/Translation.h \
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index 718e6fe342..e8bb3b9bca 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -52,14 +52,24 @@ if (BUILD_AMQP)
qpid/broker/amqp/Connection.cpp
qpid/broker/amqp/DataReader.h
qpid/broker/amqp/DataReader.cpp
+ qpid/broker/amqp/Domain.h
+ qpid/broker/amqp/Domain.cpp
qpid/broker/amqp/Filter.h
qpid/broker/amqp/Filter.cpp
qpid/broker/amqp/Header.h
qpid/broker/amqp/Header.cpp
+ qpid/broker/amqp/Incoming.h
+ qpid/broker/amqp/Incoming.cpp
+ qpid/broker/amqp/Interconnect.h
+ qpid/broker/amqp/Interconnect.cpp
+ qpid/broker/amqp/Interconnects.h
+ qpid/broker/amqp/Interconnects.cpp
qpid/broker/amqp/ManagedConnection.h
qpid/broker/amqp/ManagedConnection.cpp
qpid/broker/amqp/ManagedSession.h
qpid/broker/amqp/ManagedSession.cpp
+ qpid/broker/amqp/ManagedIncomingLink.h
+ qpid/broker/amqp/ManagedIncomingLink.cpp
qpid/broker/amqp/ManagedOutgoingLink.h
qpid/broker/amqp/ManagedOutgoingLink.cpp
qpid/broker/amqp/Message.h
@@ -69,8 +79,12 @@ if (BUILD_AMQP)
qpid/broker/amqp/Outgoing.h
qpid/broker/amqp/Outgoing.cpp
qpid/broker/amqp/ProtocolPlugin.cpp
+ qpid/broker/amqp/Relay.h
+ qpid/broker/amqp/Relay.cpp
qpid/broker/amqp/Sasl.h
qpid/broker/amqp/Sasl.cpp
+ qpid/broker/amqp/SaslClient.h
+ qpid/broker/amqp/SaslClient.cpp
qpid/broker/amqp/Session.h
qpid/broker/amqp/Session.cpp
qpid/broker/amqp/Translation.h
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index a9887f9c35..1f70cac2cc 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -707,7 +707,9 @@ void Broker::createObject(const std::string& type, const std::string& name,
}
//TODO: implement 'strict' option (check there are no unrecognised properties)
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
- if (type == TYPE_QUEUE) {
+ if (objectFactory.createObject(*this, type, name, properties, userId, connectionId)) {
+ QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ") handled by registered factory");
+ } else if (type == TYPE_QUEUE) {
bool durable(false);
bool autodelete(false);
std::string alternateExchange;
@@ -1064,8 +1066,16 @@ void Broker::connect(
const std::string& host, const std::string& port, const std::string& transport,
boost::function2<void, int, std::string> failed)
{
+ connect(name, host, port, transport, factory.get(), failed);
+}
+
+void Broker::connect(
+ const std::string& name,
+ const std::string& host, const std::string& port, const std::string& transport,
+ sys::ConnectionCodec::Factory* f, boost::function2<void, int, std::string> failed)
+{
boost::shared_ptr<TransportConnector> tcf = getTransportInfo(transport).connectorFactory;
- if (tcf) tcf->connect(poller, name, host, port, factory.get(), failed);
+ if (tcf) tcf->connect(poller, name, host, port, f, failed);
else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport));
}
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index cfd96c9913..6a46095af4 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -28,6 +28,7 @@
#include "qpid/Plugin.h"
#include "qpid/broker/DtxManager.h"
#include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/ObjectFactory.h"
#include "qpid/broker/Protocol.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/LinkRegistry.h"
@@ -193,6 +194,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
ConsumerFactories consumerFactories;
ProtocolRegistry protocolRegistry;
+ ObjectFactoryRegistry objectFactory;
mutable sys::Mutex linkClientPropertiesLock;
framing::FieldTable linkClientProperties;
@@ -232,6 +234,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
DataDir& getDataDir() { return dataDir; }
Options& getOptions() { return config; }
ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; }
+ ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; }
void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; }
boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; }
@@ -258,6 +261,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
const std::string& host, const std::string& port,
const std::string& transport,
boost::function2<void, int, std::string> failed);
+ QPID_BROKER_EXTERN void connect(const std::string& name,
+ const std::string& host, const std::string& port,
+ const std::string& transport,
+ sys::ConnectionCodec::Factory*,
+ boost::function2<void, int, std::string> failed);
+
/** Move messages from one queue to another.
A zero quantity means to move all messages
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
index ee2c3ca794..be19185c3a 100644
--- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
@@ -38,7 +38,7 @@ LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStor
bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
{
- if (increment.getSize() > settings.maxDepth.getSize()) {
+ if (settings.maxDepth.hasSize() && increment.getSize() > settings.maxDepth.getSize()) {
if (mgmtObject) {
mgmtObject->inc_discardsOverflow();
if (brokerMgmtObject)
diff --git a/qpid/cpp/src/qpid/broker/ObjectFactory.cpp b/qpid/cpp/src/qpid/broker/ObjectFactory.cpp
new file mode 100644
index 0000000000..5bd516c351
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ObjectFactory.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ObjectFactory.h"
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+bool ObjectFactoryRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId)
+{
+ for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)
+ {
+ if ((*i)->createObject(broker, type, name, properties, userId, connectionId)) return true;
+ }
+ return false;
+}
+
+bool ObjectFactoryRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId)
+{
+ for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)
+ {
+ if ((*i)->deleteObject(broker, type, name, properties, userId, connectionId)) return true;
+ }
+ return false;
+}
+
+ObjectFactoryRegistry::~ObjectFactoryRegistry()
+{
+ for (Factories::iterator i = factories.begin(); i != factories.end(); ++i)
+ {
+ delete *i;
+ }
+ factories.clear();
+}
+void ObjectFactoryRegistry::add(ObjectFactory* factory)
+{
+ factories.push_back(factory);
+}
+
+}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/ObjectFactory.h b/qpid/cpp/src/qpid/broker/ObjectFactory.h
new file mode 100644
index 0000000000..7a48be3caa
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/ObjectFactory.h
@@ -0,0 +1,65 @@
+#ifndef QPID_BROKER_OBJECTFACTORY_H
+#define QPID_BROKER_OBJECTFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/types/Variant.h"
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class Broker;
+
+/**
+ * An extension point through which plugins can register functionality
+ * for creating (and deleting) particular types of objects via the
+ * Broker::createObject() method (or deleteObject()), invoked via management.
+ */
+class ObjectFactory
+{
+ public:
+ virtual bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId) = 0;
+ virtual bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId) = 0;
+ virtual ~ObjectFactory() {}
+ private:
+};
+
+class ObjectFactoryRegistry : public ObjectFactory
+{
+ public:
+ bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId);
+ bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId);
+
+ ~ObjectFactoryRegistry();
+ void add(ObjectFactory*);
+ private:
+ typedef std::vector<ObjectFactory*> Factories;
+ Factories factories;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_OBJECTFACTORY_H*/
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 7bc2c94d1c..6b4f6b3025 100644
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -44,13 +44,7 @@ sys::ConnectionCodec*
SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
if (v == ProtocolVersion(0, 10)) {
- SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new qpid::amqp_0_10::Connection(out, id, false));
- ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false));
- i->setSecureConnection(sc.get());
- c->setInputHandler(InputPtr(i.release()));
- sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
- return sc.release();
+ return create_0_10(out, id, external, false);
} else {
return broker.getProtocolRegistry().create(v, out, id, external);
}
@@ -61,9 +55,16 @@ sys::ConnectionCodec*
SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id,
const SecuritySettings& external) {
// used to create connections from one broker to another
+ return create_0_10(out, id, external, true);
+}
+
+sys::ConnectionCodec*
+SecureConnectionFactory::create_0_10(sys::OutputControl& out, const std::string& id,
+ const SecuritySettings& external, bool brokerActsAsClient)
+{
SecureConnectionPtr sc(new SecureConnection());
- CodecPtr c(new qpid::amqp_0_10::Connection(out, id, true));
- ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true ));
+ CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient));
+ ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, brokerActsAsClient));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
index 8a04dfcb15..e64776d5ec 100644
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h
@@ -30,7 +30,7 @@ class Broker;
class SecureConnectionFactory : public sys::ConnectionCodec::Factory
{
public:
- SecureConnectionFactory(Broker& b);
+ SecureConnectionFactory(Broker& b);
sys::ConnectionCodec*
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id,
@@ -41,6 +41,9 @@ class SecureConnectionFactory : public sys::ConnectionCodec::Factory
private:
Broker& broker;
+
+ sys::ConnectionCodec*
+ create_0_10(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&, bool brokerActsAsClient);
};
}}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 1f135cf931..a83034eb6e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -37,14 +37,15 @@ namespace qpid {
namespace broker {
namespace amqp {
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, bool saslInUse)
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse)
: ManagedConnection(b, i),
connection(pn_connection()),
transport(pn_transport()),
- out(o), id(i), broker(b), haveOutput(true)
+ out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_)
{
if (pn_transport_bind(transport, connection)) {
//error
+ QPID_LOG(error, "Failed to bind transport to connection: " << getError());
}
out.activateOutput();
bool enableTrace(false);
@@ -74,13 +75,18 @@ Connection::~Connection()
pn_connection_free(connection);
}
+Interconnects& Connection::getInterconnects()
+{
+ return interconnects;
+}
pn_transport_t* Connection::getTransport()
{
return transport;
}
size_t Connection::decode(const char* buffer, size_t size)
{
- QPID_LOG(trace, id << " decode(" << size << ")")
+ QPID_LOG(trace, id << " decode(" << size << ")");
+ if (size == 0) return 0;
//TODO: Fix pn_engine_input() to take const buffer
ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
@@ -88,8 +94,13 @@ size_t Connection::decode(const char* buffer, size_t size)
//it processed, but can assume none need to be reprocessed so
//consider them all read:
if (n == PN_EOS) n = size;
- QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
- process();
+ QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
+ try {
+ process();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ close();
+ }
pn_transport_tick(transport, 0);
if (!haveOutput) {
haveOutput = true;
@@ -123,10 +134,15 @@ size_t Connection::encode(char* buffer, size_t size)
}
bool Connection::canEncode()
{
- for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
- if (i->second->dispatch()) haveOutput = true;
+ try {
+ for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) {
+ if (i->second->dispatch()) haveOutput = true;
+ }
+ process();
+ } catch (const std::exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ close();
}
- process();
//TODO: proper handling of time in and out of tick
pn_transport_tick(transport, 0);
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
@@ -134,11 +150,16 @@ bool Connection::canEncode()
}
void Connection::closed()
{
- //TODO: tear down sessions and associated links
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
i->second->close();
}
}
+void Connection::close()
+{
+ closed();
+ QPID_LOG_CAT(debug, model, id << " connection closed");
+ pn_connection_close(connection);
+}
bool Connection::isClosed() const
{
return pn_connection_state(connection) & PN_REMOTE_CLOSED;
@@ -191,7 +212,7 @@ void Connection::process()
if (pn_link_is_receiver(link)) {
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
- i->second->incoming(link, delivery);
+ i->second->readable(link, delivery);
} else {
pn_delivery_update(delivery, PN_REJECTED);
}
@@ -199,7 +220,7 @@ void Connection::process()
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link));
- i->second->outgoing(link, delivery);
+ i->second->writable(link, delivery);
} else {
QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link);
}
@@ -238,9 +259,9 @@ std::string Connection::getError()
{
std::stringstream text;
pn_error_t* cerror = pn_connection_error(connection);
- if (cerror) text << "connection error " << pn_error_text(cerror);
+ if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]";
pn_error_t* terror = pn_transport_error(transport);
- if (terror) text << "transport error " << pn_error_text(terror);
+ if (terror) text << "transport error " << pn_error_text(terror) << " [" << terror << "]";
return text.str();
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 8af209af7a..28cf86f123 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -37,6 +37,7 @@ class Broker;
namespace amqp {
+class Interconnects;
class Session;
/**
* AMQP 1.0 protocol support for broker
@@ -44,10 +45,10 @@ class Session;
class Connection : public sys::ConnectionCodec, public ManagedConnection
{
public:
- Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse);
- ~Connection();
+ Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse);
+ virtual ~Connection();
size_t decode(const char* buffer, size_t size);
- size_t encode(char* buffer, size_t size);
+ virtual size_t encode(char* buffer, size_t size);
bool canEncode();
void closed();
@@ -55,7 +56,8 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection
framing::ProtocolVersion getVersion() const;
pn_transport_t* getTransport();
- private:
+ Interconnects& getInterconnects();
+ protected:
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
pn_transport_t* transport;
@@ -64,9 +66,11 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection
qpid::broker::Broker& broker;
bool haveOutput;
Sessions sessions;
+ Interconnects& interconnects;
- void process();
+ virtual void process();
std::string getError();
+ void close();
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
new file mode 100644
index 0000000000..4b13bfc871
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
@@ -0,0 +1,279 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Domain.h"
+#include "Interconnect.h"
+#include "Interconnects.h"
+#include "SaslClient.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Exception.h"
+#include "qpid/SaslFactory.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/log/Statement.h"
+#include "qpid/management/ManagementAgent.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+namespace {
+const std::string NONE("NONE");
+const std::string SOURCE("source");
+const std::string TARGET("target");
+const std::string URL("url");
+const std::string USERNAME("username");
+const std::string PASSWORD("password");
+const std::string SASL_MECHANISMS("sasl_mechanisms");
+const std::string SASL_SERVICE("sasl_service");
+const std::string MIN_SSF("min_ssf");
+const std::string MAX_SSF("max_ssf");
+class Wrapper : public qpid::sys::ConnectionCodec
+{
+ public:
+ Wrapper(boost::shared_ptr<Interconnect> c) : connection(c) {}
+ ~Wrapper()
+ {
+ QPID_LOG(debug, "Wrapper for non-SASL based interconnect has been deleted");
+ connection->transportDeleted();
+ }
+
+ std::size_t decode(const char* buffer, std::size_t size)
+ {
+ return connection->decode(buffer, size);
+ }
+ std::size_t encode(char* buffer, std::size_t size)
+ {
+ return connection->encode(buffer, size);
+ }
+ bool canEncode()
+ {
+ return connection->canEncode();
+ }
+ void closed()
+ {
+ connection->closed();
+ }
+ bool isClosed() const
+ {
+ QPID_LOG(debug, "Wrapper for non_SASL based interconnect " << (connection->isClosed() ? " IS " : " IS NOT ") << " closed");
+ return connection->isClosed();
+ }
+ qpid::framing::ProtocolVersion getVersion() const
+ {
+ return connection->getVersion();
+ }
+ private:
+ boost::shared_ptr<Interconnect> connection;
+};
+
+bool get(std::string& result, const std::string& key, const qpid::types::Variant::Map& map)
+{
+ qpid::types::Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ result = i->second.asString();
+ return true;
+ } else {
+ return false;
+ }
+}
+bool get(int& result, const std::string& key, const qpid::types::Variant::Map& map)
+{
+ qpid::types::Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ result = i->second;
+ return true;
+ } else {
+ return false;
+ }
+}
+bool get(qpid::Url& url, const std::string& key, const qpid::types::Variant::Map& map)
+{
+ qpid::types::Variant::Map::const_iterator i = map.find(key);
+ if (i != map.end()) {
+ url = qpid::Url(i->second.asString());
+ return true;
+ } else {
+ return false;
+ }
+}
+}
+
+class InterconnectFactory : public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory>
+{
+ public:
+ InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, Broker&, Interconnects&);
+ InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target,
+ Domain&, Broker&, Interconnects&, boost::shared_ptr<Relay>);
+ qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
+ bool connect();
+ void failed(int, std::string);
+ private:
+ bool incoming;
+ std::string name;
+ std::string source;
+ std::string target;
+ qpid::Url url;
+ qpid::Url::iterator next;
+ std::string hostname;
+ Domain& domain;
+ Broker& broker;
+ Interconnects& registry;
+ qpid::Address address;
+ boost::shared_ptr<Relay> relay;
+};
+
+InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, Broker& b, Interconnects& r)
+ : incoming(i), name(n), url(d.getUrl()), domain(d), broker(b), registry(r)
+{
+ get(source, SOURCE, properties);
+ get(target, TARGET, properties);
+ next = url.begin();
+}
+
+InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std::string& source_, const std::string& target_,
+ Domain& d, Broker& b, Interconnects& r, boost::shared_ptr<Relay> relay_)
+ : incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), broker(b), registry(r), relay(relay_)
+{
+ next = url.begin();
+}
+
+qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&)
+{
+ throw qpid::Exception("Not implemented!");
+}
+qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t)
+{
+ bool useSasl = domain.getMechanisms() != NONE;
+ boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, broker, true, incoming, name, source, target, domain, registry));
+ if (!relay) registry.add(name, connection);
+ else connection->setRelay(relay);
+
+ std::auto_ptr<qpid::sys::ConnectionCodec> codec;
+ if (useSasl) {
+ QPID_LOG(info, "Using AMQP 1.0 (with SASL layer) on connect");
+ codec = std::auto_ptr<qpid::sys::ConnectionCodec>(new qpid::broker::amqp::SaslClient(out, id, connection, domain.sasl(hostname), hostname, domain.getMechanisms(), t));
+ } else {
+ QPID_LOG(info, "Using AMQP 1.0 (no SASL layer) on connect");
+ codec = std::auto_ptr<qpid::sys::ConnectionCodec>(new Wrapper(connection));
+ }
+ domain.removePending(shared_from_this());//(TODO: add support for retry on connection failure)
+ return codec.release();
+}
+
+bool InterconnectFactory::connect()
+{
+ if (next == url.end()) return false;
+ address = *next;
+ next++;
+ hostname = address.host;
+ QPID_LOG (info, "Inter-broker connection initiated (" << address << ")");
+ broker.connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
+ return true;
+}
+
+void InterconnectFactory::failed(int, std::string text)
+{
+ QPID_LOG (info, "Inter-broker connection failed (" << address << "): " << text);
+ if (!connect()) {
+ domain.removePending(shared_from_this());//give up (TODO: add support for periodic retry)
+ }
+}
+
+Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties, Broker& b)
+ : name(n), durable(false), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent())
+{
+ if (!get(url, URL, properties)) {
+ QPID_LOG(error, "No URL specified for domain " << name << "!");
+ throw qpid::Exception("A url is required for a domain!");
+ } else {
+ QPID_LOG(notice, "Created domain " << name << " with url " << url << " from " << properties);
+ }
+ //TODO: durable
+ get(username, USERNAME, properties);
+ get(password, PASSWORD, properties);
+ get(mechanisms, SASL_MECHANISMS, properties);
+ get(service, SASL_SERVICE, properties);
+ get(minSsf, MIN_SSF, properties);
+ get(maxSsf, MAX_SSF, properties);
+ if (agent != 0) {
+ domain = _qmf::Domain::shared_ptr(new _qmf::Domain(agent, this, name, durable));
+ domain->set_url(url.str());
+ domain->set_username(username);
+ domain->set_password(password);
+ domain->set_mechanisms(mechanisms);
+ agent->addObject(domain);
+ }
+}
+
+boost::shared_ptr<qpid::management::ManagementObject> Domain::GetManagementObject() const
+{
+ return domain;
+}
+
+const std::string& Domain::getMechanisms() const
+{
+ return mechanisms;
+}
+
+qpid::Url Domain::getUrl() const
+{
+ return url;
+}
+
+std::auto_ptr<qpid::Sasl> Domain::sasl(const std::string& hostname)
+{
+ return qpid::SaslFactory::getInstance().create(username, password, service, hostname, minSsf, maxSsf, false);
+}
+
+void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects& registry)
+{
+ boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, broker, registry));
+ factory->connect();
+ addPending(factory);
+}
+
+void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects& registry, boost::shared_ptr<Relay> relay)
+{
+ boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, broker, registry, relay));
+ factory->connect();
+ addPending(factory);
+}
+
+void Domain::addPending(boost::shared_ptr<InterconnectFactory> f)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ pending.insert(f);
+}
+
+void Domain::removePending(boost::shared_ptr<InterconnectFactory> f)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ pending.erase(f);
+}
+
+
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.h b/qpid/cpp/src/qpid/broker/amqp/Domain.h
new file mode 100644
index 0000000000..ccbee6341e
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.h
@@ -0,0 +1,78 @@
+#ifndef QPID_BROKER_AMQP_DOMAIN_H
+#define QPID_BROKER_AMQP_DOMAIN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/types/Variant.h"
+#include "qpid/Url.h"
+#include "qpid/Version.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/sys/Mutex.h"
+#include "qmf/org/apache/qpid/broker/Domain.h"
+#include <boost/shared_ptr.hpp>
+#include <memory>
+#include <set>
+
+namespace qpid {
+class Sasl;
+namespace management {
+class ManagementAgent;
+class ManagementObject;
+}
+namespace broker {
+class Broker;
+namespace amqp {
+class InterconnectFactory;
+class Interconnects;
+class Relay;
+
+class Domain : public qpid::management::Manageable
+{
+ public:
+ Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&);
+ void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&);
+ void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>);
+ std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname);
+ const std::string& getMechanisms() const;
+ qpid::Url getUrl() const;
+ void addPending(boost::shared_ptr<InterconnectFactory>);
+ void removePending(boost::shared_ptr<InterconnectFactory>);
+ boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const;
+ private:
+ std::string name;
+ bool durable;
+ Broker& broker;
+ qpid::Url url;
+ std::string username;
+ std::string password;
+ std::string mechanisms;
+ std::string service;
+ int minSsf;
+ int maxSsf;
+ boost::shared_ptr<qmf::org::apache::qpid::broker::Domain> domain;
+ qpid::management::ManagementAgent* agent;
+ std::set< boost::shared_ptr<InterconnectFactory> > pending;
+ qpid::sys::Mutex lock;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_DOMAIN_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
new file mode 100644
index 0000000000..9616b267bd
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Incoming.h"
+#include "Message.h"
+#include "Session.h"
+#include "qpid/broker/AsyncCompletion.h"
+#include "qpid/broker/Message.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+Incoming::Incoming(pn_link_t* l, Broker& broker, Session& parent, const std::string& target, const std::string& name)
+ : ManagedIncomingLink(broker, parent, target, name), credit(100), window(0), link(l), session(parent) {}
+
+
+Incoming::~Incoming() {}
+bool Incoming::doWork()
+{
+ uint32_t c = getCredit();
+ bool issue = window < c;
+ if (issue) {
+ pn_link_flow(link, c - window);
+ window = c;
+ }
+ return issue;
+}
+bool Incoming::haveWork()
+{
+ return window <= (getCredit()/2);
+}
+
+uint32_t Incoming::getCredit()
+{
+ return credit;//TODO: proper flow control
+}
+
+void Incoming::detached()
+{
+}
+
+void Incoming::wakeup()
+{
+ session.wakeup();
+}
+namespace {
+ class Transfer : public qpid::broker::AsyncCompletion::Callback
+ {
+ public:
+ Transfer(pn_delivery_t* d, boost::shared_ptr<Session> s) : delivery(d), session(s) {}
+ void completed(bool sync) { session->accepted(delivery, sync); }
+ boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone()
+ {
+ boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session));
+ return copy;
+ }
+ private:
+ pn_delivery_t* delivery;
+ boost::shared_ptr<Session> session;
+ };
+}
+
+DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& target, const std::string& name)
+ : Incoming(link, broker, parent, target, name), session(parent.shared_from_this()) {}
+DecodingIncoming::~DecodingIncoming() {}
+
+void DecodingIncoming::readable(pn_delivery_t* delivery)
+{
+ boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
+ /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
+ received->scan();
+ pn_link_advance(link);
+
+ qpid::broker::Message message(received, received);
+
+ handle(message);
+ --window;
+ received->begin();
+ Transfer t(delivery, session);
+ received->end(t);
+}
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
new file mode 100644
index 0000000000..cab023c7c1
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -0,0 +1,68 @@
+#ifndef QPID_BROKER_AMQP_INCOMING_H
+#define QPID_BROKER_AMQP_INCOMING_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Message.h"
+#include "ManagedIncomingLink.h"
+extern "C" {
+#include <proton/engine.h>
+}
+#include <boost/intrusive_ptr.hpp>
+namespace qpid {
+namespace broker {
+class Broker;
+class Message;
+namespace amqp {
+class Session;
+
+class Incoming : public ManagedIncomingLink
+{
+ public:
+ Incoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name);
+ virtual ~Incoming();
+ virtual bool doWork();//do anything that requires output
+ virtual bool haveWork();//called when handling input to see whether any output work is needed
+ virtual void detached();
+ virtual void readable(pn_delivery_t* delivery) = 0;
+ void wakeup();
+ protected:
+ const uint32_t credit;
+ uint32_t window;
+ pn_link_t* link;
+ Session& session;
+ virtual uint32_t getCredit();
+};
+
+class DecodingIncoming : public Incoming
+{
+ public:
+ DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name);
+ virtual ~DecodingIncoming();
+ void readable(pn_delivery_t* delivery);
+ virtual void handle(qpid::broker::Message&) = 0;
+ private:
+ boost::shared_ptr<Session> session;
+};
+
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_INCOMING_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
new file mode 100644
index 0000000000..082715b1b2
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Interconnect.h"
+#include "Interconnects.h"
+#include "Connection.h"
+#include "SaslClient.h"
+#include "Session.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Exception.h"
+#include "qpid/SaslFactory.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+extern "C" {
+#include <proton/engine.h>
+#include <proton/error.h>
+}
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
+ bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r)
+ : Connection(out, id, broker, r, saslInUse), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(false),
+ closeRequested(false), isTransportDeleted(false)
+{}
+
+Interconnect::~Interconnect()
+{
+ QPID_LOG(notice, "Interconnect deleted");
+}
+
+namespace {
+const pn_state_t UNINIT = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT;
+const size_t PROTOCOL_HEADER_LENGTH(8);
+}
+
+size_t Interconnect::encode(char* buffer, size_t size)
+{
+ if (headerDiscarded) {
+ return Connection::encode(buffer, size);
+ } else {
+ //The IO 'layer' will write in a protocol header when an
+ //'outgoing' connection is established. However the proton
+ //potocol engine will also emit one. One needs to be
+ //discarded, here we discard the one the engine emits for
+ //interconnects.
+ headerDiscarded = true;
+ size_t encoded = Connection::encode(buffer, size);
+ assert(encoded >= PROTOCOL_HEADER_LENGTH);//we never encode part of protocol header
+ //discard first eight bytes
+ ::memmove(buffer, buffer + PROTOCOL_HEADER_LENGTH, encoded - PROTOCOL_HEADER_LENGTH);
+ return encoded - PROTOCOL_HEADER_LENGTH;
+ }
+}
+
+void Interconnect::process()
+{
+ QPID_LOG(trace, id << " processing interconnect");
+ if (closeRequested) {
+ close();
+ } else {
+ if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
+ QPID_LOG_CAT(debug, model, id << " interconnect opened");
+ pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ pn_connection_open(connection);
+
+ pn_session_t* s = pn_session(connection);
+ pn_session_open(s);
+ boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out));
+ sessions[s] = ssn;
+
+ pn_link_t* l = incoming ? pn_receiver(s, name.c_str()) : pn_sender(s, name.c_str());
+ pn_link_open(l);
+ ssn->attach(l, source, target, relay);
+ }
+ Connection::process();
+ }
+}
+
+void Interconnect::setRelay(boost::shared_ptr<Relay> r)
+{
+ relay = r;
+}
+
+void Interconnect::deletedFromRegistry()
+{
+ closeRequested = true;
+ if (!isTransportDeleted) out.activateOutput();
+}
+
+void Interconnect::transportDeleted()
+{
+ isTransportDeleted = true;
+ registry.remove(name);
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
new file mode 100644
index 0000000000..230abbc667
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_AMQP_INTERCONNECT_H
+#define QPID_BROKER_AMQP_INTERCONNECT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Connection.h"
+
+namespace qpid {
+struct Address;
+namespace broker {
+namespace amqp {
+class Domain;
+class Interconnects;
+class Relay;
+
+/**
+ *
+ */
+class Interconnect : public Connection
+{
+ public:
+ Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse,
+ bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&, Interconnects&);
+ void setRelay(boost::shared_ptr<Relay>);
+ ~Interconnect();
+ size_t encode(char* buffer, size_t size);
+ void deletedFromRegistry();
+ void transportDeleted();
+ private:
+ bool incoming;
+ std::string name;
+ std::string source;
+ std::string target;
+ Domain& domain;
+ Interconnects& registry;
+ bool headerDiscarded;
+ boost::shared_ptr<Relay> relay;
+ bool closeRequested;
+ bool isTransportDeleted;
+
+ void process();
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_INTERCONNECT_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
new file mode 100644
index 0000000000..cbdf0da3ef
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Interconnects.h"
+#include "Interconnect.h"
+#include "Connection.h"
+#include "Domain.h"
+#include "SaslClient.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/Exception.h"
+#include "qpid/SaslFactory.h"
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+namespace {
+const std::string INCOMING_TYPE("incoming");
+const std::string OUTGOING_TYPE("outgoing");
+const std::string DOMAIN_TYPE("domain");
+}
+
+bool Interconnects::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+ if (type == DOMAIN_TYPE) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ DomainMap::iterator i = domains.find(name);
+ if (i == domains.end()) {
+ boost::shared_ptr<Domain> domain(new Domain(name, properties, broker));
+ domains[name] = domain;
+ return true;
+ } else {
+ return false;
+ }
+ } else if (type == INCOMING_TYPE || type == OUTGOING_TYPE) {
+ QPID_LOG(notice, "Creating interconnect " << name << ", " << properties);
+ boost::shared_ptr<Domain> domain;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ qpid::types::Variant::Map::const_iterator p = properties.find(DOMAIN_TYPE);
+ if (p != properties.end()) {
+ std::string domainName = p->second;
+ DomainMap::iterator i = domains.find(domainName);
+ if (i != domains.end()) {
+ domain = i->second;
+ } else {
+ throw qpid::Exception(QPID_MSG("No such domain: " << domainName));
+ }
+ } else {
+ throw qpid::Exception(QPID_MSG("Domain must be specified"));
+ }
+ }
+ domain->connect(type == INCOMING_TYPE, name, properties, *this);
+ return true;
+ } else {
+ return false;
+ }
+}
+bool Interconnects::deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/,
+ const std::string& /*userId*/, const std::string& /*connectionId*/)
+{
+ if (type == DOMAIN_TYPE) {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ DomainMap::iterator i = domains.find(name);
+ if (i != domains.end()) {
+ domains.erase(i);
+ return true;
+ } else {
+ return false;
+ }
+ } else if (type == INCOMING_TYPE || type == OUTGOING_TYPE) {
+ boost::shared_ptr<Interconnect> interconnect;
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ InterconnectMap::iterator i = interconnects.find(name);
+ if (i != interconnects.end()) {
+ interconnect = i->second;
+ interconnects.erase(i);
+ } else {
+ throw qpid::Exception(QPID_MSG("No such interconnection: " << name));
+ }
+ }
+ if (interconnect) interconnect->deletedFromRegistry();
+ return true;
+ } else {
+ return false;
+ }
+}
+
+
+bool Interconnects::add(const std::string& name, boost::shared_ptr<Interconnect> connection)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ InterconnectMap::iterator i = interconnects.find(name);
+ if (i == interconnects.end()) {
+ interconnects[name] = connection;
+ return true;
+ } else return false;
+}
+boost::shared_ptr<Interconnect> Interconnects::get(const std::string& name)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ InterconnectMap::const_iterator i = interconnects.find(name);
+ if (i != interconnects.end()) return i->second;
+ else return boost::shared_ptr<Interconnect>();
+}
+bool Interconnects::remove(const std::string& name)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ InterconnectMap::iterator i = interconnects.find(name);
+ if (i != interconnects.end()) {
+ interconnects.erase(i);
+ return true;
+ } else return false;
+}
+
+boost::shared_ptr<Domain> Interconnects::findDomain(const std::string& name)
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ DomainMap::iterator i = domains.find(name);
+ if (i == domains.end()) {
+ return boost::shared_ptr<Domain>();
+ } else {
+ return i->second;
+ }
+
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.h b/qpid/cpp/src/qpid/broker/amqp/Interconnects.h
new file mode 100644
index 0000000000..4a7263c8f5
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.h
@@ -0,0 +1,62 @@
+#ifndef QPID_BROKER_AMQP_INTERCONNECTS_H
+#define QPID_BROKER_AMQP_INTERCONNECTS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/ObjectFactory.h"
+#include "qpid/sys/Mutex.h"
+#include <string>
+#include <map>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+class Domain;
+class Interconnect;
+/**
+ *
+ */
+class Interconnects : public ObjectFactory
+{
+ public:
+ bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId);
+ bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties,
+ const std::string& userId, const std::string& connectionId);
+
+
+ bool add(const std::string&, boost::shared_ptr<Interconnect>);
+ boost::shared_ptr<Interconnect> get(const std::string&);
+ bool remove(const std::string&);
+
+ boost::shared_ptr<Domain> findDomain(const std::string&);
+ private:
+ typedef std::map<std::string, boost::shared_ptr<Interconnect> > InterconnectMap;
+ typedef std::map<std::string, boost::shared_ptr<Domain> > DomainMap;
+ InterconnectMap interconnects;
+ DomainMap domains;
+ qpid::sys::Mutex lock;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_INTERCONNECTS_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
new file mode 100644
index 0000000000..8817e410ce
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ManagedIncomingLink.h"
+#include "qpid/broker/amqp/ManagedSession.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+
+namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, const std::string& target, const std::string& _name)
+ : parent(p), name(_name)
+{
+ qpid::management::ManagementAgent* agent = broker.getManagementAgent();
+ if (agent) {
+ incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, target, _name));
+ agent->addObject(incoming);
+ }
+}
+ManagedIncomingLink::~ManagedIncomingLink()
+{
+ if (incoming != 0) incoming->resourceDestroy();
+}
+
+qpid::management::ManagementObject::shared_ptr ManagedIncomingLink::GetManagementObject() const
+{
+ return incoming;
+}
+
+void ManagedIncomingLink::incomingMessageReceived()
+{
+ if (incoming) { incoming->inc_transfers(); }
+}
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h
new file mode 100644
index 0000000000..5b7db7997c
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h
@@ -0,0 +1,50 @@
+#ifndef QPID_BROKER_AMQP_MANAGEDINCOMINGLINK_H
+#define QPID_BROKER_AMQP_MANAGEDINCOMINGLINK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Incoming.h"
+
+namespace qpid {
+namespace management {
+class ManagementObject;
+}
+namespace broker {
+class Broker;
+namespace amqp {
+class ManagedSession;
+
+class ManagedIncomingLink : public qpid::management::Manageable
+{
+ public:
+ ManagedIncomingLink(Broker& broker, ManagedSession& parent, const std::string& target, const std::string& name);
+ virtual ~ManagedIncomingLink();
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
+ void incomingMessageReceived();
+ private:
+ ManagedSession& parent;
+ const std::string name;
+ qmf::org::apache::qpid::broker::Incoming::shared_ptr incoming;
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_MANAGEDINCOMINGLINK_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
index f36a1e8da4..53e49d2bca 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
@@ -21,8 +21,6 @@
#include "ManagedOutgoingLink.h"
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/types/Variant.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/log/Statement.h"
@@ -32,30 +30,28 @@ namespace qpid {
namespace broker {
namespace amqp {
-ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSession& p, const std::string i, bool topic)
- : parent(p), id(i)
+ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& _name)
+ : parent(p), name(_name)
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id,
- false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map()));
- agent->addObject(subscription);
- subscription->set_creditMode("n/a");
+ outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, _name));
+ agent->addObject(outgoing);
}
}
ManagedOutgoingLink::~ManagedOutgoingLink()
{
- if (subscription != 0) subscription->resourceDestroy();
+ if (outgoing != 0) outgoing->resourceDestroy();
}
qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const
{
- return subscription;
+ return outgoing;
}
void ManagedOutgoingLink::outgoingMessageSent()
{
- if (subscription) { subscription->inc_delivered(); }
+ if (outgoing) { outgoing->inc_transfers(); }
parent.outgoingMessageSent();
}
void ManagedOutgoingLink::outgoingMessageAccepted()
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
index 20a1095db2..61d0b9c3a0 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h
@@ -22,7 +22,7 @@
*
*/
#include "qpid/management/Manageable.h"
-#include "qmf/org/apache/qpid/broker/Subscription.h"
+#include "qmf/org/apache/qpid/broker/Outgoing.h"
namespace qpid {
namespace management {
@@ -30,14 +30,13 @@ class ManagementObject;
}
namespace broker {
class Broker;
-class Queue;
namespace amqp {
class ManagedSession;
class ManagedOutgoingLink : public qpid::management::Manageable
{
public:
- ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic);
+ ManagedOutgoingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& name);
virtual ~ManagedOutgoingLink();
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
void outgoingMessageSent();
@@ -45,8 +44,8 @@ class ManagedOutgoingLink : public qpid::management::Manageable
void outgoingMessageRejected();
private:
ManagedSession& parent;
- const std::string id;
- qmf::org::apache::qpid::broker::Subscription::shared_ptr subscription;
+ const std::string name;
+ qmf::org::apache::qpid::broker::Outgoing::shared_ptr outgoing;
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
index eb0a6e20aa..e531e8cd20 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/amqp/Header.h"
+#include "qpid/broker/amqp/Session.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Selector.h"
@@ -32,9 +33,16 @@ namespace qpid {
namespace broker {
namespace amqp {
-Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession& session, qpid::sys::OutputControl& o, bool topic)
- : Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
- ManagedOutgoingLink(broker, *q, session, pn_link_name(l), topic),
+Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name) : ManagedOutgoingLink(broker, parent, source, name), session(parent) {}
+
+void Outgoing::wakeup()
+{
+ session.wakeup();
+}
+
+OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic)
+ : Outgoing(broker, session, source, pn_link_name(l)),
+ Consumer(pn_link_name(l), /*FIXME*/CONSUMER),
exclusive(topic),
queue(q), deliveries(5000), link(l), out(o),
current(0), outstanding(0),
@@ -45,12 +53,12 @@ Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, Man
}
}
-void Outgoing::init()
+void OutgoingFromQueue::init()
{
queue->consume(shared_from_this(), exclusive);//may throw exception
}
-bool Outgoing::dispatch()
+bool OutgoingFromQueue::doWork()
{
QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link));
if (canDeliver()) {
@@ -66,12 +74,12 @@ bool Outgoing::dispatch()
return false;
}
-void Outgoing::write(const char* data, size_t size)
+void OutgoingFromQueue::write(const char* data, size_t size)
{
pn_link_send(link, data, size);
}
-void Outgoing::handle(pn_delivery_t* delivery)
+void OutgoingFromQueue::handle(pn_delivery_t* delivery)
{
pn_delivery_tag_t tag = pn_delivery_tag(delivery);
size_t i = *reinterpret_cast<const size_t*>(tag.bytes);
@@ -126,12 +134,12 @@ void Outgoing::handle(pn_delivery_t* delivery)
}
}
-bool Outgoing::canDeliver()
+bool OutgoingFromQueue::canDeliver()
{
return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding;
}
-void Outgoing::detached()
+void OutgoingFromQueue::detached()
{
QPID_LOG(debug, "Detaching outgoing link from " << queue->getName());
queue->cancel(shared_from_this());
@@ -143,7 +151,7 @@ void Outgoing::detached()
}
//Consumer interface:
-bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
+bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg)
{
Record& r = deliveries[current++];
if (current >= deliveries.capacity()) current = 0;
@@ -155,23 +163,23 @@ bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& m
return true;
}
-void Outgoing::notify()
+void OutgoingFromQueue::notify()
{
QPID_LOG(trace, "Notification received for " << queue->getName());
out.activateOutput();
}
-bool Outgoing::accept(const qpid::broker::Message&)
+bool OutgoingFromQueue::accept(const qpid::broker::Message&)
{
return true;
}
-void Outgoing::setSubjectFilter(const std::string& f)
+void OutgoingFromQueue::setSubjectFilter(const std::string& f)
{
subjectFilter = f;
}
-void Outgoing::setSelectorFilter(const std::string& f)
+void OutgoingFromQueue::setSelectorFilter(const std::string& f)
{
selector.reset(new Selector(f));
}
@@ -217,29 +225,29 @@ bool match(const std::string& filter, const std::string& target)
}
}
-bool Outgoing::filter(const qpid::broker::Message& m)
+bool OutgoingFromQueue::filter(const qpid::broker::Message& m)
{
return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
&& (!selector || selector->filter(m));
}
-void Outgoing::cancel() {}
+void OutgoingFromQueue::cancel() {}
-void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {}
+void OutgoingFromQueue::acknowledged(const qpid::broker::DeliveryRecord&) {}
-qpid::broker::OwnershipToken* Outgoing::getSession()
+qpid::broker::OwnershipToken* OutgoingFromQueue::getSession()
{
return 0;
}
-Outgoing::Record::Record() : delivery(0), disposition(0), index(0) {}
-void Outgoing::Record::init(size_t i)
+OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) {}
+void OutgoingFromQueue::Record::init(size_t i)
{
index = i;
tag.bytes = reinterpret_cast<const char*>(&index);
tag.size = sizeof(index);
}
-void Outgoing::Record::reset()
+void OutgoingFromQueue::Record::reset()
{
cursor = QueueCursor();
msg = qpid::broker::Message();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
index 7d845a1427..b8a689b8f8 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -41,7 +41,7 @@ class Broker;
class Queue;
class Selector;
namespace amqp {
-class ManagedSession;
+class Session;
template <class T>
class CircularArray
{
@@ -56,17 +56,43 @@ class CircularArray
size_t next;
};
+class Outgoing : public ManagedOutgoingLink
+{
+ public:
+ Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name);
+ virtual void setSubjectFilter(const std::string&) = 0;
+ virtual void setSelectorFilter(const std::string&) = 0;
+ virtual void init() = 0;
+ /**
+ * Allows the link to initiate any outgoing transfers
+ */
+ virtual bool doWork() = 0;
+ /**
+ * Signals that this link has been detached
+ */
+ virtual void detached() = 0;
+ /**
+ * Called when a delivery is writable
+ */
+ virtual void handle(pn_delivery_t* delivery) = 0;
+ void wakeup();
+ virtual ~Outgoing() {}
+ private:
+ Session& session;
+};
+
/**
- *
+ * Logic for handling an outgoing link from a queue (even if it is a
+ * subscription pseduo-queue created by the broker)
*/
-class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from_this<Outgoing>, public ManagedOutgoingLink
+class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue>
{
public:
- Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic);
+ OutgoingFromQueue(Broker&, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic);
void setSubjectFilter(const std::string&);
void setSelectorFilter(const std::string&);
void init();
- bool dispatch();
+ bool doWork();
void write(const char* data, size_t size);
void handle(pn_delivery_t* delivery);
bool canDeliver();
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
index 711592257c..0e622f8d20 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -27,6 +27,7 @@
#include "qpid/broker/RecoverableMessage.h"
#include "qpid/broker/RecoverableMessageImpl.h"
#include "qpid/broker/amqp/Connection.h"
+#include "qpid/broker/amqp/Interconnects.h"
#include "qpid/broker/amqp/Message.h"
#include "qpid/broker/amqp/Sasl.h"
#include "qpid/broker/amqp/Translation.h"
@@ -43,11 +44,15 @@ namespace amqp {
class ProtocolImpl : public Protocol
{
public:
- ProtocolImpl(Broker& b) : broker(b) {}
+ ProtocolImpl(Interconnects* i, Broker& b) : interconnects(i), broker(b)
+ {
+ broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown
+ }
qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&);
boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
private:
+ Interconnects* interconnects;
Broker& broker;
};
@@ -58,7 +63,8 @@ struct ProtocolPlugin : public Plugin
//need to register protocol before recovery from store
broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target);
if (broker) {
- broker->getProtocolRegistry().add("AMQP 1.0", new ProtocolImpl(*broker));
+ ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker);
+ broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown
}
}
@@ -73,18 +79,18 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe
if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) {
if (broker.getOptions().auth) {
QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)");
- return new qpid::broker::amqp::Sasl(out, id, broker, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external));
+ return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external));
} else {
std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm));
QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)");
- return new qpid::broker::amqp::Sasl(out, id, broker, authenticator);
+ return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator);
}
} else {
if (broker.getOptions().auth) {
throw qpid::Exception("SASL layer required!");
} else {
QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
- return new qpid::broker::amqp::Connection(out, id, broker, false);
+ return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false);
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
new file mode 100644
index 0000000000..48a629a66e
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Relay.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
+#include <algorithm>
+#include <string.h>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+Relay::Relay(size_t max_) : credit(0), max(max_), current(0), isDetached(false), out(0), in(0) {}
+void Relay::check()
+{
+ if (isDetached) throw qpid::Exception("other end of relay has been detached");
+}
+bool Relay::send(pn_link_t* link)
+{
+ BufferedTransfer* c(0);
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ if (current < buffer.size()) {
+ c = &buffer[current++];
+ } else {
+ return false;
+ }
+ }
+ c->initOut(link);
+ return true;
+}
+
+BufferedTransfer& Relay::push()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ buffer.push_back(BufferedTransfer());
+ return buffer.back();
+}
+
+void Relay::received(pn_link_t* link, pn_delivery_t* delivery)
+{
+ BufferedTransfer& received = push();
+ received.initIn(link, delivery);
+ if (out) out->wakeup();
+}
+size_t Relay::size() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ return buffer.size();
+}
+BufferedTransfer& Relay::head()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ return buffer.front();
+}
+void Relay::pop()
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ buffer.pop_front();
+ if (current) --current;
+}
+void Relay::setCredit(int c)
+{
+ credit = c;
+ if (in) in->wakeup();
+}
+
+int Relay::getCredit() const
+{
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock);
+ return std::min(credit - size(), max);
+}
+void Relay::attached(Outgoing* o)
+{
+ out = o;
+}
+void Relay::attached(Incoming* i)
+{
+ in = i;
+}
+void Relay::detached(Outgoing*)
+{
+ out = 0;
+ isDetached = true;
+ if (in) in->wakeup();
+}
+void Relay::detached(Incoming*)
+{
+ in = 0;
+ isDetached = true;
+ if (out) out->wakeup();
+}
+
+OutgoingFromRelay::OutgoingFromRelay(pn_link_t* l, Broker& broker, Session& parent, const std::string& source,
+ const std::string& name_, boost::shared_ptr<Relay> r)
+ : Outgoing(broker, parent, source, name_), name(name_), link(l), relay(r) {}
+/**
+ * Allows the link to initiate any outgoing transfers
+ */
+bool OutgoingFromRelay::doWork()
+{
+ relay->check();
+ relay->setCredit(pn_link_credit(link));
+ return relay->send(link);
+}
+/**
+ * Called when a delivery is writable
+ */
+void OutgoingFromRelay::handle(pn_delivery_t* delivery)
+{
+ void* context = pn_delivery_get_context(delivery);
+ BufferedTransfer* transfer = reinterpret_cast<BufferedTransfer*>(context);
+ assert(transfer);
+ if (pn_delivery_writable(delivery)) {
+ if (transfer->write(link)) {
+ outgoingMessageSent();
+ QPID_LOG(debug, "Sent relayed message " << name);
+ } else {
+ QPID_LOG(error, "Failed to send relayed message " << name);
+ }
+ }
+ if (pn_delivery_updated(delivery)) {
+ pn_disposition_t d = transfer->updated();
+ switch (d) {
+ case PN_ACCEPTED:
+ outgoingMessageAccepted();
+ break;
+ case PN_REJECTED:
+ case PN_RELEASED://TODO: not quite true...
+ case PN_MODIFIED://TODO: not quite true...
+ outgoingMessageRejected();
+ break;
+ default:
+ QPID_LOG(warning, "Unhandled disposition: " << d);
+ }
+ }
+}
+/**
+ * Signals that this link has been detached
+ */
+void OutgoingFromRelay::detached()
+{
+ relay->detached(this);
+}
+void OutgoingFromRelay::init()
+{
+ relay->attached(this);
+}
+void OutgoingFromRelay::setSubjectFilter(const std::string&)
+{
+ //TODO
+}
+void OutgoingFromRelay::setSelectorFilter(const std::string&)
+{
+ //TODO
+}
+
+IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& parent, const std::string& target,
+ const std::string& name, boost::shared_ptr<Relay> r)
+ : Incoming(link, broker, parent, target, name), relay(r)
+{
+ relay->attached(this);
+}
+bool IncomingToRelay::settle()
+{
+ bool result(false);
+ while (relay->size() && relay->head().settle()) {
+ result = true;
+ relay->pop();
+ }
+ return result;
+}
+bool IncomingToRelay::doWork()
+{
+ relay->check();
+ bool work(false);
+ if (settle()) work = true;
+ if (Incoming::doWork()) work = true;
+ return work;
+}
+bool IncomingToRelay::haveWork()
+{
+ bool work(false);
+ if (settle()) work = true;
+ if (Incoming::haveWork()) work = true;
+ return work;
+}
+void IncomingToRelay::readable(pn_delivery_t* delivery)
+{
+ relay->received(link, delivery);
+ --window;
+}
+
+uint32_t IncomingToRelay::getCredit()
+{
+ return relay->getCredit();
+}
+
+void IncomingToRelay::detached()
+{
+ relay->detached(this);
+}
+
+void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d)
+{
+ in.handle = d;
+ //read in data
+ data.resize(pn_delivery_pending(d));
+ /*ssize_t read = */pn_link_recv(link, &data[0], data.size());
+ pn_link_advance(link);
+
+ //copy delivery tag
+ pn_delivery_tag_t dt = pn_delivery_tag(d);
+ tag.resize(dt.size);
+ ::memmove(&tag[0], dt.bytes, dt.size);
+
+ //set context
+ pn_delivery_set_context(d, this);
+
+}
+
+bool BufferedTransfer::settle()
+{
+ if (out.settled && !in.settled) {
+ pn_delivery_update(in.handle, disposition);
+ pn_delivery_settle(in.handle);
+ in.settled = true;
+ }
+ return out.settled && in.settled;
+}
+
+void BufferedTransfer::initOut(pn_link_t* link)
+{
+ pn_delivery_tag_t dt;
+ dt.bytes = &tag[0];
+ dt.size = tag.size();
+ out.handle = pn_delivery(link, dt);
+ //set context
+ pn_delivery_set_context(out.handle, this);
+}
+
+pn_disposition_t BufferedTransfer::updated()
+{
+ disposition = pn_delivery_remote_state(out.handle);
+ if (disposition) {
+ pn_delivery_settle(out.handle);
+ out.settled = true;
+ }
+ return disposition;
+}
+
+bool BufferedTransfer::write(pn_link_t* link)
+{
+ pn_link_send(link, &data[0], data.size());
+ return pn_link_advance(link);
+}
+Delivery::Delivery() : settled(false), handle(0) {}
+Delivery::Delivery(pn_delivery_t* d) : settled(false), handle(d) {}
+
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h
new file mode 100644
index 0000000000..0b20b563d4
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h
@@ -0,0 +1,128 @@
+#ifndef QPID_BROKER_AMQP_RELAY_H
+#define QPID_BROKER_AMQP_RELAY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Incoming.h"
+#include "Outgoing.h"
+#include "qpid/sys/Mutex.h"
+extern "C" {
+#include <proton/engine.h>
+}
+#include <deque>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+struct Delivery
+{
+ bool settled;
+ pn_delivery_t* handle;
+
+ Delivery();
+ Delivery(pn_delivery_t* d);
+};
+
+class BufferedTransfer
+{
+ public:
+ void initIn(pn_link_t* link, pn_delivery_t* d);
+ bool settle();
+ void initOut(pn_link_t* link);
+ pn_disposition_t updated();
+ bool write(pn_link_t*);
+ private:
+ std::vector<char> data;
+ Delivery in;
+ Delivery out;
+ pn_delivery_tag_t dt;
+ std::vector<char> tag;
+ pn_disposition_t disposition;
+};
+
+/**
+ *
+ */
+class Relay
+{
+ public:
+ Relay(size_t max);
+ void check();
+ size_t size() const;
+ BufferedTransfer& head();
+ void pop();
+ bool send(pn_link_t*);
+ void received(pn_link_t* link, pn_delivery_t* delivery);
+ int getCredit() const;
+ void setCredit(int);
+ void attached(Outgoing*);
+ void attached(Incoming*);
+ void detached(Outgoing*);
+ void detached(Incoming*);
+ private:
+ std::deque<BufferedTransfer> buffer;//TODO: optimise by replacing with simple circular array
+ int credit;//issued by outgoing peer, decremented everytime we send a message on outgoing link
+ size_t max;
+ size_t current;
+ bool isDetached;
+ Outgoing* out;
+ Incoming* in;
+ mutable qpid::sys::Mutex lock;
+
+ BufferedTransfer& push();
+};
+
+class OutgoingFromRelay : public Outgoing
+{
+ public:
+ OutgoingFromRelay(pn_link_t*, Broker&, Session&, const std::string& source,
+ const std::string& name, boost::shared_ptr<Relay>);
+ bool doWork();
+ void handle(pn_delivery_t* delivery);
+ void detached();
+ void init();
+ void setSubjectFilter(const std::string&);
+ void setSelectorFilter(const std::string&);
+ private:
+ const std::string name;
+ pn_link_t* link;
+ boost::shared_ptr<Relay> relay;
+};
+
+class IncomingToRelay : public Incoming
+{
+ public:
+ IncomingToRelay(pn_link_t*, Broker&, Session&, const std::string& target,
+ const std::string& name, boost::shared_ptr<Relay> r);
+ bool settle();
+ bool doWork();
+ bool haveWork();
+ void detached();
+ void readable(pn_delivery_t* delivery);
+ uint32_t getCredit();
+ private:
+ boost::shared_ptr<Relay> relay;
+};
+
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_RELAY_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
index 4b89e7b15d..d8e12fcfdd 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
@@ -31,8 +31,8 @@ namespace qpid {
namespace broker {
namespace amqp {
-Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> auth)
- : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, true),
+Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth)
+ : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true),
authenticator(auth),
state(INCOMPLETE), writeHeader(true), haveOutput(true)
{
diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.h b/qpid/cpp/src/qpid/broker/amqp/Sasl.h
index 079128be02..7718b4c43a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Sasl.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.h
@@ -32,7 +32,6 @@ class SecurityLayer;
}
namespace broker {
namespace amqp {
-
/**
* An AMQP 1.0 SASL Security Layer for authentication and optionally
* encryption.
@@ -40,7 +39,7 @@ namespace amqp {
class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer
{
public:
- Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> authenticator);
+ Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator);
~Sasl();
size_t decode(const char* buffer, size_t size);
diff --git a/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp b/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp
new file mode 100644
index 0000000000..7e03ae9450
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp
@@ -0,0 +1,178 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "SaslClient.h"
+#include "Interconnect.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SecurityLayer.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Sasl.h"
+#include "qpid/SaslFactory.h"
+#include "qpid/StringUtils.h"
+#include <sstream>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+
+SaslClient::SaslClient(qpid::sys::OutputControl& out_, const std::string& id, boost::shared_ptr<Interconnect> c, std::auto_ptr<qpid::Sasl> s,
+ const std::string& hostname_, const std::string& mechs, const qpid::sys::SecuritySettings& t)
+ : qpid::amqp::SaslClient(id), out(out_), connection(c), sasl(s),
+ hostname(hostname_), allowedMechanisms(mechs), transport(t), readHeader(true), writeHeader(true), haveOutput(false), state(NONE) {}
+
+SaslClient::~SaslClient()
+{
+ connection->transportDeleted();
+}
+
+std::size_t SaslClient::decode(const char* buffer, std::size_t size)
+{
+ size_t decoded = 0;
+ if (readHeader) {
+ decoded += readProtocolHeader(buffer, size);
+ readHeader = !decoded;
+ }
+ if (state == NONE && decoded < size) {
+ decoded += read(buffer + decoded, size - decoded);
+ } else if (state == SUCCEEDED) {
+ if (securityLayer.get()) decoded += securityLayer->decode(buffer + decoded, size - decoded);
+ else decoded += connection->decode(buffer + decoded, size - decoded);
+ }
+ QPID_LOG(trace, id << " SaslClient::decode(" << size << "): " << decoded);
+ return decoded;
+}
+
+std::size_t SaslClient::encode(char* buffer, std::size_t size)
+{
+ size_t encoded = 0;
+ if (writeHeader) {
+ encoded += writeProtocolHeader(buffer, size);
+ writeHeader = !encoded;
+ }
+ if (state == NONE && encoded < size) {
+ encoded += write(buffer + encoded, size - encoded);
+ } else if (state == SUCCEEDED) {
+ if (securityLayer.get()) encoded += securityLayer->encode(buffer + encoded, size - encoded);
+ else encoded += connection->encode(buffer + encoded, size - encoded);
+ }
+ haveOutput = (encoded == size);
+ QPID_LOG(trace, id << " SaslClient::encode(" << size << "): " << encoded);
+ return encoded;
+}
+
+bool SaslClient::canEncode()
+{
+ if (state == NONE) {
+ QPID_LOG(trace, id << " SaslClient::canEncode(): " << writeHeader << " || " << haveOutput);
+ return writeHeader || haveOutput;
+ } else if (state == SUCCEEDED) {
+ if (securityLayer.get()) return securityLayer->canEncode();
+ else return connection->canEncode();
+ } else {
+ return false;
+ }
+}
+
+void SaslClient::mechanisms(const std::string& offered)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-MECHANISMS(" << offered << ")");
+ std::string response;
+
+ std::string mechanisms;
+ if (allowedMechanisms.size()) {
+ std::vector<std::string> allowed = split(allowedMechanisms, " ");
+ std::vector<std::string> supported = split(offered, " ");
+ std::stringstream intersection;
+ for (std::vector<std::string>::const_iterator i = allowed.begin(); i != allowed.end(); ++i) {
+ if (std::find(supported.begin(), supported.end(), *i) != supported.end()) {
+ intersection << *i << " ";
+ }
+ }
+ mechanisms = intersection.str();
+ } else {
+ mechanisms = offered;
+ }
+
+ if (sasl->start(mechanisms, response, &transport)) {
+ init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0);
+ } else {
+ init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0);
+ }
+ haveOutput = true;
+ out.activateOutput();
+}
+void SaslClient::challenge(const std::string& challenge)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(" << challenge.size() << " bytes)");
+ std::string r = sasl->step(challenge);
+ response(&r);
+ haveOutput = true;
+ out.activateOutput();
+}
+namespace {
+const std::string EMPTY;
+}
+void SaslClient::challenge()
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(null)");
+ std::string r = sasl->step(EMPTY);
+ response(&r);
+}
+void SaslClient::outcome(uint8_t result, const std::string& extra)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ", " << extra << ")");
+ outcome(result);
+}
+void SaslClient::outcome(uint8_t result)
+{
+ QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ")");
+ if (result) state = FAILED;
+ else state = SUCCEEDED;
+
+ securityLayer = sasl->getSecurityLayer(65535);
+ if (securityLayer.get()) {
+ securityLayer->init(connection.get());
+ }
+ out.activateOutput();
+}
+
+void SaslClient::closed()
+{
+ if (state == SUCCEEDED) {
+ connection->closed();
+ } else {
+ QPID_LOG(info, id << " Connection closed prior to authentication completing");
+ state = FAILED;
+ }
+}
+
+bool SaslClient::isClosed() const
+{
+ if (state == FAILED) return true;
+ else if (state == SUCCEEDED) return connection->isClosed();
+ else return false;
+}
+qpid::framing::ProtocolVersion SaslClient::getVersion() const
+{
+ return connection->getVersion();
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/SaslClient.h b/qpid/cpp/src/qpid/broker/amqp/SaslClient.h
new file mode 100644
index 0000000000..4d802f6f65
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/SaslClient.h
@@ -0,0 +1,80 @@
+#ifndef QPID_BROKER_AMQP_SASLCLIENT_H
+#define QPID_BROKER_AMQP_SASLCLIENT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/amqp/SaslClient.h"
+#include <memory>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+class Sasl;
+namespace sys {
+class OutputControl;
+class SecurityLayer;
+}
+namespace broker {
+class Broker;
+namespace amqp {
+class Interconnect;
+
+/**
+ * Implementation of SASL client role for when broker connects to
+ * external peers.
+ */
+class SaslClient : public qpid::sys::ConnectionCodec, qpid::amqp::SaslClient
+{
+ public:
+ SaslClient(qpid::sys::OutputControl& out, const std::string& id, boost::shared_ptr<Interconnect>, std::auto_ptr<qpid::Sasl>,
+ const std::string& hostname, const std::string& allowedMechanisms, const qpid::sys::SecuritySettings&);
+ ~SaslClient();
+ std::size_t decode(const char* buffer, std::size_t size);
+ std::size_t encode(char* buffer, std::size_t size);
+ bool canEncode();
+ void closed();
+ bool isClosed() const;
+ qpid::framing::ProtocolVersion getVersion() const;
+ private:
+ qpid::sys::OutputControl& out;
+ boost::shared_ptr<Interconnect> connection;
+ std::auto_ptr<qpid::Sasl> sasl;
+ std::string hostname;
+ std::string allowedMechanisms;
+ qpid::sys::SecuritySettings transport;
+ bool readHeader;
+ bool writeHeader;
+ bool haveOutput;
+ enum {
+ NONE, FAILED, SUCCEEDED
+ } state;
+ std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
+
+ void mechanisms(const std::string&);
+ void challenge(const std::string&);
+ void challenge(); //null != empty string
+ void outcome(uint8_t result, const std::string&);
+ void outcome(uint8_t result);
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_SASLCLIENT_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 3ec5eb15dd..bb94a37398 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -19,17 +19,19 @@
*
*/
#include "Session.h"
+#include "Incoming.h"
#include "Outgoing.h"
#include "Message.h"
-#include "ManagedConnection.h"
-#include "qpid/broker/AsyncCompletion.h"
+#include "Connection.h"
+#include "Domain.h"
+#include "Interconnects.h"
+#include "Relay.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/Exchange.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/FanOutExchange.h"
-#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
@@ -51,58 +53,62 @@ namespace qpid {
namespace broker {
namespace amqp {
-class Target
+class IncomingToQueue : public DecodingIncoming
{
public:
- Target(pn_link_t* l) : credit(100), window(0), link(l) {}
- virtual ~Target() {}
- bool flow();
- bool needFlow();
- virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message
- protected:
- const uint32_t credit;
- uint32_t window;
- pn_link_t* link;
-};
-
-class Queue : public Target
-{
- public:
- Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {}
+ IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : DecodingIncoming(l, b, p, q->getName(), pn_link_name(l)), queue(q) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Queue> queue;
};
-class Exchange : public Target
+class IncomingToExchange : public DecodingIncoming
{
public:
- Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {}
+ IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : DecodingIncoming(l, b, p, e->getName(), pn_link_name(l)), exchange(e) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
};
-Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o)
+Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o)
: ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
-Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus)
+Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
{
ResolvedNode node;
node.exchange = broker.getExchanges().find(name);
node.queue = broker.getQueues().find(name);
- if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) {
- //TODO: handle dynamic creation
- //is it a queue or an exchange?
- NodeProperties properties;
- properties.read(pn_terminus_properties(terminus));
- if (properties.isQueue()) {
- node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ if (!node.queue && !node.exchange) {
+ if (pn_terminus_is_dynamic(terminus)) {
+ //is it a queue or an exchange?
+ NodeProperties properties;
+ properties.read(pn_terminus_properties(terminus));
+ if (properties.isQueue()) {
+ node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ } else {
+ qpid::framing::FieldTable args;
+ node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(),
+ args, connection.getUserid(), connection.getId()).first;
+ }
} else {
- qpid::framing::FieldTable args;
- node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(),
- args, connection.getUserid(), connection.getId()).first;
+ size_t i = name.find('@');
+ if (i != std::string::npos && (i+1) < name.length()) {
+ std::string domain = name.substr(i+1);
+ std::string local = name.substr(0, i);
+ std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str();
+ //does this domain exist?
+ boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain);
+ if (d) {
+ node.relay = boost::shared_ptr<Relay>(new Relay(1000));
+ if (incoming) {
+ d->connect(false, id, name, local, connection.getInterconnects(), node.relay);
+ } else {
+ d->connect(true, id, local, name, connection.getInterconnects(), node.relay);
+ }
+ }
+ }
}
} else if (node.queue && node.exchange) {
QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue");
@@ -123,43 +129,7 @@ void Session::attach(pn_link_t* link)
QPID_LOG(debug, "Received attach request for outgoing link from " << name);
pn_terminus_set_address(pn_link_source(link), name.c_str());
- ResolvedNode node = resolve(name, source);
- Filter filter;
- filter.read(pn_terminus_filter(source));
-
- if (node.queue) {
- boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false));
- q->init();
- if (filter.hasSubjectFilter()) {
- q->setSubjectFilter(filter.getSubjectFilter());
- }
- if (filter.hasSelectorFilter()) {
- q->setSelectorFilter(filter.getSelectorFilter());
- }
- senders[link] = q;
- } else if (node.exchange) {
- QueueSettings settings(false, true);
- //TODO: populate settings from source details when available from engine
- boost::shared_ptr<qpid::broker::Queue> queue
- = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
- if (filter.hasSubjectFilter()) {
- filter.bind(node.exchange, queue);
- filter.write(pn_terminus_filter(pn_link_source(link)));
- } else if (node.exchange->getType() == FanOutExchange::typeName) {
- node.exchange->bind(queue, std::string(), 0);
- } else if (node.exchange->getType() == TopicExchange::typeName) {
- node.exchange->bind(queue, "#", 0);
- } else {
- throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
- }
- boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true));
- senders[link] = q;
- q->init();
- } else {
- pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
- throw qpid::Exception("Node not found: " + name);/*not-found*/
- }
- QPID_LOG(debug, "Outgoing link attached");
+ setupOutgoing(link, source, name);
} else {
pn_terminus_t* target = pn_link_remote_target(link);
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
@@ -169,51 +139,120 @@ void Session::attach(pn_link_t* link)
QPID_LOG(debug, "Received attach request for incoming link to " << name);
pn_terminus_set_address(pn_link_target(link), name.c_str());
- ResolvedNode node = resolve(name, target);
+ setupIncoming(link, target, name);
+ }
+}
+
+void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name)
+{
+ ResolvedNode node = resolve(name, target, true);
+
+ if (node.queue) {
+ boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link));
+ incoming[link] = q;
+ } else if (node.exchange) {
+ boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link));
+ incoming[link] = e;
+ } else if (node.relay) {
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, name, pn_link_name(link), node.relay));
+ incoming[link] = in;
+ } else {
+ pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
+ throw qpid::Exception("Node not found: " + name);/*not-found*/
+ }
+ QPID_LOG(debug, "Incoming link attached");
+}
- if (node.queue) {
- boost::shared_ptr<Target> q(new Queue(node.queue, link));
- targets[link] = q;
- } else if (node.exchange) {
- boost::shared_ptr<Target> e(new Exchange(node.exchange, link));
- targets[link] = e;
+void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
+{
+ ResolvedNode node = resolve(name, source, false);
+ Filter filter;
+ filter.read(pn_terminus_filter(source));
+
+ if (node.queue) {
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, node.queue, link, *this, out, false));
+ q->init();
+ if (filter.hasSubjectFilter()) {
+ q->setSubjectFilter(filter.getSubjectFilter());
+ }
+ if (filter.hasSelectorFilter()) {
+ q->setSelectorFilter(filter.getSelectorFilter());
+ }
+ outgoing[link] = q;
+ } else if (node.exchange) {
+ QueueSettings settings(false, true);
+ //TODO: populate settings from source details when available from engine
+ boost::shared_ptr<qpid::broker::Queue> queue
+ = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ if (filter.hasSubjectFilter()) {
+ filter.bind(node.exchange, queue);
+ filter.write(pn_terminus_filter(pn_link_source(link)));
+ } else if (node.exchange->getType() == FanOutExchange::typeName) {
+ node.exchange->bind(queue, std::string(), 0);
+ } else if (node.exchange->getType() == TopicExchange::typeName) {
+ node.exchange->bind(queue, "#", 0);
} else {
- pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
- throw qpid::Exception("Node not found: " + name);/*not-found*/
+ throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/
+ }
+ boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, queue, link, *this, out, true));
+ outgoing[link] = q;
+ q->init();
+ } else if (node.relay) {
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, pn_link_name(link), node.relay));
+ outgoing[link] = out;
+ out->init();
+ } else {
+ pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED);
+ throw qpid::Exception("Node not found: " + name);/*not-found*/
+ }
+ QPID_LOG(debug, "Outgoing link attached");
+}
+
+/**
+ * Called for links initiated by the broker
+ */
+void Session::attach(pn_link_t* link, const std::string& src, const std::string& tgt, boost::shared_ptr<Relay> relay)
+{
+ pn_terminus_t* source = pn_link_source(link);
+ pn_terminus_t* target = pn_link_target(link);
+ pn_terminus_set_address(source, src.c_str());
+ pn_terminus_set_address(target, tgt.c_str());
+
+ if (relay) {
+ if (pn_link_is_sender(link)) {
+ boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, pn_link_name(link), relay));
+ outgoing[link] = out;
+ out->init();
+ } else {
+ boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, tgt, pn_link_name(link), relay));
+ incoming[link] = in;
+ }
+ } else {
+ if (pn_link_is_sender(link)) {
+ setupOutgoing(link, source, src);
+ } else {
+ setupIncoming(link, target, tgt);
}
- QPID_LOG(debug, "Incoming link attached");
}
}
void Session::detach(pn_link_t* link)
{
if (pn_link_is_sender(link)) {
- Senders::iterator i = senders.find(link);
- if (i != senders.end()) {
+ OutgoingLinks::iterator i = outgoing.find(link);
+ if (i != outgoing.end()) {
i->second->detached();
- senders.erase(i);
+ outgoing.erase(i);
QPID_LOG(debug, "Outgoing link detached");
}
} else {
- targets.erase(link);
- QPID_LOG(debug, "Incoming link detached");
- }
-}
-namespace {
- class Transfer : public qpid::broker::AsyncCompletion::Callback
- {
- public:
- Transfer(pn_delivery_t* d, boost::shared_ptr<Session> s) : delivery(d), session(s) {}
- void completed(bool sync) { session->accepted(delivery, sync); }
- boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone()
- {
- boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session));
- return copy;
+ IncomingLinks::iterator i = incoming.find(link);
+ if (i != incoming.end()) {
+ i->second->detached();
+ incoming.erase(i);
+ QPID_LOG(debug, "Incoming link detached");
}
- private:
- pn_delivery_t* delivery;
- boost::shared_ptr<Session> session;
- };
+ }
}
void Session::accepted(pn_delivery_t* delivery, bool sync)
@@ -233,36 +272,26 @@ void Session::accepted(pn_delivery_t* delivery, bool sync)
}
}
-void Session::incoming(pn_link_t* link, pn_delivery_t* delivery)
+void Session::readable(pn_link_t* link, pn_delivery_t* delivery)
{
pn_delivery_tag_t tag = pn_delivery_tag(delivery);
QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size));
- boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery)));
- /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize());
- received->scan();
- pn_link_advance(link);
-
- qpid::broker::Message message(received, received);
-
incomingMessageReceived();
- Targets::iterator target = targets.find(link);
- if (target == targets.end()) {
+ IncomingLinks::iterator target = incoming.find(link);
+ if (target == incoming.end()) {
QPID_LOG(error, "Received message on unknown link");
pn_delivery_update(delivery, PN_REJECTED);
pn_delivery_settle(delivery);//do we need to check settlement modes/orders?
incomingMessageRejected();
} else {
- target->second->handle(message);
- received->begin();
- Transfer t(delivery, shared_from_this());
- received->end(t);
- if (target->second->needFlow()) out.activateOutput();
+ target->second->readable(delivery);
+ if (target->second->haveWork()) out.activateOutput();
}
}
-void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
+void Session::writable(pn_link_t* link, pn_delivery_t* delivery)
{
- Senders::iterator sender = senders.find(link);
- if (sender == senders.end()) {
+ OutgoingLinks::iterator sender = outgoing.find(link);
+ if (sender == outgoing.end()) {
QPID_LOG(error, "Delivery returned for unknown link");
} else {
sender->second->handle(delivery);
@@ -272,8 +301,8 @@ void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery)
bool Session::dispatch()
{
bool output(false);
- for (Senders::iterator s = senders.begin(); s != senders.end(); ++s) {
- if (s->second->dispatch()) output = true;
+ for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end(); ++s) {
+ if (s->second->doWork()) output = true;
}
if (completed.size()) {
output = true;
@@ -286,8 +315,8 @@ bool Session::dispatch()
accepted(*i, true);
}
}
- for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) {
- if (t->second->flow()) output = true;
+ for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) {
+ if (i->second->doWork()) output = true;
}
return output;
@@ -295,42 +324,33 @@ bool Session::dispatch()
void Session::close()
{
- for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+ i->second->detached();
+ }
+ for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) {
i->second->detached();
}
- senders.clear();
- targets.clear();//at present no explicit cleanup required for targets
- QPID_LOG(debug, "Session closed, all senders cancelled.");
+ outgoing.clear();
+ incoming.clear();
+ QPID_LOG(debug, "Session closed, all links detached.");
qpid::sys::Mutex::ScopedLock l(lock);
deleted = true;
}
-void Queue::handle(qpid::broker::Message& message)
-{
- queue->deliver(message);
- --window;
-}
-
-void Exchange::handle(qpid::broker::Message& message)
+void Session::wakeup()
{
- DeliverableMessage deliverable(message, 0);
- exchange->route(deliverable);
- --window;
+ out.activateOutput();
}
-bool Target::flow()
+void IncomingToQueue::handle(qpid::broker::Message& message)
{
- bool issue = window < credit;
- if (issue) {
- pn_link_flow(link, credit - window);//TODO: proper flow control
- window = credit;
- }
- return issue;
+ queue->deliver(message);
}
-bool Target::needFlow()
+void IncomingToExchange::handle(qpid::broker::Message& message)
{
- return window <= (credit/2);
+ DeliverableMessage deliverable(message, 0);
+ exchange->route(deliverable);
}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 7dbdaf05fc..74f50a9eda 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -43,34 +43,45 @@ class Queue;
namespace amqp {
-class ManagedConnection;
+class Connection;
+class Incoming;
class Outgoing;
-class Target;
+class Relay;
/**
*
*/
class Session : public ManagedSession, public boost::enable_shared_from_this<Session>
{
public:
- Session(pn_session_t*, qpid::broker::Broker&, ManagedConnection&, qpid::sys::OutputControl&);
+ Session(pn_session_t*, qpid::broker::Broker&, Connection&, qpid::sys::OutputControl&);
+ /**
+ * called for links initiated by the peer
+ */
void attach(pn_link_t*);
void detach(pn_link_t*);
- void incoming(pn_link_t*, pn_delivery_t*);
- void outgoing(pn_link_t*, pn_delivery_t*);
+ void readable(pn_link_t*, pn_delivery_t*);
+ void writable(pn_link_t*, pn_delivery_t*);
bool dispatch();
void close();
+ /**
+ * called for links initiated by the broker
+ */
+ void attach(pn_link_t* link, const std::string& src, const std::string& tgt, boost::shared_ptr<Relay>);
+
//called when a transfer is completly processed (e.g.including stored on disk)
void accepted(pn_delivery_t*, bool sync);
+
+ void wakeup();
private:
- typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > Senders;
- typedef std::map<pn_link_t*, boost::shared_ptr<Target> > Targets;
+ typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
+ typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
pn_session_t* session;
qpid::broker::Broker& broker;
- ManagedConnection& connection;
+ Connection& connection;
qpid::sys::OutputControl& out;
- Targets targets;
- Senders senders;
+ IncomingLinks incoming;
+ OutgoingLinks outgoing;
std::deque<pn_delivery_t*> completed;
bool deleted;
qpid::sys::Mutex lock;
@@ -78,9 +89,12 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
{
boost::shared_ptr<qpid::broker::Exchange> exchange;
boost::shared_ptr<qpid::broker::Queue> queue;
+ boost::shared_ptr<Relay> relay;
};
- ResolvedNode resolve(const std::string name, pn_terminus_t* terminus);
+ ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming);
+ void setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name);
+ void setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name);
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index ca2094b965..9d34e71e04 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -202,7 +202,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
}
}
-void Translation::write(Outgoing& out)
+void Translation::write(OutgoingFromQueue& out)
{
const Message* message = dynamic_cast<const Message*>(&original.getEncoding());
if (message) {
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.h b/qpid/cpp/src/qpid/broker/amqp/Translation.h
index 64d96560e3..7591f45b2a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.h
@@ -31,7 +31,7 @@ class MessageTransfer;
}
namespace amqp {
-class Outgoing;
+class OutgoingFromQueue;
/**
*
*/
@@ -49,7 +49,7 @@ class Translation
/**
* Writes the AMQP 1.0 bare message and any annotations, translating from 0-10 if necessary
*/
- void write(Outgoing&);
+ void write(OutgoingFromQueue&);
private:
const qpid::broker::Message& original;
};
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index b2a9b979b6..c74ee01898 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -140,6 +140,12 @@ bool ConnectionContext::isOpen() const
void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ //wait for outstanding sends to settle
+ while (!ssn->settled()) {
+ QPID_LOG(debug, "Waiting for sends to settle before closing");
+ wait();//wait until message has been confirmed
+ }
+
pn_session_close(ssn->session);
//TODO: need to destroy session and remove context from map
wakeupDriver();
@@ -166,13 +172,19 @@ void ConnectionContext::close()
wakeupDriver();
//wait for close to be confirmed by peer?
while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) {
- wait();
+ if (state == DISCONNECTED) {
+ QPID_LOG(warning, "Disconnected before close received from peer.");
+ break;
+ }
+ lock.wait();
}
sessions.clear();
}
- transport->close();
- while (state != DISCONNECTED) {
- lock.wait();
+ if (state != DISCONNECTED) {
+ transport->close();
+ while (state != DISCONNECTED) {
+ lock.wait();
+ }
}
}
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 7a418993d5..f77863a146 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -324,6 +324,9 @@ endif (PYTHON_EXECUTABLE)
add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix})
if (PYTHON_EXECUTABLE)
add_test (ha_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
+ if (BUILD_AMQP)
+ add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
+ endif (BUILD_AMQP)
if (BUILD_LEGACYSTORE)
add_test (ha_store_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_store_tests.py)
endif (BUILD_LEGACYSTORE)
diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am
index 69ca01a934..2e55b24c3e 100644
--- a/qpid/cpp/src/tests/Makefile.am
+++ b/qpid/cpp/src/tests/Makefile.am
@@ -297,7 +297,7 @@ TESTS_ENVIRONMENT = \
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
run_msg_group_tests
TESTS += start_broker $(system_tests) python_tests stop_broker \
- run_ha_tests run_federation_tests run_federation_sys_tests \
+ run_ha_tests run_interlink_tests run_federation_tests run_federation_sys_tests \
run_acl_tests run_cli_tests dynamic_log_level_test \
dynamic_log_hires_timestamp run_queue_flow_limit_tests ipv6_test
@@ -346,6 +346,8 @@ EXTRA_DIST += \
run_ha_tests \
ha_test.py \
ha_tests.py \
+ run_interlink_tests \
+ interlink_tests.py \
brokertest.py \
ha_store_tests.py \
test_env.ps1.in
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 70c145a51b..af1edfee44 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -427,6 +427,8 @@ class BrokerTest(TestCase):
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
+ amqp_lib = os.getenv("AMQP_LIB")
+ amqpc_lib = os.getenv("AMQPC_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
receiver_exec = os.getenv("RECEIVER_EXEC")
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
new file mode 100755
index 0000000000..1e7262051a
--- /dev/null
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -0,0 +1,157 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import traceback
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
+from qpid.datatypes import uuid4
+from brokertest import *
+from threading import Thread, Lock, Condition
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
+from qpidtoollibs import BrokerAgent, BrokerObject
+from uuid import UUID
+
+class Domain(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Config:
+ def __init__(self, broker, address="q;{create:always}", version="amqp1.0"):
+ self.url = broker.host_port()
+ self.address = address
+ self.version = version
+
+ def __str__(self):
+ return "url: %s, address: %s, version: %s" % (self.url, self.address, self.version)
+
+class AmqpBrokerTest(BrokerTest):
+ """
+ Tests using AMQP 1.0 support
+ """
+ def setUp(self):
+ BrokerTest.setUp(self)
+ os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
+ self.broker = self.amqp_broker()
+ self.default_config = Config(self.broker)
+ self.agent = BrokerAgent(self.broker.connect())
+
+ def sender(self, config):
+ cmd = ["qpid-send",
+ "--broker", config.url,
+ "--address", config.address,
+ "--connection-options", "{protocol:%s}" % config.version,
+ "--content-stdin", "--send-eos=1"
+ ]
+ return self.popen(cmd, stdin=PIPE)
+
+ def receiver(self, config):
+ cmd = ["qpid-receive",
+ "--broker", config.url,
+ "--address", config.address,
+ "--connection-options", "{protocol:%r}" % config.version,
+ "--timeout=10"
+ ]
+ return self.popen(cmd, stdout=PIPE)
+
+ def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False):
+ if debug:
+ print "sender config is %s" % (send_config or self.default_config)
+ print "receiver config is %s" % (recv_config or self.default_config)
+ sender = self.sender(send_config or self.default_config)
+ receiver = self.receiver(recv_config or self.default_config)
+
+ messages = ["message-%s" % (i+1) for i in range(count)]
+ for m in messages:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+ sender.stdin.close()
+ if debug:
+ c = send_config or self.default_config
+ print "sent %s messages to %s sn %s" % (len(messages), c.address, c.url)
+
+ if debug:
+ c = recv_config or self.default_config
+ print "reading messages from %s sn %s" % (c.address, c.url)
+ for m in messages:
+ l = receiver.stdout.readline().rstrip()
+ if debug:
+ print l
+ assert m == l, (m, l)
+
+ sender.wait()
+ receiver.wait()
+
+ def test_simple(self):
+ self.send_and_receive()
+
+ def test_translate1(self):
+ self.send_and_receive(recv_config=Config(self.broker, version="amqp0-10"))
+
+ def test_translate2(self):
+ self.send_and_receive(send_config=Config(self.broker, version="amqp0-10"))
+
+ def test_domain(self):
+ brokerB = self.amqp_broker()
+ self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()})
+ domains = self.agent._getAllBrokerObjects(Domain)
+ assert len(domains) == 1
+ assert domains[0].name == "BrokerB"
+
+ def test_incoming_link(self):
+ brokerB = self.amqp_broker()
+ agentB = BrokerAgent(brokerB.connect())
+ self.agent.create("queue", "q")
+ agentB.create("queue", "q")
+ self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
+ self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
+ #send to brokerB, receive from brokerA
+ self.send_and_receive(send_config=Config(brokerB))
+
+ def test_outgoing_link(self):
+ brokerB = self.amqp_broker()
+ agentB = BrokerAgent(brokerB.connect())
+ self.agent.create("queue", "q")
+ agentB.create("queue", "q")
+ self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
+ self.agent.create("outgoing", "Link1", {"domain":"BrokerB","source":"q","target":"q"})
+ #send to brokerA, receive from brokerB
+ self.send_and_receive(recv_config=Config(brokerB))
+
+ def test_relay(self):
+ brokerB = self.amqp_broker()
+ agentB = BrokerAgent(brokerB.connect())
+ agentB.create("queue", "q")
+ self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"})
+ #send to q on broker B through brokerA
+ self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
+
+ """ Create and return a broker with AMQP 1.0 support """
+ def amqp_broker(self):
+ assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--max-negotiate-time=600000",
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args)
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test",
+ ["qpid-python-test", "-m", "interlink_tests"] + sys.argv[1:])
diff --git a/qpid/cpp/src/tests/run_interlink_tests b/qpid/cpp/src/tests/run_interlink_tests
new file mode 100755
index 0000000000..6c61bdd654
--- /dev/null
+++ b/qpid/cpp/src/tests/run_interlink_tests
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+test -e "$AMQP_LIB" || { echo "Skipping AMQP 1.0 based tests; AMQP 1.0 support not available."; exit 0; }
+
+srcdir=`dirname $0`
+$srcdir/interlink_tests.py
+
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 0f8f834731..6940943b54 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -68,6 +68,8 @@ exportmodule SSL_LIB ssl.so
exportmodule WATCHDOG_LIB watchdog.so
exportmodule XML_LIB xml.so
exportmodule STORE_LIB legacystore.so
+exportmodule AMQP_LIB amqp.so
+exportmodule AMQPC_LIB amqpc.so
# Qpid options
export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index b3514e1f70..2a8a36f10e 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -379,6 +379,43 @@
<method name="close"/>
</class>
+
+ <!--
+ ===============================================================
+ AMQP 1.0 link for incoming transfers
+ ===============================================================
+ -->
+ <class name="Incoming">
+ <property name="sessionRef" type="objId" references="Session" access="RC" parentRef="y"/>
+ <property name="target" type="sstr" access="RC" index="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/>
+ </class>
+ <!--
+ ===============================================================
+ AMQP 1.0 link for outgoing transfers
+ ===============================================================
+ -->
+ <class name="Outgoing">
+ <property name="sessionRef" type="objId" references="Session" access="RC" parentRef="y"/>
+ <property name="source" type="sstr" access="RC" index="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/>
+ </class>
+ <!--
+ ===============================================================
+ Domain
+ ===============================================================
+ -->
+ <class name="Domain">
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="durable" type="bool" access="RC"/>
+ <property name="url" type="sstr" access="RO"/>
+ <property name="mechanisms" type="sstr" access="RO"/>
+ <property name="username" type="sstr" access="RO"/>
+ <property name="password" type="sstr" access="RO"/>
+ </class>
+
<!--
===============================================================
Link
diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py
index d8b75c3c60..41fea74414 100644
--- a/qpid/tools/src/py/qpidtoollibs/broker.py
+++ b/qpid/tools/src/py/qpidtoollibs/broker.py
@@ -292,9 +292,13 @@ class BrokerAgent(object):
'routingKey': key}
return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
- def create(self, _type, name, properties, strict):
+ def create(self, _type, name, properties={}, strict=False):
"""Create an object of the specified type"""
- pass
+ args = {'type': _type,
+ 'name': name,
+ 'properties': properties,
+ 'strict': strict}
+ return self._method('create', args)
def delete(self, _type, name, options):
"""Delete an object of the specified type"""