diff options
author | Alan Conway <aconway@apache.org> | 2012-02-13 16:18:46 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-02-13 16:18:46 +0000 |
commit | fb7678f5a2c4ca854824d4763e78623b59bb5d9b (patch) | |
tree | c307a8e5be883bb09fd9397fcb246c373bf1ed36 | |
parent | e93c50e3cc997d7d83c09711928bf030d62a40e6 (diff) | |
download | qpid-python-fb7678f5a2c4ca854824d4763e78623b59bb5d9b.tar.gz |
QPID-3603: Fix core dump in Link::requestIOProcessing.
Core dump occuring when a link was closed before being completely opened.
- Merge Link::established and setConnection into one atomic function.
- Moved logic that needs to be executed in a connection thread from ~Link to Link::destroy
Link::destroy is always called in connection thread, ~Link can be called later if
another thread is holding a reference.
- Added some asserts to verify functioning as expected.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603-2@1243584 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 48 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 3 |
3 files changed, 25 insertions, 31 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index cd5b89e1ad..e896b3238b 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -103,10 +103,8 @@ Link::Link(LinkRegistry* _links, Link::~Link () { - timerTask->cancel(); - if (state == STATE_OPERATIONAL && connection != 0) - connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); - + assert(state == STATE_CLOSED); // Can only get here after destroy() + assert(connection == 0); if (mgmtObject != 0) mgmtObject->resourceDestroy (); } @@ -134,6 +132,7 @@ void Link::setStateLH (int newState) void Link::startConnectionLH () { + assert(state == STATE_WAITING); try { // Set the state before calling connect. It is possible that connect // will fail synchronously and call Link::closed before returning. @@ -150,7 +149,7 @@ void Link::startConnectionLH () } } -void Link::established () +void Link::established(Connection* c) { stringstream addr; addr << host << ":" << port; @@ -159,16 +158,19 @@ void Link::established () if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); - { - Mutex::ScopedLock mutex(lock); - setStateLH(STATE_OPERATIONAL); - currentInterval = 1; - visitCount = 0; - if (closing) - destroy(); - } + Mutex::ScopedLock mutex(lock); + assert(state == STATE_CONNECTING); + setStateLH(STATE_OPERATIONAL); + currentInterval = 1; + visitCount = 0; + connection = c; + if (closing) + destroy(); + else // Process any IO tasks bridges added before established. + connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } + void Link::setUrl(const Url& u) { Mutex::ScopedLock mutex(lock); url = u; @@ -222,6 +224,7 @@ void Link::closed(int, std::string text) destroy(); } +// Called in connection IO thread. void Link::destroy () { Bridges toDelete; @@ -231,7 +234,7 @@ void Link::destroy () QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " removed by management"); if (connection) connection->close(CLOSE_CODE_CONNECTION_FORCED, "closed by management"); - + connection = 0; setStateLH(STATE_CLOSED); // Move the bridges to be deleted into a local vector so there is no @@ -245,6 +248,8 @@ void Link::destroy () for (Bridges::iterator i = created.begin(); i != created.end(); i++) toDelete.push_back(*i); created.clear(); + + timerTask->cancel(); } // Now delete all bridges on this link (don't hold the lock for this). for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++) @@ -284,7 +289,7 @@ void Link::cancel(Bridge::shared_ptr bridge) } needIOProcessing = !cancellations.empty(); } - if (needIOProcessing) + if (needIOProcessing && connection) connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } @@ -326,14 +331,6 @@ void Link::ioThreadProcessing() } } -void Link::setConnection(Connection* c) -{ - Mutex::ScopedLock mutex(lock); - connection = c; - // Process any IO tasks bridges added before setConnection. - connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); -} - void Link::maintenanceVisit () { Mutex::ScopedLock mutex(lock); @@ -357,9 +354,8 @@ void Link::maintenanceVisit () connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this)); } -void Link::reconnect(const Address& a) +void Link::reconnectLH(const Address& a) { - Mutex::ScopedLock mutex(lock); host = a.host; port = a.port; transport = a.protocol; @@ -378,7 +374,7 @@ bool Link::tryFailoverLH() { if (next.host != host || next.port != port || next.protocol != transport) { links->changeAddress(Address(transport, host, port), next); QPID_LOG(debug, "Inter-broker link failing over to " << next.host << ":" << next.port); - reconnect(next); + reconnectLH(next); return true; } return false; diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 9ee4837040..4085c3bfcf 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -121,11 +121,10 @@ class Link : public PersistableConfig, public management::Manageable { void cancel(Bridge::shared_ptr); void setUrl(const Url&); // Set URL for reconnection. - void established(); // Called when connection is create + void established(Connection*); // Called when connection is create void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away - void setConnection(Connection*); // Set pointer to the AMQP Connection - void reconnect(const Address&); //called by LinkRegistry + void reconnectLH(const Address&); //called by LinkRegistry void close(); // Close the link from within the broker. std::string getAuthMechanism() { return authMechanism; } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index ef6bf95b0b..a4fd90684e 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -240,8 +240,7 @@ void LinkRegistry::notifyConnection(const std::string& key, Connection* c) { Link::shared_ptr link = findLink(key); if (link) { - link->established(); - link->setConnection(c); + link->established(c); c->setUserId(str(format("%1%@%2%") % link->getUsername() % realm)); } } |