diff options
author | Ted Ross <tross@apache.org> | 2008-05-21 21:40:49 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2008-05-21 21:40:49 +0000 |
commit | 35d9dc572a918015c038245725b0f9894b13132a (patch) | |
tree | d9efecaeab11e12f0b2f2d87ff7f202383eaa6a0 | |
parent | 28404c0026b5bed8ad4ad37d52cd4d3aab5c70bc (diff) | |
download | qpid-python-35d9dc572a918015c038245725b0f9894b13132a.tar.gz |
QPID-1087
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658886 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 269 insertions, 263 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index f17d322dab..9321e0d855 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -536,7 +536,6 @@ nobase_include_HEADERS = \ qpid/sys/OutputControl.h \ qpid/sys/OutputTask.h \ qpid/sys/Poller.h \ - qpid/sys/ProtocolAccess.h \ qpid/sys/ProtocolFactory.h \ qpid/sys/Runnable.h \ qpid/sys/ScopedIncrement.h \ diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index 9e860ab653..c1e2e21e5d 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -19,7 +19,6 @@ * */ #include "Connection.h" -#include "qpid/sys/ProtocolAccess.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/exceptions.h" @@ -28,13 +27,9 @@ namespace amqp_0_10 { using sys::Mutex; -Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a) - : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)), - identifier(id), initialized(false), isClient(_isClient) -{ - if (a != 0) - a->callConnCb(connection); -} +Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient) + : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient), + identifier(id), initialized(false), isClient(_isClient) {} size_t Connection::decode(const char* buffer, size_t size) { framing::Buffer in(const_cast<char*>(buffer), size); @@ -50,13 +45,13 @@ size_t Connection::decode(const char* buffer, size_t size) { framing::AMQFrame frame; while(frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); - connection->received(frame); + connection.received(frame); } return in.getPosition(); } bool Connection::canEncode() { - if (!frameQueueClosed) connection->doOutput(); + if (!frameQueueClosed) connection.doOutput(); Mutex::ScopedLock l(frameQueueLock); return (!isClient && !initialized) || !frameQueue.empty(); } @@ -95,7 +90,7 @@ void Connection::close() { } void Connection::closed() { - connection->closed(); + connection.closed(); } void Connection::send(framing::AMQFrame& f) { diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index ea8d183e01..4369d401bd 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -29,7 +29,6 @@ #include <queue> namespace qpid { -namespace sys { class ProtocolAccess; } namespace broker { class Broker; } namespace amqp_0_10 { @@ -41,13 +40,13 @@ class Connection : public sys::ConnectionCodec, bool frameQueueClosed; mutable sys::Mutex frameQueueLock; sys::OutputControl& output; - broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18: + broker::Connection connection; // FIXME aconway 2008-03-18: std::string identifier; bool initialized; bool isClient; public: - Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0); + Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false); size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool isClosed() const; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 4f7686aac4..2992ea45cf 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -361,18 +361,20 @@ void Broker::accept() { // TODO: How to chose the protocolFactory to use for the connection void Broker::connect( const std::string& host, uint16_t port, bool /*useSsl*/, - sys::ConnectionCodec::Factory* f, - sys::ProtocolAccess* access) + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* f) { - getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access); + getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed); } void Broker::connect( - const Url& url, sys::ConnectionCodec::Factory* f) + const Url& url, + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* f) { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0); + connect(addr.host, addr.port, false, failed, f); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 7092a86181..531817db83 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -44,7 +44,6 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/ProtocolAccess.h" #include <vector> @@ -135,10 +134,12 @@ class Broker : public sys::Runnable, public Plugin::Target, /** Create a connection to another broker. */ void connect(const std::string& host, uint16_t port, bool useSsl, - sys::ConnectionCodec::Factory* =0, - sys::ProtocolAccess* =0); + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* =0); /** Create a connection to another broker. */ - void connect(const Url& url, sys::ConnectionCodec::Factory* =0); + void connect(const Url& url, + boost::function2<void, int, std::string> failed, + sys::ConnectionCodec::Factory* =0); // TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed // For the present just return the first ProtocolFactory registered. diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 463193a346..ea3d3547f5 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -47,44 +47,26 @@ using qpid::management::Args; namespace qpid { namespace broker { -class Connection::MgmtClient : public Connection::MgmtWrapper -{ - management::Client::shared_ptr mgmtClient; - -public: - MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent, - const std::string& mgmtId, bool incoming); - ~MgmtClient(); - void received(framing::AMQFrame& frame); - management::ManagementObject::shared_ptr getManagementObject() const; - void closing(); -}; - -Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) : +Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) : ConnectionState(out_, broker_), - adapter(*this, isLink), + adapter(*this, isLink_), + isLink(isLink_), mgmtClosing(false), - mgmtId(mgmtId_) -{ - initMgmt(); -} - -void Connection::initMgmt(bool asLink) + mgmtId(mgmtId_), + links(broker_.getLinks()) { Manageable* parent = broker.GetVhostObject (); + if (isLink) + links.notifyConnection (mgmtId, this); + if (parent != 0) { ManagementAgent::shared_ptr agent = ManagementAgent::getAgent (); if (agent.get () != 0) - { - if (asLink) { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false)); - } else { - mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true)); - } - } + mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink)); + agent->addObject (mgmtObject); } } @@ -95,19 +77,65 @@ void Connection::requestIOProcessing (boost::function0<void> callback) } -Connection::~Connection () {} +Connection::~Connection () +{ + if (mgmtObject.get() != 0) + mgmtObject->resourceDestroy(); + if (isLink) + links.notifyClosed (mgmtId); +} void Connection::received(framing::AMQFrame& frame){ - if (mgmtClosing) - close (403, "Closed by Management Request", 0, 0); - if (frame.getChannel() == 0 && frame.getMethod()) { adapter.handle(frame); } else { getChannel(frame.getChannel()).in(frame); } - - if (mgmtWrapper.get()) mgmtWrapper->received(frame); + + if (isLink) + recordFromServer(frame); + else + recordFromClient(frame); +} + +void Connection::recordFromServer (framing::AMQFrame& frame) +{ + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesToClient (); + mgmtObject->inc_bytesToClient (frame.size ()); + } +} + +void Connection::recordFromClient (framing::AMQFrame& frame) +{ + if (mgmtObject.get () != 0) + { + mgmtObject->inc_framesFromClient (); + mgmtObject->inc_bytesFromClient (frame.size ()); + } +} + +string Connection::getAuthMechanism() +{ + if (!isLink) + return string("ANONYMOUS"); + + return links.getAuthMechanism(mgmtId); +} + +string Connection::getAuthCredentials() +{ + if (!isLink) + return string(); + + return links.getAuthCredentials(mgmtId); +} + +void Connection::notifyConnectionForced(const string& text) +{ + if (isLink) + links.notifyConnectionForced(mgmtId, text); } void Connection::close( @@ -125,7 +153,7 @@ void Connection::idleIn(){} void Connection::closed(){ // Physically closed, suspend open sessions. try { while (!channels.empty()) - ptr_map_ptr(channels.begin())->handleDetach(); + ptr_map_ptr(channels.begin())->handleDetach(); while (!exclusiveQueues.empty()) { Queue::shared_ptr q(exclusiveQueues.front()); q->releaseExclusiveOwnership(); @@ -147,10 +175,12 @@ bool Connection::doOutput() if (ioCallback) ioCallback(); // Lend the IO thread for management processing ioCallback = 0; - if (mgmtClosing) close (403, "Closed by Management Request", 0, 0); - //then do other output as needed: - return outputTasks.doOutput(); + if (mgmtClosing) + close (403, "Closed by Management Request", 0, 0); + else + //then do other output as needed: + return outputTasks.doOutput(); }catch(ConnectionException& e){ close(e.code, e.what(), 0, 0); }catch(std::exception& e){ @@ -174,7 +204,7 @@ SessionHandler& Connection::getChannel(ChannelId id) { ManagementObject::shared_ptr Connection::GetManagementObject (void) const { - return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr(); + return dynamic_pointer_cast<ManagementObject>(mgmtObject); } Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) @@ -187,7 +217,7 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) { case management::Client::METHOD_CLOSE : mgmtClosing = true; - if (mgmtWrapper.get()) mgmtWrapper->closing(); + if (mgmtObject.get()) mgmtObject->set_closing(1); out->activateOutput(); status = Manageable::STATUS_OK; break; @@ -196,39 +226,5 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&) return status; } -Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent, - ManagementAgent::shared_ptr agent, - const std::string& mgmtId, bool incoming) -{ - mgmtClient = management::Client::shared_ptr - (new management::Client (conn, parent, mgmtId, incoming)); - agent->addObject (mgmtClient); -} - -Connection::MgmtClient::~MgmtClient() -{ - if (mgmtClient.get () != 0) - mgmtClient->resourceDestroy (); -} - -void Connection::MgmtClient::received(framing::AMQFrame& frame) -{ - if (mgmtClient.get () != 0) - { - mgmtClient->inc_framesFromClient (); - mgmtClient->inc_bytesFromClient (frame.size ()); - } -} - -management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const -{ - return dynamic_pointer_cast<ManagementObject>(mgmtClient); -} - -void Connection::MgmtClient::closing() -{ - if (mgmtClient) mgmtClient->set_closing (1); -} - }} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index dff1e0653b..e6e3d4d15e 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -43,13 +43,14 @@ #include "SessionHandler.h" #include "qpid/management/Manageable.h" #include "qpid/management/Client.h" -#include "qpid/management/Link.h" #include <boost/ptr_container/ptr_map.hpp> namespace qpid { namespace broker { +class LinkRegistry; + class Connection : public sys::ConnectionInputHandler, public ConnectionState { @@ -62,7 +63,10 @@ class Connection : public sys::ConnectionInputHandler, SessionHandler& getChannel(framing::ChannelId channel); /** Close the connection */ - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + void close(framing::ReplyCode code = 403, + const string& text = string(), + framing::ClassId classId = 0, + framing::MethodId methodId = 0); // ConnectionInputHandler methods void received(framing::AMQFrame& frame); @@ -78,38 +82,26 @@ class Connection : public sys::ConnectionInputHandler, management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args); - void initMgmt(bool asLink = false); void requestIOProcessing (boost::function0<void>); + void recordFromServer (framing::AMQFrame& frame); + void recordFromClient (framing::AMQFrame& frame); + std::string getAuthMechanism(); + std::string getAuthCredentials(); + void notifyConnectionForced(const std::string& text); private: typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - /** - * Connection may appear, for the purposes of management, as a - * normal client initiated connection or as an agent initiated - * inter-broker link. This wrapper abstracts the common interface - * for both. - */ - class MgmtWrapper - { - public: - virtual ~MgmtWrapper(){} - virtual void received(framing::AMQFrame& frame) = 0; - virtual management::ManagementObject::shared_ptr getManagementObject() const = 0; - virtual void closing() = 0; - virtual void processPending(){} - virtual void process(Connection&, const management::Args&){} - }; - class MgmtClient; - ChannelMap channels; framing::AMQP_ClientProxy::Connection* client; ConnectionHandler adapter; - std::auto_ptr<MgmtWrapper> mgmtWrapper; + bool isLink; bool mgmtClosing; const std::string mgmtId; boost::function0<void> ioCallback; + management::Client::shared_ptr mgmtObject; + LinkRegistry& links; }; }} diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index cd015ce4f5..5de5a0230a 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std: } sys::ConnectionCodec* -ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) { +ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { // used to create connections from one broker to another - return new amqp_0_10::Connection(out, broker, id, true, a); + return new amqp_0_10::Connection(out, broker, id, true); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index bf55ab3b88..5797495054 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -24,7 +24,6 @@ #include "qpid/sys/ConnectionCodec.h" namespace qpid { -namespace sys { class ProtocolAccess; } namespace broker { class Broker; @@ -38,7 +37,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory { create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); sys::ConnectionCodec* - create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0); + create(sys::OutputControl&, const std::string& id); private: Broker& broker; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 162664fb88..77a4d1a3de 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -26,6 +26,7 @@ #include "Connection.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/ServerInvoker.h" +#include "qpid/framing/constants.h" #include "qpid/log/Statement.h" using namespace qpid; @@ -123,6 +124,10 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe if (replyCode != 200) { QPID_LOG(warning, "Client closed connection with " << replyCode << ": " << replyText); } + + if (replyCode == framing::connection::CONNECTION_FORCED) + connection.notifyConnectionForced(replyText); + client.closeOk(); connection.getOutput().close(); } @@ -136,9 +141,10 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/, const framing::Array& /*mechanisms*/, const framing::Array& /*locales*/) { - string response; - server.startOk(FieldTable(), ANONYMOUS, response, en_US); - connection.initMgmt(true); + string mechanism = connection.getAuthMechanism(); + string response = connection.getAuthCredentials(); + + server.startOk(FieldTable(), mechanism, response, en_US); } void ConnectionHandler::Handler::secure(const string& /*challenge*/) diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index cd032495e2..6bcfcf77a3 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -51,13 +51,11 @@ Link::Link(LinkRegistry* _links, : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable), authMechanism(_authMechanism), username(_username), password(_password), persistenceId(0), broker(_broker), state(0), - access(boost::bind(&Link::established, this), - boost::bind(&Link::closed, this, _1, _2), - boost::bind(&Link::setConnection, this, _1)), visitCount(0), currentInterval(1), closing(false), - channelCounter(1) + channelCounter(1), + connection(0) { if (parent != 0) { @@ -75,8 +73,9 @@ Link::Link(LinkRegistry* _links, Link::~Link () { - if (state == STATE_OPERATIONAL) - access.close(); + if (state == STATE_OPERATIONAL && connection != 0) + connection->close(); + if (mgmtObject.get () != 0) mgmtObject->resourceDestroy (); } @@ -95,13 +94,16 @@ void Link::setStateLH (int newState) case STATE_WAITING : mgmtObject->set_state("Waiting"); break; case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break; case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break; + case STATE_FAILED : mgmtObject->set_state("Failed"); break; + case STATE_CLOSED : mgmtObject->set_state("Closed"); break; } } void Link::startConnectionLH () { try { - broker->connect (host, port, useSsl, 0, &access); + broker->connect (host, port, useSsl, + boost::bind (&Link::closed, this, _1, _2)); setStateLH(STATE_CONNECTING); } catch(std::exception& e) { setStateLH(STATE_WAITING); @@ -125,16 +127,21 @@ void Link::closed (int, std::string text) { Mutex::ScopedLock mutex(lock); + connection = 0; + if (state == STATE_OPERATIONAL) QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port); - connection.reset(); for (Bridges::iterator i = active.begin(); i != active.end(); i++) created.push_back(*i); active.clear(); - setStateLH(STATE_WAITING); - mgmtObject->set_lastError (text); + if (state != STATE_FAILED) + { + setStateLH(STATE_WAITING); + mgmtObject->set_lastError (text); + } + if (closing) destroy(); } @@ -145,7 +152,10 @@ void Link::destroy () Bridges toDelete; QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); - connection.reset(); + if (connection) + connection->close(403, "closed by management"); + + setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no // corruption of the iterator caused by bridge deletion. @@ -168,10 +178,7 @@ void Link::destroy () void Link::add(Bridge::shared_ptr bridge) { Mutex::ScopedLock mutex(lock); - created.push_back (bridge); - if (state == STATE_OPERATIONAL && connection.get() != 0) - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::cancel(Bridge::shared_ptr bridge) @@ -197,6 +204,9 @@ void Link::ioThreadProcessing() { Mutex::ScopedLock mutex(lock); + if (state != STATE_OPERATIONAL) + return; + //process any pending creates if (!created.empty()) { for (Bridges::iterator i = created.begin(); i != created.end(); ++i) { @@ -207,12 +217,10 @@ void Link::ioThreadProcessing() } } -void Link::setConnection(Connection::shared_ptr c) +void Link::setConnection(Connection* c) { Mutex::ScopedLock mutex(lock); - connection = c; - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } void Link::maintenanceVisit () @@ -231,6 +239,8 @@ void Link::maintenanceVisit () startConnectionLH(); } } + else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0) + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } uint Link::nextChannel() @@ -240,6 +250,14 @@ uint Link::nextChannel() return channelCounter++; } +void Link::notifyConnectionForced(const string text) +{ + Mutex::ScopedLock mutex(lock); + + setStateLH(STATE_FAILED); + mgmtObject->set_lastError(text); +} + void Link::setPersistenceId(uint64_t id) const { if (mgmtObject != 0 && persistenceId == 0) diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h index c4eca86c19..de757d112e 100644 --- a/cpp/src/qpid/broker/Link.h +++ b/cpp/src/qpid/broker/Link.h @@ -27,7 +27,6 @@ #include "PersistableConfig.h" #include "Bridge.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/ProtocolAccess.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/Manageable.h" #include "qpid/management/Link.h" @@ -57,7 +56,6 @@ namespace qpid { management::Link::shared_ptr mgmtObject; Broker* broker; int state; - sys::ProtocolAccess access; uint32_t visitCount; uint32_t currentInterval; bool closing; @@ -66,21 +64,20 @@ namespace qpid { Bridges created; // Bridges pending creation Bridges active; // Bridges active uint channelCounter; - boost::shared_ptr<Connection> connection; + Connection* connection; static const int STATE_WAITING = 1; static const int STATE_CONNECTING = 2; static const int STATE_OPERATIONAL = 3; + static const int STATE_FAILED = 4; + static const int STATE_CLOSED = 5; - static const uint32_t MAX_INTERVAL = 16; + static const uint32_t MAX_INTERVAL = 32; void setStateLH (int newState); void startConnectionLH(); // Start the IO Connection - void established(); // Called when connection is created - void closed(int, std::string); // Called when connection goes away void destroy(); // Called when mgmt deletes this link void ioThreadProcessing(); // Called on connection's IO thread by request - void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection public: typedef boost::shared_ptr<Link> shared_ptr; @@ -106,6 +103,16 @@ namespace qpid { void add(Bridge::shared_ptr); void cancel(Bridge::shared_ptr); + void established(); // Called when connection is created + void closed(int, std::string); // Called when connection goes away + void setConnection(Connection*); // Set pointer to the AMQP Connection + + string getAuthMechanism() { return authMechanism; } + string getUsername() { return username; } + string getPassword() { return password; } + + void notifyConnectionForced(const std::string text); + // PersistableConfig: void setPersistenceId(uint64_t id) const; uint64_t getPersistenceId() const { return persistenceId; } diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp index be3c67077e..455cc8452e 100644 --- a/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/cpp/src/qpid/broker/LinkRegistry.cpp @@ -27,7 +27,7 @@ using std::pair; using std::stringstream; using boost::intrusive_ptr; -#define LINK_MAINT_INTERVAL 5 +#define LINK_MAINT_INTERVAL 2 LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0) { @@ -185,3 +185,56 @@ MessageStore* LinkRegistry::getStore() const { return store; } +void LinkRegistry::notifyConnection(const std::string& key, Connection* c) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + { + l->second->established(); + l->second->setConnection(c); + } +} + +void LinkRegistry::notifyClosed(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + l->second->closed(0, "Closed by peer"); +} + +void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + l->second->notifyConnectionForced(text); +} + +std::string LinkRegistry::getAuthMechanism(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l != links.end()) + return l->second->getAuthMechanism(); + return string("ANONYMOUS"); +} + +std::string LinkRegistry::getAuthCredentials(const std::string& key) +{ + Mutex::ScopedLock locker(lock); + LinkMap::iterator l = links.find(key); + if (l == links.end()) + return string(); + + string result; + result += '\0'; + result += l->second->getUsername(); + result += '\0'; + result += l->second->getPassword(); + + return result; +} + + diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h index 3c47954141..f902490ed3 100644 --- a/cpp/src/qpid/broker/LinkRegistry.h +++ b/cpp/src/qpid/broker/LinkRegistry.h @@ -34,6 +34,7 @@ namespace qpid { namespace broker { class Broker; + class Connection; class LinkRegistry { // Declare a timer task to manage the establishment of link connections and the @@ -106,6 +107,12 @@ namespace broker { * Return the message store used. */ MessageStore* getStore() const; + + void notifyConnection (const std::string& key, Connection* c); + void notifyClosed (const std::string& key); + void notifyConnectionForced (const std::string& key, const std::string& text); + std::string getAuthMechanism (const std::string& key); + std::string getAuthCredentials (const std::string& key); }; } } diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 31974993bb..ca2bd7c93c 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -36,14 +36,13 @@ struct Buff : public AsynchIO::BufferBase { { delete [] bytes;} }; -AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) : +AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) : identifier(id), aio(0), factory(f), codec(0), readError(false), - isClient(false), - access(a) + isClient(false) {} AsynchIOHandler::~AsynchIOHandler() { @@ -153,7 +152,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { - codec = factory->create(*this, identifier, access); + codec = factory->create(*this, identifier); write(framing::ProtocolInitiation(codec->getVersion())); return; } diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index ece52f57c4..7448094a94 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -32,7 +32,6 @@ namespace framing { } namespace sys { -class ProtocolAccess; class AsynchIOHandler : public OutputControl { std::string identifier; AsynchIO* aio; @@ -40,12 +39,11 @@ class AsynchIOHandler : public OutputControl { ConnectionCodec* codec; bool readError; bool isClient; - ProtocolAccess* access; void write(const framing::ProtocolInitiation&); public: - AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0); + AsynchIOHandler(std::string id, ConnectionCodec::Factory* f); ~AsynchIOHandler(); void init(AsynchIO* a, int numBuffs); diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h index 4c5a68e576..efc6839b60 100644 --- a/cpp/src/qpid/sys/ConnectionCodec.h +++ b/cpp/src/qpid/sys/ConnectionCodec.h @@ -29,7 +29,6 @@ namespace qpid { namespace sys { -class ProtocolAccess; /** * Interface of coder/decoder for a connection of a specific protocol @@ -69,7 +68,7 @@ class ConnectionCodec { /** Return "preferred" codec for outbound connections. */ virtual ConnectionCodec* create( - OutputControl&, const std::string& id, ProtocolAccess* a = 0 + OutputControl&, const std::string& id ) = 0; }; }; diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h deleted file mode 100644 index 433bf0ef97..0000000000 --- a/cpp/src/qpid/sys/ProtocolAccess.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef _sys_ProtocolAccess_h -#define _sys_ProtocolAccess_h - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "AsynchIO.h" -#include "AsynchIOHandler.h" -#include <boost/function.hpp> -#include <boost/shared_ptr.hpp> - -namespace qpid { - -namespace broker -{ -class Connection; -} - -namespace sys { - -class ProtocolAccess -{ -public: - typedef boost::function0<void> Callback; - typedef boost::function2<void, int, std::string> ClosedCallback; - typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback; - - ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb) - : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {} - ~ProtocolAccess() {} - inline void close() { if (aio) aio->queueWriteClose(); } - - inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); } - inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); } - inline void closed(int err, std::string str) { closedCb(err, str); } - inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); } - -private: - AsynchIO* aio; - Callback establishedCb; - ClosedCallback closedCb; - SetConnCallback setConnCb; -}; - -}} - -#endif //!_sys_ProtocolAccess_h diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h index e61a94b205..e8eaefe1f6 100644 --- a/cpp/src/qpid/sys/ProtocolFactory.h +++ b/cpp/src/qpid/sys/ProtocolFactory.h @@ -25,7 +25,7 @@ #include <stdint.h> #include "qpid/SharedObject.h" #include "ConnectionCodec.h" -#include "ProtocolAccess.h" +#include <boost/function.hpp> namespace qpid { namespace sys { @@ -43,7 +43,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory> boost::shared_ptr<Poller>, const std::string& host, int16_t port, ConnectionCodec::Factory* codec, - ProtocolAccess* access = 0) = 0; + boost::function2<void, int, std::string> failed) = 0; }; inline ProtocolFactory::~ProtocolFactory() {} diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h index 806d6b5164..f95d841b39 100644 --- a/cpp/src/qpid/sys/Socket.h +++ b/cpp/src/qpid/sys/Socket.h @@ -118,6 +118,7 @@ public: private: Socket(IOHandlePrivate*); + mutable std::string connectname; }; }} diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 5d2cadbe03..e82a6a9102 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -42,14 +42,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory { AsynchIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); void connect(Poller::shared_ptr, const std::string& host, int16_t port, - ConnectionCodec::Factory*, ProtocolAccess*); + ConnectionCodec::Factory*, + boost::function2<void, int, std::string> failed); uint16_t getPort() const; std::string getHost() const; private: void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, - bool isClient, ProtocolAccess*); + bool isClient); }; // Static instance to initialise plugin @@ -74,31 +75,18 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : {} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient, - ProtocolAccess* a) { - AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a); - AsynchIO* aio; + ConnectionCodec::Factory* f, bool isClient) { + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); if (isClient) async->setClient(); - if (a == 0) - aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - else { - aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&ProtocolAccess::closedEof, a, async), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); - a->setAio(aio); - } + AsynchIO* aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); async->init(aio, 4); aio->start(poller); @@ -116,8 +104,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false, - (ProtocolAccess*) 0))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); acceptor->start(poller); } @@ -125,7 +112,7 @@ void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, ConnectionCodec::Factory* fact, - ProtocolAccess* access) + boost::function2<void, int, std::string> failed) { // Note that the following logic does not cause a memory leak. // The allocated Socket is freed either by the AsynchConnector @@ -135,8 +122,8 @@ void AsynchIOProtocolFactory::connect( Socket* socket = new Socket(); new AsynchConnector (*socket, poller, host, port, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access), - boost::bind(&ProtocolAccess::closed, access, _1, _2)); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true), + failed); } }} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp index 67f6b6db4c..f4320531a9 100644 --- a/cpp/src/qpid/sys/posix/Socket.cpp +++ b/cpp/src/qpid/sys/posix/Socket.cpp @@ -32,6 +32,7 @@ #include <netdb.h> #include <cstdlib> #include <string.h> +#include <iostream> #include <boost/format.hpp> @@ -138,6 +139,10 @@ const char* h_errstr(int e) { void Socket::connect(const std::string& host, int port) const { + std::stringstream namestream; + namestream << host << ":" << port; + connectname = namestream.str(); + const int& socket = impl->fd; struct sockaddr_in name; name.sin_family = AF_INET; @@ -240,6 +245,8 @@ std::string Socket::getPeername() const std::string Socket::getPeerAddress() const { + if (!connectname.empty()) + return std::string (connectname); return getName(impl->fd, false, true); } diff --git a/python/commands/qpid-route b/python/commands/qpid-route index 0dc9a89f21..5206f5c3cb 100755 --- a/python/commands/qpid-route +++ b/python/commands/qpid-route @@ -112,9 +112,12 @@ class RouteManager: connectArgs["port"] = self.src.port connectArgs["useSsl"] = False connectArgs["durable"] = _durable - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password + if self.src.username == "anonymous": + connectArgs["authMechanism"] = "ANONYMOUS" + else: + connectArgs["authMechanism"] = "PLAIN" + connectArgs["username"] = self.src.username + connectArgs["password"] = self.src.password res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) if _verbose: print "Connect method returned:", res.status, res.statusText @@ -164,9 +167,12 @@ class RouteManager: connectArgs["port"] = self.src.port connectArgs["useSsl"] = False connectArgs["durable"] = _durable - connectArgs["authMechanism"] = "PLAIN" - connectArgs["username"] = self.src.username - connectArgs["password"] = self.src.password + if self.src.username == "anonymous": + connectArgs["authMechanism"] = "ANONYMOUS" + else: + connectArgs["authMechanism"] = "PLAIN" + connectArgs["username"] = self.src.username + connectArgs["password"] = self.src.password res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs) if _verbose: print "Connect method returned:", res.status, res.statusText |