summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-13 16:18:46 +0000
committerAlan Conway <aconway@apache.org>2012-02-13 16:18:46 +0000
commitfb7678f5a2c4ca854824d4763e78623b59bb5d9b (patch)
treec307a8e5be883bb09fd9397fcb246c373bf1ed36
parente93c50e3cc997d7d83c09711928bf030d62a40e6 (diff)
downloadqpid-python-fb7678f5a2c4ca854824d4763e78623b59bb5d9b.tar.gz
QPID-3603: Fix core dump in Link::requestIOProcessing.
Core dump occuring when a link was closed before being completely opened. - Merge Link::established and setConnection into one atomic function. - Moved logic that needs to be executed in a connection thread from ~Link to Link::destroy Link::destroy is always called in connection thread, ~Link can be called later if another thread is holding a reference. - Added some asserts to verify functioning as expected. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243584 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp48
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h5
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp3
3 files changed, 25 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index cd5b89e1ad..e896b3238b 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -103,10 +103,8 @@ Link::Link(LinkRegistry* _links,
Link::~Link ()
{
- timerTask->cancel();
- if (state == STATE_OPERATIONAL && connection != 0)
- connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ assert(state == STATE_CLOSED); // Can only get here after destroy()
+ assert(connection == 0);
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
}
@@ -134,6 +132,7 @@ void Link::setStateLH (int newState)
void Link::startConnectionLH ()
{
+ assert(state == STATE_WAITING);
try {
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
@@ -150,7 +149,7 @@ void Link::startConnectionLH ()
}
}
-void Link::established ()
+void Link::established(Connection* c)
{
stringstream addr;
addr << host << ":" << port;
@@ -159,16 +158,19 @@ void Link::established ()
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- {
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- if (closing)
- destroy();
- }
+ Mutex::ScopedLock mutex(lock);
+ assert(state == STATE_CONNECTING);
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ if (closing)
+ destroy();
+ else // Process any IO tasks bridges added before established.
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
+
void Link::setUrl(const Url& u) {
Mutex::ScopedLock mutex(lock);
url = u;
@@ -222,6 +224,7 @@ void Link::closed(int, std::string text)
destroy();
}
+// Called in connection IO thread.
void Link::destroy ()
{
Bridges toDelete;
@@ -231,7 +234,7 @@ void Link::destroy ()
QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management");
if (connection)
connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management");
-
+ connection = 0;
setStateLH(STATE_CLOSED);
// Move the bridges to be deleted into a local vector so there is no
@@ -245,6 +248,8 @@ void Link::destroy ()
for (Bridges::iterator i = created.begin(); i != created.end(); i++)
toDelete.push_back(*i);
created.clear();
+
+ timerTask->cancel();
}
// Now delete all bridges on this link (don't hold the lock for this).
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
@@ -284,7 +289,7 @@ void Link::cancel(Bridge::shared_ptr bridge)
}
needIOProcessing = !cancellations.empty();
}
- if (needIOProcessing)
+ if (needIOProcessing && connection)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -326,14 +331,6 @@ void Link::ioThreadProcessing()
}
}
-void Link::setConnection(Connection* c)
-{
- Mutex::ScopedLock mutex(lock);
- connection = c;
- // Process any IO tasks bridges added before setConnection.
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
-}
-
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
@@ -357,9 +354,8 @@ void Link::maintenanceVisit ()
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
-void Link::reconnect(const Address& a)
+void Link::reconnectLH(const Address& a)
{
- Mutex::ScopedLock mutex(lock);
host = a.host;
port = a.port;
transport = a.protocol;
@@ -378,7 +374,7 @@ bool Link::tryFailoverLH() {
if (next.host != host || next.port != port || next.protocol != transport) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port);
- reconnect(next);
+ reconnectLH(next);
return true;
}
return false;
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 9ee4837040..4085c3bfcf 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -121,11 +121,10 @@ class Link : public PersistableConfig, public management::Manageable {
void cancel(Bridge::shared_ptr);
void setUrl(const Url&); // Set URL for reconnection.
- void established(); // Called when connection is create
+ void established(Connection*); // Called when connection is create
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
- void setConnection(Connection*); // Set pointer to the AMQP Connection
- void reconnect(const Address&); //called by LinkRegistry
+ void reconnectLH(const Address&); //called by LinkRegistry
void close(); // Close the link from within the broker.
std::string getAuthMechanism() { return authMechanism; }
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index ef6bf95b0b..a4fd90684e 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -240,8 +240,7 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
{
Link::shared_ptr link = findLink(key);
if (link) {
- link->established();
- link->setConnection(c);
+ link->established(c);
c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm));
}
}