summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp135
1 files changed, 95 insertions, 40 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 83c9a2a62e..cd032495e2 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -37,14 +37,19 @@ using qpid::management::Manageable;
using qpid::management::Args;
using qpid::sys::Mutex;
-Link::Link(LinkRegistry* _links,
- string& _host,
- uint16_t _port,
- bool _useSsl,
- bool _durable,
- Broker* _broker,
+Link::Link(LinkRegistry* _links,
+ MessageStore* _store,
+ string& _host,
+ uint16_t _port,
+ bool _useSsl,
+ bool _durable,
+ string& _authMechanism,
+ string& _username,
+ string& _password,
+ Broker* _broker,
management::Manageable* parent)
- : links(_links), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), durable(_durable),
+ authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), broker(_broker), state(0),
access(boost::bind(&Link::established, this),
boost::bind(&Link::closed, this, _1, _2),
@@ -65,7 +70,7 @@ Link::Link(LinkRegistry* _links,
agent->addObject(mgmtObject);
}
}
- setState(STATE_WAITING);
+ setStateLH(STATE_WAITING);
}
Link::~Link ()
@@ -76,7 +81,7 @@ Link::~Link ()
mgmtObject->resourceDestroy ();
}
-void Link::setState (int newState)
+void Link::setStateLH (int newState)
{
if (newState == state)
return;
@@ -93,13 +98,13 @@ void Link::setState (int newState)
}
}
-void Link::startConnection ()
+void Link::startConnectionLH ()
{
try {
broker->connect (host, port, useSsl, 0, &access);
- setState(STATE_CONNECTING);
+ setStateLH(STATE_CONNECTING);
} catch(std::exception& e) {
- setState(STATE_WAITING);
+ setStateLH(STATE_WAITING);
mgmtObject->set_lastError (e.what());
}
}
@@ -109,7 +114,7 @@ void Link::established ()
Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link established to " << host << ":" << port);
- setState(STATE_OPERATIONAL);
+ setStateLH(STATE_OPERATIONAL);
currentInterval = 1;
visitCount = 0;
if (closing)
@@ -124,8 +129,11 @@ void Link::closed (int, std::string text)
QPID_LOG (warning, "Inter-broker link disconnected from " << host << ":" << port);
connection.reset();
- created.transfer(created.end(), active.begin(), active.end(), active);
- setState(STATE_WAITING);
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ created.push_back(*i);
+ active.clear();
+
+ setStateLH(STATE_WAITING);
mgmtObject->set_lastError (text);
if (closing)
destroy();
@@ -133,25 +141,56 @@ void Link::closed (int, std::string text)
void Link::destroy ()
{
+ Mutex::ScopedLock mutex(lock);
+ Bridges toDelete;
+
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
connection.reset();
+
+ // Move the bridges to be deleted into a local vector so there is no
+ // corruption of the iterator caused by bridge deletion.
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ toDelete.push_back(*i);
+ active.clear();
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++)
+ toDelete.push_back(*i);
+ created.clear();
+
+ // Now delete all bridges on this link.
+ for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
+ (*i)->destroy();
+ toDelete.clear();
+
links->destroy (host, port);
}
-void Link::cancel(Bridge* bridge)
+void Link::add(Bridge::shared_ptr bridge)
{
Mutex::ScopedLock mutex(lock);
- //need to take this out of the active map and add it to the cancelled map
+ created.push_back (bridge);
+ if (state == STATE_OPERATIONAL && connection.get() != 0)
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::cancel(Bridge::shared_ptr bridge)
+{
+ Mutex::ScopedLock mutex(lock);
+
+ for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+ if ((*i).get() == bridge.get()) {
+ created.erase(i);
+ break;
+ }
+ }
for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- if (&(*i) == bridge) {
- cancelled.transfer(cancelled.end(), i, active);
+ if ((*i).get() == bridge.get()) {
+ bridge->cancel();
+ active.erase(i);
break;
}
}
-
- if (connection.get() != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
void Link::ioThreadProcessing()
@@ -161,21 +200,17 @@ void Link::ioThreadProcessing()
//process any pending creates
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
- i->create(*connection);
- }
- active.transfer(active.end(), created.begin(), created.end(), created);
- }
- if (!cancelled.empty()) {
- //process any pending cancellations
- for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); ++i) {
- i->cancel();
+ active.push_back(*i);
+ (*i)->create(*connection);
}
- cancelled.clear();
+ created.clear();
}
}
void Link::setConnection(Connection::shared_ptr c)
{
+ Mutex::ScopedLock mutex(lock);
+
connection = c;
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -193,11 +228,18 @@ void Link::maintenanceVisit ()
currentInterval *= 2;
if (currentInterval > MAX_INTERVAL)
currentInterval = MAX_INTERVAL;
- startConnection();
+ startConnectionLH();
}
}
}
+uint Link::nextChannel()
+{
+ Mutex::ScopedLock mutex(lock);
+
+ return channelCounter++;
+}
+
void Link::setPersistenceId(uint64_t id) const
{
if (mgmtObject != 0 && persistenceId == 0)
@@ -217,13 +259,19 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
{
string host;
uint16_t port;
+ string authMechanism;
+ string username;
+ string password;
buffer.getShortString(host);
port = buffer.getShort();
bool useSsl(buffer.getOctet());
bool durable(buffer.getOctet());
+ buffer.getShortString(authMechanism);
+ buffer.getShortString(username);
+ buffer.getShortString(password);
- return links.declare(host, port, useSsl, durable).first;
+ return links.declare(host, port, useSsl, durable, authMechanism, username, password).first;
}
void Link::encode(Buffer& buffer) const
@@ -233,6 +281,9 @@ void Link::encode(Buffer& buffer) const
buffer.putShort(port);
buffer.putOctet(useSsl ? 1 : 0);
buffer.putOctet(durable ? 1 : 0);
+ buffer.putShortString(authMechanism);
+ buffer.putShortString(username);
+ buffer.putShortString(password);
}
uint32_t Link::encodedSize() const
@@ -241,7 +292,10 @@ uint32_t Link::encodedSize() const
+ 5 // short-string ("link")
+ 2 // port
+ 1 // useSsl
- + 1; // durable
+ + 1 // durable
+ + authMechanism.size() + 1
+ + username.size() + 1
+ + password.size() + 1;
}
ManagementObject::shared_ptr Link::GetManagementObject (void) const
@@ -251,8 +305,6 @@ ManagementObject::shared_ptr Link::GetManagementObject (void) const
Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args)
{
- Mutex::ScopedLock mutex(lock);
-
switch (op)
{
case management::Link::METHOD_CLOSE :
@@ -269,11 +321,14 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& args
if (iargs.i_durable && !durable)
return Manageable::STATUS_INVALID_PARAMETER;
- created.push_back(new Bridge(this, channelCounter++,
- boost::bind(&Link::cancel, this, _1), iargs));
+ std::pair<Bridge::shared_ptr, bool> result =
+ links->declare (host, port, iargs.i_durable, iargs.i_src,
+ iargs.i_dest, iargs.i_key, iargs.i_src_is_queue,
+ iargs.i_src_is_local, iargs.i_tag, iargs.i_excludes);
+
+ if (result.second && iargs.i_durable)
+ store->create(*result.first);
- if (state == STATE_OPERATIONAL && connection.get() != 0)
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
return Manageable::STATUS_OK;
}