summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp48
1 files changed, 22 insertions, 26 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index cd5b89e1ad..e896b3238b 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -103,10 +103,8 @@ Link::Link(LinkRegistry* _links,
Link::~Link ()
{
- timerTask->cancel();
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ assert(state == STATE_CLOSED); // Can only get here after destroy()
+ assert(connection == 0);
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
@@ -134,6 +132,7 @@ void Link::setStateLH (int newState)
void Link::startConnectionLH ()
{
+ assert(state == STATE_WAITING);
try {
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
@@ -150,7 +149,7 @@ void Link::startConnectionLH ()
}
}
-void Link::established ()
+void Link::established(Connection* c)
{
stringstream addr;
addr << host << ":" << port;
@@ -159,16 +158,19 @@ void Link::established ()
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- {
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
- }
+ Mutex::ScopedLock mutex(lock);
+ assert(state == STATE_CONNECTING);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ if (closing)
+ destroy();
+ else // Process any IO tasks bridges added before established.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
+
void Link::setUrl(const Url& u) {
Mutex::ScopedLock mutex(lock);
url = u;
@@ -222,6 +224,7 @@ void Link::closed(int, std::string text)
destroy();
}
+// Called in connection IO thread.
void Link::destroy ()
{
Bridges toDelete;
@@ -231,7 +234,7 @@ void Link::destroy ()
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
if (connection)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ connection = 0;
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -245,6 +248,8 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
+
+ timerTask->cancel();
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
@@ -284,7 +289,7 @@ void Link::cancel(Bridge::shared_ptr bridge)
}
needIOProcessing = !cancellations.empty();
}
- if (needIOProcessing)
+ if (needIOProcessing && connection)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -326,14 +331,6 @@ void Link::ioThreadProcessing()
}
}
-void Link::setConnection(Connection* c)
-{
- Mutex::ScopedLock mutex(lock);
- connection = c;
- // Process any IO tasks bridges added before setConnection.
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
-
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
@@ -357,9 +354,8 @@ void Link::maintenanceVisit ()
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
-void Link::reconnect(const Address& a)
+void Link::reconnectLH(const Address& a)
{
- Mutex::ScopedLock mutex(lock);
host = a.host;
port = a.port;
transport = a.protocol;
@@ -378,7 +374,7 @@ bool Link::tryFailoverLH() {
if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
- reconnect(next);
+ reconnectLH(next);
return true;
}
return false;