summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/cluster.mk4
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp138
-rw-r--r--cpp/src/qpid/cluster/Cluster.h21
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp65
-rw-r--r--cpp/src/qpid/cluster/Connection.h20
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp13
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.h4
-rw-r--r--cpp/src/qpid/cluster/ConnectionDecoder.cpp57
-rw-r--r--cpp/src/qpid/cluster/ConnectionDecoder.h61
-rw-r--r--cpp/src/qpid/cluster/ConnectionMap.cpp33
-rw-r--r--cpp/src/qpid/cluster/ConnectionMap.h9
-rw-r--r--cpp/src/qpid/cluster/Decoder.cpp50
-rw-r--r--cpp/src/qpid/cluster/Decoder.h65
-rw-r--r--cpp/src/qpid/cluster/Event.cpp14
-rw-r--r--cpp/src/qpid/cluster/Event.h7
-rw-r--r--cpp/src/qpid/cluster/EventFrame.h4
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp15
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h1
-rw-r--r--cpp/src/qpid/cluster/types.h11
-rw-r--r--cpp/src/qpid/framing/FrameDecoder.cpp12
-rw-r--r--cpp/src/qpid/framing/FrameDecoder.h9
-rw-r--r--cpp/src/tests/ClusterFixture.cpp2
-rw-r--r--cpp/src/tests/FrameDecoder.cpp2
-rw-r--r--cpp/xml/cluster.xml1
24 files changed, 222 insertions, 396 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 0db9455136..5acabce694 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -53,10 +53,6 @@ cluster_la_SOURCES = \
qpid/cluster/ConnectionMap.cpp \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
- qpid/cluster/Decoder.cpp \
- qpid/cluster/Decoder.h \
- qpid/cluster/ConnectionDecoder.cpp \
- qpid/cluster/ConnectionDecoder.h \
qpid/cluster/Dispatchable.h \
qpid/cluster/UpdateClient.cpp \
qpid/cluster/UpdateClient.h \
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 312d1e90e3..bea336644f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -22,6 +22,7 @@
#include "UpdateClient.h"
#include "FailoverExchange.h"
+#include "qpid/assert.h"
#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
#include "qpid/broker/Broker.h"
@@ -91,7 +92,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
cpg(*this),
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
- myId(cpg.self()),
+ self(cpg.self()),
readMax(settings.readMax),
writeEstimate(settings.writeEstimate),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -104,8 +105,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
boost::bind(&Cluster::leave, this),
"Error delivering frames",
poller),
- decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections),
- expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())),
+ expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())),
frameId(0),
initialized(false),
state(INIT),
@@ -213,7 +213,7 @@ void Cluster::deliver(
framing::Buffer buf(static_cast<char*>(msg), msg_len);
Event e(Event::decodeCopy(from, buf));
e.setSequence(sequence++);
- if (from == myId) // Record self-deliveries for flow control.
+ if (from == self) // Record self-deliveries for flow control.
mcast.selfDeliver(e);
deliver(e);
}
@@ -227,42 +227,33 @@ void Cluster::deliver(const Event& e) {
// Handler for deliverEventQueue
void Cluster::deliveredEvent(const Event& e) {
QPID_LATENCY_RECORD("delivered event queue", e);
- Buffer buf(const_cast<char*>(e.getData()), e.getSize());
- if (e.getType() == CONTROL) {
- AMQFrame frame;
- while (frame.decode(buf)) {
- // Check for deliver close here so we can erase the
- // connection decoder safely in this thread.
- if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
- decoder.erase(e.getConnectionId());
- deliverFrameQueue.push(EventFrame(e, frame));
- }
+ Mutex::ScopedLock l(lock);
+ if (e.isCluster()) { // Cluster control, process in this thread.
+ AMQFrame frame(e.getFrame());
+ ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l);
+ if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
}
- else if (e.getType() == DATA)
- decoder.decode(e, e.getData());
+ else if (state >= CATCHUP) { // Connection frame, push onto deliver queue.
+ if (e.getType() == CONTROL)
+ connectionFrame(EventFrame(e, e.getFrame()));
+ else
+ connections.decode(e, e.getData());
+ }
+ else // connection frame && state < CATCHUP. Drop.
+ QPID_LOG(trace, *this << " DROP: " << e);
}
// Handler for deliverFrameQueue
void Cluster::deliveredFrame(const EventFrame& e) {
- Mutex::ScopedLock l(lock);
- const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock?
+ assert(!e.isCluster()); // Only connection frames on this queue.
QPID_LOG(trace, *this << " DLVR: " << e);
- QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- if (e.isCluster()) { // Cluster control frame
- ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l);
- if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
- else { // Connection frame.
- if (state <= UPDATEE) {
- QPID_LOG(trace, *this << " DROP: " << e);
- return;
- }
- boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
- if (connection) // Ignore frames to closed local connections.
- connection->deliveredFrame(e);
- }
- QPID_LATENCY_RECORD("processed", e.frame);
+ if (e.type == DATA) // Sequence number to identify data frames.
+ const_cast<AMQFrame&>(e.frame).setClusterId(frameId++);
+ boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId);
+ if (connection) // Ignore frames to closed local connections.
+ connection->deliveredFrame(e);
}
struct AddrList {
@@ -310,7 +301,7 @@ void Cluster::configChange (
std::string addresses;
for (cpg_address* p = current; p < current+nCurrent; ++p)
addresses.append(MemberId(*p).str());
- deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId));
+ deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self));
}
void Cluster::setReady(Lock&) {
@@ -323,7 +314,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
bool memberChange = map.configChange(addresses);
if (state == LEFT) return;
- if (!map.isAlive(myId)) { // Final config change.
+ if (!map.isAlive(self)) { // Final config change.
leave(l);
return;
}
@@ -332,16 +323,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
if (map.aliveCount() == 1) {
setClusterId(true);
setReady(l);
- map = ClusterMap(myId, myUrl, true);
+ map = ClusterMap(self, myUrl, true);
memberUpdate(l);
QPID_LOG(notice, *this << " first in cluster");
}
else { // Joining established group.
state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
elders = map.getAlive();
- elders.erase(myId);
+ elders.erase(self);
broker.getLinks().setPassive(true);
}
}
@@ -361,7 +352,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self);
}
}
@@ -388,17 +379,29 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
- if (state == CATCHUP && id == myId) {
+ if (state == CATCHUP && id == self) {
setReady(l);
QPID_LOG(notice, *this << " caught up, active cluster member");
}
}
+void Cluster::stall(Lock&) {
+ // Stop processing the deliveredEventQueue in order to send or
+ // recieve an update.
+ deliverEventQueue.stop();
+}
+
+void Cluster::unstall(Lock&) {
+ // Stop processing the deliveredEventQueue in order to send or
+ // recieve an update.
+ deliverEventQueue.start();
+}
+
void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
if (state == LEFT) return;
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
- if (updater == myId) {
+ if (updater == self) {
assert(state == OFFER);
if (url) { // My offer was first.
updateStart(updatee, *url, l);
@@ -409,29 +412,29 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
makeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
- else if (updatee == myId && url) {
+ else if (updatee == self && url) {
assert(state == JOINER);
setClusterId(uuid);
state = UPDATEE;
QPID_LOG(info, *this << " receiving update from " << updater);
- deliverFrameQueue.stop();
+ stall(l);
checkUpdateIn(l);
}
}
-void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
if (state == LEFT) return;
assert(state == OFFER);
state = UPDATER;
QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
- deliverFrameQueue.stop();
+ stall(l);
if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
client::ConnectionSettings cs;
cs.username = settings.username;
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(),
+ new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(),
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
@@ -445,13 +448,13 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& l) {
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
- mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
QPID_LOG(info, *this << " received update, starting catch-up");
- deliverFrameQueue.start();
+ unstall(l);
}
}
@@ -465,7 +468,7 @@ void Cluster::updateOutDone(Lock& l) {
assert(state == UPDATER);
state = READY;
mcast.release();
- deliverFrameQueue.start();
+ unstall(l);
makeOffer(map.firstJoiner(), l); // Try another offer
}
@@ -490,7 +493,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s
{
_qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
stringstream stream;
- stream << myId;
+ stream << self;
if (iargs.i_brokerId == stream.str())
stopClusterNode(l);
}
@@ -511,7 +514,7 @@ void Cluster::stopClusterNode(Lock& l) {
void Cluster::stopFullCluster(Lock& ) {
QPID_LOG(notice, *this << " shutting down cluster " << name);
- mcast.mcastControl(ClusterShutdownBody(), myId);
+ mcast.mcastControl(ClusterShutdownBody(), self);
}
void Cluster::memberUpdate(Lock& l) {
@@ -522,12 +525,12 @@ void Cluster::memberUpdate(Lock& l) {
failoverExchange->setUrls(urls);
if (size == 1 && lastSize > 1 && state >= CATCHUP) {
- QPID_LOG(info, *this << " last broker standing, update queue policies");
+ QPID_LOG(notice, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
}
else if (size > 1 && lastBroker) {
- QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
+ QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size);
lastBroker = false;
broker.getQueues().updateQueueClusterState(false);
}
@@ -549,17 +552,25 @@ void Cluster::memberUpdate(Lock& l) {
mgmtObject->set_memberIDs(idstr);
}
- // Close connections belonging to members that have now been excluded
- connections.update(myId, map);
+ // Generate a deliver-close control frame for connections
+ // belonging to defunct members, so they will be erased in the
+ // deliverFrameQueue thread.
+ ConnectionMap::Vector c = connections.values();
+ for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) {
+ ConnectionId cid = (*i)->getId();
+ MemberId mid = cid.getMember();
+ if (mid != self && !map.isMember(mid))
+ connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody())));
+ }
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
- return o << cluster.myId << "(" << STATE[cluster.state] << ")";
+ return o << cluster.self << "(" << STATE[cluster.state] << ")";
}
MemberId Cluster::getId() const {
- return myId; // Immutable, no need to lock.
+ return self; // Immutable, no need to lock.
}
broker::Broker& Cluster::getBroker() const {
@@ -578,7 +589,7 @@ void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
if (mgmtObject) {
stringstream stream;
- stream << myId;
+ stream << self;
mgmtObject->set_clusterID(clusterId.str());
mgmtObject->set_memberID(stream.str());
}
@@ -589,4 +600,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) {
expiryPolicy->deliverExpire(id);
}
+void Cluster::connectionFrame(const EventFrame& frame) {
+ // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition.
+ // Measure performance impact, restore with better locking.
+ // deliverFrameQueue.push(frame);
+ deliveredFrame(frame);
+}
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index ea472a9ecf..4d358cf495 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -30,7 +30,6 @@
#include "NoOpConnectionOutputHandler.h"
#include "PollerDispatch.h"
#include "Quorum.h"
-#include "Decoder.h"
#include "PollableQueue.h"
#include "ExpiryPolicy.h"
@@ -102,7 +101,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
size_t getWriteEstimate() { return writeEstimate; }
bool isLeader() const; // Called in deliver thread.
-
+
+ // Called by Connection in deliver event thread with decoded connection data frames.
+ void connectionFrame(const EventFrame&);
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -125,7 +127,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void brokerShutdown();
// Cluster controls implement XML methods from cluster.xml.
- // Called in deliver thread.
+ // Called in deliveredEvent thread.
//
void updateRequest(const MemberId&, const std::string&, Lock&);
void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
@@ -134,6 +136,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
+ // Used by cluster controls.
+ void stall(Lock&);
+ void unstall(Lock&);
+
// Handlers for pollable queues.
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);
@@ -141,6 +147,10 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Helper, called in deliver thread.
void updateStart(const MemberId& updatee, const Url& url, Lock&);
+ // Called in event deliver thread to check for update status.
+ bool isUpdateComplete(const EventFrame&);
+ bool isUpdateComplete();
+
void setReady(Lock&);
void deliver( // CPG deliver callback.
@@ -186,7 +196,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
Cpg cpg;
const std::string name;
Url myUrl;
- const MemberId myId;
+ const MemberId self;
const size_t readMax;
const size_t writeEstimate;
framing::Uuid clusterId;
@@ -201,9 +211,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
- // Used only in deliverdEvent thread
- Decoder decoder;
-
// Used only in deliveredFrame thread
ClusterMap::Set elders;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9c2b4f1638..0f71a91293 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -40,6 +40,7 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -58,19 +59,22 @@ using namespace framing;
NoOpConnectionOutputHandler Connection::discardHandler;
-// Shadow connections
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, ConnectionId myId)
- : cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+}
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+ : cluster(c), self(id), catchUp(false), output(*this, out),
+ connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
-// Local connections
+// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
- : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
+ const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
+ : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+ connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
@@ -149,12 +153,9 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- // FIXME aconway 2009-02-24: Using the DATA/CONTROL
- // distinction to distinguish incoming vs. outgoing frames is
- // very unclear.
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // outgoing data frame, send via SessionState
+ else { // frame control, send frame via SessionState
broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
@@ -200,12 +201,12 @@ void Connection::left() {
connection.closed();
}
-// Decode data from local clients.
+// ConnectoinCodec::decode receives read buffers from directly-connected clients.
size_t Connection::decode(const char* buffer, size_t size) {
if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
- received(localDecoder.frame);
+ received(localDecoder.getFrame());
}
else { // Multicast local connections.
assert(isLocal());
@@ -233,6 +234,29 @@ size_t Connection::decode(const char* buffer, size_t size) {
return size;
}
+// Decode a data event, a read buffer that has been delivered by the cluster.
+void Connection::decode(const EventHeader& eh, const void* data) {
+ assert(eh.getType() == DATA); // Only handle connection data events.
+ const char* cp = static_cast<const char*>(data);
+ Buffer buf(const_cast<char*>(cp), eh.getSize());
+ if (clusterDecoder.decode(buf)) { // Decoded a frame
+ AMQFrame frame(clusterDecoder.getFrame());
+ while (clusterDecoder.decode(buf)) {
+ cluster.connectionFrame(EventFrame(eh, frame));
+ frame = clusterDecoder.getFrame();
+ }
+ // Set read-credit on the last frame ending in this event.
+ // Credit will be given when this frame is processed.
+ cluster.connectionFrame(EventFrame(eh, frame, 1));
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not complete any frames so
+ // we give read credit directly.
+ giveReadCredit(1);
+ }
+}
+
broker::SessionState& Connection::sessionState() {
return *connection.getChannel(currentChannel).getSession();
}
@@ -267,11 +291,12 @@ void Connection::sessionState(
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
- ConnectionId shadow = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
- self = shadow;
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+ ConnectionId shadowId = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+ self = shadowId;
connection.setUserId(username);
+ clusterDecoder.setFragment(fragment.data(), fragment.size());
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
@@ -281,7 +306,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
}
bool Connection::isLocal() const {
- return self.first == cluster.getId() && self.second == this;
+ return self.first == cluster.getId() && self.second;
}
bool Connection::isShadow() const {
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index cefea00262..048008f2a5 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -64,10 +64,10 @@ class Connection :
public:
typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
- /** Local connection, use this in ConnectionId */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink);
- /** Shadow connection */
- Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
+ /** Local connection. */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
+ /** Shadow connection. */
+ Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id);
~Connection();
ConnectionId getId() const { return self; }
@@ -100,9 +100,12 @@ class Connection :
/** Called if the connectors member has left the cluster */
void left();
- // ConnectionCodec methods
+ // ConnectionCodec methods - called by IO layer with a read buffer.
size_t decode(const char* buffer, size_t size);
+ // Decode a data event, a read buffer that has been delivered by the cluster.
+ void decode(const EventHeader& eh, const void* data);
+
// Called for data delivered from the cluster.
void deliveredFrame(const EventFrame&);
@@ -118,7 +121,7 @@ class Connection :
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
@@ -149,7 +152,9 @@ class Connection :
void exchange(const std::string& encoded);
void giveReadCredit(int credit);
-
+
+ framing::FrameDecoder& getDecoder() { return clusterDecoder; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -174,6 +179,7 @@ class Connection :
WriteEstimate writeEstimate;
OutputInterceptor output;
framing::FrameDecoder localDecoder;
+ framing::FrameDecoder clusterDecoder;
broker::Connection connection;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 442ac1438f..1ddd64d3d6 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -46,16 +46,13 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, con
// Used for outgoing Link connections, we don't care.
sys::ConnectionCodec*
-ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
- return new ConnectionCodec(out, id, cluster, false, true);
- //return next->create(out, id);
+ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) {
+ return new ConnectionCodec(out, logId, cluster, false, true);
}
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink)
- : codec(out, id, isLink),
- interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)),
- id(interceptor->getId()),
- localId(id)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink)
+ : codec(out, logId, isLink),
+ interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink))
{
std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor));
codec.setInputHandler(ih);
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h
index 69c2b0c3c8..ea01b7abb9 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.h
+++ b/cpp/src/qpid/cluster/ConnectionCodec.h
@@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec {
sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id);
};
- ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink);
+ ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink);
~ConnectionCodec();
// ConnectionCodec functions.
@@ -71,8 +71,6 @@ class ConnectionCodec : public sys::ConnectionCodec {
private:
amqp_0_10::Connection codec;
boost::intrusive_ptr<cluster::Connection> interceptor;
- cluster::ConnectionId id;
- std::string localId;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/cpp/src/qpid/cluster/ConnectionDecoder.cpp
deleted file mode 100644
index 3c18cf751e..0000000000
--- a/cpp/src/qpid/cluster/ConnectionDecoder.cpp
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ConnectionDecoder.h"
-#include "EventFrame.h"
-#include "ConnectionMap.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-
-ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {}
-
-void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap& map) {
- assert(eh.getType() == DATA); // Only handle connection data events.
- const char* cp = static_cast<const char*>(data);
- Buffer buf(const_cast<char*>(cp), eh.getSize());
- if (decoder.decode(buf)) { // Decoded a frame
- AMQFrame frame(decoder.frame);
- while (decoder.decode(buf)) {
- handler(EventFrame(eh, frame));
- frame = decoder.frame;
- }
- // Set read-credit on the last frame ending in this event.
- // Credit will be given when this frame is processed.
- handler(EventFrame(eh, frame, 1));
- }
- else {
- // We must give 1 unit read credit per event.
- // This event does not complete any frames so
- // we give read credit directly.
- ConnectionPtr connection = map.getLocal(eh.getConnectionId());
- if (connection)
- connection->giveReadCredit(1);
- }
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionDecoder.h b/cpp/src/qpid/cluster/ConnectionDecoder.h
deleted file mode 100644
index 449387c1cc..0000000000
--- a/cpp/src/qpid/cluster/ConnectionDecoder.h
+++ /dev/null
@@ -1,61 +0,0 @@
-#ifndef QPID_CLUSTER_CONNECTIONDECODER_H
-#define QPID_CLUSTER_CONNECTIONDECODER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/FrameDecoder.h"
-#include <boost/function.hpp>
-
-namespace qpid {
-namespace cluster {
-
-class EventHeader;
-class EventFrame;
-class ConnectionMap;
-
-/**
- * Decodes delivered connection data Event's as EventFrame's for a
- * connection replica, local or shadow. Manages state for frame
- * fragments and flow control.
- *
- * THREAD UNSAFE: connection events are decoded in sequence.
- */
-class ConnectionDecoder
-{
- public:
- typedef boost::function<void(const EventFrame&)> Handler;
-
- ConnectionDecoder(const Handler& h);
-
- /** Takes EventHeader + data rather than Event so that the caller can
- * pass a pointer to connection data or a CPG buffer directly without copy.
- */
- void decode(const EventHeader& eh, const void* data, ConnectionMap& connections);
-
- private:
- Handler handler;
- framing::FrameDecoder decoder;
-};
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_CONNECTIONDECODER_H*/
diff --git a/cpp/src/qpid/cluster/ConnectionMap.cpp b/cpp/src/qpid/cluster/ConnectionMap.cpp
index 2c024b579d..d4b2aa6675 100644
--- a/cpp/src/qpid/cluster/ConnectionMap.cpp
+++ b/cpp/src/qpid/cluster/ConnectionMap.cpp
@@ -38,9 +38,9 @@ void ConnectionMap::insert(ConnectionPtr p) {
void ConnectionMap::erase(const ConnectionId& id) {
Lock l(lock);
- Map::iterator i = map.find(id);
- QPID_ASSERT(i != map.end());
- map.erase(i);
+ size_t erased = map.erase(id);
+ assert(erased);
+ (void)erased; // Avoid unused variable warnings.
}
ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
@@ -61,13 +61,6 @@ ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) {
return i->second;
}
-ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) {
- Lock l(lock);
- if (id.getMember() != cluster.getId()) return 0;
- Map::const_iterator i = map.find(id);
- return i == map.end() ? 0 : i->second;
-}
-
ConnectionMap::Vector ConnectionMap::values() const {
Lock l(lock);
Vector result(map.size());
@@ -76,22 +69,16 @@ ConnectionMap::Vector ConnectionMap::values() const {
return result;
}
-void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) {
- Lock l(lock);
- for (Map::iterator i = map.begin(); i != map.end(); ) {
- MemberId member = i->first.getMember();
- if (member != myId && !cluster.isMember(member)) {
- i->second->left();
- map.erase(i++);
- } else {
- i++;
- }
- }
-}
-
void ConnectionMap::clear() {
Lock l(lock);
map.clear();
}
+void ConnectionMap::decode(const EventHeader& eh, const void* data) {
+ ConnectionPtr connection = get(eh.getConnectionId());
+ if (connection)
+ connection->decode(eh, data);
+}
+
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h
index c5ba18af0c..b449f329b1 100644
--- a/cpp/src/qpid/cluster/ConnectionMap.h
+++ b/cpp/src/qpid/cluster/ConnectionMap.h
@@ -60,18 +60,13 @@ class ConnectionMap {
*/
ConnectionPtr get(const ConnectionId& id);
- /** If ID is a local connection and in the map return it, else return 0 */
- ConnectionPtr getLocal(const ConnectionId& id);
-
/** Get connections for sending an update. */
Vector values() const;
- /** Remove connections who's members are no longer in the cluster. Deliver thread. */
- void update(MemberId myId, const ClusterMap& cluster);
+ /** Decode a connection data event. */
+ void decode(const EventHeader& eh, const void* data);
-
void clear();
-
size_t size() const;
private:
diff --git a/cpp/src/qpid/cluster/Decoder.cpp b/cpp/src/qpid/cluster/Decoder.cpp
deleted file mode 100644
index 1ba36bb521..0000000000
--- a/cpp/src/qpid/cluster/Decoder.cpp
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "Decoder.h"
-#include "Event.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/ptr_map.h"
-
-namespace qpid {
-namespace cluster {
-
-using namespace framing;
-
-Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h), connections(cm) {}
-
-void Decoder::decode(const EventHeader& eh, const void* data) {
- ConnectionId id = eh.getConnectionId();
- Map::iterator i = map.find(id);
- if (i == map.end()) {
- std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler));
- i = ib.first;
- }
- ptr_map_ptr(i)->decode(eh, data, connections);
-}
-
-void Decoder::erase(const ConnectionId& c) {
- Map::iterator i = map.find(c);
- if (i != map.end())
- map.erase(i);
-}
-
-}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h
deleted file mode 100644
index 50f6afa491..0000000000
--- a/cpp/src/qpid/cluster/Decoder.h
+++ /dev/null
@@ -1,65 +0,0 @@
-#ifndef QPID_CLUSTER_DECODER_H
-#define QPID_CLUSTER_DECODER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ConnectionDecoder.h"
-#include "types.h"
-#include <boost/ptr_container/ptr_map.hpp>
-
-namespace qpid {
-namespace cluster {
-
-class EventHeader;
-class ConnectionMap;
-
-/**
- * Holds a map of ConnectionDecoders. Decodes Events into EventFrames
- * and forwards EventFrames to a handler.
- *
- * THREAD UNSAFE: Called sequentially with un-decoded cluster events from CPG.
- */
-class Decoder
-{
- public:
- typedef boost::function<void(const EventFrame&)> Handler;
-
- Decoder(const Handler& h, ConnectionMap&);
-
- /** Takes EventHeader + data rather than Event so that the caller can
- * pass a pointer to connection data or a CPG buffer directly without copy.
- */
- void decode(const EventHeader& eh, const void* data);
-
- /** Erase the decoder for a connection. */
- void erase(const ConnectionId&);
-
- private:
- typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map;
- Handler handler;
- Map map;
- ConnectionMap& connections;
-};
-
-}} // namespace qpid::cluster
-
-#endif /*!QPID_CLUSTER_DECODER_H*/
diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp
index 9fe5376bc5..749fbf240f 100644
--- a/cpp/src/qpid/cluster/Event.cpp
+++ b/cpp/src/qpid/cluster/Event.cpp
@@ -23,6 +23,7 @@
#include "Cpg.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/assert.h"
#include <ostream>
#include <iterator>
#include <algorithm>
@@ -31,6 +32,7 @@ namespace qpid {
namespace cluster {
using framing::Buffer;
+using framing::AMQFrame;
const size_t EventHeader::HEADER_SIZE =
sizeof(uint8_t) + // type
@@ -57,7 +59,7 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) {
type = (EventType)buf.getOctet();
if(type != DATA && type != CONTROL)
throw Exception("Invalid multicast event type");
- connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong()));
+ connectionId = ConnectionId(m, buf.getLongLong());
size = buf.getLong();
#ifdef QPID_LATENCY_METRIC
latency_metric_timestamp = buf.getLongLong();
@@ -93,7 +95,7 @@ iovec Event::toIovec() {
void EventHeader::encode(Buffer& b) const {
b.putOctet(type);
- b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer()));
+ b.putLongLong(connectionId.getNumber());
b.putLong(size);
#ifdef QPID_LATENCY_METRIC
b.putLongLong(latency_metric_timestamp);
@@ -111,6 +113,14 @@ Event::operator Buffer() const {
return Buffer(const_cast<char*>(getData()), getSize());
}
+AMQFrame Event::getFrame() const {
+ assert(type == CONTROL);
+ Buffer buf(*this);
+ AMQFrame frame;
+ QPID_ASSERT(frame.decode(buf));
+ return frame;
+}
+
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
std::ostream& operator << (std::ostream& o, EventType t) {
diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h
index 1338ea7413..c9f44725df 100644
--- a/cpp/src/qpid/cluster/Event.h
+++ b/cpp/src/qpid/cluster/Event.h
@@ -24,6 +24,7 @@
#include "types.h"
#include "qpid/RefCountedBuffer.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/sys/LatencyMetric.h"
#include <sys/uio.h> // For iovec
#include <iosfwd>
@@ -59,8 +60,8 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
uint64_t getSequence() const { return sequence; }
void setSequence(uint64_t n) { sequence = n; }
- bool isCluster() const { return connectionId.getPointer() == 0; }
- bool isConnection() const { return connectionId.getPointer() != 0; }
+ bool isCluster() const { return connectionId.getNumber() == 0; }
+ bool isConnection() const { return connectionId.getNumber() != 0; }
protected:
static const size_t HEADER_SIZE;
@@ -97,6 +98,8 @@ class Event : public EventHeader {
// Store including header
char* getStore() { return store; }
const char* getStore() const { return store; }
+
+ framing::AMQFrame getFrame() const;
operator framing::Buffer() const;
diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h
index ef3c38658b..abeea3ef16 100644
--- a/cpp/src/qpid/cluster/EventFrame.h
+++ b/cpp/src/qpid/cluster/EventFrame.h
@@ -42,8 +42,8 @@ struct EventFrame
EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0);
- bool isCluster() const { return !connectionId.getPointer(); }
- bool isConnection() const { return connectionId.getPointer(); }
+ bool isCluster() const { return connectionId.getNumber() == 0; }
+ bool isConnection() const { return connectionId.getNumber() != 0; }
bool isLastInEvent() const { return readCredit; }
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 18746ccb7e..4a4af4adbd 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -95,7 +95,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
frameId(frameId_), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
- done(ok), failed(fail)
+ done(ok), failed(fail), connectionSettings(cs)
{
connection.open(url, cs);
session = connection.newSession("update_shared");
@@ -228,13 +228,15 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda
shadowConnection = catchUpConnection();
broker::Connection& bc = updateConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
- shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+ connectionSettings.maxFrameSize = bc.getFrameMax();
+ shadowConnection.open(updateeUrl, connectionSettings);
bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
+ std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment();
ClusterConnectionProxy(shadowConnection).shadowReady(
updateConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()),
- updateConnection->getBrokerConnection().getUserId()
+ updateConnection->getId().getNumber(),
+ bc.getUserId(),
+ string(fragment.first, fragment.second)
);
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
@@ -285,9 +287,6 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) {
if (inProgress) {
inProgress->getFrames().map(simpl->out);
}
-
- // FIXME aconway 2008-09-23: update session replay list.
-
QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 23f647c820..08267392f4 100644
--- a/cpp/src/qpid/cluster/UpdateClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -98,6 +98,7 @@ class UpdateClient : public sys::Runnable {
client::AsyncSession session, shadowSession;
boost::function<void()> done;
boost::function<void(const std::exception& e)> failed;
+ client::ConnectionSettings connectionSettings;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h
index 30454d9fbb..c19152e4d8 100644
--- a/cpp/src/qpid/cluster/types.h
+++ b/cpp/src/qpid/cluster/types.h
@@ -68,17 +68,16 @@ inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id
std::ostream& operator<<(std::ostream&, const MemberId&);
-struct ConnectionId : public std::pair<MemberId, Connection*> {
- ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {}
- ConnectionId(uint64_t m, uint64_t c)
- : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {}
+struct ConnectionId : public std::pair<MemberId, uint64_t> {
+ ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {}
+ ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {}
MemberId getMember() const { return first; }
- Connection* getPointer() const { return second; }
+ uint64_t getNumber() const { return second; }
};
std::ostream& operator<<(std::ostream&, const ConnectionId&);
-std::ostream& operator << (std::ostream&, EventType);
+std::ostream& operator<<(std::ostream&, EventType);
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/framing/FrameDecoder.cpp b/cpp/src/qpid/framing/FrameDecoder.cpp
index cbdac181e9..6f0ae9756f 100644
--- a/cpp/src/qpid/framing/FrameDecoder.cpp
+++ b/cpp/src/qpid/framing/FrameDecoder.cpp
@@ -21,8 +21,9 @@
#include "FrameDecoder.h"
#include "Buffer.h"
#include "qpid/log/Statement.h"
-#include <algorithm>
#include "qpid/framing/reply_exceptions.h"
+#include <algorithm>
+#include <string.h>
namespace qpid {
namespace framing {
@@ -67,4 +68,13 @@ bool FrameDecoder::decode(Buffer& buffer) {
return false;
}
+void FrameDecoder::setFragment(const char* data, size_t size) {
+ fragment.resize(size);
+ ::memcpy(fragment.data(), data, size);
+}
+
+std::pair<const char*, size_t> FrameDecoder::getFragment() const {
+ return std::pair<const char*, size_t>(fragment.data(), fragment.size());
+}
+
}} // namespace qpid::framing
diff --git a/cpp/src/qpid/framing/FrameDecoder.h b/cpp/src/qpid/framing/FrameDecoder.h
index 7f974dadc3..961cc666a9 100644
--- a/cpp/src/qpid/framing/FrameDecoder.h
+++ b/cpp/src/qpid/framing/FrameDecoder.h
@@ -35,9 +35,16 @@ class FrameDecoder
{
public:
bool decode(Buffer& buffer);
- AMQFrame frame;
+ const AMQFrame& getFrame() const { return frame; }
+ AMQFrame& getFrame() { return frame; }
+
+ void setFragment(const char*, size_t);
+ std::pair<const char*, size_t> getFragment() const;
+
private:
std::vector<char> fragment;
+ AMQFrame frame;
+
};
}} // namespace qpid::framing
diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp
index 3a0ea74098..5658957b48 100644
--- a/cpp/src/tests/ClusterFixture.cpp
+++ b/cpp/src/tests/ClusterFixture.cpp
@@ -109,7 +109,7 @@ void ClusterFixture::addLocal() {
Args args(makeArgs(prefix));
vector<const char*> argv(args.size());
transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
- qpid::log::Logger::instance().setPrefix(os.str());
+ qpid::log::Logger::instance().setPrefix(prefix);
localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0])));
push_back(localBroker->getPort());
forkedBrokers.push_back(shared_ptr<ForkedBroker>());
diff --git a/cpp/src/tests/FrameDecoder.cpp b/cpp/src/tests/FrameDecoder.cpp
index b7f1ea1b89..f5db66d5fe 100644
--- a/cpp/src/tests/FrameDecoder.cpp
+++ b/cpp/src/tests/FrameDecoder.cpp
@@ -65,7 +65,7 @@ QPID_AUTO_TEST_CASE(testByteFragments) {
}
Buffer buf(&encoded[encoded.size()-1], 1);
BOOST_CHECK(decoder.decode(buf));
- BOOST_CHECK_EQUAL(data, getData(decoder.frame));
+ BOOST_CHECK_EQUAL(data, getData(decoder.getFrame()));
}
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 2cf4e915b6..d3e4b488fb 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -125,6 +125,7 @@
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
<field name="user-name" type="str8"/>
+ <field name="fragment" type="str32"/>
</control>
<!-- Complete a cluster state update. -->