summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2008-05-21 21:40:49 +0000
committerTed Ross <tross@apache.org>2008-05-21 21:40:49 +0000
commit35d9dc572a918015c038245725b0f9894b13132a (patch)
treed9efecaeab11e12f0b2f2d87ff7f202383eaa6a0
parent28404c0026b5bed8ad4ad37d52cd4d3aab5c70bc (diff)
downloadqpid-python-35d9dc572a918015c038245725b0f9894b13132a.tar.gz
QPID-1087
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@658886 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp17
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h5
-rw-r--r--cpp/src/qpid/broker/Broker.cpp12
-rw-r--r--cpp/src/qpid/broker/Broker.h9
-rw-r--r--cpp/src/qpid/broker/Connection.cpp144
-rw-r--r--cpp/src/qpid/broker/Connection.h36
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp4
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h3
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp12
-rw-r--r--cpp/src/qpid/broker/Link.cpp52
-rw-r--r--cpp/src/qpid/broker/Link.h21
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp55
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp7
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h4
-rw-r--r--cpp/src/qpid/sys/ConnectionCodec.h3
-rw-r--r--cpp/src/qpid/sys/ProtocolAccess.h65
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h4
-rw-r--r--cpp/src/qpid/sys/Socket.h1
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp45
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp7
-rwxr-xr-xpython/commands/qpid-route18
23 files changed, 269 insertions, 263 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index f17d322dab..9321e0d855 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -536,7 +536,6 @@ nobase_include_HEADERS = \
qpid/sys/OutputControl.h \
qpid/sys/OutputTask.h \
qpid/sys/Poller.h \
- qpid/sys/ProtocolAccess.h \
qpid/sys/ProtocolFactory.h \
qpid/sys/Runnable.h \
qpid/sys/ScopedIncrement.h \
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index 9e860ab653..c1e2e21e5d 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -19,7 +19,6 @@
*
*/
#include "Connection.h"
-#include "qpid/sys/ProtocolAccess.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/exceptions.h"
@@ -28,13 +27,9 @@ namespace amqp_0_10 {
using sys::Mutex;
-Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient, sys::ProtocolAccess* a)
- : frameQueueClosed(false), output(o), connection(new broker::Connection(this, broker, id, _isClient)),
- identifier(id), initialized(false), isClient(_isClient)
-{
- if (a != 0)
- a->callConnCb(connection);
-}
+Connection::Connection(sys::OutputControl& o, broker::Broker& broker, const std::string& id, bool _isClient)
+ : frameQueueClosed(false), output(o), connection(this, broker, id, _isClient),
+ identifier(id), initialized(false), isClient(_isClient) {}
size_t Connection::decode(const char* buffer, size_t size) {
framing::Buffer in(const_cast<char*>(buffer), size);
@@ -50,13 +45,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
framing::AMQFrame frame;
while(frame.decode(in)) {
QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- connection->received(frame);
+ connection.received(frame);
}
return in.getPosition();
}
bool Connection::canEncode() {
- if (!frameQueueClosed) connection->doOutput();
+ if (!frameQueueClosed) connection.doOutput();
Mutex::ScopedLock l(frameQueueLock);
return (!isClient && !initialized) || !frameQueue.empty();
}
@@ -95,7 +90,7 @@ void Connection::close() {
}
void Connection::closed() {
- connection->closed();
+ connection.closed();
}
void Connection::send(framing::AMQFrame& f) {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index ea8d183e01..4369d401bd 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -29,7 +29,6 @@
#include <queue>
namespace qpid {
-namespace sys { class ProtocolAccess; }
namespace broker { class Broker; }
namespace amqp_0_10 {
@@ -41,13 +40,13 @@ class Connection : public sys::ConnectionCodec,
bool frameQueueClosed;
mutable sys::Mutex frameQueueLock;
sys::OutputControl& output;
- broker::Connection::shared_ptr connection; // FIXME aconway 2008-03-18:
+ broker::Connection connection; // FIXME aconway 2008-03-18:
std::string identifier;
bool initialized;
bool isClient;
public:
- Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false, sys::ProtocolAccess* a =0);
+ Connection(sys::OutputControl&, broker::Broker&, const std::string& id, bool isClient = false);
size_t decode(const char* buffer, size_t size);
size_t encode(const char* buffer, size_t size);
bool isClosed() const;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 4f7686aac4..2992ea45cf 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -361,18 +361,20 @@ void Broker::accept() {
// TODO: How to chose the protocolFactory to use for the connection
void Broker::connect(
const std::string& host, uint16_t port, bool /*useSsl*/,
- sys::ConnectionCodec::Factory* f,
- sys::ProtocolAccess* access)
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* f)
{
- getProtocolFactory()->connect(poller, host, port, f ? f : &factory, access);
+ getProtocolFactory()->connect(poller, host, port, f ? f : &factory, failed);
}
void Broker::connect(
- const Url& url, sys::ConnectionCodec::Factory* f)
+ const Url& url,
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* f)
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- connect(addr.host, addr.port, false, f, (sys::ProtocolAccess*) 0);
+ connect(addr.host, addr.port, false, failed, f);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 7092a86181..531817db83 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -44,7 +44,6 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/sys/ProtocolAccess.h"
#include <vector>
@@ -135,10 +134,12 @@ class Broker : public sys::Runnable, public Plugin::Target,
/** Create a connection to another broker. */
void connect(const std::string& host, uint16_t port, bool useSsl,
- sys::ConnectionCodec::Factory* =0,
- sys::ProtocolAccess* =0);
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* =0);
/** Create a connection to another broker. */
- void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
+ void connect(const Url& url,
+ boost::function2<void, int, std::string> failed,
+ sys::ConnectionCodec::Factory* =0);
// TODO: There isn't a single ProtocolFactory so the use of the following needs to be fixed
// For the present just return the first ProtocolFactory registered.
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 463193a346..ea3d3547f5 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -47,44 +47,26 @@ using qpid::management::Args;
namespace qpid {
namespace broker {
-class Connection::MgmtClient : public Connection::MgmtWrapper
-{
- management::Client::shared_ptr mgmtClient;
-
-public:
- MgmtClient(Connection* conn, Manageable* parent, ManagementAgent::shared_ptr agent,
- const std::string& mgmtId, bool incoming);
- ~MgmtClient();
- void received(framing::AMQFrame& frame);
- management::ManagementObject::shared_ptr getManagementObject() const;
- void closing();
-};
-
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, bool isLink_) :
ConnectionState(out_, broker_),
- adapter(*this, isLink),
+ adapter(*this, isLink_),
+ isLink(isLink_),
mgmtClosing(false),
- mgmtId(mgmtId_)
-{
- initMgmt();
-}
-
-void Connection::initMgmt(bool asLink)
+ mgmtId(mgmtId_),
+ links(broker_.getLinks())
{
Manageable* parent = broker.GetVhostObject ();
+ if (isLink)
+ links.notifyConnection (mgmtId, this);
+
if (parent != 0)
{
ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
if (agent.get () != 0)
- {
- if (asLink) {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, false));
- } else {
- mgmtWrapper = std::auto_ptr<MgmtWrapper>(new MgmtClient(this, parent, agent, mgmtId, true));
- }
- }
+ mgmtObject = management::Client::shared_ptr (new management::Client(this, parent, mgmtId, !isLink));
+ agent->addObject (mgmtObject);
}
}
@@ -95,19 +77,65 @@ void Connection::requestIOProcessing (boost::function0<void> callback)
}
-Connection::~Connection () {}
+Connection::~Connection ()
+{
+ if (mgmtObject.get() != 0)
+ mgmtObject->resourceDestroy();
+ if (isLink)
+ links.notifyClosed (mgmtId);
+}
void Connection::received(framing::AMQFrame& frame){
- if (mgmtClosing)
- close (403, "Closed by Management Request", 0, 0);
-
if (frame.getChannel() == 0 && frame.getMethod()) {
adapter.handle(frame);
} else {
getChannel(frame.getChannel()).in(frame);
}
-
- if (mgmtWrapper.get()) mgmtWrapper->received(frame);
+
+ if (isLink)
+ recordFromServer(frame);
+ else
+ recordFromClient(frame);
+}
+
+void Connection::recordFromServer (framing::AMQFrame& frame)
+{
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesToClient ();
+ mgmtObject->inc_bytesToClient (frame.size ());
+ }
+}
+
+void Connection::recordFromClient (framing::AMQFrame& frame)
+{
+ if (mgmtObject.get () != 0)
+ {
+ mgmtObject->inc_framesFromClient ();
+ mgmtObject->inc_bytesFromClient (frame.size ());
+ }
+}
+
+string Connection::getAuthMechanism()
+{
+ if (!isLink)
+ return string("ANONYMOUS");
+
+ return links.getAuthMechanism(mgmtId);
+}
+
+string Connection::getAuthCredentials()
+{
+ if (!isLink)
+ return string();
+
+ return links.getAuthCredentials(mgmtId);
+}
+
+void Connection::notifyConnectionForced(const string& text)
+{
+ if (isLink)
+ links.notifyConnectionForced(mgmtId, text);
}
void Connection::close(
@@ -125,7 +153,7 @@ void Connection::idleIn(){}
void Connection::closed(){ // Physically closed, suspend open sessions.
try {
while (!channels.empty())
- ptr_map_ptr(channels.begin())->handleDetach();
+ ptr_map_ptr(channels.begin())->handleDetach();
while (!exclusiveQueues.empty()) {
Queue::shared_ptr q(exclusiveQueues.front());
q->releaseExclusiveOwnership();
@@ -147,10 +175,12 @@ bool Connection::doOutput()
if (ioCallback)
ioCallback(); // Lend the IO thread for management processing
ioCallback = 0;
- if (mgmtClosing) close (403, "Closed by Management Request", 0, 0);
- //then do other output as needed:
- return outputTasks.doOutput();
+ if (mgmtClosing)
+ close (403, "Closed by Management Request", 0, 0);
+ else
+ //then do other output as needed:
+ return outputTasks.doOutput();
}catch(ConnectionException& e){
close(e.code, e.what(), 0, 0);
}catch(std::exception& e){
@@ -174,7 +204,7 @@ SessionHandler& Connection::getChannel(ChannelId id) {
ManagementObject::shared_ptr Connection::GetManagementObject (void) const
{
- return mgmtWrapper.get() ? mgmtWrapper->getManagementObject() : ManagementObject::shared_ptr();
+ return dynamic_pointer_cast<ManagementObject>(mgmtObject);
}
Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
@@ -187,7 +217,7 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
{
case management::Client::METHOD_CLOSE :
mgmtClosing = true;
- if (mgmtWrapper.get()) mgmtWrapper->closing();
+ if (mgmtObject.get()) mgmtObject->set_closing(1);
out->activateOutput();
status = Manageable::STATUS_OK;
break;
@@ -196,39 +226,5 @@ Manageable::status_t Connection::ManagementMethod (uint32_t methodId, Args&)
return status;
}
-Connection::MgmtClient::MgmtClient(Connection* conn, Manageable* parent,
- ManagementAgent::shared_ptr agent,
- const std::string& mgmtId, bool incoming)
-{
- mgmtClient = management::Client::shared_ptr
- (new management::Client (conn, parent, mgmtId, incoming));
- agent->addObject (mgmtClient);
-}
-
-Connection::MgmtClient::~MgmtClient()
-{
- if (mgmtClient.get () != 0)
- mgmtClient->resourceDestroy ();
-}
-
-void Connection::MgmtClient::received(framing::AMQFrame& frame)
-{
- if (mgmtClient.get () != 0)
- {
- mgmtClient->inc_framesFromClient ();
- mgmtClient->inc_bytesFromClient (frame.size ());
- }
-}
-
-management::ManagementObject::shared_ptr Connection::MgmtClient::getManagementObject() const
-{
- return dynamic_pointer_cast<ManagementObject>(mgmtClient);
-}
-
-void Connection::MgmtClient::closing()
-{
- if (mgmtClient) mgmtClient->set_closing (1);
-}
-
}}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index dff1e0653b..e6e3d4d15e 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -43,13 +43,14 @@
#include "SessionHandler.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Client.h"
-#include "qpid/management/Link.h"
#include <boost/ptr_container/ptr_map.hpp>
namespace qpid {
namespace broker {
+class LinkRegistry;
+
class Connection : public sys::ConnectionInputHandler,
public ConnectionState
{
@@ -62,7 +63,10 @@ class Connection : public sys::ConnectionInputHandler,
SessionHandler& getChannel(framing::ChannelId channel);
/** Close the connection */
- void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId);
+ void close(framing::ReplyCode code = 403,
+ const string& text = string(),
+ framing::ClassId classId = 0,
+ framing::MethodId methodId = 0);
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
@@ -78,38 +82,26 @@ class Connection : public sys::ConnectionInputHandler,
management::Manageable::status_t
ManagementMethod (uint32_t methodId, management::Args& args);
- void initMgmt(bool asLink = false);
void requestIOProcessing (boost::function0<void>);
+ void recordFromServer (framing::AMQFrame& frame);
+ void recordFromClient (framing::AMQFrame& frame);
+ std::string getAuthMechanism();
+ std::string getAuthCredentials();
+ void notifyConnectionForced(const std::string& text);
private:
typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- /**
- * Connection may appear, for the purposes of management, as a
- * normal client initiated connection or as an agent initiated
- * inter-broker link. This wrapper abstracts the common interface
- * for both.
- */
- class MgmtWrapper
- {
- public:
- virtual ~MgmtWrapper(){}
- virtual void received(framing::AMQFrame& frame) = 0;
- virtual management::ManagementObject::shared_ptr getManagementObject() const = 0;
- virtual void closing() = 0;
- virtual void processPending(){}
- virtual void process(Connection&, const management::Args&){}
- };
- class MgmtClient;
-
ChannelMap channels;
framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
- std::auto_ptr<MgmtWrapper> mgmtWrapper;
+ bool isLink;
bool mgmtClosing;
const std::string mgmtId;
boost::function0<void> ioCallback;
+ management::Client::shared_ptr mgmtObject;
+ LinkRegistry& links;
};
}}
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index cd015ce4f5..5de5a0230a 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -39,9 +39,9 @@ ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std:
}
sys::ConnectionCodec*
-ConnectionFactory::create(sys::OutputControl& out, const std::string& id, sys::ProtocolAccess* a) {
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
// used to create connections from one broker to another
- return new amqp_0_10::Connection(out, broker, id, true, a);
+ return new amqp_0_10::Connection(out, broker, id, true);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index bf55ab3b88..5797495054 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -24,7 +24,6 @@
#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
-namespace sys { class ProtocolAccess; }
namespace broker {
class Broker;
@@ -38,7 +37,7 @@ class ConnectionFactory : public sys::ConnectionCodec::Factory {
create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
sys::ConnectionCodec*
- create(sys::OutputControl&, const std::string& id, sys::ProtocolAccess* a =0);
+ create(sys::OutputControl&, const std::string& id);
private:
Broker& broker;
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 162664fb88..77a4d1a3de 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -26,6 +26,7 @@
#include "Connection.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/ServerInvoker.h"
+#include "qpid/framing/constants.h"
#include "qpid/log/Statement.h"
using namespace qpid;
@@ -123,6 +124,10 @@ void ConnectionHandler::Handler::close(uint16_t replyCode, const string& replyTe
if (replyCode != 200) {
QPID_LOG(warning, "Client closed connection with " << replyCode << ": " << replyText);
}
+
+ if (replyCode == framing::connection::CONNECTION_FORCED)
+ connection.notifyConnectionForced(replyText);
+
client.closeOk();
connection.getOutput().close();
}
@@ -136,9 +141,10 @@ void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
- string response;
- server.startOk(FieldTable(), ANONYMOUS, response, en_US);
- connection.initMgmt(true);
+ string mechanism = connection.getAuthMechanism();
+ string response = connection.getAuthCredentials();
+
+ server.startOk(FieldTable(), mechanism, response, en_US);
}
void ConnectionHandler::Handler::secure(const string& /*challenge*/)
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index cd032495e2..6bcfcf77a3 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -51,13 +51,11 @@ Link::Link(LinkRegistry* _links,
: links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), broker(_broker), state(0),
- access(boost::bind(&Link::established, this),
- boost::bind(&Link::closed, this, _1, _2),
- boost::bind(&Link::setConnection, this, _1)),
visitCount(0),
currentInterval(1),
closing(false),
- channelCounter(1)
+ channelCounter(1),
+ connection(0)
{
if (parent != 0)
{
@@ -75,8 +73,9 @@ Link::Link(LinkRegistry* _links,
Link::~Link ()
{
- if (state == STATE_OPERATIONAL)
- access.close();
+ if (state == STATE_OPERATIONAL && connection != 0)
+ connection->close();
+
if (mgmtObject.get () != 0)
mgmtObject->resourceDestroy ();
}
@@ -95,13 +94,16 @@ void Link::setStateLH (int newState)
case STATE_WAITING : mgmtObject->set_state("Waiting"); break;
case STATE_CONNECTING : mgmtObject->set_state("Connecting"); break;
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
+ case STATE_FAILED : mgmtObject->set_state("Failed"); break;
+ case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
}
}
void Link::startConnectionLH ()
{
try {
- broker->connect (host, port, useSsl, 0, &access);
+ broker->connect (host, port, useSsl,
+ boost::bind (&Link::closed, this, _1, _2));
setStateLH(STATE_CONNECTING);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
@@ -125,16 +127,21 @@ void Link::closed (int, std::string text)
{
Mutex::ScopedLock mutex(lock);
+ connection = 0;
+
if (state == STATE_OPERATIONAL)
QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
- connection.reset();
for (Bridges::iterator i = active.begin(); i != active.end(); i++)
created.push_back(*i);
active.clear();
- setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (text);
+ if (state != STATE_FAILED)
+ {
+ setStateLH(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ }
+
if (closing)
destroy();
}
@@ -145,7 +152,10 @@ void Link::destroy ()
Bridges toDelete;
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
- connection.reset();
+ if (connection)
+ connection->close(403, "closed by management");
+
+ setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
// corruption of the iterator caused by bridge deletion.
@@ -168,10 +178,7 @@ void Link::destroy ()
void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
-
created.push_back (bridge);
- if (state == STATE_OPERATIONAL && connection.get() != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::cancel(Bridge::shared_ptr bridge)
@@ -197,6 +204,9 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
+ if (state != STATE_OPERATIONAL)
+ return;
+
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -207,12 +217,10 @@ void Link::ioThreadProcessing()
}
}
-void Link::setConnection(Connection::shared_ptr c)
+void Link::setConnection(Connection* c)
{
Mutex::ScopedLock mutex(lock);
-
connection = c;
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::maintenanceVisit ()
@@ -231,6 +239,8 @@ void Link::maintenanceVisit ()
startConnectionLH();
}
}
+ else if (state == STATE_OPERATIONAL && !created.empty() && connection != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
uint Link::nextChannel()
@@ -240,6 +250,14 @@ uint Link::nextChannel()
return channelCounter++;
}
+void Link::notifyConnectionForced(const string text)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ setStateLH(STATE_FAILED);
+ mgmtObject->set_lastError(text);
+}
+
void Link::setPersistenceId(uint64_t id) const
{
if (mgmtObject != 0 && persistenceId == 0)
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index c4eca86c19..de757d112e 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -27,7 +27,6 @@
#include "PersistableConfig.h"
#include "Bridge.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/ProtocolAccess.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/management/Manageable.h"
#include "qpid/management/Link.h"
@@ -57,7 +56,6 @@ namespace qpid {
management::Link::shared_ptr mgmtObject;
Broker* broker;
int state;
- sys::ProtocolAccess access;
uint32_t visitCount;
uint32_t currentInterval;
bool closing;
@@ -66,21 +64,20 @@ namespace qpid {
Bridges created; // Bridges pending creation
Bridges active; // Bridges active
uint channelCounter;
- boost::shared_ptr<Connection> connection;
+ Connection* connection;
static const int STATE_WAITING = 1;
static const int STATE_CONNECTING = 2;
static const int STATE_OPERATIONAL = 3;
+ static const int STATE_FAILED = 4;
+ static const int STATE_CLOSED = 5;
- static const uint32_t MAX_INTERVAL = 16;
+ static const uint32_t MAX_INTERVAL = 32;
void setStateLH (int newState);
void startConnectionLH(); // Start the IO Connection
- void established(); // Called when connection is created
- void closed(int, std::string); // Called when connection goes away
void destroy(); // Called when mgmt deletes this link
void ioThreadProcessing(); // Called on connection's IO thread by request
- void setConnection(boost::shared_ptr<Connection>); // Set pointer to the AMQP Connection
public:
typedef boost::shared_ptr<Link> shared_ptr;
@@ -106,6 +103,16 @@ namespace qpid {
void add(Bridge::shared_ptr);
void cancel(Bridge::shared_ptr);
+ void established(); // Called when connection is created
+ void closed(int, std::string); // Called when connection goes away
+ void setConnection(Connection*); // Set pointer to the AMQP Connection
+
+ string getAuthMechanism() { return authMechanism; }
+ string getUsername() { return username; }
+ string getPassword() { return password; }
+
+ void notifyConnectionForced(const std::string text);
+
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
uint64_t getPersistenceId() const { return persistenceId; }
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index be3c67077e..455cc8452e 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -27,7 +27,7 @@ using std::pair;
using std::stringstream;
using boost::intrusive_ptr;
-#define LINK_MAINT_INTERVAL 5
+#define LINK_MAINT_INTERVAL 2
LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
{
@@ -185,3 +185,56 @@ MessageStore* LinkRegistry::getStore() const {
return store;
}
+void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ {
+ l->second->established();
+ l->second->setConnection(c);
+ }
+}
+
+void LinkRegistry::notifyClosed(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ l->second->closed(0, "Closed by peer");
+}
+
+void LinkRegistry::notifyConnectionForced(const std::string& key, const std::string& text)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ l->second->notifyConnectionForced(text);
+}
+
+std::string LinkRegistry::getAuthMechanism(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l != links.end())
+ return l->second->getAuthMechanism();
+ return string("ANONYMOUS");
+}
+
+std::string LinkRegistry::getAuthCredentials(const std::string& key)
+{
+ Mutex::ScopedLock locker(lock);
+ LinkMap::iterator l = links.find(key);
+ if (l == links.end())
+ return string();
+
+ string result;
+ result += '\0';
+ result += l->second->getUsername();
+ result += '\0';
+ result += l->second->getPassword();
+
+ return result;
+}
+
+
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 3c47954141..f902490ed3 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -34,6 +34,7 @@ namespace qpid {
namespace broker {
class Broker;
+ class Connection;
class LinkRegistry {
// Declare a timer task to manage the establishment of link connections and the
@@ -106,6 +107,12 @@ namespace broker {
* Return the message store used.
*/
MessageStore* getStore() const;
+
+ void notifyConnection (const std::string& key, Connection* c);
+ void notifyClosed (const std::string& key);
+ void notifyConnectionForced (const std::string& key, const std::string& text);
+ std::string getAuthMechanism (const std::string& key);
+ std::string getAuthCredentials (const std::string& key);
};
}
}
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 31974993bb..ca2bd7c93c 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -36,14 +36,13 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) :
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
codec(0),
readError(false),
- isClient(false),
- access(a)
+ isClient(false)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -153,7 +152,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, access);
+ codec = factory->create(*this, identifier);
write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index ece52f57c4..7448094a94 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -32,7 +32,6 @@ namespace framing {
}
namespace sys {
-class ProtocolAccess;
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
@@ -40,12 +39,11 @@ class AsynchIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
bool isClient;
- ProtocolAccess* access;
void write(const framing::ProtocolInitiation&);
public:
- AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0);
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
~AsynchIOHandler();
void init(AsynchIO* a, int numBuffs);
diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h
index 4c5a68e576..efc6839b60 100644
--- a/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/cpp/src/qpid/sys/ConnectionCodec.h
@@ -29,7 +29,6 @@
namespace qpid {
namespace sys {
-class ProtocolAccess;
/**
* Interface of coder/decoder for a connection of a specific protocol
@@ -69,7 +68,7 @@ class ConnectionCodec {
/** Return "preferred" codec for outbound connections. */
virtual ConnectionCodec* create(
- OutputControl&, const std::string& id, ProtocolAccess* a = 0
+ OutputControl&, const std::string& id
) = 0;
};
};
diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h
deleted file mode 100644
index 433bf0ef97..0000000000
--- a/cpp/src/qpid/sys/ProtocolAccess.h
+++ /dev/null
@@ -1,65 +0,0 @@
-#ifndef _sys_ProtocolAccess_h
-#define _sys_ProtocolAccess_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "AsynchIO.h"
-#include "AsynchIOHandler.h"
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-
-namespace broker
-{
-class Connection;
-}
-
-namespace sys {
-
-class ProtocolAccess
-{
-public:
- typedef boost::function0<void> Callback;
- typedef boost::function2<void, int, std::string> ClosedCallback;
- typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback;
-
- ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb)
- : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {}
- ~ProtocolAccess() {}
- inline void close() { if (aio) aio->queueWriteClose(); }
-
- inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); }
- inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); }
- inline void closed(int err, std::string str) { closedCb(err, str); }
- inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); }
-
-private:
- AsynchIO* aio;
- Callback establishedCb;
- ClosedCallback closedCb;
- SetConnCallback setConnCb;
-};
-
-}}
-
-#endif //!_sys_ProtocolAccess_h
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h
index e61a94b205..e8eaefe1f6 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -25,7 +25,7 @@
#include <stdint.h>
#include "qpid/SharedObject.h"
#include "ConnectionCodec.h"
-#include "ProtocolAccess.h"
+#include <boost/function.hpp>
namespace qpid {
namespace sys {
@@ -43,7 +43,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
boost::shared_ptr<Poller>,
const std::string& host, int16_t port,
ConnectionCodec::Factory* codec,
- ProtocolAccess* access = 0) = 0;
+ boost::function2<void, int, std::string> failed) = 0;
};
inline ProtocolFactory::~ProtocolFactory() {}
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index 806d6b5164..f95d841b39 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -118,6 +118,7 @@ public:
private:
Socket(IOHandlePrivate*);
+ mutable std::string connectname;
};
}}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 5d2cadbe03..e82a6a9102 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -42,14 +42,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
AsynchIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, int16_t port,
- ConnectionCodec::Factory*, ProtocolAccess*);
+ ConnectionCodec::Factory*,
+ boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient, ProtocolAccess*);
+ bool isClient);
};
// Static instance to initialise plugin
@@ -74,31 +75,18 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient,
- ProtocolAccess* a) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a);
- AsynchIO* aio;
+ ConnectionCodec::Factory* f, bool isClient) {
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
if (isClient)
async->setClient();
- if (a == 0)
- aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- else {
- aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&ProtocolAccess::closedEof, a, async),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- a->setAio(aio);
- }
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, 4);
aio->start(poller);
@@ -116,8 +104,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
acceptor.reset(
new AsynchAcceptor(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false,
- (ProtocolAccess*) 0)));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
acceptor->start(poller);
}
@@ -125,7 +112,7 @@ void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
- ProtocolAccess* access)
+ boost::function2<void, int, std::string> failed)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the AsynchConnector
@@ -135,8 +122,8 @@ void AsynchIOProtocolFactory::connect(
Socket* socket = new Socket();
new AsynchConnector (*socket, poller, host, port,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access),
- boost::bind(&ProtocolAccess::closed, access, _1, _2));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true),
+ failed);
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index 67f6b6db4c..f4320531a9 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -32,6 +32,7 @@
#include <netdb.h>
#include <cstdlib>
#include <string.h>
+#include <iostream>
#include <boost/format.hpp>
@@ -138,6 +139,10 @@ const char* h_errstr(int e) {
void Socket::connect(const std::string& host, int port) const
{
+ std::stringstream namestream;
+ namestream << host << ":" << port;
+ connectname = namestream.str();
+
const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
@@ -240,6 +245,8 @@ std::string Socket::getPeername() const
std::string Socket::getPeerAddress() const
{
+ if (!connectname.empty())
+ return std::string (connectname);
return getName(impl->fd, false, true);
}
diff --git a/python/commands/qpid-route b/python/commands/qpid-route
index 0dc9a89f21..5206f5c3cb 100755
--- a/python/commands/qpid-route
+++ b/python/commands/qpid-route
@@ -112,9 +112,12 @@ class RouteManager:
connectArgs["port"] = self.src.port
connectArgs["useSsl"] = False
connectArgs["durable"] = _durable
- connectArgs["authMechanism"] = "PLAIN"
- connectArgs["username"] = self.src.username
- connectArgs["password"] = self.src.password
+ if self.src.username == "anonymous":
+ connectArgs["authMechanism"] = "ANONYMOUS"
+ else:
+ connectArgs["authMechanism"] = "PLAIN"
+ connectArgs["username"] = self.src.username
+ connectArgs["password"] = self.src.password
res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
if _verbose:
print "Connect method returned:", res.status, res.statusText
@@ -164,9 +167,12 @@ class RouteManager:
connectArgs["port"] = self.src.port
connectArgs["useSsl"] = False
connectArgs["durable"] = _durable
- connectArgs["authMechanism"] = "PLAIN"
- connectArgs["username"] = self.src.username
- connectArgs["password"] = self.src.password
+ if self.src.username == "anonymous":
+ connectArgs["authMechanism"] = "ANONYMOUS"
+ else:
+ connectArgs["authMechanism"] = "PLAIN"
+ connectArgs["username"] = self.src.username
+ connectArgs["password"] = self.src.password
res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
if _verbose:
print "Connect method returned:", res.status, res.statusText