summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Link.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Link.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp65
1 files changed, 24 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 8010bf43e7..f3acf7c660 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -30,6 +30,7 @@
#include "qpid/framing/enum.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/sys/ClusterSafe.h"
using namespace qpid::broker;
using qpid::framing::Buffer;
@@ -56,8 +57,8 @@ Link::Link(LinkRegistry* _links,
string& _password,
Broker* _broker,
Manageable* parent)
- : links(_links), store(_store), host(_host), port(_port),
- transport(_transport),
+ : links(_links), store(_store), host(_host), port(_port),
+ transport(_transport),
durable(_durable),
authMechanism(_authMechanism), username(_username), password(_password),
persistenceId(0), mgmtObject(0), broker(_broker), state(0),
@@ -96,8 +97,7 @@ void Link::setStateLH (int newState)
return;
state = newState;
-
- if (hideManagement())
+ if (mgmtObject == 0)
return;
switch (state)
@@ -117,12 +117,12 @@ void Link::startConnectionLH ()
// Set the state before calling connect. It is possible that connect
// will fail synchronously and call Link::closed before returning.
setStateLH(STATE_CONNECTING);
- broker->connect (host, boost::lexical_cast<std::string>(port), transport,
+ broker->connect (host, port, transport,
boost::bind (&Link::closed, this, _1, _2));
QPID_LOG (debug, "Inter-broker link connecting to " << host << ":" << port);
} catch(std::exception& e) {
setStateLH(STATE_WAITING);
- if (!hideManagement())
+ if (mgmtObject != 0)
mgmtObject->set_lastError (e.what());
}
}
@@ -133,7 +133,8 @@ void Link::established ()
addr << host << ":" << port;
QPID_LOG (info, "Inter-broker link established to " << addr.str());
- if (!hideManagement() && agent)
+ // Don't raise the management event in a cluster, other members wont't get this call.
+ if (broker && !broker->isInCluster())
agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str()));
{
@@ -153,11 +154,12 @@ void Link::closed (int, std::string text)
connection = 0;
+ // Don't raise the management event in a cluster, other members wont't get this call.
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str());
- if (!hideManagement() && agent)
+ if (broker && !broker->isInCluster())
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -170,7 +172,7 @@ void Link::closed (int, std::string text)
if (state != STATE_FAILED)
{
setStateLH(STATE_WAITING);
- if (!hideManagement())
+ if (mgmtObject != 0)
mgmtObject->set_lastError (text);
}
@@ -219,7 +221,7 @@ void Link::cancel(Bridge::shared_ptr bridge)
{
{
Mutex::ScopedLock mutex(lock);
-
+
for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
if ((*i).get() == bridge.get()) {
created.erase(i);
@@ -248,19 +250,6 @@ void Link::ioThreadProcessing()
return;
QPID_LOG(debug, "Link::ioThreadProcessing()");
- // check for bridge session errors and recover
- if (!active.empty()) {
- Bridges::iterator removed = std::remove_if(
- active.begin(), active.end(), !boost::bind(&Bridge::isSessionReady, _1));
- for (Bridges::iterator i = removed; i != active.end(); ++i) {
- Bridge::shared_ptr bridge = *i;
- bridge->closed();
- bridge->cancel(*connection);
- created.push_back(bridge);
- }
- active.erase(removed, active.end());
- }
-
//process any pending creates and/or cancellations
if (!created.empty()) {
for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
@@ -288,9 +277,9 @@ void Link::maintenanceVisit ()
{
Mutex::ScopedLock mutex(lock);
- if (connection && updateUrls) {
+ if (connection && updateUrls) {
urls.reset(connection->getKnownHosts());
- QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
+ QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << urls);
updateUrls = false;
}
@@ -309,7 +298,7 @@ void Link::maintenanceVisit ()
}
}
}
- else if (state == STATE_OPERATIONAL && (!active.empty() || !created.empty() || !cancellations.empty()) && connection != 0)
+ else if (state == STATE_OPERATIONAL && (!created.empty() || !cancellations.empty()) && connection != 0)
connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, this));
}
@@ -320,7 +309,7 @@ void Link::reconnect(const qpid::Address& a)
port = a.port;
transport = a.protocol;
startConnectionLH();
- if (!hideManagement()) {
+ if (mgmtObject != 0) {
stringstream errorString;
errorString << "Failed over to " << a;
mgmtObject->set_lastError(errorString.str());
@@ -330,7 +319,7 @@ void Link::reconnect(const qpid::Address& a)
bool Link::tryFailover()
{
Address next;
- if (urls.next(next) &&
+ if (urls.next(next) &&
(next.host != host || next.port != port || next.protocol != transport)) {
links->changeAddress(Address(transport, host, port), next);
QPID_LOG(debug, "Link failing over to " << host << ":" << port);
@@ -340,12 +329,6 @@ bool Link::tryFailover()
}
}
-// Management updates for a linke are inconsistent in a cluster, so they are
-// suppressed.
-bool Link::hideManagement() const {
- return !mgmtObject || ( broker && broker->isInCluster());
-}
-
uint Link::nextChannel()
{
Mutex::ScopedLock mutex(lock);
@@ -358,7 +341,7 @@ void Link::notifyConnectionForced(const string text)
Mutex::ScopedLock mutex(lock);
setStateLH(STATE_FAILED);
- if (!hideManagement())
+ if (mgmtObject != 0)
mgmtObject->set_lastError(text);
}
@@ -380,7 +363,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
string authMechanism;
string username;
string password;
-
+
buffer.getShortString(host);
port = buffer.getShort();
buffer.getShortString(transport);
@@ -392,7 +375,7 @@ Link::shared_ptr Link::decode(LinkRegistry& links, Buffer& buffer)
return links.declare(host, port, transport, durable, authMechanism, username, password).first;
}
-void Link::encode(Buffer& buffer) const
+void Link::encode(Buffer& buffer) const
{
buffer.putShortString(string("link"));
buffer.putShortString(host);
@@ -404,8 +387,8 @@ void Link::encode(Buffer& buffer) const
buffer.putShortString(password);
}
-uint32_t Link::encodedSize() const
-{
+uint32_t Link::encodedSize() const
+{
return host.size() + 1 // short-string (host)
+ 5 // short-string ("link")
+ 2 // port