diff options
author | Gordon Sim <gsim@apache.org> | 2013-03-14 19:36:26 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-03-14 19:36:26 +0000 |
commit | 3c22a7e7a58c883cdb0575afb30c6658344d4e2b (patch) | |
tree | 0570d45f59f36063f9c44073d807e9f5ec4bfe1d | |
parent | 125e49d4df65142a89b69e7d367b34f01ea354a1 (diff) | |
download | qpid-python-3c22a7e7a58c883cdb0575afb30c6658344d4e2b.tar.gz |
QPID-4586: add ability to have qpidd establish outgoing connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1456621 13f79535-47bb-0310-9956-ffa450edef68
46 files changed, 2455 insertions, 249 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 5dc473d04a..ef68d79fb2 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1160,6 +1160,8 @@ set (qpidbroker_SOURCES qpid/broker/Fairshare.cpp qpid/broker/MessageDeque.cpp qpid/broker/MessageMap.cpp + qpid/broker/ObjectFactory.h + qpid/broker/ObjectFactory.cpp qpid/broker/PriorityQueue.cpp qpid/broker/Protocol.cpp qpid/broker/Queue.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 1b2d0fcbb4..d974aad3a1 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -659,6 +659,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/NameGenerator.h \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ + qpid/broker/ObjectFactory.h \ + qpid/broker/ObjectFactory.cpp \ qpid/broker/Observers.h \ qpid/broker/OwnershipToken.h \ qpid/broker/Persistable.h \ @@ -783,14 +785,24 @@ amqp_la_SOURCES = \ qpid/broker/amqp/Connection.cpp \ qpid/broker/amqp/DataReader.h \ qpid/broker/amqp/DataReader.cpp \ + qpid/broker/amqp/Domain.h \ + qpid/broker/amqp/Domain.cpp \ qpid/broker/amqp/Filter.h \ qpid/broker/amqp/Filter.cpp \ qpid/broker/amqp/Header.h \ qpid/broker/amqp/Header.cpp \ + qpid/broker/amqp/Incoming.h \ + qpid/broker/amqp/Incoming.cpp \ + qpid/broker/amqp/Interconnect.h \ + qpid/broker/amqp/Interconnect.cpp \ + qpid/broker/amqp/Interconnects.h \ + qpid/broker/amqp/Interconnects.cpp \ qpid/broker/amqp/ManagedConnection.h \ qpid/broker/amqp/ManagedConnection.cpp \ qpid/broker/amqp/ManagedSession.h \ qpid/broker/amqp/ManagedSession.cpp \ + qpid/broker/amqp/ManagedIncomingLink.h \ + qpid/broker/amqp/ManagedIncomingLink.cpp \ qpid/broker/amqp/ManagedOutgoingLink.h \ qpid/broker/amqp/ManagedOutgoingLink.cpp \ qpid/broker/amqp/Message.h \ @@ -800,8 +812,12 @@ amqp_la_SOURCES = \ qpid/broker/amqp/Outgoing.h \ qpid/broker/amqp/Outgoing.cpp \ qpid/broker/amqp/ProtocolPlugin.cpp \ + qpid/broker/amqp/Relay.h \ + qpid/broker/amqp/Relay.cpp \ qpid/broker/amqp/Sasl.h \ qpid/broker/amqp/Sasl.cpp \ + qpid/broker/amqp/SaslClient.h \ + qpid/broker/amqp/SaslClient.cpp \ qpid/broker/amqp/Session.h \ qpid/broker/amqp/Session.cpp \ qpid/broker/amqp/Translation.h \ diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 718e6fe342..e8bb3b9bca 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -52,14 +52,24 @@ if (BUILD_AMQP) qpid/broker/amqp/Connection.cpp qpid/broker/amqp/DataReader.h qpid/broker/amqp/DataReader.cpp + qpid/broker/amqp/Domain.h + qpid/broker/amqp/Domain.cpp qpid/broker/amqp/Filter.h qpid/broker/amqp/Filter.cpp qpid/broker/amqp/Header.h qpid/broker/amqp/Header.cpp + qpid/broker/amqp/Incoming.h + qpid/broker/amqp/Incoming.cpp + qpid/broker/amqp/Interconnect.h + qpid/broker/amqp/Interconnect.cpp + qpid/broker/amqp/Interconnects.h + qpid/broker/amqp/Interconnects.cpp qpid/broker/amqp/ManagedConnection.h qpid/broker/amqp/ManagedConnection.cpp qpid/broker/amqp/ManagedSession.h qpid/broker/amqp/ManagedSession.cpp + qpid/broker/amqp/ManagedIncomingLink.h + qpid/broker/amqp/ManagedIncomingLink.cpp qpid/broker/amqp/ManagedOutgoingLink.h qpid/broker/amqp/ManagedOutgoingLink.cpp qpid/broker/amqp/Message.h @@ -69,8 +79,12 @@ if (BUILD_AMQP) qpid/broker/amqp/Outgoing.h qpid/broker/amqp/Outgoing.cpp qpid/broker/amqp/ProtocolPlugin.cpp + qpid/broker/amqp/Relay.h + qpid/broker/amqp/Relay.cpp qpid/broker/amqp/Sasl.h qpid/broker/amqp/Sasl.cpp + qpid/broker/amqp/SaslClient.h + qpid/broker/amqp/SaslClient.cpp qpid/broker/amqp/Session.h qpid/broker/amqp/Session.cpp qpid/broker/amqp/Translation.h diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index a9887f9c35..1f70cac2cc 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -707,7 +707,9 @@ void Broker::createObject(const std::string& type, const std::string& name, } //TODO: implement 'strict' option (check there are no unrecognised properties) QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); - if (type == TYPE_QUEUE) { + if (objectFactory.createObject(*this, type, name, properties, userId, connectionId)) { + QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ") handled by registered factory"); + } else if (type == TYPE_QUEUE) { bool durable(false); bool autodelete(false); std::string alternateExchange; @@ -1064,8 +1066,16 @@ void Broker::connect( const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed) { + connect(name, host, port, transport, factory.get(), failed); +} + +void Broker::connect( + const std::string& name, + const std::string& host, const std::string& port, const std::string& transport, + sys::ConnectionCodec::Factory* f, boost::function2<void, int, std::string> failed) +{ boost::shared_ptr<TransportConnector> tcf = getTransportInfo(transport).connectorFactory; - if (tcf) tcf->connect(poller, name, host, port, factory.get(), failed); + if (tcf) tcf->connect(poller, name, host, port, f, failed); else throw NoSuchTransportException(QPID_MSG("Unsupported transport type: " << transport)); } diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index cfd96c9913..6a46095af4 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -28,6 +28,7 @@ #include "qpid/Plugin.h" #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/ObjectFactory.h" #include "qpid/broker/Protocol.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/LinkRegistry.h" @@ -193,6 +194,7 @@ class Broker : public sys::Runnable, public Plugin::Target, boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConsumerFactories consumerFactories; ProtocolRegistry protocolRegistry; + ObjectFactoryRegistry objectFactory; mutable sys::Mutex linkClientPropertiesLock; framing::FieldTable linkClientProperties; @@ -232,6 +234,7 @@ class Broker : public sys::Runnable, public Plugin::Target, DataDir& getDataDir() { return dataDir; } Options& getOptions() { return config; } ProtocolRegistry& getProtocolRegistry() { return protocolRegistry; } + ObjectFactoryRegistry& getObjectFactoryRegistry() { return objectFactory; } void setExpiryPolicy(const boost::intrusive_ptr<ExpiryPolicy>& e) { expiryPolicy = e; } boost::intrusive_ptr<ExpiryPolicy> getExpiryPolicy() { return expiryPolicy; } @@ -258,6 +261,12 @@ class Broker : public sys::Runnable, public Plugin::Target, const std::string& host, const std::string& port, const std::string& transport, boost::function2<void, int, std::string> failed); + QPID_BROKER_EXTERN void connect(const std::string& name, + const std::string& host, const std::string& port, + const std::string& transport, + sys::ConnectionCodec::Factory*, + boost::function2<void, int, std::string> failed); + /** Move messages from one queue to another. A zero quantity means to move all messages diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp index ee2c3ca794..be19185c3a 100644 --- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp +++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp @@ -38,7 +38,7 @@ LossyQueue::LossyQueue(const std::string& n, const QueueSettings& s, MessageStor bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message) { - if (increment.getSize() > settings.maxDepth.getSize()) { + if (settings.maxDepth.hasSize() && increment.getSize() > settings.maxDepth.getSize()) { if (mgmtObject) { mgmtObject->inc_discardsOverflow(); if (brokerMgmtObject) diff --git a/qpid/cpp/src/qpid/broker/ObjectFactory.cpp b/qpid/cpp/src/qpid/broker/ObjectFactory.cpp new file mode 100644 index 0000000000..5bd516c351 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ObjectFactory.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ObjectFactory.h" +#include "Broker.h" + +namespace qpid { +namespace broker { + +bool ObjectFactoryRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId) +{ + for (Factories::iterator i = factories.begin(); i != factories.end(); ++i) + { + if ((*i)->createObject(broker, type, name, properties, userId, connectionId)) return true; + } + return false; +} + +bool ObjectFactoryRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId) +{ + for (Factories::iterator i = factories.begin(); i != factories.end(); ++i) + { + if ((*i)->deleteObject(broker, type, name, properties, userId, connectionId)) return true; + } + return false; +} + +ObjectFactoryRegistry::~ObjectFactoryRegistry() +{ + for (Factories::iterator i = factories.begin(); i != factories.end(); ++i) + { + delete *i; + } + factories.clear(); +} +void ObjectFactoryRegistry::add(ObjectFactory* factory) +{ + factories.push_back(factory); +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/ObjectFactory.h b/qpid/cpp/src/qpid/broker/ObjectFactory.h new file mode 100644 index 0000000000..7a48be3caa --- /dev/null +++ b/qpid/cpp/src/qpid/broker/ObjectFactory.h @@ -0,0 +1,65 @@ +#ifndef QPID_BROKER_OBJECTFACTORY_H +#define QPID_BROKER_OBJECTFACTORY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" +#include <vector> + +namespace qpid { +namespace broker { + +class Broker; + +/** + * An extension point through which plugins can register functionality + * for creating (and deleting) particular types of objects via the + * Broker::createObject() method (or deleteObject()), invoked via management. + */ +class ObjectFactory +{ + public: + virtual bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId) = 0; + virtual bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId) = 0; + virtual ~ObjectFactory() {} + private: +}; + +class ObjectFactoryRegistry : public ObjectFactory +{ + public: + bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + + ~ObjectFactoryRegistry(); + void add(ObjectFactory*); + private: + typedef std::vector<ObjectFactory*> Factories; + Factories factories; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_OBJECTFACTORY_H*/ diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 7bc2c94d1c..6b4f6b3025 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -44,13 +44,7 @@ sys::ConnectionCodec* SecureConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { if (v == ProtocolVersion(0, 10)) { - SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new qpid::amqp_0_10::Connection(out, id, false)); - ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, false)); - i->setSecureConnection(sc.get()); - c->setInputHandler(InputPtr(i.release())); - sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); - return sc.release(); + return create_0_10(out, id, external, false); } else { return broker.getProtocolRegistry().create(v, out, id, external); } @@ -61,9 +55,16 @@ sys::ConnectionCodec* SecureConnectionFactory::create(sys::OutputControl& out, const std::string& id, const SecuritySettings& external) { // used to create connections from one broker to another + return create_0_10(out, id, external, true); +} + +sys::ConnectionCodec* +SecureConnectionFactory::create_0_10(sys::OutputControl& out, const std::string& id, + const SecuritySettings& external, bool brokerActsAsClient) +{ SecureConnectionPtr sc(new SecureConnection()); - CodecPtr c(new qpid::amqp_0_10::Connection(out, id, true)); - ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, true )); + CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient)); + ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, brokerActsAsClient)); i->setSecureConnection(sc.get()); c->setInputHandler(InputPtr(i.release())); sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h index 8a04dfcb15..e64776d5ec 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.h @@ -30,7 +30,7 @@ class Broker; class SecureConnectionFactory : public sys::ConnectionCodec::Factory { public: - SecureConnectionFactory(Broker& b); + SecureConnectionFactory(Broker& b); sys::ConnectionCodec* create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id, @@ -41,6 +41,9 @@ class SecureConnectionFactory : public sys::ConnectionCodec::Factory private: Broker& broker; + + sys::ConnectionCodec* + create_0_10(sys::OutputControl&, const std::string& id, const qpid::sys::SecuritySettings&, bool brokerActsAsClient); }; }} diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 1f135cf931..a83034eb6e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -37,14 +37,15 @@ namespace qpid { namespace broker { namespace amqp { -Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, bool saslInUse) +Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse) : ManagedConnection(b, i), connection(pn_connection()), transport(pn_transport()), - out(o), id(i), broker(b), haveOutput(true) + out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_) { if (pn_transport_bind(transport, connection)) { //error + QPID_LOG(error, "Failed to bind transport to connection: " << getError()); } out.activateOutput(); bool enableTrace(false); @@ -74,13 +75,18 @@ Connection::~Connection() pn_connection_free(connection); } +Interconnects& Connection::getInterconnects() +{ + return interconnects; +} pn_transport_t* Connection::getTransport() { return transport; } size_t Connection::decode(const char* buffer, size_t size) { - QPID_LOG(trace, id << " decode(" << size << ")") + QPID_LOG(trace, id << " decode(" << size << ")"); + if (size == 0) return 0; //TODO: Fix pn_engine_input() to take const buffer ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size); if (n > 0 || n == PN_EOS) { @@ -88,8 +94,13 @@ size_t Connection::decode(const char* buffer, size_t size) //it processed, but can assume none need to be reprocessed so //consider them all read: if (n == PN_EOS) n = size; - QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) - process(); + QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size); + try { + process(); + } catch (const std::exception& e) { + QPID_LOG(error, id << ": " << e.what()); + close(); + } pn_transport_tick(transport, 0); if (!haveOutput) { haveOutput = true; @@ -123,10 +134,15 @@ size_t Connection::encode(char* buffer, size_t size) } bool Connection::canEncode() { - for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { - if (i->second->dispatch()) haveOutput = true; + try { + for (Sessions::iterator i = sessions.begin();i != sessions.end(); ++i) { + if (i->second->dispatch()) haveOutput = true; + } + process(); + } catch (const std::exception& e) { + QPID_LOG(error, id << ": " << e.what()); + close(); } - process(); //TODO: proper handling of time in and out of tick pn_transport_tick(transport, 0); QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) @@ -134,11 +150,16 @@ bool Connection::canEncode() } void Connection::closed() { - //TODO: tear down sessions and associated links for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { i->second->close(); } } +void Connection::close() +{ + closed(); + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); +} bool Connection::isClosed() const { return pn_connection_state(connection) & PN_REMOTE_CLOSED; @@ -191,7 +212,7 @@ void Connection::process() if (pn_link_is_receiver(link)) { Sessions::iterator i = sessions.find(pn_link_session(link)); if (i != sessions.end()) { - i->second->incoming(link, delivery); + i->second->readable(link, delivery); } else { pn_delivery_update(delivery, PN_REJECTED); } @@ -199,7 +220,7 @@ void Connection::process() Sessions::iterator i = sessions.find(pn_link_session(link)); if (i != sessions.end()) { QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); - i->second->outgoing(link, delivery); + i->second->writable(link, delivery); } else { QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); } @@ -238,9 +259,9 @@ std::string Connection::getError() { std::stringstream text; pn_error_t* cerror = pn_connection_error(connection); - if (cerror) text << "connection error " << pn_error_text(cerror); + if (cerror) text << "connection error " << pn_error_text(cerror) << " [" << cerror << "]"; pn_error_t* terror = pn_transport_error(transport); - if (terror) text << "transport error " << pn_error_text(terror); + if (terror) text << "transport error " << pn_error_text(terror) << " [" << terror << "]"; return text.str(); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 8af209af7a..28cf86f123 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -37,6 +37,7 @@ class Broker; namespace amqp { +class Interconnects; class Session; /** * AMQP 1.0 protocol support for broker @@ -44,10 +45,10 @@ class Session; class Connection : public sys::ConnectionCodec, public ManagedConnection { public: - Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse); - ~Connection(); + Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse); + virtual ~Connection(); size_t decode(const char* buffer, size_t size); - size_t encode(char* buffer, size_t size); + virtual size_t encode(char* buffer, size_t size); bool canEncode(); void closed(); @@ -55,7 +56,8 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection framing::ProtocolVersion getVersion() const; pn_transport_t* getTransport(); - private: + Interconnects& getInterconnects(); + protected: typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; pn_transport_t* transport; @@ -64,9 +66,11 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection qpid::broker::Broker& broker; bool haveOutput; Sessions sessions; + Interconnects& interconnects; - void process(); + virtual void process(); std::string getError(); + void close(); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp new file mode 100644 index 0000000000..4b13bfc871 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -0,0 +1,279 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Domain.h" +#include "Interconnect.h" +#include "Interconnects.h" +#include "SaslClient.h" +#include "qpid/broker/Broker.h" +#include "qpid/Exception.h" +#include "qpid/SaslFactory.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/log/Statement.h" +#include "qpid/management/ManagementAgent.h" +#include <boost/shared_ptr.hpp> +#include <boost/lexical_cast.hpp> +#include <boost/enable_shared_from_this.hpp> + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { + +namespace { +const std::string NONE("NONE"); +const std::string SOURCE("source"); +const std::string TARGET("target"); +const std::string URL("url"); +const std::string USERNAME("username"); +const std::string PASSWORD("password"); +const std::string SASL_MECHANISMS("sasl_mechanisms"); +const std::string SASL_SERVICE("sasl_service"); +const std::string MIN_SSF("min_ssf"); +const std::string MAX_SSF("max_ssf"); +class Wrapper : public qpid::sys::ConnectionCodec +{ + public: + Wrapper(boost::shared_ptr<Interconnect> c) : connection(c) {} + ~Wrapper() + { + QPID_LOG(debug, "Wrapper for non-SASL based interconnect has been deleted"); + connection->transportDeleted(); + } + + std::size_t decode(const char* buffer, std::size_t size) + { + return connection->decode(buffer, size); + } + std::size_t encode(char* buffer, std::size_t size) + { + return connection->encode(buffer, size); + } + bool canEncode() + { + return connection->canEncode(); + } + void closed() + { + connection->closed(); + } + bool isClosed() const + { + QPID_LOG(debug, "Wrapper for non_SASL based interconnect " << (connection->isClosed() ? " IS " : " IS NOT ") << " closed"); + return connection->isClosed(); + } + qpid::framing::ProtocolVersion getVersion() const + { + return connection->getVersion(); + } + private: + boost::shared_ptr<Interconnect> connection; +}; + +bool get(std::string& result, const std::string& key, const qpid::types::Variant::Map& map) +{ + qpid::types::Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + result = i->second.asString(); + return true; + } else { + return false; + } +} +bool get(int& result, const std::string& key, const qpid::types::Variant::Map& map) +{ + qpid::types::Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + result = i->second; + return true; + } else { + return false; + } +} +bool get(qpid::Url& url, const std::string& key, const qpid::types::Variant::Map& map) +{ + qpid::types::Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + url = qpid::Url(i->second.asString()); + return true; + } else { + return false; + } +} +} + +class InterconnectFactory : public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory> +{ + public: + InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, Broker&, Interconnects&); + InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target, + Domain&, Broker&, Interconnects&, boost::shared_ptr<Relay>); + qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + bool connect(); + void failed(int, std::string); + private: + bool incoming; + std::string name; + std::string source; + std::string target; + qpid::Url url; + qpid::Url::iterator next; + std::string hostname; + Domain& domain; + Broker& broker; + Interconnects& registry; + qpid::Address address; + boost::shared_ptr<Relay> relay; +}; + +InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, Broker& b, Interconnects& r) + : incoming(i), name(n), url(d.getUrl()), domain(d), broker(b), registry(r) +{ + get(source, SOURCE, properties); + get(target, TARGET, properties); + next = url.begin(); +} + +InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std::string& source_, const std::string& target_, + Domain& d, Broker& b, Interconnects& r, boost::shared_ptr<Relay> relay_) + : incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), broker(b), registry(r), relay(relay_) +{ + next = url.begin(); +} + +qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) +{ + throw qpid::Exception("Not implemented!"); +} +qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t) +{ + bool useSasl = domain.getMechanisms() != NONE; + boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, broker, true, incoming, name, source, target, domain, registry)); + if (!relay) registry.add(name, connection); + else connection->setRelay(relay); + + std::auto_ptr<qpid::sys::ConnectionCodec> codec; + if (useSasl) { + QPID_LOG(info, "Using AMQP 1.0 (with SASL layer) on connect"); + codec = std::auto_ptr<qpid::sys::ConnectionCodec>(new qpid::broker::amqp::SaslClient(out, id, connection, domain.sasl(hostname), hostname, domain.getMechanisms(), t)); + } else { + QPID_LOG(info, "Using AMQP 1.0 (no SASL layer) on connect"); + codec = std::auto_ptr<qpid::sys::ConnectionCodec>(new Wrapper(connection)); + } + domain.removePending(shared_from_this());//(TODO: add support for retry on connection failure) + return codec.release(); +} + +bool InterconnectFactory::connect() +{ + if (next == url.end()) return false; + address = *next; + next++; + hostname = address.host; + QPID_LOG (info, "Inter-broker connection initiated (" << address << ")"); + broker.connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2)); + return true; +} + +void InterconnectFactory::failed(int, std::string text) +{ + QPID_LOG (info, "Inter-broker connection failed (" << address << "): " << text); + if (!connect()) { + domain.removePending(shared_from_this());//give up (TODO: add support for periodic retry) + } +} + +Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties, Broker& b) + : name(n), durable(false), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent()) +{ + if (!get(url, URL, properties)) { + QPID_LOG(error, "No URL specified for domain " << name << "!"); + throw qpid::Exception("A url is required for a domain!"); + } else { + QPID_LOG(notice, "Created domain " << name << " with url " << url << " from " << properties); + } + //TODO: durable + get(username, USERNAME, properties); + get(password, PASSWORD, properties); + get(mechanisms, SASL_MECHANISMS, properties); + get(service, SASL_SERVICE, properties); + get(minSsf, MIN_SSF, properties); + get(maxSsf, MAX_SSF, properties); + if (agent != 0) { + domain = _qmf::Domain::shared_ptr(new _qmf::Domain(agent, this, name, durable)); + domain->set_url(url.str()); + domain->set_username(username); + domain->set_password(password); + domain->set_mechanisms(mechanisms); + agent->addObject(domain); + } +} + +boost::shared_ptr<qpid::management::ManagementObject> Domain::GetManagementObject() const +{ + return domain; +} + +const std::string& Domain::getMechanisms() const +{ + return mechanisms; +} + +qpid::Url Domain::getUrl() const +{ + return url; +} + +std::auto_ptr<qpid::Sasl> Domain::sasl(const std::string& hostname) +{ + return qpid::SaslFactory::getInstance().create(username, password, service, hostname, minSsf, maxSsf, false); +} + +void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects& registry) +{ + boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, broker, registry)); + factory->connect(); + addPending(factory); +} + +void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects& registry, boost::shared_ptr<Relay> relay) +{ + boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, broker, registry, relay)); + factory->connect(); + addPending(factory); +} + +void Domain::addPending(boost::shared_ptr<InterconnectFactory> f) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + pending.insert(f); +} + +void Domain::removePending(boost::shared_ptr<InterconnectFactory> f) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + pending.erase(f); +} + + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.h b/qpid/cpp/src/qpid/broker/amqp/Domain.h new file mode 100644 index 0000000000..ccbee6341e --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.h @@ -0,0 +1,78 @@ +#ifndef QPID_BROKER_AMQP_DOMAIN_H +#define QPID_BROKER_AMQP_DOMAIN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/types/Variant.h" +#include "qpid/Url.h" +#include "qpid/Version.h" +#include "qpid/management/Manageable.h" +#include "qpid/sys/Mutex.h" +#include "qmf/org/apache/qpid/broker/Domain.h" +#include <boost/shared_ptr.hpp> +#include <memory> +#include <set> + +namespace qpid { +class Sasl; +namespace management { +class ManagementAgent; +class ManagementObject; +} +namespace broker { +class Broker; +namespace amqp { +class InterconnectFactory; +class Interconnects; +class Relay; + +class Domain : public qpid::management::Manageable +{ + public: + Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&); + void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&); + void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>); + std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname); + const std::string& getMechanisms() const; + qpid::Url getUrl() const; + void addPending(boost::shared_ptr<InterconnectFactory>); + void removePending(boost::shared_ptr<InterconnectFactory>); + boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const; + private: + std::string name; + bool durable; + Broker& broker; + qpid::Url url; + std::string username; + std::string password; + std::string mechanisms; + std::string service; + int minSsf; + int maxSsf; + boost::shared_ptr<qmf::org::apache::qpid::broker::Domain> domain; + qpid::management::ManagementAgent* agent; + std::set< boost::shared_ptr<InterconnectFactory> > pending; + qpid::sys::Mutex lock; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_DOMAIN_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp new file mode 100644 index 0000000000..9616b267bd --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Incoming.h" +#include "Message.h" +#include "Session.h" +#include "qpid/broker/AsyncCompletion.h" +#include "qpid/broker/Message.h" + +namespace qpid { +namespace broker { +namespace amqp { +Incoming::Incoming(pn_link_t* l, Broker& broker, Session& parent, const std::string& target, const std::string& name) + : ManagedIncomingLink(broker, parent, target, name), credit(100), window(0), link(l), session(parent) {} + + +Incoming::~Incoming() {} +bool Incoming::doWork() +{ + uint32_t c = getCredit(); + bool issue = window < c; + if (issue) { + pn_link_flow(link, c - window); + window = c; + } + return issue; +} +bool Incoming::haveWork() +{ + return window <= (getCredit()/2); +} + +uint32_t Incoming::getCredit() +{ + return credit;//TODO: proper flow control +} + +void Incoming::detached() +{ +} + +void Incoming::wakeup() +{ + session.wakeup(); +} +namespace { + class Transfer : public qpid::broker::AsyncCompletion::Callback + { + public: + Transfer(pn_delivery_t* d, boost::shared_ptr<Session> s) : delivery(d), session(s) {} + void completed(bool sync) { session->accepted(delivery, sync); } + boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone() + { + boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session)); + return copy; + } + private: + pn_delivery_t* delivery; + boost::shared_ptr<Session> session; + }; +} + +DecodingIncoming::DecodingIncoming(pn_link_t* link, Broker& broker, Session& parent, const std::string& target, const std::string& name) + : Incoming(link, broker, parent, target, name), session(parent.shared_from_this()) {} +DecodingIncoming::~DecodingIncoming() {} + +void DecodingIncoming::readable(pn_delivery_t* delivery) +{ + boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery))); + /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize()); + received->scan(); + pn_link_advance(link); + + qpid::broker::Message message(received, received); + + handle(message); + --window; + received->begin(); + Transfer t(delivery, session); + received->end(t); +} +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h new file mode 100644 index 0000000000..cab023c7c1 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -0,0 +1,68 @@ +#ifndef QPID_BROKER_AMQP_INCOMING_H +#define QPID_BROKER_AMQP_INCOMING_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Message.h" +#include "ManagedIncomingLink.h" +extern "C" { +#include <proton/engine.h> +} +#include <boost/intrusive_ptr.hpp> +namespace qpid { +namespace broker { +class Broker; +class Message; +namespace amqp { +class Session; + +class Incoming : public ManagedIncomingLink +{ + public: + Incoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name); + virtual ~Incoming(); + virtual bool doWork();//do anything that requires output + virtual bool haveWork();//called when handling input to see whether any output work is needed + virtual void detached(); + virtual void readable(pn_delivery_t* delivery) = 0; + void wakeup(); + protected: + const uint32_t credit; + uint32_t window; + pn_link_t* link; + Session& session; + virtual uint32_t getCredit(); +}; + +class DecodingIncoming : public Incoming +{ + public: + DecodingIncoming(pn_link_t*, Broker& broker, Session& parent, const std::string& target, const std::string& name); + virtual ~DecodingIncoming(); + void readable(pn_delivery_t* delivery); + virtual void handle(qpid::broker::Message&) = 0; + private: + boost::shared_ptr<Session> session; +}; + +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_INCOMING_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp new file mode 100644 index 0000000000..082715b1b2 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Interconnect.h" +#include "Interconnects.h" +#include "Connection.h" +#include "SaslClient.h" +#include "Session.h" +#include "qpid/broker/Broker.h" +#include "qpid/Exception.h" +#include "qpid/SaslFactory.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> +extern "C" { +#include <proton/engine.h> +#include <proton/error.h> +} + +namespace qpid { +namespace broker { +namespace amqp { + +Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse, + bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r) + : Connection(out, id, broker, r, saslInUse), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(false), + closeRequested(false), isTransportDeleted(false) +{} + +Interconnect::~Interconnect() +{ + QPID_LOG(notice, "Interconnect deleted"); +} + +namespace { +const pn_state_t UNINIT = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; +const size_t PROTOCOL_HEADER_LENGTH(8); +} + +size_t Interconnect::encode(char* buffer, size_t size) +{ + if (headerDiscarded) { + return Connection::encode(buffer, size); + } else { + //The IO 'layer' will write in a protocol header when an + //'outgoing' connection is established. However the proton + //potocol engine will also emit one. One needs to be + //discarded, here we discard the one the engine emits for + //interconnects. + headerDiscarded = true; + size_t encoded = Connection::encode(buffer, size); + assert(encoded >= PROTOCOL_HEADER_LENGTH);//we never encode part of protocol header + //discard first eight bytes + ::memmove(buffer, buffer + PROTOCOL_HEADER_LENGTH, encoded - PROTOCOL_HEADER_LENGTH); + return encoded - PROTOCOL_HEADER_LENGTH; + } +} + +void Interconnect::process() +{ + QPID_LOG(trace, id << " processing interconnect"); + if (closeRequested) { + close(); + } else { + if ((pn_connection_state(connection) & UNINIT) == UNINIT) { + QPID_LOG_CAT(debug, model, id << " interconnect opened"); + pn_connection_set_container(connection, broker.getFederationTag().c_str()); + pn_connection_open(connection); + + pn_session_t* s = pn_session(connection); + pn_session_open(s); + boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out)); + sessions[s] = ssn; + + pn_link_t* l = incoming ? pn_receiver(s, name.c_str()) : pn_sender(s, name.c_str()); + pn_link_open(l); + ssn->attach(l, source, target, relay); + } + Connection::process(); + } +} + +void Interconnect::setRelay(boost::shared_ptr<Relay> r) +{ + relay = r; +} + +void Interconnect::deletedFromRegistry() +{ + closeRequested = true; + if (!isTransportDeleted) out.activateOutput(); +} + +void Interconnect::transportDeleted() +{ + isTransportDeleted = true; + registry.remove(name); +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h new file mode 100644 index 0000000000..230abbc667 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h @@ -0,0 +1,63 @@ +#ifndef QPID_BROKER_AMQP_INTERCONNECT_H +#define QPID_BROKER_AMQP_INTERCONNECT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Connection.h" + +namespace qpid { +struct Address; +namespace broker { +namespace amqp { +class Domain; +class Interconnects; +class Relay; + +/** + * + */ +class Interconnect : public Connection +{ + public: + Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse, + bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&, Interconnects&); + void setRelay(boost::shared_ptr<Relay>); + ~Interconnect(); + size_t encode(char* buffer, size_t size); + void deletedFromRegistry(); + void transportDeleted(); + private: + bool incoming; + std::string name; + std::string source; + std::string target; + Domain& domain; + Interconnects& registry; + bool headerDiscarded; + boost::shared_ptr<Relay> relay; + bool closeRequested; + bool isTransportDeleted; + + void process(); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_INTERCONNECT_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp new file mode 100644 index 0000000000..cbdf0da3ef --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Interconnects.h" +#include "Interconnect.h" +#include "Connection.h" +#include "Domain.h" +#include "SaslClient.h" +#include "qpid/broker/Broker.h" +#include "qpid/Exception.h" +#include "qpid/SaslFactory.h" +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +namespace amqp { + +namespace { +const std::string INCOMING_TYPE("incoming"); +const std::string OUTGOING_TYPE("outgoing"); +const std::string DOMAIN_TYPE("domain"); +} + +bool Interconnects::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + if (type == DOMAIN_TYPE) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + DomainMap::iterator i = domains.find(name); + if (i == domains.end()) { + boost::shared_ptr<Domain> domain(new Domain(name, properties, broker)); + domains[name] = domain; + return true; + } else { + return false; + } + } else if (type == INCOMING_TYPE || type == OUTGOING_TYPE) { + QPID_LOG(notice, "Creating interconnect " << name << ", " << properties); + boost::shared_ptr<Domain> domain; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + qpid::types::Variant::Map::const_iterator p = properties.find(DOMAIN_TYPE); + if (p != properties.end()) { + std::string domainName = p->second; + DomainMap::iterator i = domains.find(domainName); + if (i != domains.end()) { + domain = i->second; + } else { + throw qpid::Exception(QPID_MSG("No such domain: " << domainName)); + } + } else { + throw qpid::Exception(QPID_MSG("Domain must be specified")); + } + } + domain->connect(type == INCOMING_TYPE, name, properties, *this); + return true; + } else { + return false; + } +} +bool Interconnects::deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + if (type == DOMAIN_TYPE) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + DomainMap::iterator i = domains.find(name); + if (i != domains.end()) { + domains.erase(i); + return true; + } else { + return false; + } + } else if (type == INCOMING_TYPE || type == OUTGOING_TYPE) { + boost::shared_ptr<Interconnect> interconnect; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + InterconnectMap::iterator i = interconnects.find(name); + if (i != interconnects.end()) { + interconnect = i->second; + interconnects.erase(i); + } else { + throw qpid::Exception(QPID_MSG("No such interconnection: " << name)); + } + } + if (interconnect) interconnect->deletedFromRegistry(); + return true; + } else { + return false; + } +} + + +bool Interconnects::add(const std::string& name, boost::shared_ptr<Interconnect> connection) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + InterconnectMap::iterator i = interconnects.find(name); + if (i == interconnects.end()) { + interconnects[name] = connection; + return true; + } else return false; +} +boost::shared_ptr<Interconnect> Interconnects::get(const std::string& name) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + InterconnectMap::const_iterator i = interconnects.find(name); + if (i != interconnects.end()) return i->second; + else return boost::shared_ptr<Interconnect>(); +} +bool Interconnects::remove(const std::string& name) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + InterconnectMap::iterator i = interconnects.find(name); + if (i != interconnects.end()) { + interconnects.erase(i); + return true; + } else return false; +} + +boost::shared_ptr<Domain> Interconnects::findDomain(const std::string& name) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + DomainMap::iterator i = domains.find(name); + if (i == domains.end()) { + return boost::shared_ptr<Domain>(); + } else { + return i->second; + } + +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.h b/qpid/cpp/src/qpid/broker/amqp/Interconnects.h new file mode 100644 index 0000000000..4a7263c8f5 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.h @@ -0,0 +1,62 @@ +#ifndef QPID_BROKER_AMQP_INTERCONNECTS_H +#define QPID_BROKER_AMQP_INTERCONNECTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ObjectFactory.h" +#include "qpid/sys/Mutex.h" +#include <string> +#include <map> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +namespace amqp { + +class Domain; +class Interconnect; +/** + * + */ +class Interconnects : public ObjectFactory +{ + public: + bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + + + bool add(const std::string&, boost::shared_ptr<Interconnect>); + boost::shared_ptr<Interconnect> get(const std::string&); + bool remove(const std::string&); + + boost::shared_ptr<Domain> findDomain(const std::string&); + private: + typedef std::map<std::string, boost::shared_ptr<Interconnect> > InterconnectMap; + typedef std::map<std::string, boost::shared_ptr<Domain> > DomainMap; + InterconnectMap interconnects; + DomainMap domains; + qpid::sys::Mutex lock; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_INTERCONNECTS_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp new file mode 100644 index 0000000000..8817e410ce --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ManagedIncomingLink.h" +#include "qpid/broker/amqp/ManagedSession.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/log/Statement.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { + +ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, const std::string& target, const std::string& _name) + : parent(p), name(_name) +{ + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent) { + incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, target, _name)); + agent->addObject(incoming); + } +} +ManagedIncomingLink::~ManagedIncomingLink() +{ + if (incoming != 0) incoming->resourceDestroy(); +} + +qpid::management::ManagementObject::shared_ptr ManagedIncomingLink::GetManagementObject() const +{ + return incoming; +} + +void ManagedIncomingLink::incomingMessageReceived() +{ + if (incoming) { incoming->inc_transfers(); } +} +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h new file mode 100644 index 0000000000..5b7db7997c --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.h @@ -0,0 +1,50 @@ +#ifndef QPID_BROKER_AMQP_MANAGEDINCOMINGLINK_H +#define QPID_BROKER_AMQP_MANAGEDINCOMINGLINK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Incoming.h" + +namespace qpid { +namespace management { +class ManagementObject; +} +namespace broker { +class Broker; +namespace amqp { +class ManagedSession; + +class ManagedIncomingLink : public qpid::management::Manageable +{ + public: + ManagedIncomingLink(Broker& broker, ManagedSession& parent, const std::string& target, const std::string& name); + virtual ~ManagedIncomingLink(); + qpid::management::ManagementObject::shared_ptr GetManagementObject() const; + void incomingMessageReceived(); + private: + ManagedSession& parent; + const std::string name; + qmf::org::apache::qpid::broker::Incoming::shared_ptr incoming; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_MANAGEDINCOMINGLINK_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp index f36a1e8da4..53e49d2bca 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -21,8 +21,6 @@ #include "ManagedOutgoingLink.h" #include "qpid/broker/amqp/ManagedSession.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/types/Variant.h" #include "qpid/management/ManagementAgent.h" #include "qpid/log/Statement.h" @@ -32,30 +30,28 @@ namespace qpid { namespace broker { namespace amqp { -ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, Queue& q, ManagedSession& p, const std::string i, bool topic) - : parent(p), id(i) +ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, const std::string& source, const std::string& _name) + : parent(p), name(_name) { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - subscription = _qmf::Subscription::shared_ptr(new _qmf::Subscription(agent, this, &p, q.GetManagementObject()->getObjectId(), id, - false/*FIXME*/, true/*FIXME*/, topic, qpid::types::Variant::Map())); - agent->addObject(subscription); - subscription->set_creditMode("n/a"); + outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, source, _name)); + agent->addObject(outgoing); } } ManagedOutgoingLink::~ManagedOutgoingLink() { - if (subscription != 0) subscription->resourceDestroy(); + if (outgoing != 0) outgoing->resourceDestroy(); } qpid::management::ManagementObject::shared_ptr ManagedOutgoingLink::GetManagementObject() const { - return subscription; + return outgoing; } void ManagedOutgoingLink::outgoingMessageSent() { - if (subscription) { subscription->inc_delivered(); } + if (outgoing) { outgoing->inc_transfers(); } parent.outgoingMessageSent(); } void ManagedOutgoingLink::outgoingMessageAccepted() diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h index 20a1095db2..61d0b9c3a0 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.h @@ -22,7 +22,7 @@ * */ #include "qpid/management/Manageable.h" -#include "qmf/org/apache/qpid/broker/Subscription.h" +#include "qmf/org/apache/qpid/broker/Outgoing.h" namespace qpid { namespace management { @@ -30,14 +30,13 @@ class ManagementObject; } namespace broker { class Broker; -class Queue; namespace amqp { class ManagedSession; class ManagedOutgoingLink : public qpid::management::Manageable { public: - ManagedOutgoingLink(Broker& broker, Queue&, ManagedSession& parent, const std::string id, bool topic); + ManagedOutgoingLink(Broker& broker, ManagedSession& parent, const std::string& source, const std::string& name); virtual ~ManagedOutgoingLink(); qpid::management::ManagementObject::shared_ptr GetManagementObject() const; void outgoingMessageSent(); @@ -45,8 +44,8 @@ class ManagedOutgoingLink : public qpid::management::Manageable void outgoingMessageRejected(); private: ManagedSession& parent; - const std::string id; - qmf::org::apache::qpid::broker::Subscription::shared_ptr subscription; + const std::string name; + qmf::org::apache::qpid::broker::Outgoing::shared_ptr outgoing; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index eb0a6e20aa..e531e8cd20 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/amqp/Header.h" +#include "qpid/broker/amqp/Session.h" #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Selector.h" @@ -32,9 +33,16 @@ namespace qpid { namespace broker { namespace amqp { -Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession& session, qpid::sys::OutputControl& o, bool topic) - : Consumer(pn_link_name(l), /*FIXME*/CONSUMER), - ManagedOutgoingLink(broker, *q, session, pn_link_name(l), topic), +Outgoing::Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name) : ManagedOutgoingLink(broker, parent, source, name), session(parent) {} + +void Outgoing::wakeup() +{ + session.wakeup(); +} + +OutgoingFromQueue::OutgoingFromQueue(Broker& broker, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session& session, qpid::sys::OutputControl& o, bool topic) + : Outgoing(broker, session, source, pn_link_name(l)), + Consumer(pn_link_name(l), /*FIXME*/CONSUMER), exclusive(topic), queue(q), deliveries(5000), link(l), out(o), current(0), outstanding(0), @@ -45,12 +53,12 @@ Outgoing::Outgoing(Broker& broker, boost::shared_ptr<Queue> q, pn_link_t* l, Man } } -void Outgoing::init() +void OutgoingFromQueue::init() { queue->consume(shared_from_this(), exclusive);//may throw exception } -bool Outgoing::dispatch() +bool OutgoingFromQueue::doWork() { QPID_LOG(trace, "Dispatching to " << getName() << ": " << pn_link_credit(link)); if (canDeliver()) { @@ -66,12 +74,12 @@ bool Outgoing::dispatch() return false; } -void Outgoing::write(const char* data, size_t size) +void OutgoingFromQueue::write(const char* data, size_t size) { pn_link_send(link, data, size); } -void Outgoing::handle(pn_delivery_t* delivery) +void OutgoingFromQueue::handle(pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); size_t i = *reinterpret_cast<const size_t*>(tag.bytes); @@ -126,12 +134,12 @@ void Outgoing::handle(pn_delivery_t* delivery) } } -bool Outgoing::canDeliver() +bool OutgoingFromQueue::canDeliver() { return deliveries[current].delivery == 0 && pn_link_credit(link) > outstanding; } -void Outgoing::detached() +void OutgoingFromQueue::detached() { QPID_LOG(debug, "Detaching outgoing link from " << queue->getName()); queue->cancel(shared_from_this()); @@ -143,7 +151,7 @@ void Outgoing::detached() } //Consumer interface: -bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg) +bool OutgoingFromQueue::deliver(const QueueCursor& cursor, const qpid::broker::Message& msg) { Record& r = deliveries[current++]; if (current >= deliveries.capacity()) current = 0; @@ -155,23 +163,23 @@ bool Outgoing::deliver(const QueueCursor& cursor, const qpid::broker::Message& m return true; } -void Outgoing::notify() +void OutgoingFromQueue::notify() { QPID_LOG(trace, "Notification received for " << queue->getName()); out.activateOutput(); } -bool Outgoing::accept(const qpid::broker::Message&) +bool OutgoingFromQueue::accept(const qpid::broker::Message&) { return true; } -void Outgoing::setSubjectFilter(const std::string& f) +void OutgoingFromQueue::setSubjectFilter(const std::string& f) { subjectFilter = f; } -void Outgoing::setSelectorFilter(const std::string& f) +void OutgoingFromQueue::setSelectorFilter(const std::string& f) { selector.reset(new Selector(f)); } @@ -217,29 +225,29 @@ bool match(const std::string& filter, const std::string& target) } } -bool Outgoing::filter(const qpid::broker::Message& m) +bool OutgoingFromQueue::filter(const qpid::broker::Message& m) { return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey())) && (!selector || selector->filter(m)); } -void Outgoing::cancel() {} +void OutgoingFromQueue::cancel() {} -void Outgoing::acknowledged(const qpid::broker::DeliveryRecord&) {} +void OutgoingFromQueue::acknowledged(const qpid::broker::DeliveryRecord&) {} -qpid::broker::OwnershipToken* Outgoing::getSession() +qpid::broker::OwnershipToken* OutgoingFromQueue::getSession() { return 0; } -Outgoing::Record::Record() : delivery(0), disposition(0), index(0) {} -void Outgoing::Record::init(size_t i) +OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) {} +void OutgoingFromQueue::Record::init(size_t i) { index = i; tag.bytes = reinterpret_cast<const char*>(&index); tag.size = sizeof(index); } -void Outgoing::Record::reset() +void OutgoingFromQueue::Record::reset() { cursor = QueueCursor(); msg = qpid::broker::Message(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 7d845a1427..b8a689b8f8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -41,7 +41,7 @@ class Broker; class Queue; class Selector; namespace amqp { -class ManagedSession; +class Session; template <class T> class CircularArray { @@ -56,17 +56,43 @@ class CircularArray size_t next; }; +class Outgoing : public ManagedOutgoingLink +{ + public: + Outgoing(Broker& broker, Session& parent, const std::string& source, const std::string& name); + virtual void setSubjectFilter(const std::string&) = 0; + virtual void setSelectorFilter(const std::string&) = 0; + virtual void init() = 0; + /** + * Allows the link to initiate any outgoing transfers + */ + virtual bool doWork() = 0; + /** + * Signals that this link has been detached + */ + virtual void detached() = 0; + /** + * Called when a delivery is writable + */ + virtual void handle(pn_delivery_t* delivery) = 0; + void wakeup(); + virtual ~Outgoing() {} + private: + Session& session; +}; + /** - * + * Logic for handling an outgoing link from a queue (even if it is a + * subscription pseduo-queue created by the broker) */ -class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from_this<Outgoing>, public ManagedOutgoingLink +class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public boost::enable_shared_from_this<OutgoingFromQueue> { public: - Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic); + OutgoingFromQueue(Broker&, const std::string& source, boost::shared_ptr<Queue> q, pn_link_t* l, Session&, qpid::sys::OutputControl& o, bool topic); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); void init(); - bool dispatch(); + bool doWork(); void write(const char* data, size_t size); void handle(pn_delivery_t* delivery); bool canDeliver(); diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index 711592257c..0e622f8d20 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -27,6 +27,7 @@ #include "qpid/broker/RecoverableMessage.h" #include "qpid/broker/RecoverableMessageImpl.h" #include "qpid/broker/amqp/Connection.h" +#include "qpid/broker/amqp/Interconnects.h" #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/Sasl.h" #include "qpid/broker/amqp/Translation.h" @@ -43,11 +44,15 @@ namespace amqp { class ProtocolImpl : public Protocol { public: - ProtocolImpl(Broker& b) : broker(b) {} + ProtocolImpl(Interconnects* i, Broker& b) : interconnects(i), broker(b) + { + broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown + } qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&); boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); private: + Interconnects* interconnects; Broker& broker; }; @@ -58,7 +63,8 @@ struct ProtocolPlugin : public Plugin //need to register protocol before recovery from store broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target); if (broker) { - broker->getProtocolRegistry().add("AMQP 1.0", new ProtocolImpl(*broker)); + ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker); + broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown } } @@ -73,18 +79,18 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) { if (broker.getOptions().auth) { QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)"); - return new qpid::broker::amqp::Sasl(out, id, broker, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external)); + return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm, broker.getOptions().requireEncrypted, external)); } else { std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm)); QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)"); - return new qpid::broker::amqp::Sasl(out, id, broker, authenticator); + return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator); } } else { if (broker.getOptions().auth) { throw qpid::Exception("SASL layer required!"); } else { QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)"); - return new qpid::broker::amqp::Connection(out, id, broker, false); + return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false); } } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp new file mode 100644 index 0000000000..48a629a66e --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -0,0 +1,278 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Relay.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include <algorithm> +#include <string.h> + +namespace qpid { +namespace broker { +namespace amqp { + +Relay::Relay(size_t max_) : credit(0), max(max_), current(0), isDetached(false), out(0), in(0) {} +void Relay::check() +{ + if (isDetached) throw qpid::Exception("other end of relay has been detached"); +} +bool Relay::send(pn_link_t* link) +{ + BufferedTransfer* c(0); + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + if (current < buffer.size()) { + c = &buffer[current++]; + } else { + return false; + } + } + c->initOut(link); + return true; +} + +BufferedTransfer& Relay::push() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + buffer.push_back(BufferedTransfer()); + return buffer.back(); +} + +void Relay::received(pn_link_t* link, pn_delivery_t* delivery) +{ + BufferedTransfer& received = push(); + received.initIn(link, delivery); + if (out) out->wakeup(); +} +size_t Relay::size() const +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + return buffer.size(); +} +BufferedTransfer& Relay::head() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + return buffer.front(); +} +void Relay::pop() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + buffer.pop_front(); + if (current) --current; +} +void Relay::setCredit(int c) +{ + credit = c; + if (in) in->wakeup(); +} + +int Relay::getCredit() const +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + return std::min(credit - size(), max); +} +void Relay::attached(Outgoing* o) +{ + out = o; +} +void Relay::attached(Incoming* i) +{ + in = i; +} +void Relay::detached(Outgoing*) +{ + out = 0; + isDetached = true; + if (in) in->wakeup(); +} +void Relay::detached(Incoming*) +{ + in = 0; + isDetached = true; + if (out) out->wakeup(); +} + +OutgoingFromRelay::OutgoingFromRelay(pn_link_t* l, Broker& broker, Session& parent, const std::string& source, + const std::string& name_, boost::shared_ptr<Relay> r) + : Outgoing(broker, parent, source, name_), name(name_), link(l), relay(r) {} +/** + * Allows the link to initiate any outgoing transfers + */ +bool OutgoingFromRelay::doWork() +{ + relay->check(); + relay->setCredit(pn_link_credit(link)); + return relay->send(link); +} +/** + * Called when a delivery is writable + */ +void OutgoingFromRelay::handle(pn_delivery_t* delivery) +{ + void* context = pn_delivery_get_context(delivery); + BufferedTransfer* transfer = reinterpret_cast<BufferedTransfer*>(context); + assert(transfer); + if (pn_delivery_writable(delivery)) { + if (transfer->write(link)) { + outgoingMessageSent(); + QPID_LOG(debug, "Sent relayed message " << name); + } else { + QPID_LOG(error, "Failed to send relayed message " << name); + } + } + if (pn_delivery_updated(delivery)) { + pn_disposition_t d = transfer->updated(); + switch (d) { + case PN_ACCEPTED: + outgoingMessageAccepted(); + break; + case PN_REJECTED: + case PN_RELEASED://TODO: not quite true... + case PN_MODIFIED://TODO: not quite true... + outgoingMessageRejected(); + break; + default: + QPID_LOG(warning, "Unhandled disposition: " << d); + } + } +} +/** + * Signals that this link has been detached + */ +void OutgoingFromRelay::detached() +{ + relay->detached(this); +} +void OutgoingFromRelay::init() +{ + relay->attached(this); +} +void OutgoingFromRelay::setSubjectFilter(const std::string&) +{ + //TODO +} +void OutgoingFromRelay::setSelectorFilter(const std::string&) +{ + //TODO +} + +IncomingToRelay::IncomingToRelay(pn_link_t* link, Broker& broker, Session& parent, const std::string& target, + const std::string& name, boost::shared_ptr<Relay> r) + : Incoming(link, broker, parent, target, name), relay(r) +{ + relay->attached(this); +} +bool IncomingToRelay::settle() +{ + bool result(false); + while (relay->size() && relay->head().settle()) { + result = true; + relay->pop(); + } + return result; +} +bool IncomingToRelay::doWork() +{ + relay->check(); + bool work(false); + if (settle()) work = true; + if (Incoming::doWork()) work = true; + return work; +} +bool IncomingToRelay::haveWork() +{ + bool work(false); + if (settle()) work = true; + if (Incoming::haveWork()) work = true; + return work; +} +void IncomingToRelay::readable(pn_delivery_t* delivery) +{ + relay->received(link, delivery); + --window; +} + +uint32_t IncomingToRelay::getCredit() +{ + return relay->getCredit(); +} + +void IncomingToRelay::detached() +{ + relay->detached(this); +} + +void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d) +{ + in.handle = d; + //read in data + data.resize(pn_delivery_pending(d)); + /*ssize_t read = */pn_link_recv(link, &data[0], data.size()); + pn_link_advance(link); + + //copy delivery tag + pn_delivery_tag_t dt = pn_delivery_tag(d); + tag.resize(dt.size); + ::memmove(&tag[0], dt.bytes, dt.size); + + //set context + pn_delivery_set_context(d, this); + +} + +bool BufferedTransfer::settle() +{ + if (out.settled && !in.settled) { + pn_delivery_update(in.handle, disposition); + pn_delivery_settle(in.handle); + in.settled = true; + } + return out.settled && in.settled; +} + +void BufferedTransfer::initOut(pn_link_t* link) +{ + pn_delivery_tag_t dt; + dt.bytes = &tag[0]; + dt.size = tag.size(); + out.handle = pn_delivery(link, dt); + //set context + pn_delivery_set_context(out.handle, this); +} + +pn_disposition_t BufferedTransfer::updated() +{ + disposition = pn_delivery_remote_state(out.handle); + if (disposition) { + pn_delivery_settle(out.handle); + out.settled = true; + } + return disposition; +} + +bool BufferedTransfer::write(pn_link_t* link) +{ + pn_link_send(link, &data[0], data.size()); + return pn_link_advance(link); +} +Delivery::Delivery() : settled(false), handle(0) {} +Delivery::Delivery(pn_delivery_t* d) : settled(false), handle(d) {} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h new file mode 100644 index 0000000000..0b20b563d4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h @@ -0,0 +1,128 @@ +#ifndef QPID_BROKER_AMQP_RELAY_H +#define QPID_BROKER_AMQP_RELAY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Incoming.h" +#include "Outgoing.h" +#include "qpid/sys/Mutex.h" +extern "C" { +#include <proton/engine.h> +} +#include <deque> + +namespace qpid { +namespace broker { +namespace amqp { + +struct Delivery +{ + bool settled; + pn_delivery_t* handle; + + Delivery(); + Delivery(pn_delivery_t* d); +}; + +class BufferedTransfer +{ + public: + void initIn(pn_link_t* link, pn_delivery_t* d); + bool settle(); + void initOut(pn_link_t* link); + pn_disposition_t updated(); + bool write(pn_link_t*); + private: + std::vector<char> data; + Delivery in; + Delivery out; + pn_delivery_tag_t dt; + std::vector<char> tag; + pn_disposition_t disposition; +}; + +/** + * + */ +class Relay +{ + public: + Relay(size_t max); + void check(); + size_t size() const; + BufferedTransfer& head(); + void pop(); + bool send(pn_link_t*); + void received(pn_link_t* link, pn_delivery_t* delivery); + int getCredit() const; + void setCredit(int); + void attached(Outgoing*); + void attached(Incoming*); + void detached(Outgoing*); + void detached(Incoming*); + private: + std::deque<BufferedTransfer> buffer;//TODO: optimise by replacing with simple circular array + int credit;//issued by outgoing peer, decremented everytime we send a message on outgoing link + size_t max; + size_t current; + bool isDetached; + Outgoing* out; + Incoming* in; + mutable qpid::sys::Mutex lock; + + BufferedTransfer& push(); +}; + +class OutgoingFromRelay : public Outgoing +{ + public: + OutgoingFromRelay(pn_link_t*, Broker&, Session&, const std::string& source, + const std::string& name, boost::shared_ptr<Relay>); + bool doWork(); + void handle(pn_delivery_t* delivery); + void detached(); + void init(); + void setSubjectFilter(const std::string&); + void setSelectorFilter(const std::string&); + private: + const std::string name; + pn_link_t* link; + boost::shared_ptr<Relay> relay; +}; + +class IncomingToRelay : public Incoming +{ + public: + IncomingToRelay(pn_link_t*, Broker&, Session&, const std::string& target, + const std::string& name, boost::shared_ptr<Relay> r); + bool settle(); + bool doWork(); + bool haveWork(); + void detached(); + void readable(pn_delivery_t* delivery); + uint32_t getCredit(); + private: + boost::shared_ptr<Relay> relay; +}; + +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_RELAY_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp index 4b89e7b15d..d8e12fcfdd 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp @@ -31,8 +31,8 @@ namespace qpid { namespace broker { namespace amqp { -Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> auth) - : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, true), +Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth) + : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true), authenticator(auth), state(INCOMPLETE), writeHeader(true), haveOutput(true) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.h b/qpid/cpp/src/qpid/broker/amqp/Sasl.h index 079128be02..7718b4c43a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.h +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.h @@ -32,7 +32,6 @@ class SecurityLayer; } namespace broker { namespace amqp { - /** * An AMQP 1.0 SASL Security Layer for authentication and optionally * encryption. @@ -40,7 +39,7 @@ namespace amqp { class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer { public: - Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, std::auto_ptr<qpid::SaslServer> authenticator); + Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator); ~Sasl(); size_t decode(const char* buffer, size_t size); diff --git a/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp b/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp new file mode 100644 index 0000000000..7e03ae9450 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/SaslClient.cpp @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "SaslClient.h" +#include "Interconnect.h" +#include "qpid/broker/Broker.h" +#include "qpid/sys/OutputControl.h" +#include "qpid/sys/SecurityLayer.h" +#include "qpid/log/Statement.h" +#include "qpid/Sasl.h" +#include "qpid/SaslFactory.h" +#include "qpid/StringUtils.h" +#include <sstream> + +namespace qpid { +namespace broker { +namespace amqp { + +SaslClient::SaslClient(qpid::sys::OutputControl& out_, const std::string& id, boost::shared_ptr<Interconnect> c, std::auto_ptr<qpid::Sasl> s, + const std::string& hostname_, const std::string& mechs, const qpid::sys::SecuritySettings& t) + : qpid::amqp::SaslClient(id), out(out_), connection(c), sasl(s), + hostname(hostname_), allowedMechanisms(mechs), transport(t), readHeader(true), writeHeader(true), haveOutput(false), state(NONE) {} + +SaslClient::~SaslClient() +{ + connection->transportDeleted(); +} + +std::size_t SaslClient::decode(const char* buffer, std::size_t size) +{ + size_t decoded = 0; + if (readHeader) { + decoded += readProtocolHeader(buffer, size); + readHeader = !decoded; + } + if (state == NONE && decoded < size) { + decoded += read(buffer + decoded, size - decoded); + } else if (state == SUCCEEDED) { + if (securityLayer.get()) decoded += securityLayer->decode(buffer + decoded, size - decoded); + else decoded += connection->decode(buffer + decoded, size - decoded); + } + QPID_LOG(trace, id << " SaslClient::decode(" << size << "): " << decoded); + return decoded; +} + +std::size_t SaslClient::encode(char* buffer, std::size_t size) +{ + size_t encoded = 0; + if (writeHeader) { + encoded += writeProtocolHeader(buffer, size); + writeHeader = !encoded; + } + if (state == NONE && encoded < size) { + encoded += write(buffer + encoded, size - encoded); + } else if (state == SUCCEEDED) { + if (securityLayer.get()) encoded += securityLayer->encode(buffer + encoded, size - encoded); + else encoded += connection->encode(buffer + encoded, size - encoded); + } + haveOutput = (encoded == size); + QPID_LOG(trace, id << " SaslClient::encode(" << size << "): " << encoded); + return encoded; +} + +bool SaslClient::canEncode() +{ + if (state == NONE) { + QPID_LOG(trace, id << " SaslClient::canEncode(): " << writeHeader << " || " << haveOutput); + return writeHeader || haveOutput; + } else if (state == SUCCEEDED) { + if (securityLayer.get()) return securityLayer->canEncode(); + else return connection->canEncode(); + } else { + return false; + } +} + +void SaslClient::mechanisms(const std::string& offered) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-MECHANISMS(" << offered << ")"); + std::string response; + + std::string mechanisms; + if (allowedMechanisms.size()) { + std::vector<std::string> allowed = split(allowedMechanisms, " "); + std::vector<std::string> supported = split(offered, " "); + std::stringstream intersection; + for (std::vector<std::string>::const_iterator i = allowed.begin(); i != allowed.end(); ++i) { + if (std::find(supported.begin(), supported.end(), *i) != supported.end()) { + intersection << *i << " "; + } + } + mechanisms = intersection.str(); + } else { + mechanisms = offered; + } + + if (sasl->start(mechanisms, response, &transport)) { + init(sasl->getMechanism(), &response, hostname.size() ? &hostname : 0); + } else { + init(sasl->getMechanism(), 0, hostname.size() ? &hostname : 0); + } + haveOutput = true; + out.activateOutput(); +} +void SaslClient::challenge(const std::string& challenge) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(" << challenge.size() << " bytes)"); + std::string r = sasl->step(challenge); + response(&r); + haveOutput = true; + out.activateOutput(); +} +namespace { +const std::string EMPTY; +} +void SaslClient::challenge() +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-CHALLENGE(null)"); + std::string r = sasl->step(EMPTY); + response(&r); +} +void SaslClient::outcome(uint8_t result, const std::string& extra) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ", " << extra << ")"); + outcome(result); +} +void SaslClient::outcome(uint8_t result) +{ + QPID_LOG_CAT(debug, protocol, id << " Received SASL-OUTCOME(" << result << ")"); + if (result) state = FAILED; + else state = SUCCEEDED; + + securityLayer = sasl->getSecurityLayer(65535); + if (securityLayer.get()) { + securityLayer->init(connection.get()); + } + out.activateOutput(); +} + +void SaslClient::closed() +{ + if (state == SUCCEEDED) { + connection->closed(); + } else { + QPID_LOG(info, id << " Connection closed prior to authentication completing"); + state = FAILED; + } +} + +bool SaslClient::isClosed() const +{ + if (state == FAILED) return true; + else if (state == SUCCEEDED) return connection->isClosed(); + else return false; +} +qpid::framing::ProtocolVersion SaslClient::getVersion() const +{ + return connection->getVersion(); +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/SaslClient.h b/qpid/cpp/src/qpid/broker/amqp/SaslClient.h new file mode 100644 index 0000000000..4d802f6f65 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/SaslClient.h @@ -0,0 +1,80 @@ +#ifndef QPID_BROKER_AMQP_SASLCLIENT_H +#define QPID_BROKER_AMQP_SASLCLIENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/amqp/SaslClient.h" +#include <memory> +#include <boost/shared_ptr.hpp> + +namespace qpid { +class Sasl; +namespace sys { +class OutputControl; +class SecurityLayer; +} +namespace broker { +class Broker; +namespace amqp { +class Interconnect; + +/** + * Implementation of SASL client role for when broker connects to + * external peers. + */ +class SaslClient : public qpid::sys::ConnectionCodec, qpid::amqp::SaslClient +{ + public: + SaslClient(qpid::sys::OutputControl& out, const std::string& id, boost::shared_ptr<Interconnect>, std::auto_ptr<qpid::Sasl>, + const std::string& hostname, const std::string& allowedMechanisms, const qpid::sys::SecuritySettings&); + ~SaslClient(); + std::size_t decode(const char* buffer, std::size_t size); + std::size_t encode(char* buffer, std::size_t size); + bool canEncode(); + void closed(); + bool isClosed() const; + qpid::framing::ProtocolVersion getVersion() const; + private: + qpid::sys::OutputControl& out; + boost::shared_ptr<Interconnect> connection; + std::auto_ptr<qpid::Sasl> sasl; + std::string hostname; + std::string allowedMechanisms; + qpid::sys::SecuritySettings transport; + bool readHeader; + bool writeHeader; + bool haveOutput; + enum { + NONE, FAILED, SUCCEEDED + } state; + std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; + + void mechanisms(const std::string&); + void challenge(const std::string&); + void challenge(); //null != empty string + void outcome(uint8_t result, const std::string&); + void outcome(uint8_t result); +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_SASLCLIENT_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 3ec5eb15dd..bb94a37398 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -19,17 +19,19 @@ * */ #include "Session.h" +#include "Incoming.h" #include "Outgoing.h" #include "Message.h" -#include "ManagedConnection.h" -#include "qpid/broker/AsyncCompletion.h" +#include "Connection.h" +#include "Domain.h" +#include "Interconnects.h" +#include "Relay.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/FanOutExchange.h" -#include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Selector.h" #include "qpid/broker/TopicExchange.h" @@ -51,58 +53,62 @@ namespace qpid { namespace broker { namespace amqp { -class Target +class IncomingToQueue : public DecodingIncoming { public: - Target(pn_link_t* l) : credit(100), window(0), link(l) {} - virtual ~Target() {} - bool flow(); - bool needFlow(); - virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message - protected: - const uint32_t credit; - uint32_t window; - pn_link_t* link; -}; - -class Queue : public Target -{ - public: - Queue(boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : Target(l), queue(q) {} + IncomingToQueue(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Queue> q, pn_link_t* l) : DecodingIncoming(l, b, p, q->getName(), pn_link_name(l)), queue(q) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Queue> queue; }; -class Exchange : public Target +class IncomingToExchange : public DecodingIncoming { public: - Exchange(boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : Target(l), exchange(e) {} + IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l) : DecodingIncoming(l, b, p, e->getName(), pn_link_name(l)), exchange(e) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; }; -Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o) +Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o) : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {} -Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus) +Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) { ResolvedNode node; node.exchange = broker.getExchanges().find(name); node.queue = broker.getQueues().find(name); - if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) { - //TODO: handle dynamic creation - //is it a queue or an exchange? - NodeProperties properties; - properties.read(pn_terminus_properties(terminus)); - if (properties.isQueue()) { - node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; + if (!node.queue && !node.exchange) { + if (pn_terminus_is_dynamic(terminus)) { + //is it a queue or an exchange? + NodeProperties properties; + properties.read(pn_terminus_properties(terminus)); + if (properties.isQueue()) { + node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; + } else { + qpid::framing::FieldTable args; + node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(), + args, connection.getUserid(), connection.getId()).first; + } } else { - qpid::framing::FieldTable args; - node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(), - args, connection.getUserid(), connection.getId()).first; + size_t i = name.find('@'); + if (i != std::string::npos && (i+1) < name.length()) { + std::string domain = name.substr(i+1); + std::string local = name.substr(0, i); + std::string id = (boost::format("%1%-%2%") % name % qpid::types::Uuid(true).str()).str(); + //does this domain exist? + boost::shared_ptr<Domain> d = connection.getInterconnects().findDomain(domain); + if (d) { + node.relay = boost::shared_ptr<Relay>(new Relay(1000)); + if (incoming) { + d->connect(false, id, name, local, connection.getInterconnects(), node.relay); + } else { + d->connect(true, id, local, name, connection.getInterconnects(), node.relay); + } + } + } } } else if (node.queue && node.exchange) { QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); @@ -123,43 +129,7 @@ void Session::attach(pn_link_t* link) QPID_LOG(debug, "Received attach request for outgoing link from " << name); pn_terminus_set_address(pn_link_source(link), name.c_str()); - ResolvedNode node = resolve(name, source); - Filter filter; - filter.read(pn_terminus_filter(source)); - - if (node.queue) { - boost::shared_ptr<Outgoing> q(new Outgoing(broker, node.queue, link, *this, out, false)); - q->init(); - if (filter.hasSubjectFilter()) { - q->setSubjectFilter(filter.getSubjectFilter()); - } - if (filter.hasSelectorFilter()) { - q->setSelectorFilter(filter.getSelectorFilter()); - } - senders[link] = q; - } else if (node.exchange) { - QueueSettings settings(false, true); - //TODO: populate settings from source details when available from engine - boost::shared_ptr<qpid::broker::Queue> queue - = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; - if (filter.hasSubjectFilter()) { - filter.bind(node.exchange, queue); - filter.write(pn_terminus_filter(pn_link_source(link))); - } else if (node.exchange->getType() == FanOutExchange::typeName) { - node.exchange->bind(queue, std::string(), 0); - } else if (node.exchange->getType() == TopicExchange::typeName) { - node.exchange->bind(queue, "#", 0); - } else { - throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ - } - boost::shared_ptr<Outgoing> q(new Outgoing(broker, queue, link, *this, out, true)); - senders[link] = q; - q->init(); - } else { - pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); - throw qpid::Exception("Node not found: " + name);/*not-found*/ - } - QPID_LOG(debug, "Outgoing link attached"); + setupOutgoing(link, source, name); } else { pn_terminus_t* target = pn_link_remote_target(link); if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { @@ -169,51 +139,120 @@ void Session::attach(pn_link_t* link) QPID_LOG(debug, "Received attach request for incoming link to " << name); pn_terminus_set_address(pn_link_target(link), name.c_str()); - ResolvedNode node = resolve(name, target); + setupIncoming(link, target, name); + } +} + +void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name) +{ + ResolvedNode node = resolve(name, target, true); + + if (node.queue) { + boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link)); + incoming[link] = q; + } else if (node.exchange) { + boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link)); + incoming[link] = e; + } else if (node.relay) { + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, name, pn_link_name(link), node.relay)); + incoming[link] = in; + } else { + pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); + throw qpid::Exception("Node not found: " + name);/*not-found*/ + } + QPID_LOG(debug, "Incoming link attached"); +} - if (node.queue) { - boost::shared_ptr<Target> q(new Queue(node.queue, link)); - targets[link] = q; - } else if (node.exchange) { - boost::shared_ptr<Target> e(new Exchange(node.exchange, link)); - targets[link] = e; +void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name) +{ + ResolvedNode node = resolve(name, source, false); + Filter filter; + filter.read(pn_terminus_filter(source)); + + if (node.queue) { + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, node.queue, link, *this, out, false)); + q->init(); + if (filter.hasSubjectFilter()) { + q->setSubjectFilter(filter.getSubjectFilter()); + } + if (filter.hasSelectorFilter()) { + q->setSelectorFilter(filter.getSelectorFilter()); + } + outgoing[link] = q; + } else if (node.exchange) { + QueueSettings settings(false, true); + //TODO: populate settings from source details when available from engine + boost::shared_ptr<qpid::broker::Queue> queue + = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; + if (filter.hasSubjectFilter()) { + filter.bind(node.exchange, queue); + filter.write(pn_terminus_filter(pn_link_source(link))); + } else if (node.exchange->getType() == FanOutExchange::typeName) { + node.exchange->bind(queue, std::string(), 0); + } else if (node.exchange->getType() == TopicExchange::typeName) { + node.exchange->bind(queue, "#", 0); } else { - pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); - throw qpid::Exception("Node not found: " + name);/*not-found*/ + throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ + } + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, queue, link, *this, out, true)); + outgoing[link] = q; + q->init(); + } else if (node.relay) { + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, pn_link_name(link), node.relay)); + outgoing[link] = out; + out->init(); + } else { + pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); + throw qpid::Exception("Node not found: " + name);/*not-found*/ + } + QPID_LOG(debug, "Outgoing link attached"); +} + +/** + * Called for links initiated by the broker + */ +void Session::attach(pn_link_t* link, const std::string& src, const std::string& tgt, boost::shared_ptr<Relay> relay) +{ + pn_terminus_t* source = pn_link_source(link); + pn_terminus_t* target = pn_link_target(link); + pn_terminus_set_address(source, src.c_str()); + pn_terminus_set_address(target, tgt.c_str()); + + if (relay) { + if (pn_link_is_sender(link)) { + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, pn_link_name(link), relay)); + outgoing[link] = out; + out->init(); + } else { + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, tgt, pn_link_name(link), relay)); + incoming[link] = in; + } + } else { + if (pn_link_is_sender(link)) { + setupOutgoing(link, source, src); + } else { + setupIncoming(link, target, tgt); } - QPID_LOG(debug, "Incoming link attached"); } } void Session::detach(pn_link_t* link) { if (pn_link_is_sender(link)) { - Senders::iterator i = senders.find(link); - if (i != senders.end()) { + OutgoingLinks::iterator i = outgoing.find(link); + if (i != outgoing.end()) { i->second->detached(); - senders.erase(i); + outgoing.erase(i); QPID_LOG(debug, "Outgoing link detached"); } } else { - targets.erase(link); - QPID_LOG(debug, "Incoming link detached"); - } -} -namespace { - class Transfer : public qpid::broker::AsyncCompletion::Callback - { - public: - Transfer(pn_delivery_t* d, boost::shared_ptr<Session> s) : delivery(d), session(s) {} - void completed(bool sync) { session->accepted(delivery, sync); } - boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> clone() - { - boost::intrusive_ptr<qpid::broker::AsyncCompletion::Callback> copy(new Transfer(delivery, session)); - return copy; + IncomingLinks::iterator i = incoming.find(link); + if (i != incoming.end()) { + i->second->detached(); + incoming.erase(i); + QPID_LOG(debug, "Incoming link detached"); } - private: - pn_delivery_t* delivery; - boost::shared_ptr<Session> session; - }; + } } void Session::accepted(pn_delivery_t* delivery, bool sync) @@ -233,36 +272,26 @@ void Session::accepted(pn_delivery_t* delivery, bool sync) } } -void Session::incoming(pn_link_t* link, pn_delivery_t* delivery) +void Session::readable(pn_link_t* link, pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); - boost::intrusive_ptr<Message> received(new Message(pn_delivery_pending(delivery))); - /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize()); - received->scan(); - pn_link_advance(link); - - qpid::broker::Message message(received, received); - incomingMessageReceived(); - Targets::iterator target = targets.find(link); - if (target == targets.end()) { + IncomingLinks::iterator target = incoming.find(link); + if (target == incoming.end()) { QPID_LOG(error, "Received message on unknown link"); pn_delivery_update(delivery, PN_REJECTED); pn_delivery_settle(delivery);//do we need to check settlement modes/orders? incomingMessageRejected(); } else { - target->second->handle(message); - received->begin(); - Transfer t(delivery, shared_from_this()); - received->end(t); - if (target->second->needFlow()) out.activateOutput(); + target->second->readable(delivery); + if (target->second->haveWork()) out.activateOutput(); } } -void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery) +void Session::writable(pn_link_t* link, pn_delivery_t* delivery) { - Senders::iterator sender = senders.find(link); - if (sender == senders.end()) { + OutgoingLinks::iterator sender = outgoing.find(link); + if (sender == outgoing.end()) { QPID_LOG(error, "Delivery returned for unknown link"); } else { sender->second->handle(delivery); @@ -272,8 +301,8 @@ void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery) bool Session::dispatch() { bool output(false); - for (Senders::iterator s = senders.begin(); s != senders.end(); ++s) { - if (s->second->dispatch()) output = true; + for (OutgoingLinks::iterator s = outgoing.begin(); s != outgoing.end(); ++s) { + if (s->second->doWork()) output = true; } if (completed.size()) { output = true; @@ -286,8 +315,8 @@ bool Session::dispatch() accepted(*i, true); } } - for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) { - if (t->second->flow()) output = true; + for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { + if (i->second->doWork()) output = true; } return output; @@ -295,42 +324,33 @@ bool Session::dispatch() void Session::close() { - for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + i->second->detached(); + } + for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { i->second->detached(); } - senders.clear(); - targets.clear();//at present no explicit cleanup required for targets - QPID_LOG(debug, "Session closed, all senders cancelled."); + outgoing.clear(); + incoming.clear(); + QPID_LOG(debug, "Session closed, all links detached."); qpid::sys::Mutex::ScopedLock l(lock); deleted = true; } -void Queue::handle(qpid::broker::Message& message) -{ - queue->deliver(message); - --window; -} - -void Exchange::handle(qpid::broker::Message& message) +void Session::wakeup() { - DeliverableMessage deliverable(message, 0); - exchange->route(deliverable); - --window; + out.activateOutput(); } -bool Target::flow() +void IncomingToQueue::handle(qpid::broker::Message& message) { - bool issue = window < credit; - if (issue) { - pn_link_flow(link, credit - window);//TODO: proper flow control - window = credit; - } - return issue; + queue->deliver(message); } -bool Target::needFlow() +void IncomingToExchange::handle(qpid::broker::Message& message) { - return window <= (credit/2); + DeliverableMessage deliverable(message, 0); + exchange->route(deliverable); } }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 7dbdaf05fc..74f50a9eda 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -43,34 +43,45 @@ class Queue; namespace amqp { -class ManagedConnection; +class Connection; +class Incoming; class Outgoing; -class Target; +class Relay; /** * */ class Session : public ManagedSession, public boost::enable_shared_from_this<Session> { public: - Session(pn_session_t*, qpid::broker::Broker&, ManagedConnection&, qpid::sys::OutputControl&); + Session(pn_session_t*, qpid::broker::Broker&, Connection&, qpid::sys::OutputControl&); + /** + * called for links initiated by the peer + */ void attach(pn_link_t*); void detach(pn_link_t*); - void incoming(pn_link_t*, pn_delivery_t*); - void outgoing(pn_link_t*, pn_delivery_t*); + void readable(pn_link_t*, pn_delivery_t*); + void writable(pn_link_t*, pn_delivery_t*); bool dispatch(); void close(); + /** + * called for links initiated by the broker + */ + void attach(pn_link_t* link, const std::string& src, const std::string& tgt, boost::shared_ptr<Relay>); + //called when a transfer is completly processed (e.g.including stored on disk) void accepted(pn_delivery_t*, bool sync); + + void wakeup(); private: - typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > Senders; - typedef std::map<pn_link_t*, boost::shared_ptr<Target> > Targets; + typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks; + typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks; pn_session_t* session; qpid::broker::Broker& broker; - ManagedConnection& connection; + Connection& connection; qpid::sys::OutputControl& out; - Targets targets; - Senders senders; + IncomingLinks incoming; + OutgoingLinks outgoing; std::deque<pn_delivery_t*> completed; bool deleted; qpid::sys::Mutex lock; @@ -78,9 +89,12 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses { boost::shared_ptr<qpid::broker::Exchange> exchange; boost::shared_ptr<qpid::broker::Queue> queue; + boost::shared_ptr<Relay> relay; }; - ResolvedNode resolve(const std::string name, pn_terminus_t* terminus); + ResolvedNode resolve(const std::string name, pn_terminus_t* terminus, bool incoming); + void setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name); + void setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index ca2094b965..9d34e71e04 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -202,7 +202,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation } } -void Translation::write(Outgoing& out) +void Translation::write(OutgoingFromQueue& out) { const Message* message = dynamic_cast<const Message*>(&original.getEncoding()); if (message) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.h b/qpid/cpp/src/qpid/broker/amqp/Translation.h index 64d96560e3..7591f45b2a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.h +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.h @@ -31,7 +31,7 @@ class MessageTransfer; } namespace amqp { -class Outgoing; +class OutgoingFromQueue; /** * */ @@ -49,7 +49,7 @@ class Translation /** * Writes the AMQP 1.0 bare message and any annotations, translating from 0-10 if necessary */ - void write(Outgoing&); + void write(OutgoingFromQueue&); private: const qpid::broker::Message& original; }; diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index b2a9b979b6..c74ee01898 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -140,6 +140,12 @@ bool ConnectionContext::isOpen() const void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + //wait for outstanding sends to settle + while (!ssn->settled()) { + QPID_LOG(debug, "Waiting for sends to settle before closing"); + wait();//wait until message has been confirmed + } + pn_session_close(ssn->session); //TODO: need to destroy session and remove context from map wakeupDriver(); @@ -166,13 +172,19 @@ void ConnectionContext::close() wakeupDriver(); //wait for close to be confirmed by peer? while (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { - wait(); + if (state == DISCONNECTED) { + QPID_LOG(warning, "Disconnected before close received from peer."); + break; + } + lock.wait(); } sessions.clear(); } - transport->close(); - while (state != DISCONNECTED) { - lock.wait(); + if (state != DISCONNECTED) { + transport->close(); + while (state != DISCONNECTED) { + lock.wait(); + } } } diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 7a418993d5..f77863a146 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -324,6 +324,9 @@ endif (PYTHON_EXECUTABLE) add_test (stop_broker ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/stop_broker${test_script_suffix}) if (PYTHON_EXECUTABLE) add_test (ha_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py) + if (BUILD_AMQP) + add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py) + endif (BUILD_AMQP) if (BUILD_LEGACYSTORE) add_test (ha_store_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_store_tests.py) endif (BUILD_LEGACYSTORE) diff --git a/qpid/cpp/src/tests/Makefile.am b/qpid/cpp/src/tests/Makefile.am index 69ca01a934..2e55b24c3e 100644 --- a/qpid/cpp/src/tests/Makefile.am +++ b/qpid/cpp/src/tests/Makefile.am @@ -297,7 +297,7 @@ TESTS_ENVIRONMENT = \ system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \ run_msg_group_tests TESTS += start_broker $(system_tests) python_tests stop_broker \ - run_ha_tests run_federation_tests run_federation_sys_tests \ + run_ha_tests run_interlink_tests run_federation_tests run_federation_sys_tests \ run_acl_tests run_cli_tests dynamic_log_level_test \ dynamic_log_hires_timestamp run_queue_flow_limit_tests ipv6_test @@ -346,6 +346,8 @@ EXTRA_DIST += \ run_ha_tests \ ha_test.py \ ha_tests.py \ + run_interlink_tests \ + interlink_tests.py \ brokertest.py \ ha_store_tests.py \ test_env.ps1.in diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 70c145a51b..af1edfee44 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -427,6 +427,8 @@ class BrokerTest(TestCase): qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) ha_lib = os.getenv("HA_LIB") xml_lib = os.getenv("XML_LIB") + amqp_lib = os.getenv("AMQP_LIB") + amqpc_lib = os.getenv("AMQPC_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") receiver_exec = os.getenv("RECEIVER_EXEC") diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py new file mode 100755 index 0000000000..1e7262051a --- /dev/null +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random +import traceback +from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty +from qpid.datatypes import uuid4 +from brokertest import * +from threading import Thread, Lock, Condition +from logging import getLogger, WARN, ERROR, DEBUG, INFO +from qpidtoollibs import BrokerAgent, BrokerObject +from uuid import UUID + +class Domain(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Config: + def __init__(self, broker, address="q;{create:always}", version="amqp1.0"): + self.url = broker.host_port() + self.address = address + self.version = version + + def __str__(self): + return "url: %s, address: %s, version: %s" % (self.url, self.address, self.version) + +class AmqpBrokerTest(BrokerTest): + """ + Tests using AMQP 1.0 support + """ + def setUp(self): + BrokerTest.setUp(self) + os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib) + self.broker = self.amqp_broker() + self.default_config = Config(self.broker) + self.agent = BrokerAgent(self.broker.connect()) + + def sender(self, config): + cmd = ["qpid-send", + "--broker", config.url, + "--address", config.address, + "--connection-options", "{protocol:%s}" % config.version, + "--content-stdin", "--send-eos=1" + ] + return self.popen(cmd, stdin=PIPE) + + def receiver(self, config): + cmd = ["qpid-receive", + "--broker", config.url, + "--address", config.address, + "--connection-options", "{protocol:%r}" % config.version, + "--timeout=10" + ] + return self.popen(cmd, stdout=PIPE) + + def send_and_receive(self, send_config=None, recv_config=None, count=1000, debug=False): + if debug: + print "sender config is %s" % (send_config or self.default_config) + print "receiver config is %s" % (recv_config or self.default_config) + sender = self.sender(send_config or self.default_config) + receiver = self.receiver(recv_config or self.default_config) + + messages = ["message-%s" % (i+1) for i in range(count)] + for m in messages: + sender.stdin.write(m + "\n") + sender.stdin.flush() + sender.stdin.close() + if debug: + c = send_config or self.default_config + print "sent %s messages to %s sn %s" % (len(messages), c.address, c.url) + + if debug: + c = recv_config or self.default_config + print "reading messages from %s sn %s" % (c.address, c.url) + for m in messages: + l = receiver.stdout.readline().rstrip() + if debug: + print l + assert m == l, (m, l) + + sender.wait() + receiver.wait() + + def test_simple(self): + self.send_and_receive() + + def test_translate1(self): + self.send_and_receive(recv_config=Config(self.broker, version="amqp0-10")) + + def test_translate2(self): + self.send_and_receive(send_config=Config(self.broker, version="amqp0-10")) + + def test_domain(self): + brokerB = self.amqp_broker() + self.agent.create("domain", "BrokerB", {"url":brokerB.host_port()}) + domains = self.agent._getAllBrokerObjects(Domain) + assert len(domains) == 1 + assert domains[0].name == "BrokerB" + + def test_incoming_link(self): + brokerB = self.amqp_broker() + agentB = BrokerAgent(brokerB.connect()) + self.agent.create("queue", "q") + agentB.create("queue", "q") + self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"}) + self.agent.create("incoming", "Link1", {"domain":"BrokerB","source":"q","target":"q"}) + #send to brokerB, receive from brokerA + self.send_and_receive(send_config=Config(brokerB)) + + def test_outgoing_link(self): + brokerB = self.amqp_broker() + agentB = BrokerAgent(brokerB.connect()) + self.agent.create("queue", "q") + agentB.create("queue", "q") + self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"}) + self.agent.create("outgoing", "Link1", {"domain":"BrokerB","source":"q","target":"q"}) + #send to brokerA, receive from brokerB + self.send_and_receive(recv_config=Config(brokerB)) + + def test_relay(self): + brokerB = self.amqp_broker() + agentB = BrokerAgent(brokerB.connect()) + agentB.create("queue", "q") + self.agent.create("domain", "BrokerB", {"url":brokerB.host_port(), "sasl_mechanisms":"NONE"}) + #send to q on broker B through brokerA + self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB)) + + """ Create and return a broker with AMQP 1.0 support """ + def amqp_broker(self): + assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + args = ["--load-module", BrokerTest.amqp_lib, + "--max-negotiate-time=600000", + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args) + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "interlink_tests"] + sys.argv[1:]) diff --git a/qpid/cpp/src/tests/run_interlink_tests b/qpid/cpp/src/tests/run_interlink_tests new file mode 100755 index 0000000000..6c61bdd654 --- /dev/null +++ b/qpid/cpp/src/tests/run_interlink_tests @@ -0,0 +1,26 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +test -e "$AMQP_LIB" || { echo "Skipping AMQP 1.0 based tests; AMQP 1.0 support not available."; exit 0; } + +srcdir=`dirname $0` +$srcdir/interlink_tests.py + diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in index 0f8f834731..6940943b54 100644 --- a/qpid/cpp/src/tests/test_env.sh.in +++ b/qpid/cpp/src/tests/test_env.sh.in @@ -68,6 +68,8 @@ exportmodule SSL_LIB ssl.so exportmodule WATCHDOG_LIB watchdog.so exportmodule XML_LIB xml.so exportmodule STORE_LIB legacystore.so +exportmodule AMQP_LIB amqp.so +exportmodule AMQPC_LIB amqpc.so # Qpid options export QPID_NO_MODULE_DIR=1 # Don't accidentally load installed modules diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml index b3514e1f70..2a8a36f10e 100644 --- a/qpid/specs/management-schema.xml +++ b/qpid/specs/management-schema.xml @@ -379,6 +379,43 @@ <method name="close"/> </class> + + <!-- + =============================================================== + AMQP 1.0 link for incoming transfers + =============================================================== + --> + <class name="Incoming"> + <property name="sessionRef" type="objId" references="Session" access="RC" parentRef="y"/> + <property name="target" type="sstr" access="RC" index="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/> + </class> + <!-- + =============================================================== + AMQP 1.0 link for outgoing transfers + =============================================================== + --> + <class name="Outgoing"> + <property name="sessionRef" type="objId" references="Session" access="RC" parentRef="y"/> + <property name="source" type="sstr" access="RC" index="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + <statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/> + </class> + <!-- + =============================================================== + Domain + =============================================================== + --> + <class name="Domain"> + <property name="name" type="sstr" access="RC" index="y"/> + <property name="durable" type="bool" access="RC"/> + <property name="url" type="sstr" access="RO"/> + <property name="mechanisms" type="sstr" access="RO"/> + <property name="username" type="sstr" access="RO"/> + <property name="password" type="sstr" access="RO"/> + </class> + <!-- =============================================================== Link diff --git a/qpid/tools/src/py/qpidtoollibs/broker.py b/qpid/tools/src/py/qpidtoollibs/broker.py index d8b75c3c60..41fea74414 100644 --- a/qpid/tools/src/py/qpidtoollibs/broker.py +++ b/qpid/tools/src/py/qpidtoollibs/broker.py @@ -292,9 +292,13 @@ class BrokerAgent(object): 'routingKey': key} return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker") - def create(self, _type, name, properties, strict): + def create(self, _type, name, properties={}, strict=False): """Create an object of the specified type""" - pass + args = {'type': _type, + 'name': name, + 'properties': properties, + 'strict': strict} + return self._method('create', args) def delete(self, _type, name, options): """Delete an object of the specified type""" |