summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2013-01-22 21:35:09 +0000
committerKen Giusti <kgiusti@apache.org>2013-01-22 21:35:09 +0000
commit43c8e1bb6c191297364c8927cf11485dee85d070 (patch)
treef6719719233ac6a029664541672aafbf15db0336 /cpp/src/qpid/broker/Link.cpp
parent407a8eab0758706ea3b974b61b72a3621aae550a (diff)
downloadqpid-python-43c8e1bb6c191297364c8927cf11485dee85d070.tar.gz
QPID-4546: delete links regardless of connection state.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1437187 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/Link.cpp')
-rw-r--r--cpp/src/qpid/broker/Link.cpp120
1 files changed, 76 insertions, 44 deletions
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index dfa7d4c3ab..70d0f68427 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -148,7 +148,6 @@ Link::Link(const string& _name,
persistenceId(0), broker(_broker), state(0),
visitCount(0),
currentInterval(1),
- closing(false),
reconnectNext(0), // Index of next address for reconnecting in url.
nextFreeChannel(1),
freeChannels(1, framing::CHANNEL_MAX),
@@ -213,6 +212,7 @@ void Link::setStateLH (int newState)
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
case STATE_FAILED : mgmtObject->set_state("Failed"); break;
case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
+ case STATE_CLOSING : mgmtObject->set_state("Closing"); break;
}
}
@@ -242,19 +242,20 @@ void Link::established(Connection* c)
if (agent)
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
- bool isClosing = false;
+ bool isClosing = true;
{
Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_OPERATIONAL);
- currentInterval = 1;
- visitCount = 0;
- connection = c;
- isClosing = closing;
+ if (state != STATE_CLOSING) {
+ isClosing = false;
+ setStateLH(STATE_OPERATIONAL);
+ currentInterval = 1;
+ visitCount = 0;
+ connection = c;
+ c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
if (isClosing)
destroy();
- else // Process any IO tasks bridges added before established.
- c->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -279,9 +280,10 @@ class DetachedCallback : public SessionHandler::ErrorListener {
};
}
-void Link::opened() {
+void Link::opened()
+{
Mutex::ScopedLock mutex(lock);
- if (!connection) return;
+ if (!connection || state != STATE_OPERATIONAL) return;
if (connection->GetManagementObject()) {
mgmtObject->set_connectionRef(connection->GetManagementObject()->getObjectId());
@@ -338,34 +340,43 @@ void Link::opened() {
}
}
+
+// called when connection attempt fails (see startConnectionLH)
void Link::closed(int, std::string text)
{
- Mutex::ScopedLock mutex(lock);
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
- connection = 0;
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+
+ connection = 0;
- mgmtObject->set_connectionRef(qpid::management::ObjectId());
- if (state == STATE_OPERATIONAL && agent) {
- stringstream addr;
- addr << host << ":" << port;
+ mgmtObject->set_connectionRef(qpid::management::ObjectId());
+ if (state == STATE_OPERATIONAL && agent) {
+ stringstream addr;
+ addr << host << ":" << port;
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
- }
+ }
- for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
- (*i)->closed();
- created.push_back(*i);
- }
- active.clear();
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->closed();
+ created.push_back(*i);
+ }
+ active.clear();
- if (state != STATE_FAILED)
- {
- setStateLH(STATE_WAITING);
- mgmtObject->set_lastError (text);
+ if (state == STATE_CLOSING) {
+ isClosing = true;
+ } else if (state != STATE_FAILED) {
+ setStateLH(STATE_WAITING);
+ mgmtObject->set_lastError (text);
+ }
}
+ if (isClosing) destroy();
}
-// Called in connection IO thread, cleans up the connection before destroying Link
+// Cleans up the connection before destroying Link. Must be called in connection thread
+// if the connection is active. Caller Note well: may call "delete this"!
void Link::destroy ()
{
Bridges toDelete;
@@ -395,7 +406,9 @@ void Link::destroy ()
for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
(*i)->close();
toDelete.clear();
- listener(this); // notify LinkRegistry that this Link has been destroyed
+ // notify LinkRegistry that this Link has been destroyed. Will result in "delete
+ // this" if LinkRegistry is holding the last shared pointer to *this
+ listener(this);
}
void Link::add(Bridge::shared_ptr bridge)
@@ -437,7 +450,7 @@ void Link::ioThreadProcessing()
{
Mutex::ScopedLock mutex(lock);
- if (state != STATE_OPERATIONAL || closing)
+ if (state != STATE_OPERATIONAL)
return;
// check for bridge session errors and recover
@@ -474,7 +487,6 @@ void Link::ioThreadProcessing()
void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (closing) return;
if (state == STATE_WAITING)
{
visitCount++;
@@ -490,10 +502,11 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL &&
- (!active.empty() || !created.empty() || !cancellations.empty()) &&
- connection && connection->isOpen())
- connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ else if (state == STATE_OPERATIONAL) {
+ if ((!active.empty() || !created.empty() || !cancellations.empty()) &&
+ connection && connection->isOpen())
+ connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
+ }
}
void Link::reconnectLH(const Address& a)
@@ -564,9 +577,17 @@ void Link::returnChannel(framing::ChannelId c)
void Link::notifyConnectionForced(const string text)
{
- Mutex::ScopedLock mutex(lock);
- setStateLH(STATE_FAILED);
- mgmtObject->set_lastError(text);
+ bool isClosing = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ if (state == STATE_CLOSING) {
+ isClosing = true;
+ } else {
+ setStateLH(STATE_FAILED);
+ mgmtObject->set_lastError(text);
+ }
+ }
+ if (isClosing) destroy();
}
void Link::setPersistenceId(uint64_t id) const
@@ -656,14 +677,25 @@ ManagementObject::shared_ptr Link::GetManagementObject(void) const
void Link::close() {
QPID_LOG(debug, "Link::close(), link=" << name );
- Mutex::ScopedLock mutex(lock);
- if (!closing) {
- closing = true;
- if (state != STATE_CONNECTING && connection) {
- //connection can only be closed on the connections own IO processing thread
- connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ bool destroy_now = false;
+ {
+ Mutex::ScopedLock mutex(lock);
+ if (state != STATE_CLOSING) {
+ int old_state = state;
+ setStateLH(STATE_CLOSING);
+ if (connection) {
+ //connection can only be closed on the connections own IO processing thread
+ connection->requestIOProcessing(boost::bind(&Link::destroy, this));
+ } else if (old_state == STATE_CONNECTING) {
+ // cannot destroy Link now since a connection request is outstanding.
+ // destroy the link after we get a response (see Link::established,
+ // Link::closed, Link::notifyConnectionForced, etc).
+ } else {
+ destroy_now = true;
+ }
}
}
+ if (destroy_now) destroy();
}