diff options
Diffstat (limited to 'cpp')
24 files changed, 222 insertions, 396 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 0db9455136..5acabce694 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -53,10 +53,6 @@ cluster_la_SOURCES = \ qpid/cluster/ConnectionMap.cpp \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ - qpid/cluster/Decoder.cpp \ - qpid/cluster/Decoder.h \ - qpid/cluster/ConnectionDecoder.cpp \ - qpid/cluster/ConnectionDecoder.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 312d1e90e3..bea336644f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -22,6 +22,7 @@ #include "UpdateClient.h" #include "FailoverExchange.h" +#include "qpid/assert.h" #include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h" #include "qmf/org/apache/qpid/cluster/Package.h" #include "qpid/broker/Broker.h" @@ -91,7 +92,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : cpg(*this), name(settings.name), myUrl(settings.url.empty() ? Url() : Url(settings.url)), - myId(cpg.self()), + self(cpg.self()), readMax(settings.readMax), writeEstimate(settings.writeEstimate), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), @@ -104,8 +105,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : boost::bind(&Cluster::leave, this), "Error delivering frames", poller), - decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1), connections), - expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, myId, broker.getTimer())), + expiryPolicy(new ExpiryPolicy(boost::bind(&Cluster::isLeader, this), mcast, self, broker.getTimer())), frameId(0), initialized(false), state(INIT), @@ -213,7 +213,7 @@ void Cluster::deliver( framing::Buffer buf(static_cast<char*>(msg), msg_len); Event e(Event::decodeCopy(from, buf)); e.setSequence(sequence++); - if (from == myId) // Record self-deliveries for flow control. + if (from == self) // Record self-deliveries for flow control. mcast.selfDeliver(e); deliver(e); } @@ -227,42 +227,33 @@ void Cluster::deliver(const Event& e) { // Handler for deliverEventQueue void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); - Buffer buf(const_cast<char*>(e.getData()), e.getSize()); - if (e.getType() == CONTROL) { - AMQFrame frame; - while (frame.decode(buf)) { - // Check for deliver close here so we can erase the - // connection decoder safely in this thread. - if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>()) - decoder.erase(e.getConnectionId()); - deliverFrameQueue.push(EventFrame(e, frame)); - } + Mutex::ScopedLock l(lock); + if (e.isCluster()) { // Cluster control, process in this thread. + AMQFrame frame(e.getFrame()); + ClusterDispatcher dispatch(*this, e.getConnectionId().getMember(), l); + if (!framing::invoke(dispatch, *frame.getBody()).wasHandled()) + throw Exception(QPID_MSG("Invalid cluster control")); } - else if (e.getType() == DATA) - decoder.decode(e, e.getData()); + else if (state >= CATCHUP) { // Connection frame, push onto deliver queue. + if (e.getType() == CONTROL) + connectionFrame(EventFrame(e, e.getFrame())); + else + connections.decode(e, e.getData()); + } + else // connection frame && state < CATCHUP. Drop. + QPID_LOG(trace, *this << " DROP: " << e); } // Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { - Mutex::ScopedLock l(lock); - const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + Mutex::ScopedLock l(lock); // TODO aconway 2009-03-02: don't need this lock? + assert(!e.isCluster()); // Only connection frames on this queue. QPID_LOG(trace, *this << " DLVR: " << e); - QPID_LATENCY_RECORD("delivered frame queue", e.frame); - if (e.isCluster()) { // Cluster control frame - ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); - if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) - throw Exception(QPID_MSG("Invalid cluster control")); - } - else { // Connection frame. - if (state <= UPDATEE) { - QPID_LOG(trace, *this << " DROP: " << e); - return; - } - boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); - if (connection) // Ignore frames to closed local connections. - connection->deliveredFrame(e); - } - QPID_LATENCY_RECORD("processed", e.frame); + if (e.type == DATA) // Sequence number to identify data frames. + const_cast<AMQFrame&>(e.frame).setClusterId(frameId++); + boost::intrusive_ptr<Connection> connection = connections.get(e.connectionId); + if (connection) // Ignore frames to closed local connections. + connection->deliveredFrame(e); } struct AddrList { @@ -310,7 +301,7 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId)); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), self)); } void Cluster::setReady(Lock&) { @@ -323,7 +314,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& bool memberChange = map.configChange(addresses); if (state == LEFT) return; - if (!map.isAlive(myId)) { // Final config change. + if (!map.isAlive(self)) { // Final config change. leave(l); return; } @@ -332,16 +323,16 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (map.aliveCount() == 1) { setClusterId(true); setReady(l); - map = ClusterMap(myId, myUrl, true); + map = ClusterMap(self, myUrl, true); memberUpdate(l); QPID_LOG(notice, *this << " first in cluster"); } else { // Joining established group. state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self); elders = map.getAlive(); - elders.erase(myId); + elders.erase(self); broker.getLinks().setPassive(true); } } @@ -361,7 +352,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; QPID_LOG(info, *this << " send update-offer to " << id); - mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), self); } } @@ -388,17 +379,29 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); - if (state == CATCHUP && id == myId) { + if (state == CATCHUP && id == self) { setReady(l); QPID_LOG(notice, *this << " caught up, active cluster member"); } } +void Cluster::stall(Lock&) { + // Stop processing the deliveredEventQueue in order to send or + // recieve an update. + deliverEventQueue.stop(); +} + +void Cluster::unstall(Lock&) { + // Stop processing the deliveredEventQueue in order to send or + // recieve an update. + deliverEventQueue.start(); +} + void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; MemberId updatee(updateeInt); boost::optional<Url> url = map.updateOffer(updater, updatee); - if (updater == myId) { + if (updater == self) { assert(state == OFFER); if (url) { // My offer was first. updateStart(updatee, *url, l); @@ -409,29 +412,29 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu makeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (updatee == myId && url) { + else if (updatee == self && url) { assert(state == JOINER); setClusterId(uuid); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - deliverFrameQueue.stop(); + stall(l); checkUpdateIn(l); } } -void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { if (state == LEFT) return; assert(state == OFFER); state = UPDATER; QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - deliverFrameQueue.stop(); + stall(l); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. client::ConnectionSettings cs; cs.username = settings.username; cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(myId, updatee, url, broker, map, frameId, connections.values(), + new UpdateClient(self, updatee, url, broker, map, frameId, connections.values(), boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); @@ -445,13 +448,13 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) { checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock& ) { +void Cluster::checkUpdateIn(Lock& l) { if (state == UPDATEE && updatedMap) { map = *updatedMap; - mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; QPID_LOG(info, *this << " received update, starting catch-up"); - deliverFrameQueue.start(); + unstall(l); } } @@ -465,7 +468,7 @@ void Cluster::updateOutDone(Lock& l) { assert(state == UPDATER); state = READY; mcast.release(); - deliverFrameQueue.start(); + unstall(l); makeOffer(map.firstJoiner(), l); // Try another offer } @@ -490,7 +493,7 @@ Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, s { _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args; stringstream stream; - stream << myId; + stream << self; if (iargs.i_brokerId == stream.str()) stopClusterNode(l); } @@ -511,7 +514,7 @@ void Cluster::stopClusterNode(Lock& l) { void Cluster::stopFullCluster(Lock& ) { QPID_LOG(notice, *this << " shutting down cluster " << name); - mcast.mcastControl(ClusterShutdownBody(), myId); + mcast.mcastControl(ClusterShutdownBody(), self); } void Cluster::memberUpdate(Lock& l) { @@ -522,12 +525,12 @@ void Cluster::memberUpdate(Lock& l) { failoverExchange->setUrls(urls); if (size == 1 && lastSize > 1 && state >= CATCHUP) { - QPID_LOG(info, *this << " last broker standing, update queue policies"); + QPID_LOG(notice, *this << " last broker standing, update queue policies"); lastBroker = true; broker.getQueues().updateQueueClusterState(true); } else if (size > 1 && lastBroker) { - QPID_LOG(info, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); + QPID_LOG(notice, *this << " last broker standing joined by " << size-1 << " replicas, updating queue policies" << size); lastBroker = false; broker.getQueues().updateQueueClusterState(false); } @@ -549,17 +552,25 @@ void Cluster::memberUpdate(Lock& l) { mgmtObject->set_memberIDs(idstr); } - // Close connections belonging to members that have now been excluded - connections.update(myId, map); + // Generate a deliver-close control frame for connections + // belonging to defunct members, so they will be erased in the + // deliverFrameQueue thread. + ConnectionMap::Vector c = connections.values(); + for (ConnectionMap::Vector::iterator i = c.begin(); i != c.end(); ++i) { + ConnectionId cid = (*i)->getId(); + MemberId mid = cid.getMember(); + if (mid != self && !map.isMember(mid)) + connectionFrame(EventFrame(EventHeader(CONTROL, cid), AMQFrame(ClusterConnectionDeliverCloseBody()))); + } } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; - return o << cluster.myId << "(" << STATE[cluster.state] << ")"; + return o << cluster.self << "(" << STATE[cluster.state] << ")"; } MemberId Cluster::getId() const { - return myId; // Immutable, no need to lock. + return self; // Immutable, no need to lock. } broker::Broker& Cluster::getBroker() const { @@ -578,7 +589,7 @@ void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; if (mgmtObject) { stringstream stream; - stream << myId; + stream << self; mgmtObject->set_clusterID(clusterId.str()); mgmtObject->set_memberID(stream.str()); } @@ -589,4 +600,11 @@ void Cluster::messageExpired(const MemberId&, uint64_t id, Lock&) { expiryPolicy->deliverExpire(id); } +void Cluster::connectionFrame(const EventFrame& frame) { + // FIXME aconway 2009-03-02: bypassing deliverFrameQueue to avoid race condition. + // Measure performance impact, restore with better locking. + // deliverFrameQueue.push(frame); + deliveredFrame(frame); +} + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index ea472a9ecf..4d358cf495 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -30,7 +30,6 @@ #include "NoOpConnectionOutputHandler.h" #include "PollerDispatch.h" #include "Quorum.h" -#include "Decoder.h" #include "PollableQueue.h" #include "ExpiryPolicy.h" @@ -102,7 +101,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { size_t getWriteEstimate() { return writeEstimate; } bool isLeader() const; // Called in deliver thread. - + + // Called by Connection in deliver event thread with decoded connection data frames. + void connectionFrame(const EventFrame&); + private: typedef sys::Monitor::ScopedLock Lock; @@ -125,7 +127,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void brokerShutdown(); // Cluster controls implement XML methods from cluster.xml. - // Called in deliver thread. + // Called in deliveredEvent thread. // void updateRequest(const MemberId&, const std::string&, Lock&); void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); @@ -134,6 +136,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { void messageExpired(const MemberId&, uint64_t, Lock& l); void shutdown(const MemberId&, Lock&); + // Used by cluster controls. + void stall(Lock&); + void unstall(Lock&); + // Handlers for pollable queues. void deliveredEvent(const Event&); void deliveredFrame(const EventFrame&); @@ -141,6 +147,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Helper, called in deliver thread. void updateStart(const MemberId& updatee, const Url& url, Lock&); + // Called in event deliver thread to check for update status. + bool isUpdateComplete(const EventFrame&); + bool isUpdateComplete(); + void setReady(Lock&); void deliver( // CPG deliver callback. @@ -186,7 +196,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { Cpg cpg; const std::string name; Url myUrl; - const MemberId myId; + const MemberId self; const size_t readMax; const size_t writeEstimate; framing::Uuid clusterId; @@ -201,9 +211,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr<FailoverExchange> failoverExchange; Quorum quorum; - // Used only in deliverdEvent thread - Decoder decoder; - // Used only in deliveredFrame thread ClusterMap::Set elders; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 9c2b4f1638..0f71a91293 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -40,6 +40,7 @@ #include "qpid/framing/ConnectionCloseOkBody.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/sys/AtomicValue.h" #include <boost/current_function.hpp> @@ -58,19 +59,22 @@ using namespace framing; NoOpConnectionOutputHandler Connection::discardHandler; -// Shadow connections -Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, ConnectionId myId) - : cluster(c), self(myId), catchUp(false), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false), +namespace { +sys::AtomicValue<uint64_t> idCounter; +} + +// Shadow connection +Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id) + : cluster(c), self(id), catchUp(false), output(*this, out), + connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false), mcastFrameHandler(cluster.getMulticast(), self) { init(); } -// Local connections +// Local connection Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, - const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink) - : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out), - connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0), + const std::string& logId, MemberId member, bool isCatchUp, bool isLink) + : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out), + connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0), expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self) { init(); } @@ -149,12 +153,9 @@ void Connection::deliveredFrame(const EventFrame& f) { if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. && !checkUnsupported(*f.frame.getBody())) // Unsupported operation. { - // FIXME aconway 2009-02-24: Using the DATA/CONTROL - // distinction to distinguish incoming vs. outgoing frames is - // very unclear. if (f.type == DATA) // incoming data frames to broker::Connection connection.received(const_cast<AMQFrame&>(f.frame)); - else { // outgoing data frame, send via SessionState + else { // frame control, send frame via SessionState broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession(); if (ss) ss->out(const_cast<AMQFrame&>(f.frame)); } @@ -200,12 +201,12 @@ void Connection::left() { connection.closed(); } -// Decode data from local clients. +// ConnectoinCodec::decode receives read buffers from directly-connected clients. size_t Connection::decode(const char* buffer, size_t size) { if (catchUp) { // Handle catch-up locally. Buffer buf(const_cast<char*>(buffer), size); while (localDecoder.decode(buf)) - received(localDecoder.frame); + received(localDecoder.getFrame()); } else { // Multicast local connections. assert(isLocal()); @@ -233,6 +234,29 @@ size_t Connection::decode(const char* buffer, size_t size) { return size; } +// Decode a data event, a read buffer that has been delivered by the cluster. +void Connection::decode(const EventHeader& eh, const void* data) { + assert(eh.getType() == DATA); // Only handle connection data events. + const char* cp = static_cast<const char*>(data); + Buffer buf(const_cast<char*>(cp), eh.getSize()); + if (clusterDecoder.decode(buf)) { // Decoded a frame + AMQFrame frame(clusterDecoder.getFrame()); + while (clusterDecoder.decode(buf)) { + cluster.connectionFrame(EventFrame(eh, frame)); + frame = clusterDecoder.getFrame(); + } + // Set read-credit on the last frame ending in this event. + // Credit will be given when this frame is processed. + cluster.connectionFrame(EventFrame(eh, frame, 1)); + } + else { + // We must give 1 unit read credit per event. + // This event does not complete any frames so + // we give read credit directly. + giveReadCredit(1); + } +} + broker::SessionState& Connection::sessionState() { return *connection.getChannel(currentChannel).getSession(); } @@ -267,11 +291,12 @@ void Connection::sessionState( QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } -void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) { - ConnectionId shadow = ConnectionId(memberId, connectionId); - QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow); - self = shadow; +void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) { + ConnectionId shadowId = ConnectionId(memberId, connectionId); + QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId); + self = shadowId; connection.setUserId(username); + clusterDecoder.setFragment(fragment.data(), fragment.size()); } void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { @@ -281,7 +306,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members } bool Connection::isLocal() const { - return self.first == cluster.getId() && self.second == this; + return self.first == cluster.getId() && self.second; } bool Connection::isShadow() const { diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index cefea00262..048008f2a5 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -64,10 +64,10 @@ class Connection : public: typedef sys::PollableQueue<EventFrame> PollableFrameQueue; - /** Local connection, use this in ConnectionId */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp, bool isLink); - /** Shadow connection */ - Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId); + /** Local connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink); + /** Shadow connection. */ + Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id); ~Connection(); ConnectionId getId() const { return self; } @@ -100,9 +100,12 @@ class Connection : /** Called if the connectors member has left the cluster */ void left(); - // ConnectionCodec methods + // ConnectionCodec methods - called by IO layer with a read buffer. size_t decode(const char* buffer, size_t size); + // Decode a data event, a read buffer that has been delivered by the cluster. + void decode(const EventHeader& eh, const void* data); + // Called for data delivered from the cluster. void deliveredFrame(const EventFrame&); @@ -118,7 +121,7 @@ class Connection : const framing::SequenceNumber& received, const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete); - void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username); + void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); @@ -149,7 +152,9 @@ class Connection : void exchange(const std::string& encoded); void giveReadCredit(int credit); - + + framing::FrameDecoder& getDecoder() { return clusterDecoder; } + private: struct NullFrameHandler : public framing::FrameHandler { void handle(framing::AMQFrame&) {} @@ -174,6 +179,7 @@ class Connection : WriteEstimate writeEstimate; OutputInterceptor output; framing::FrameDecoder localDecoder; + framing::FrameDecoder clusterDecoder; broker::Connection connection; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 442ac1438f..1ddd64d3d6 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -46,16 +46,13 @@ ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, con // Used for outgoing Link connections, we don't care. sys::ConnectionCodec* -ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) { - return new ConnectionCodec(out, id, cluster, false, true); - //return next->create(out, id); +ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& logId) { + return new ConnectionCodec(out, logId, cluster, false, true); } -ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& cluster, bool catchUp, bool isLink) - : codec(out, id, isLink), - interceptor(new Connection(cluster, codec, id, cluster.getId(), catchUp, isLink)), - id(interceptor->getId()), - localId(id) +ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& cluster, bool catchUp, bool isLink) + : codec(out, logId, isLink), + interceptor(new Connection(cluster, codec, logId, cluster.getId(), catchUp, isLink)) { std::auto_ptr<sys::ConnectionInputHandler> ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); diff --git a/cpp/src/qpid/cluster/ConnectionCodec.h b/cpp/src/qpid/cluster/ConnectionCodec.h index 69c2b0c3c8..ea01b7abb9 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.h +++ b/cpp/src/qpid/cluster/ConnectionCodec.h @@ -56,7 +56,7 @@ class ConnectionCodec : public sys::ConnectionCodec { sys::ConnectionCodec* create(sys::OutputControl&, const std::string& id); }; - ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& c, bool catchUp, bool isLink); + ConnectionCodec(sys::OutputControl& out, const std::string& logId, Cluster& c, bool catchUp, bool isLink); ~ConnectionCodec(); // ConnectionCodec functions. @@ -71,8 +71,6 @@ class ConnectionCodec : public sys::ConnectionCodec { private: amqp_0_10::Connection codec; boost::intrusive_ptr<cluster::Connection> interceptor; - cluster::ConnectionId id; - std::string localId; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/cpp/src/qpid/cluster/ConnectionDecoder.cpp deleted file mode 100644 index 3c18cf751e..0000000000 --- a/cpp/src/qpid/cluster/ConnectionDecoder.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ConnectionDecoder.h" -#include "EventFrame.h" -#include "ConnectionMap.h" - -namespace qpid { -namespace cluster { - -using namespace framing; - -ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h) {} - -void ConnectionDecoder::decode(const EventHeader& eh, const void* data, ConnectionMap& map) { - assert(eh.getType() == DATA); // Only handle connection data events. - const char* cp = static_cast<const char*>(data); - Buffer buf(const_cast<char*>(cp), eh.getSize()); - if (decoder.decode(buf)) { // Decoded a frame - AMQFrame frame(decoder.frame); - while (decoder.decode(buf)) { - handler(EventFrame(eh, frame)); - frame = decoder.frame; - } - // Set read-credit on the last frame ending in this event. - // Credit will be given when this frame is processed. - handler(EventFrame(eh, frame, 1)); - } - else { - // We must give 1 unit read credit per event. - // This event does not complete any frames so - // we give read credit directly. - ConnectionPtr connection = map.getLocal(eh.getConnectionId()); - if (connection) - connection->giveReadCredit(1); - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionDecoder.h b/cpp/src/qpid/cluster/ConnectionDecoder.h deleted file mode 100644 index 449387c1cc..0000000000 --- a/cpp/src/qpid/cluster/ConnectionDecoder.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef QPID_CLUSTER_CONNECTIONDECODER_H -#define QPID_CLUSTER_CONNECTIONDECODER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/framing/FrameDecoder.h" -#include <boost/function.hpp> - -namespace qpid { -namespace cluster { - -class EventHeader; -class EventFrame; -class ConnectionMap; - -/** - * Decodes delivered connection data Event's as EventFrame's for a - * connection replica, local or shadow. Manages state for frame - * fragments and flow control. - * - * THREAD UNSAFE: connection events are decoded in sequence. - */ -class ConnectionDecoder -{ - public: - typedef boost::function<void(const EventFrame&)> Handler; - - ConnectionDecoder(const Handler& h); - - /** Takes EventHeader + data rather than Event so that the caller can - * pass a pointer to connection data or a CPG buffer directly without copy. - */ - void decode(const EventHeader& eh, const void* data, ConnectionMap& connections); - - private: - Handler handler; - framing::FrameDecoder decoder; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CONNECTIONDECODER_H*/ diff --git a/cpp/src/qpid/cluster/ConnectionMap.cpp b/cpp/src/qpid/cluster/ConnectionMap.cpp index 2c024b579d..d4b2aa6675 100644 --- a/cpp/src/qpid/cluster/ConnectionMap.cpp +++ b/cpp/src/qpid/cluster/ConnectionMap.cpp @@ -38,9 +38,9 @@ void ConnectionMap::insert(ConnectionPtr p) { void ConnectionMap::erase(const ConnectionId& id) { Lock l(lock); - Map::iterator i = map.find(id); - QPID_ASSERT(i != map.end()); - map.erase(i); + size_t erased = map.erase(id); + assert(erased); + (void)erased; // Avoid unused variable warnings. } ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { @@ -61,13 +61,6 @@ ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { return i->second; } -ConnectionMap::ConnectionPtr ConnectionMap::getLocal(const ConnectionId& id) { - Lock l(lock); - if (id.getMember() != cluster.getId()) return 0; - Map::const_iterator i = map.find(id); - return i == map.end() ? 0 : i->second; -} - ConnectionMap::Vector ConnectionMap::values() const { Lock l(lock); Vector result(map.size()); @@ -76,22 +69,16 @@ ConnectionMap::Vector ConnectionMap::values() const { return result; } -void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) { - Lock l(lock); - for (Map::iterator i = map.begin(); i != map.end(); ) { - MemberId member = i->first.getMember(); - if (member != myId && !cluster.isMember(member)) { - i->second->left(); - map.erase(i++); - } else { - i++; - } - } -} - void ConnectionMap::clear() { Lock l(lock); map.clear(); } +void ConnectionMap::decode(const EventHeader& eh, const void* data) { + ConnectionPtr connection = get(eh.getConnectionId()); + if (connection) + connection->decode(eh, data); +} + + }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/ConnectionMap.h b/cpp/src/qpid/cluster/ConnectionMap.h index c5ba18af0c..b449f329b1 100644 --- a/cpp/src/qpid/cluster/ConnectionMap.h +++ b/cpp/src/qpid/cluster/ConnectionMap.h @@ -60,18 +60,13 @@ class ConnectionMap { */ ConnectionPtr get(const ConnectionId& id); - /** If ID is a local connection and in the map return it, else return 0 */ - ConnectionPtr getLocal(const ConnectionId& id); - /** Get connections for sending an update. */ Vector values() const; - /** Remove connections who's members are no longer in the cluster. Deliver thread. */ - void update(MemberId myId, const ClusterMap& cluster); + /** Decode a connection data event. */ + void decode(const EventHeader& eh, const void* data); - void clear(); - size_t size() const; private: diff --git a/cpp/src/qpid/cluster/Decoder.cpp b/cpp/src/qpid/cluster/Decoder.cpp deleted file mode 100644 index 1ba36bb521..0000000000 --- a/cpp/src/qpid/cluster/Decoder.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Decoder.h" -#include "Event.h" -#include "qpid/framing/Buffer.h" -#include "qpid/ptr_map.h" - -namespace qpid { -namespace cluster { - -using namespace framing; - -Decoder::Decoder(const Handler& h, ConnectionMap& cm) : handler(h), connections(cm) {} - -void Decoder::decode(const EventHeader& eh, const void* data) { - ConnectionId id = eh.getConnectionId(); - Map::iterator i = map.find(id); - if (i == map.end()) { - std::pair<Map::iterator, bool> ib = map.insert(id, new ConnectionDecoder(handler)); - i = ib.first; - } - ptr_map_ptr(i)->decode(eh, data, connections); -} - -void Decoder::erase(const ConnectionId& c) { - Map::iterator i = map.find(c); - if (i != map.end()) - map.erase(i); -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Decoder.h b/cpp/src/qpid/cluster/Decoder.h deleted file mode 100644 index 50f6afa491..0000000000 --- a/cpp/src/qpid/cluster/Decoder.h +++ /dev/null @@ -1,65 +0,0 @@ -#ifndef QPID_CLUSTER_DECODER_H -#define QPID_CLUSTER_DECODER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ConnectionDecoder.h" -#include "types.h" -#include <boost/ptr_container/ptr_map.hpp> - -namespace qpid { -namespace cluster { - -class EventHeader; -class ConnectionMap; - -/** - * Holds a map of ConnectionDecoders. Decodes Events into EventFrames - * and forwards EventFrames to a handler. - * - * THREAD UNSAFE: Called sequentially with un-decoded cluster events from CPG. - */ -class Decoder -{ - public: - typedef boost::function<void(const EventFrame&)> Handler; - - Decoder(const Handler& h, ConnectionMap&); - - /** Takes EventHeader + data rather than Event so that the caller can - * pass a pointer to connection data or a CPG buffer directly without copy. - */ - void decode(const EventHeader& eh, const void* data); - - /** Erase the decoder for a connection. */ - void erase(const ConnectionId&); - - private: - typedef boost::ptr_map<ConnectionId, ConnectionDecoder> Map; - Handler handler; - Map map; - ConnectionMap& connections; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_DECODER_H*/ diff --git a/cpp/src/qpid/cluster/Event.cpp b/cpp/src/qpid/cluster/Event.cpp index 9fe5376bc5..749fbf240f 100644 --- a/cpp/src/qpid/cluster/Event.cpp +++ b/cpp/src/qpid/cluster/Event.cpp @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/assert.h" #include <ostream> #include <iterator> #include <algorithm> @@ -31,6 +32,7 @@ namespace qpid { namespace cluster { using framing::Buffer; +using framing::AMQFrame; const size_t EventHeader::HEADER_SIZE = sizeof(uint8_t) + // type @@ -57,7 +59,7 @@ void EventHeader::decode(const MemberId& m, framing::Buffer& buf) { type = (EventType)buf.getOctet(); if(type != DATA && type != CONTROL) throw Exception("Invalid multicast event type"); - connectionId = ConnectionId(m, reinterpret_cast<Connection*>(buf.getLongLong())); + connectionId = ConnectionId(m, buf.getLongLong()); size = buf.getLong(); #ifdef QPID_LATENCY_METRIC latency_metric_timestamp = buf.getLongLong(); @@ -93,7 +95,7 @@ iovec Event::toIovec() { void EventHeader::encode(Buffer& b) const { b.putOctet(type); - b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getPointer())); + b.putLongLong(connectionId.getNumber()); b.putLong(size); #ifdef QPID_LATENCY_METRIC b.putLongLong(latency_metric_timestamp); @@ -111,6 +113,14 @@ Event::operator Buffer() const { return Buffer(const_cast<char*>(getData()), getSize()); } +AMQFrame Event::getFrame() const { + assert(type == CONTROL); + Buffer buf(*this); + AMQFrame frame; + QPID_ASSERT(frame.decode(buf)); + return frame; +} + static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; std::ostream& operator << (std::ostream& o, EventType t) { diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 1338ea7413..c9f44725df 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -24,6 +24,7 @@ #include "types.h" #include "qpid/RefCountedBuffer.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/sys/LatencyMetric.h" #include <sys/uio.h> // For iovec #include <iosfwd> @@ -59,8 +60,8 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { uint64_t getSequence() const { return sequence; } void setSequence(uint64_t n) { sequence = n; } - bool isCluster() const { return connectionId.getPointer() == 0; } - bool isConnection() const { return connectionId.getPointer() != 0; } + bool isCluster() const { return connectionId.getNumber() == 0; } + bool isConnection() const { return connectionId.getNumber() != 0; } protected: static const size_t HEADER_SIZE; @@ -97,6 +98,8 @@ class Event : public EventHeader { // Store including header char* getStore() { return store; } const char* getStore() const { return store; } + + framing::AMQFrame getFrame() const; operator framing::Buffer() const; diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index ef3c38658b..abeea3ef16 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -42,8 +42,8 @@ struct EventFrame EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0); - bool isCluster() const { return !connectionId.getPointer(); } - bool isConnection() const { return connectionId.getPointer(); } + bool isCluster() const { return connectionId.getNumber() == 0; } + bool isConnection() const { return connectionId.getNumber() != 0; } bool isLastInEvent() const { return readCredit; } diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 18746ccb7e..4a4af4adbd 100644 --- a/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -95,7 +95,7 @@ UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, con : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), frameId(frameId_), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail) + done(ok), failed(fail), connectionSettings(cs) { connection.open(url, cs); session = connection.newSession("update_shared"); @@ -228,13 +228,15 @@ void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& upda shadowConnection = catchUpConnection(); broker::Connection& bc = updateConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to use on reconnect? - shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); + connectionSettings.maxFrameSize = bc.getFrameMax(); + shadowConnection.open(updateeUrl, connectionSettings); bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); + std::pair<const char*, size_t> fragment = updateConnection->getDecoder().getFragment(); ClusterConnectionProxy(shadowConnection).shadowReady( updateConnection->getId().getMember(), - reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()), - updateConnection->getBrokerConnection().getUserId() + updateConnection->getId().getNumber(), + bc.getUserId(), + string(fragment.first, fragment.second) ); shadowConnection.close(); QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); @@ -285,9 +287,6 @@ void UpdateClient::updateSession(broker::SessionHandler& sh) { if (inProgress) { inProgress->getFrames().map(simpl->out); } - - // FIXME aconway 2008-09-23: update session replay list. - QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 23f647c820..08267392f4 100644 --- a/cpp/src/qpid/cluster/UpdateClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -98,6 +98,7 @@ class UpdateClient : public sys::Runnable { client::AsyncSession session, shadowSession; boost::function<void()> done; boost::function<void(const std::exception& e)> failed; + client::ConnectionSettings connectionSettings; }; }} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/types.h b/cpp/src/qpid/cluster/types.h index 30454d9fbb..c19152e4d8 100644 --- a/cpp/src/qpid/cluster/types.h +++ b/cpp/src/qpid/cluster/types.h @@ -68,17 +68,16 @@ inline bool operator==(const cpg_address& caddr, const MemberId& id) { return id std::ostream& operator<<(std::ostream&, const MemberId&); -struct ConnectionId : public std::pair<MemberId, Connection*> { - ConnectionId(const MemberId& m=MemberId(), Connection* c=0) : std::pair<MemberId, Connection*> (m,c) {} - ConnectionId(uint64_t m, uint64_t c) - : std::pair<MemberId, Connection*>(MemberId(m), reinterpret_cast<Connection*>(c)) {} +struct ConnectionId : public std::pair<MemberId, uint64_t> { + ConnectionId(const MemberId& m=MemberId(), uint64_t c=0) : std::pair<MemberId, uint64_t> (m,c) {} + ConnectionId(uint64_t m, uint64_t c) : std::pair<MemberId, uint64_t>(MemberId(m), c) {} MemberId getMember() const { return first; } - Connection* getPointer() const { return second; } + uint64_t getNumber() const { return second; } }; std::ostream& operator<<(std::ostream&, const ConnectionId&); -std::ostream& operator << (std::ostream&, EventType); +std::ostream& operator<<(std::ostream&, EventType); }} // namespace qpid::cluster diff --git a/cpp/src/qpid/framing/FrameDecoder.cpp b/cpp/src/qpid/framing/FrameDecoder.cpp index cbdac181e9..6f0ae9756f 100644 --- a/cpp/src/qpid/framing/FrameDecoder.cpp +++ b/cpp/src/qpid/framing/FrameDecoder.cpp @@ -21,8 +21,9 @@ #include "FrameDecoder.h" #include "Buffer.h" #include "qpid/log/Statement.h" -#include <algorithm> #include "qpid/framing/reply_exceptions.h" +#include <algorithm> +#include <string.h> namespace qpid { namespace framing { @@ -67,4 +68,13 @@ bool FrameDecoder::decode(Buffer& buffer) { return false; } +void FrameDecoder::setFragment(const char* data, size_t size) { + fragment.resize(size); + ::memcpy(fragment.data(), data, size); +} + +std::pair<const char*, size_t> FrameDecoder::getFragment() const { + return std::pair<const char*, size_t>(fragment.data(), fragment.size()); +} + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/FrameDecoder.h b/cpp/src/qpid/framing/FrameDecoder.h index 7f974dadc3..961cc666a9 100644 --- a/cpp/src/qpid/framing/FrameDecoder.h +++ b/cpp/src/qpid/framing/FrameDecoder.h @@ -35,9 +35,16 @@ class FrameDecoder { public: bool decode(Buffer& buffer); - AMQFrame frame; + const AMQFrame& getFrame() const { return frame; } + AMQFrame& getFrame() { return frame; } + + void setFragment(const char*, size_t); + std::pair<const char*, size_t> getFragment() const; + private: std::vector<char> fragment; + AMQFrame frame; + }; }} // namespace qpid::framing diff --git a/cpp/src/tests/ClusterFixture.cpp b/cpp/src/tests/ClusterFixture.cpp index 3a0ea74098..5658957b48 100644 --- a/cpp/src/tests/ClusterFixture.cpp +++ b/cpp/src/tests/ClusterFixture.cpp @@ -109,7 +109,7 @@ void ClusterFixture::addLocal() { Args args(makeArgs(prefix)); vector<const char*> argv(args.size()); transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); - qpid::log::Logger::instance().setPrefix(os.str()); + qpid::log::Logger::instance().setPrefix(prefix); localBroker.reset(new BrokerFixture(parseOpts(argv.size(), &argv[0]))); push_back(localBroker->getPort()); forkedBrokers.push_back(shared_ptr<ForkedBroker>()); diff --git a/cpp/src/tests/FrameDecoder.cpp b/cpp/src/tests/FrameDecoder.cpp index b7f1ea1b89..f5db66d5fe 100644 --- a/cpp/src/tests/FrameDecoder.cpp +++ b/cpp/src/tests/FrameDecoder.cpp @@ -65,7 +65,7 @@ QPID_AUTO_TEST_CASE(testByteFragments) { } Buffer buf(&encoded[encoded.size()-1], 1); BOOST_CHECK(decoder.decode(buf)); - BOOST_CHECK_EQUAL(data, getData(decoder.frame)); + BOOST_CHECK_EQUAL(data, getData(decoder.getFrame())); } diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 2cf4e915b6..d3e4b488fb 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -125,6 +125,7 @@ <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> <field name="user-name" type="str8"/> + <field name="fragment" type="str32"/> </control> <!-- Complete a cluster state update. --> |