summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-02-05 23:02:45 +0000
committerAlan Conway <aconway@apache.org>2010-02-05 23:02:45 +0000
commitda00c6a28b4df6e2618cff17f952012f7d76c10c (patch)
treefb96aa96f454ee64a828d09da017dc55c1eb5cef /qpid
parent2b86496e7e980834464c35f49cbf7337815aeb4c (diff)
downloadqpid-python-da00c6a28b4df6e2618cff17f952012f7d76c10c.tar.gz
Consistent connection names across a cluster.
- use the same host:port for connections and their shadows. - add shadow property to managment connection to identify shadows. - updated qpid-stat and qpid-cluster to filter on shadow property. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@907123 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h4
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp52
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h27
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp12
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateReceiver.h3
-rw-r--r--qpid/cpp/xml/cluster.xml11
-rwxr-xr-xqpid/python/commands/qpid-cluster3
-rwxr-xr-xqpid/python/commands/qpid-stat3
-rw-r--r--qpid/specs/management-schema.xml1
11 files changed, 83 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/Connection.cpp
index c3388a4ab1..532666ad76 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/Connection.cpp
@@ -72,7 +72,7 @@ struct ConnectionTimeoutTask : public sys::TimerTask {
}
};
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId) :
+Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_, unsigned int ssf, bool isLink_, uint64_t objectId, bool shadow_) :
ConnectionState(out_, broker_),
ssf(ssf),
adapter(*this, isLink_),
@@ -84,7 +84,7 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
agent(0),
timer(broker_.getTimer()),
errorListener(0),
- shadow(false)
+ shadow(shadow_)
{
Manageable* parent = broker.GetVhostObject();
@@ -95,10 +95,10 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
{
agent = broker_.getManagementAgent();
-
// TODO set last bool true if system connection
if (agent != 0) {
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
+ mgmtObject->set_shadow(shadow);
agent->addObject(mgmtObject, objectId, true);
}
ConnectionState::setUrl(mgmtId);
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 00778aea00..d49d9f4d75 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -79,7 +79,7 @@ class Connection : public sys::ConnectionInputHandler,
};
Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId, unsigned int ssf,
- bool isLink = false, uint64_t objectId = 0);
+ bool isLink = false, uint64_t objectId = 0, bool shadow=false);
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
@@ -132,8 +132,6 @@ class Connection : public sys::ConnectionInputHandler,
/** True if this is a shadow connection in a cluster. */
bool isShadow() { return shadow; }
- /** Called by cluster to mark shadow connections */
- void setShadow() { shadow = true; }
// Used by cluster to update connection status
sys::AggregateOutput& getOutputTasks() { return outputTasks; }
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index eb6428d394..e718819f48 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -509,10 +509,8 @@ ConnectionPtr Cluster::getConnection(const EventFrame& e, Lock&) {
assert(cp);
}
else { // New remote connection, create a shadow.
- std::ostringstream mgmtId;
unsigned int ssf = (announce && announce->hasSsf()) ? announce->getSsf() : 0;
- mgmtId << id;
- cp = new Connection(*this, shadowOut, mgmtId.str(), id, ssf);
+ cp = new Connection(*this, shadowOut, announce->getManagementId(), id, ssf);
}
connections.insert(ConnectionMap::value_type(id, cp));
}
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 3ce2b3f376..a375a65851 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -23,6 +23,7 @@
#include "Cluster.h"
#include "UpdateReceiver.h"
+#include "qpid/assert.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/TxBuffer.h"
@@ -74,28 +75,30 @@ const std::string shadowPrefix("[shadow]");
// Shadow connection
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId,
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
+ const std::string& mgmtId,
const ConnectionId& id, unsigned int ssf)
: cluster(c), self(id), catchUp(false), output(*this, out),
- connectionCtor(&output, cluster.getBroker(), shadowPrefix+logId, ssf),
+ connectionCtor(&output, cluster.getBroker(), mgmtId, ssf, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
- consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+ updateIn(c.getUpdateReceiver())
{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& logId, MemberId member,
+ const std::string& mgmtId, MemberId member,
bool isCatchUp, bool isLink, unsigned int ssf
) : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
connectionCtor(&output, cluster.getBroker(),
- isCatchUp ? shadowPrefix+logId : logId,
+ mgmtId,
ssf,
isLink,
- isCatchUp ? ++catchUpId : 0),
+ isCatchUp ? ++catchUpId : 0,
+ isCatchUp), // isCatchUp => shadow
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
- consumerNumbering(c.getUpdateReceiver().consumerNumbering)
+ updateIn(c.getUpdateReceiver())
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -104,12 +107,14 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax);
cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(ProtocolVersion(), getSsf()), getId());
+ ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId, getSsf()), getId());
}
else {
- // Catch-up connections initialized immediately.
+ // Catch-up shadow connections initialized using nextShadow id.
assert(catchUp);
QPID_LOG(info, "new catch-up connection " << *this);
+ connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
+ updateIn.nextShadowMgmtId.clear();
init();
}
}
@@ -127,7 +132,6 @@ void Connection::init() {
connection->setClusterOrderOutput(nullFrameHandler);
// Disable client throttling, done by active node.
connection->setClientThrottling(false);
- connection->setShadow(); // Mark the connection as a shadow.
}
if (!isCatchUp())
connection->setErrorListener(this);
@@ -138,8 +142,9 @@ void Connection::giveReadCredit(int credit) {
output.giveReadCredit(credit);
}
-void Connection::announce(uint32_t ssf) {
- assert(ssf == connectionCtor.ssf);
+void Connection::announce(const std::string& mgmtId, uint32_t ssf) {
+ QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
+ QPID_ASSERT(ssf == connectionCtor.ssf);
init();
}
@@ -296,13 +301,17 @@ broker::SemanticState& Connection::semanticState() {
return sessionState().getSemanticState();
}
+void Connection::shadowPrepare(const std::string& mgmtId) {
+ updateIn.nextShadowMgmtId = mgmtId;
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position)
{
broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.position = position;
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
- consumerNumbering.add(c.shared_from_this());
+ updateIn.consumerNumbering.add(c.shared_from_this());
}
@@ -337,10 +346,15 @@ void Connection::outputTask(uint16_t channel, const std::string& name) {
OutputTask* task = &session->getSemanticState().find(name);
connection->getOutputTasks().addOutputTask(task);
}
-
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
+
+void Connection::shadowReady(
+ uint64_t memberId, uint64_t connectionId, const string& mgmtId,
+ const string& username, const string& fragment, uint32_t sendMax)
+{
+ QPID_ASSERT(mgmtId == getBrokerConnection().getMgmtId());
ConnectionId shadowId = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this
+ << " becomes shadow " << shadowId);
self = shadowId;
connection->setUserId(username);
// OK to use decoder here because cluster is stalled for update.
@@ -355,7 +369,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
{
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
cluster.updateInDone(ClusterMap(joiners, members, frameSeq, configSeq));
- consumerNumbering.clear();
+ updateIn.consumerNumbering.clear();
self.second = 0; // Mark this as completed update connection.
}
@@ -503,9 +517,9 @@ void Connection::connectionError(const std::string& msg) {
}
void Connection::addQueueListener(const std::string& q, uint32_t listener) {
- if (listener >= consumerNumbering.size())
+ if (listener >= updateIn.consumerNumbering.size())
throw Exception(QPID_MSG("Invalid listener ID: " << listener));
- findQueue(q)->getListeners().addListener(consumerNumbering[listener]);
+ findQueue(q)->getListeners().addListener(updateIn.consumerNumbering[listener]);
}
void Connection::managementSchema(const std::string& data) {
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index a2f96782f7..85fad72948 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -65,10 +65,10 @@ class Connection :
public:
/** Local connection. */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink,
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, MemberId, bool catchUp, bool isLink,
unsigned int ssf);
/** Shadow connection. */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id,
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& mgmtId, const ConnectionId& id,
unsigned int ssf);
~Connection();
@@ -109,6 +109,8 @@ class Connection :
// ==== Used in catch-up mode to build initial state.
//
// State update methods.
+ void shadowPrepare(const std::string&);
+
void sessionState(const framing::SequenceNumber& replayStart,
const framing::SequenceNumber& sendCommandPoint,
const framing::SequenceSet& sentIncomplete,
@@ -119,7 +121,12 @@ class Connection :
void outputTask(uint16_t channel, const std::string& name);
- void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
+ void shadowReady(uint64_t memberId,
+ uint64_t connectionId,
+ const std::string& managementId,
+ const std::string& username,
+ const std::string& fragment,
+ uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&,
const framing::SequenceNumber& frameSeq,
@@ -156,7 +163,7 @@ class Connection :
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
- void announce(uint32_t ssf);
+ void announce(const std::string& mgmtId, uint32_t ssf);
void abort();
void deliverClose();
@@ -182,6 +189,7 @@ class Connection :
unsigned int ssf;
bool isLink;
uint64_t objectId;
+ bool shadow;
ConnectionCtor(
sys::ConnectionOutputHandler* out_,
@@ -189,12 +197,15 @@ class Connection :
const std::string& mgmtId_,
unsigned int ssf_,
bool isLink_=false,
- uint64_t objectId_=0
- ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_), isLink(isLink_), objectId(objectId_) {}
+ uint64_t objectId_=0,
+ bool shadow_=false
+ ) : out(out_), broker(broker_), mgmtId(mgmtId_), ssf(ssf_),
+ isLink(isLink_), objectId(objectId_), shadow(shadow_)
+ {}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
- new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId));
+ new broker::Connection(out, broker, mgmtId, ssf, isLink, objectId, shadow));
}
};
@@ -225,7 +236,7 @@ class Connection :
boost::shared_ptr<broker::TxBuffer> txBuffer;
bool expectProtocolHeader;
McastFrameHandler mcastFrameHandler;
- UpdateReceiver::ConsumerNumbering& consumerNumbering;
+ UpdateReceiver& updateIn;
static qpid::sys::AtomicValue<uint64_t> catchUpId;
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index 36efdfba65..17d856b79c 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -57,6 +57,7 @@
#include <boost/bind.hpp>
#include <boost/cast.hpp>
#include <algorithm>
+#include <sstream>
namespace qpid {
namespace cluster {
@@ -148,7 +149,7 @@ void UpdateClient::update() {
ClusterConnectionProxy(session).expiryId(expiry.getId());
updateManagementAgent();
-
+
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
AMQFrame frame(membership);
@@ -328,6 +329,14 @@ void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
+
+ // Send the management ID first on the main connection.
+ std::string mgmtId = updateConnection->getBrokerConnection().getMgmtId();
+ ClusterConnectionProxy(session).shadowPrepare(mgmtId);
+ // Make sure its received before opening shadow connection
+ session.sync();
+
+ // Open shadow connection and update it.
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
@@ -341,6 +350,7 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
+ bc.getMgmtId(),
bc.getUserId(),
string(fragment.first, fragment.second),
updateConnection->getOutput().getSendMax()
diff --git a/qpid/cpp/src/qpid/cluster/UpdateReceiver.h b/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
index cc1ce0da8d..7e8ce47662 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
@@ -36,6 +36,9 @@ class UpdateReceiver {
/** Numbering used to identify Queue listeners as consumers */
typedef Numbering<boost::shared_ptr<broker::SemanticState::ConsumerImpl> > ConsumerNumbering;
ConsumerNumbering consumerNumbering;
+
+ /** Management-id for the next shadow connection */
+ std::string nextShadowMgmtId;
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index a879d5137b..44f055ea32 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -117,6 +117,7 @@
<!-- Announce a new connection -->
<control name="announce" code="0x1">
+ <field name="management-id" type="str16"/>
<!-- Security Strength Factor (ssf): if the transport provides
encryption (e.g. ssl), ssf is the bit length of the key. Zero if no
encryption provided. -->
@@ -135,13 +136,18 @@
<control name="abort" code="0x4"/>
<!-- Update controls. Sent to a new broker in joining mode.
- A connection is updateed as followed:
- - open as a normal connection.
+ A connection is updated as followed:
+ - send the shadow's management ID in shadow-perpare on the update connection
+ - open the shadow as a normal connection.
- attach sessions, create consumers, set flow with normal AMQP cokmmands.
- send /reset additional session state with controls below.
- send shadow-ready to mark end of shadow update.
- send membership when entire update is complete.
-->
+ <!-- Prepare to send a shadow connection with the given ID. -->
+ <control name="shadow-prepare" code="0x0F">
+ <field name="management-id" type="str16"/>
+ </control>
<!-- Consumer state that cannot be set by standard AMQP controls. -->
<control name="consumer-state" code="0x10">
@@ -202,6 +208,7 @@
<control name="shadow-ready" code="0x20" label="End of shadow connection update.">
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
+ <field name="management-id" type="str16"/>
<field name="user-name" type="str8"/>
<field name="fragment" type="str32"/>
<field name="send-max" type="uint32"/>
diff --git a/qpid/python/commands/qpid-cluster b/qpid/python/commands/qpid-cluster
index 7afb7671b8..6d64765184 100755
--- a/qpid/python/commands/qpid-cluster
+++ b/qpid/python/commands/qpid-cluster
@@ -193,7 +193,6 @@ class BrokerManager:
self.qmf.delBroker(self.broker)
self.broker = None
self.brokers = []
- pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
idx = 0
for host in hostList:
@@ -209,7 +208,7 @@ class BrokerManager:
print "Clients on Member: ID=%s:" % displayList[idx]
connList = self.qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
for conn in connList:
- if pattern.match(conn.address):
+ if not conn.shadow:
if self.config._numeric or self.config._delConn:
a = conn.address
else:
diff --git a/qpid/python/commands/qpid-stat b/qpid/python/commands/qpid-stat
index 29deeb2342..c6fc5ef0da 100755
--- a/qpid/python/commands/qpid-stat
+++ b/qpid/python/commands/qpid-stat
@@ -34,7 +34,6 @@ _types = ""
_limit = 50
_increasing = False
_sortcol = None
-pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
def Usage ():
print "Usage: qpid-stat [OPTIONS] [broker-addr]"
@@ -108,7 +107,7 @@ class Broker(object):
list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
for conn in list:
- if pattern.match(conn.address):
+ if not conn.shadow:
self.connections[conn.getObjectId()] = conn
list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
diff --git a/qpid/specs/management-schema.xml b/qpid/specs/management-schema.xml
index f8be051c62..b9e895c4b4 100644
--- a/qpid/specs/management-schema.xml
+++ b/qpid/specs/management-schema.xml
@@ -236,6 +236,7 @@
<property name="remoteProcessName" type="sstr" access="RO" optional="y" desc="Name of executable running as remote client"/>
<property name="remotePid" type="uint32" access="RO" optional="y" desc="Process ID of remote client"/>
<property name="remoteParentPid" type="uint32" access="RO" optional="y" desc="Parent Process ID of remote client"/>
+ <property name="shadow" type="bool" access="RO" desc="True for shadow connections"/>
<statistic name="closing" type="bool" desc="This client is closing by management request"/>
<statistic name="framesFromClient" type="count64"/>
<statistic name="framesToClient" type="count64"/>