summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Bridge.cpp7
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/Link.cpp17
-rw-r--r--cpp/src/qpid/broker/Link.h2
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.cpp21
-rw-r--r--cpp/src/qpid/broker/LinkRegistry.h12
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp14
-rw-r--r--cpp/src/qpid/cluster/Cluster.h1
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp13
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h5
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp27
-rw-r--r--cpp/src/qpid/cluster/Connection.h3
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp13
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h2
14 files changed, 123 insertions, 16 deletions
diff --git a/cpp/src/qpid/broker/Bridge.cpp b/cpp/src/qpid/broker/Bridge.cpp
index 6129f13ede..38a9b5d64c 100644
--- a/cpp/src/qpid/broker/Bridge.cpp
+++ b/cpp/src/qpid/broker/Bridge.cpp
@@ -72,6 +72,7 @@ Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
if (!args.i_durable)
agent->addObject(mgmtObject);
}
+ QPID_LOG(debug, "Bridge created from " << args.i_src << " to " << args.i_dest);
}
Bridge::~Bridge()
@@ -104,10 +105,11 @@ void Bridge::create(ConnectionState& c)
session->attach(name, false);
session->commandPoint(0,0);
- if (args.i_srcIsQueue) {
+ if (args.i_srcIsQueue) {
peer->getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 0, false, "", 0, options);
peer->getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
peer->getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << args.i_dest);
} else {
FieldTable queueSettings;
@@ -141,6 +143,9 @@ void Bridge::create(ConnectionState& c)
if (exchange.get() == 0)
throw Exception("Exchange not found for dynamic route");
exchange->registerDynamicBridge(this);
+ QPID_LOG(debug, "Activated dynamic route for exchange " << args.i_src);
+ } else {
+ QPID_LOG(debug, "Activated static route from exchange " << args.i_src << " to " << args.i_dest);
}
}
}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 5cbff57788..559cd4cfe3 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -117,7 +117,7 @@ class Connection : public sys::ConnectionInputHandler,
ChannelMap channels;
//framing::AMQP_ClientProxy::Connection* client;
ConnectionHandler adapter;
- bool isLink;
+ const bool isLink;
bool mgmtClosing;
const std::string mgmtId;
boost::function0<void> ioCallback;
diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp
index 2bd15759ef..835b37e6eb 100644
--- a/cpp/src/qpid/broker/Link.cpp
+++ b/cpp/src/qpid/broker/Link.cpp
@@ -106,6 +106,7 @@ void Link::setStateLH (int newState)
case STATE_OPERATIONAL : mgmtObject->set_state("Operational"); break;
case STATE_FAILED : mgmtObject->set_state("Failed"); break;
case STATE_CLOSED : mgmtObject->set_state("Closed"); break;
+ case STATE_PASSIVE : mgmtObject->set_state("Passive"); break;
}
}
@@ -239,6 +240,7 @@ void Link::ioThreadProcessing()
if (state != STATE_OPERATIONAL)
return;
+ QPID_LOG(debug, "Link::ioThreadProcessing()");
//process any pending creates
if (!created.empty()) {
@@ -404,6 +406,7 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
case _qmf::Link::METHOD_BRIDGE :
_qmf::ArgsLinkBridge& iargs = (_qmf::ArgsLinkBridge&) args;
+ QPID_LOG(debug, "Link::bridge() request received");
// Durable bridges are only valid on durable links
if (iargs.i_durable && !durable) {
@@ -437,3 +440,17 @@ Manageable::status_t Link::ManagementMethod (uint32_t op, Args& args, string& te
return Manageable::STATUS_UNKNOWN_METHOD;
}
+
+void Link::setPassive(bool passive)
+{
+ Mutex::ScopedLock mutex(lock);
+ if (passive) {
+ setStateLH(STATE_PASSIVE);
+ } else {
+ if (state == STATE_PASSIVE) {
+ setStateLH(STATE_WAITING);
+ } else {
+ QPID_LOG(warning, "Ignoring attempt to activate non-passive link");
+ }
+ }
+}
diff --git a/cpp/src/qpid/broker/Link.h b/cpp/src/qpid/broker/Link.h
index 6fef694663..8e741c6eb7 100644
--- a/cpp/src/qpid/broker/Link.h
+++ b/cpp/src/qpid/broker/Link.h
@@ -76,6 +76,7 @@ namespace qpid {
static const int STATE_OPERATIONAL = 3;
static const int STATE_FAILED = 4;
static const int STATE_CLOSED = 5;
+ static const int STATE_PASSIVE = 6;
static const uint32_t MAX_INTERVAL = 32;
@@ -120,6 +121,7 @@ namespace qpid {
Broker* getBroker() { return broker; }
void notifyConnectionForced(const std::string text);
+ void setPassive(bool p);
// PersistableConfig:
void setPersistenceId(uint64_t id) const;
diff --git a/cpp/src/qpid/broker/LinkRegistry.cpp b/cpp/src/qpid/broker/LinkRegistry.cpp
index f400f2066a..956a9ea5ae 100644
--- a/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -31,7 +31,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
#define LINK_MAINT_INTERVAL 2
-LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0)
+LinkRegistry::LinkRegistry (Broker* _broker) : broker(_broker), parent(0), store(0), passive(false), passiveChanged(false)
{
timer.add (intrusive_ptr<TimerTask> (new Periodic(*this)));
}
@@ -51,6 +51,14 @@ void LinkRegistry::periodicMaintenance ()
linksToDestroy.clear();
bridgesToDestroy.clear();
+ if (passiveChanged) {
+ if (passive) { QPID_LOG(info, "Passivating links"); }
+ else { QPID_LOG(info, "Activating links"); }
+ for (LinkMap::iterator i = links.begin(); i != links.end(); i++) {
+ i->second->setPassive(passive);
+ }
+ passiveChanged = false;
+ }
for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
i->second->maintenanceVisit();
//now process any requests for re-addressing
@@ -109,6 +117,7 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(string& host,
link = Link::shared_ptr (new Link (this, store, host, port, transport, durable,
authMechanism, username, password,
broker, parent));
+ if (passive) link->setPassive(true);
links[key] = link;
return std::pair<Link::shared_ptr, bool>(link, true);
}
@@ -129,6 +138,8 @@ pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
uint16_t sync)
{
Mutex::ScopedLock locker(lock);
+ QPID_LOG(debug, "Bridge declared " << host << ": " << port << " from " << src << " to " << dest << " (" << key << ")");
+
stringstream keystream;
keystream << host << ":" << port;
string linkKey = string(keystream.str());
@@ -291,3 +302,11 @@ std::string LinkRegistry::createKey(const TcpAddress& a)
keystream << a.host << ":" << a.port;
return string(keystream.str());
}
+
+void LinkRegistry::setPassive(bool p)
+{
+ Mutex::ScopedLock locker(lock);
+ passiveChanged = p != passive;
+ passive = p;
+ //will activate or passivate links on maintenance visit
+}
diff --git a/cpp/src/qpid/broker/LinkRegistry.h b/cpp/src/qpid/broker/LinkRegistry.h
index 6e228c0e2c..884228bd63 100644
--- a/cpp/src/qpid/broker/LinkRegistry.h
+++ b/cpp/src/qpid/broker/LinkRegistry.h
@@ -64,6 +64,8 @@ namespace broker {
Timer timer;
management::Manageable* parent;
MessageStore* store;
+ bool passive;
+ bool passiveChanged;
void periodicMaintenance ();
bool updateAddress(const std::string& oldKey, const TcpAddress& newAddress);
@@ -122,7 +124,17 @@ namespace broker {
std::string getAuthCredentials (const std::string& key);
std::string getAuthIdentity (const std::string& key);
+ /**
+ * Called by links failing over to new address
+ */
void changeAddress(const TcpAddress& oldAddress, const TcpAddress& newAddress);
+ /**
+ * Called to alter passive state. In passive state the links
+ * and bridges managed by a link registry will be recorded and
+ * updated but links won't actually establish connections and
+ * bridges won't therefore pull or push any messages.
+ */
+ void setPassive(bool);
};
}
}
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 0a40493350..7b40328f1c 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -323,10 +323,20 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
state = NEWBIE;
QPID_LOG(info, *this << " joining cluster: " << map);
mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ ClusterMap::Set members = map.getAlive();
+ members.erase(myId);
+ myElders = members;
+ broker.getLinks().setPassive(true);
}
}
- else if (state >= READY && memberChange)
+ else if (state >= READY && memberChange) {
memberUpdate(l);
+ myElders = ClusterMap::intersection(myElders, map.getAlive());
+ if (myElders.empty()) {
+ //assume we are oldest, reactive links if necessary
+ broker.getLinks().setPassive(false);
+ }
+ }
}
@@ -496,6 +506,8 @@ void Cluster::memberUpdate(Lock& l) {
}
lastSize = size;
+ //
+
if (mgmtObject) {
mgmtObject->set_clusterSize(size);
string urlstr;
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index c93cd21876..8d235c7caf 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -184,6 +184,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
+ ClusterMap::Set myElders;
// Thread safe members
Multicaster mcast;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index 873f0be928..cc9ea29093 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -114,6 +114,10 @@ std::vector<Url> ClusterMap::memberUrls() const {
return urls;
}
+ClusterMap::Set ClusterMap::getAlive() const {
+ return alive;
+}
+
std::ostream& operator<<(std::ostream& o, const ClusterMap::Map& m) {
std::ostream_iterator<MemberId> oi(o);
std::transform(m.begin(), m.end(), oi, boost::bind(&ClusterMap::Map::value_type::first, _1));
@@ -170,4 +174,13 @@ boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId&
return boost::optional<Url>();
}
+ClusterMap::Set ClusterMap::intersection(const ClusterMap::Set& a, const ClusterMap::Set& b)
+{
+ Set intersection;
+ std::set_intersection(a.begin(), a.end(),
+ b.begin(), b.end(),
+ std::inserter(intersection, intersection.begin()));
+ return intersection;
+
+}
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 5c1981269f..507fee9a72 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -76,6 +76,7 @@ class ClusterMap {
size_t aliveCount() const { return alive.size(); }
size_t memberCount() const { return members.size(); }
std::vector<Url> memberUrls() const;
+ Set getAlive() const;
bool dumpRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
@@ -84,6 +85,10 @@ class ClusterMap {
/**@return true If this is a new member */
bool ready(const MemberId& id, const Url&);
+ /**
+ * Utility method to return intersection of two member sets
+ */
+ static Set intersection(const Set& a, const Set& b);
private:
Url getUrl(const Map& map, const MemberId& id);
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 3988abd491..ac4b9dcdf2 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -62,14 +62,15 @@ NoOpConnectionOutputHandler Connection::discardHandler;
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0), expectProtocolHeader(false)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp)
+ const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
+ connection(&output, cluster.getBroker(), wrappedId, isLink), readCredit(0),
+ expectProtocolHeader(isLink)
{ init(); }
void Connection::init() {
@@ -213,7 +214,25 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
else { // Multicast local connections.
assert(isLocal());
- cluster.getMulticast().mcastBuffer(buffer, size, self);
+ const char* remainingData = buffer;
+ size_t remainingSize = size;
+ if (expectProtocolHeader) {
+ //If this is an outgoing link, we will receive a protocol
+ //header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*>(buffer), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+ expectProtocolHeader = false;
+ remainingData = buffer + pi.encodedSize();
+ remainingSize = size - pi.encodedSize();
+ } else {
+ QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
+ return 0;
+ }
+ }
+ cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
}
return size;
}
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index ec46d62cc2..5d46b7e81d 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -64,7 +64,7 @@ class Connection :
typedef sys::PollableQueue<EventFrame> EventFrameQueue;
/** Local connection, use this in ConnectionId */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp);
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
/** Shadow connection */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
~Connection();
@@ -172,6 +172,7 @@ class Connection :
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
int readCredit;
+ bool expectProtocolHeader;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 44e40f0591..28d2750ff9 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -38,21 +38,22 @@ using namespace framing;
sys::ConnectionCodec*
ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
if (v == ProtocolVersion(0, 10))
- return new ConnectionCodec(out, id, cluster, false);
+ return new ConnectionCodec(out, id, cluster, false, false);
else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
- return new ConnectionCodec(out, id, cluster, true); // Catch-up connection
+ return new ConnectionCodec(out, id, cluster, true, false); // Catch-up connection
return 0;
}
// Used for outgoing Link connections, we don't care.
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
- return next->create(out, id);
+ return new ConnectionCodec(out, id, cluster, false, true);
+ //return next->create(out, id);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp)
- : codec(out, id, false),
- interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp)),
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
+ : codec(out, id, isLink),
+ interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
id(interceptor->getId()),
localId(id)
{
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 86fac270fa..69c2b0c3c8 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec {
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp);
+ ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
~ConnectionCodec();
// ConnectionCodec functions.