From d71457233eb57af17dea2e5d1dc56fdc4497da6a Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 3 Feb 2009 21:28:14 +0000 Subject: Fix for race conditions in cluster join. - ConnectionDecoder: separated from Connection. - cluster/PollableQueue: stop processing frames if PollableQueue is stopped. - move state checks in event-queue handler to frame-queue handler. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@740459 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/cluster.mk | 8 +- qpid/cpp/src/qpid/cluster/Cluster.cpp | 124 ++++++++++++------------ qpid/cpp/src/qpid/cluster/Cluster.h | 23 +++-- qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 4 +- qpid/cpp/src/qpid/cluster/ClusterMap.h | 2 +- qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h | 56 ----------- qpid/cpp/src/qpid/cluster/Connection.cpp | 33 ++----- qpid/cpp/src/qpid/cluster/Connection.h | 6 +- qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp | 2 - qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp | 49 ++++++++++ qpid/cpp/src/qpid/cluster/ConnectionDecoder.h | 60 ++++++++++++ qpid/cpp/src/qpid/cluster/ConnectionMap.cpp | 86 ++++++++++++++++ qpid/cpp/src/qpid/cluster/ConnectionMap.h | 70 ++++++------- qpid/cpp/src/qpid/cluster/Decoder.cpp | 46 +++++++++ qpid/cpp/src/qpid/cluster/Decoder.h | 62 ++++++++++++ qpid/cpp/src/qpid/cluster/Event.h | 6 +- qpid/cpp/src/qpid/cluster/EventFrame.cpp | 13 +-- qpid/cpp/src/qpid/cluster/EventFrame.h | 14 ++- qpid/cpp/src/qpid/cluster/PollableQueue.h | 68 +++++++++++++ qpid/cpp/src/qpid/cluster/Quorum_cman.cpp | 2 +- qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 5 +- qpid/cpp/src/qpid/cluster/types.h | 4 +- qpid/cpp/src/tests/ForkedBroker.h | 1 + qpid/cpp/src/tests/cluster_test.cpp | 4 +- qpid/cpp/xml/cluster.xml | 6 +- 25 files changed, 521 insertions(+), 233 deletions(-) delete mode 100644 qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h create mode 100644 qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp create mode 100644 qpid/cpp/src/qpid/cluster/ConnectionDecoder.h create mode 100644 qpid/cpp/src/qpid/cluster/ConnectionMap.cpp create mode 100644 qpid/cpp/src/qpid/cluster/Decoder.cpp create mode 100644 qpid/cpp/src/qpid/cluster/Decoder.h create mode 100644 qpid/cpp/src/qpid/cluster/PollableQueue.h (limited to 'qpid/cpp') diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 9c76bb2239..f222b1e148 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -40,7 +40,7 @@ cluster_la_SOURCES = \ $(CMAN_SOURCES) \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ - qpid/cluster/ClusterQueueHandler.h \ + qpid/cluster/PollableQueue.h \ qpid/cluster/ClusterMap.cpp \ qpid/cluster/ClusterMap.h \ qpid/cluster/ClusterPlugin.cpp \ @@ -49,8 +49,13 @@ cluster_la_SOURCES = \ qpid/cluster/ConnectionCodec.cpp \ qpid/cluster/ConnectionCodec.h \ qpid/cluster/ConnectionMap.h \ + qpid/cluster/ConnectionMap.cpp \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ + qpid/cluster/Decoder.cpp \ + qpid/cluster/Decoder.h \ + qpid/cluster/ConnectionDecoder.cpp \ + qpid/cluster/ConnectionDecoder.h \ qpid/cluster/Dispatchable.h \ qpid/cluster/UpdateClient.cpp \ qpid/cluster/UpdateClient.h \ @@ -71,6 +76,7 @@ cluster_la_SOURCES = \ qpid/cluster/ThreadDispatch.h \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Quorum.h \ + qpid/cluster/Updatee.h \ qpid/cluster/WriteEstimate.cpp \ qpid/cluster/WriteEstimate.h \ qpid/cluster/types.h diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 6a19b8e4ea..eaa4a720b1 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -20,7 +20,6 @@ #include "Connection.h" #include "UpdateClient.h" #include "FailoverExchange.h" -#include "ClusterQueueHandler.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionState.h" @@ -92,8 +91,16 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b writeEstimate(writeEstimate_), mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), - deliverEventQueue(ClusterQueueHandler(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller), - deliverFrameQueue(ClusterQueueHandler(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller), + deliverEventQueue(boost::bind(&Cluster::deliveredEvent, this, _1), + boost::bind(&Cluster::leave, this), + "Error decoding events", + poller), + deliverFrameQueue(boost::bind(&Cluster::deliveredFrame, this, _1), + boost::bind(&Cluster::leave, this), + "Error delivering frames", + poller), + connections(*this), + decoder(boost::bind(&PollableFrameQueue::push, &deliverFrameQueue, _1)), state(INIT), lastSize(0), lastBroker(false), @@ -121,12 +128,23 @@ Cluster::~Cluster() { if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } -void Cluster::insert(const boost::intrusive_ptr& c) { - connections.insert(c->getId(), c); +// Called in connection thread to insert a client connection. +void Cluster::addLocalConnection(const boost::intrusive_ptr& c) { + Lock l(lock); + connections.insert(c); } -void Cluster::erase(ConnectionId id) { +// Called in connection thread to insert an updated shadow connection. +void Cluster::addShadowConnection(const boost::intrusive_ptr& c) { + Lock l(lock); + assert(state <= UPDATEE); // Only during update. + connections.insert(c); +} + +void Cluster::erase(const ConnectionId& id) { + // Called only by Connection::deliverClose in deliver thread, no need to lock. connections.erase(id); + decoder.erase(id); } std::vector Cluster::getIds() const { @@ -168,17 +186,7 @@ void Cluster::leave(Lock&) { } } -boost::intrusive_ptr Cluster::getConnection(const ConnectionId& connectionId) { - boost::intrusive_ptr cp = connections.find(connectionId); - if (!cp && connectionId.getMember() != myId) { // New shadow connection - std::ostringstream mgmtId; - mgmtId << name << ":" << connectionId; - cp = new Connection(*this, shadowOut, mgmtId.str(), connectionId); - connections.insert(connectionId, cp); - } - return cp; -} - +// Deliver CPG message. void Cluster::deliver( cpg_handle_t /*handle*/, cpg_name* /*group*/, @@ -187,58 +195,52 @@ void Cluster::deliver( void* msg, int msg_len) { - Mutex::ScopedLock l(lock); MemberId from(nodeid, pid); framing::Buffer buf(static_cast(msg), msg_len); Event e(Event::decodeCopy(from, buf)); e.setSequence(sequence++); if (from == myId) // Record self-deliveries for flow control. mcast.selfDeliver(e); - deliver(e, l); + deliver(e); } -void Cluster::deliver(const Event& e, Lock&) { +void Cluster::deliver(const Event& e) { if (state == LEFT) return; QPID_LATENCY_INIT(e); deliverEventQueue.push(e); } -// Entry point: called when deliverEventQueue has events to process. +// Handler for deliverEventQueue void Cluster::deliveredEvent(const Event& e) { QPID_LATENCY_RECORD("delivered event queue", e); Buffer buf(const_cast(e.getData()), e.getSize()); - boost::intrusive_ptr connection; - if (e.isConnection()) { - if (state <= UPDATEE) { - QPID_LOG(trace, *this << " DROP: " << e); - return; - } - connection = getConnection(e.getConnectionId()); - if (!connection) return; - } if (e.getType() == CONTROL) { AMQFrame frame; - while (frame.decode(buf)) { - deliverFrameQueue.push(EventFrame(connection, e, frame)); - } - } - else if (e.getType() == DATA) { - connection->deliveredEvent(e, deliverFrameQueue); + while (frame.decode(buf)) + deliverFrameQueue.push(EventFrame(e, frame)); } + else if (e.getType() == DATA) + decoder.decode(e, e.getData()); } +// Handler for deliverFrameQueue void Cluster::deliveredFrame(const EventFrame& e) { + Mutex::ScopedLock l(lock); QPID_LOG(trace, *this << " DLVR: " << e); QPID_LATENCY_RECORD("delivered frame queue", e.frame); - if (e.connection) { - e.connection->deliveredFrame(e); - } - else { - Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? - ClusterDispatcher dispatch(*this, e.member, l); + if (e.isCluster()) { // Cluster control frame + ClusterDispatcher dispatch(*this, e.connectionId.getMember(), l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) throw Exception(QPID_MSG("Invalid cluster control")); } + else { // Connection frame. + if (state <= UPDATEE) { + QPID_LOG(trace, *this << " DROP: " << e); + return; + } + boost::intrusive_ptr connection = connections.get(e.connectionId); + connection->deliveredFrame(e); + } QPID_LATENCY_RECORD("processed", e.frame); } @@ -282,7 +284,13 @@ void Cluster::configChange ( std::string addresses; for (cpg_address* p = current; p < current+nCurrent; ++p) addresses.append(MemberId(*p).str()); - deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId), l); + deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), addresses), myId)); +} + +void Cluster::setReady(Lock&) { + state = READY; + if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + mcast.release(); } void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& l) { @@ -296,12 +304,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& if (state == INIT) { // First configChange if (map.aliveCount() == 1) { - setClusterId(true); - // FIXME aconway 2008-12-11: Centralize transition to READY and associated actions eg mcast.release() - state = READY; - mcast.release(); QPID_LOG(notice, *this << " first in cluster"); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); + setClusterId(true); + setReady(l); map = ClusterMap(myId, myUrl, true); memberUpdate(l); } @@ -325,9 +330,6 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& } } - - - void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { if (state == READY && map.isJoiner(id)) { state = OFFER; @@ -361,11 +363,8 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); if (state == CATCHUP && id == myId) { - state = READY; - mcast.release(); QPID_LOG(notice, *this << " caught up, active cluster member"); - if (mgmtObject!=0) mgmtObject->set_status("ACTIVE"); - mcast.release(); + setReady(l); } } @@ -379,8 +378,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu updateStart(updatee, *url, l); } else { // Another offer was first. - state = READY; - mcast.release(); + setReady(l); QPID_LOG(info, *this << " cancelled update offer to " << updatee); tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. } @@ -390,7 +388,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu setClusterId(uuid); state = UPDATEE; QPID_LOG(info, *this << " receiving update from " << updater); - deliverEventQueue.stop(); + deliverFrameQueue.stop(); checkUpdateIn(l); } } @@ -400,7 +398,7 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { assert(state == OFFER); state = UPDATER; QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); - deliverEventQueue.stop(); + deliverFrameQueue.stop(); if (updateThread.id()) updateThread.join(); // Join the previous updatethread. updateThread = Thread( new UpdateClient(myId, updatee, url, broker, map, connections.values(), @@ -422,7 +420,7 @@ void Cluster::checkUpdateIn(Lock& ) { mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; QPID_LOG(info, *this << " received update, starting catch-up"); - deliverEventQueue.start(); + deliverFrameQueue.start(); } } @@ -432,11 +430,11 @@ void Cluster::updateOutDone() { } void Cluster::updateOutDone(Lock& l) { + QPID_LOG(info, *this << " sent update"); assert(state == UPDATER); state = READY; mcast.release(); - QPID_LOG(info, *this << " sent update"); - deliverEventQueue.start(); + deliverFrameQueue.start(); tryMakeOffer(map.firstJoiner(), l); // Try another offer } @@ -504,8 +502,6 @@ void Cluster::memberUpdate(Lock& l) { } lastSize = size; - // - if (mgmtObject) { mgmtObject->set_clusterSize(size); string urlstr; diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 6e91ca8f64..1cfcd04c6f 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -29,9 +29,10 @@ #include "NoOpConnectionOutputHandler.h" #include "PollerDispatch.h" #include "Quorum.h" +#include "Decoder.h" +#include "PollableQueue.h" #include "qpid/broker/Broker.h" -#include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" #include "qpid/Url.h" @@ -73,8 +74,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { virtual ~Cluster(); // Connection map - called in connection threads. - void insert(const ConnectionPtr&); - void erase(ConnectionId); + void addLocalConnection(const ConnectionPtr&); + void addShadowConnection(const ConnectionPtr&); + void erase(const ConnectionId&); // URLs of current cluster members - called in connection threads. std::vector getIds() const; @@ -100,8 +102,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { private: typedef sys::Monitor::ScopedLock Lock; - typedef sys::PollableQueue PollableEventQueue; - typedef sys::PollableQueue PollableFrameQueue; + typedef PollableQueue PollableEventQueue; + typedef PollableQueue PollableFrameQueue; // NB: The final Lock& parameter on functions below is used to mark functions // that should only be called by a function that already holds the lock. @@ -132,6 +134,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Helper, called in deliver thread. void updateStart(const MemberId& updatee, const Url& url, Lock&); + void setReady(Lock&); + void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, @@ -140,7 +144,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void* /*msg*/, int /*msg_len*/); - void deliver(const Event& e, Lock&); + void deliver(const Event&); void configChange( // CPG config change callback. cpg_handle_t /*handle*/, @@ -150,8 +154,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { struct cpg_address */*joined*/, int /*nJoined*/ ); - boost::intrusive_ptr getConnection(const ConnectionId&); - virtual qpid::management::ManagementObject* GetManagementObject() const; virtual management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args& args, std::string& text); @@ -193,7 +195,10 @@ class Cluster : private Cpg::Handler, public management::Manageable { boost::shared_ptr failoverExchange; Quorum quorum; - // Remaining members are protected by lock. + // Called only from event delivery thread + Decoder decoder; + + // Remaining members are protected by lock mutable sys::Monitor lock; // Local cluster state, cluster map diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index bcfade2b8c..9e7232180d 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -69,8 +69,7 @@ ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } -ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { - framing::ClusterConnectionMembershipBody b; +void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const { b.getJoiners().clear(); std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1)); for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { @@ -79,7 +78,6 @@ ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); - return b; } bool ClusterMap::configChange( diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 9756daf977..4548441442 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -71,7 +71,7 @@ class ClusterMap { MemberId firstJoiner() const; /** Convert map contents to a cluster control body. */ - framing::ClusterConnectionMembershipBody asMethodBody() const; + void toMethodBody(framing::ClusterConnectionMembershipBody&) const; size_t aliveCount() const { return alive.size(); } size_t memberCount() const { return members.size(); } diff --git a/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h b/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h deleted file mode 100644 index e843526962..0000000000 --- a/qpid/cpp/src/qpid/cluster/ClusterQueueHandler.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef QPID_CLUSTER_CLUSTERQUEUEHANDLER_H -#define QPID_CLUSTER_CLUSTERQUEUEHANDLER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "Cluster.h" -#include "qpid/sys/PollableQueue.h" -#include - -namespace qpid { -namespace cluster { - -/** Convenience functor for PollableQueue callbacks. */ -template struct ClusterQueueHandler { - ClusterQueueHandler(Cluster& c, boost::function f, const std::string& n) : cluster(c), callback(f), name(n) {} - ClusterQueueHandler(const Cluster* c, boost::function f, const std::string& n) : cluster(*const_cast(c)), callback(f), name(n) {} - - void operator()(typename sys::PollableQueue::Queue& values) { - try { - std::for_each(values.begin(), values.end(), callback); - values.clear(); - } - catch (const std::exception& e) { - QPID_LOG(error, "Error on " << name << ": " << e.what()); - cluster.leave(); - } - } - - Cluster& cluster; - boost::function callback; - std::string name; -}; - - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_CLUSTERQUEUEHANDLER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 9016e812be..a71950ef1d 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -75,7 +75,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, void Connection::init() { QPID_LOG(debug, cluster << " new connection: " << *this); - if (isLocal() && !isCatchUp() && cluster.getReadMax()) { + if (isLocalClient()) { + cluster.addLocalConnection(this); + if (cluster.getReadMax()) output.giveReadCredit(cluster.getReadMax()); } } @@ -99,17 +101,15 @@ void Connection::deliverDoOutput(uint32_t requested) { // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); - if (isLocal()) { + if (isLocal()) { // Local catch-up connection. currentChannel = f.getChannel(); if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or updated ex catch-up connection. + else { // Shadow or updated catch-up connection. if (f.getMethod() && f.getMethod()->isA()) { - if (isShadow()) { - QPID_LOG(debug, cluster << " inserting connection " << *this); - cluster.insert(boost::intrusive_ptr(this)); - } + if (isShadow()) + cluster.addShadowConnection(this); AMQFrame ok((ConnectionCloseOkBody())); connection.getOutput().send(ok); output.setOutputHandler(discardHandler); @@ -136,24 +136,7 @@ bool Connection::checkUnsupported(const AMQBody& body) { return !message.empty(); } -// Decode buffer and put frames on frameq. -void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { - assert(!catchUp); - Buffer buf(e); - // Set read credit on the last frame. - ++readCredit; // One credit per buffer. - if (!mcastDecoder.decode(buf)) return; - AMQFrame frame(mcastDecoder.frame); - while (mcastDecoder.decode(buf)) { - frameq.push(EventFrame(this, e, frame)); - frame = mcastDecoder.frame; - } - frameq.push(EventFrame(this, e, frame, readCredit)); - readCredit = 0; -} - - -// Delivered from cluster. +// Called in delivery thread, in cluster order. void Connection::deliveredFrame(const EventFrame& f) { assert(!catchUp); currentChannel = f.frame.getChannel(); diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index e22ff05c08..160855dc2d 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -72,9 +72,11 @@ class Connection : ConnectionId getId() const { return self; } broker::Connection& getBrokerConnection() { return connection; } - /** True for connections from direct clients of local broker */ + /** Local connections may be clients or catch-up connections */ bool isLocal() const; + bool isLocalClient() const { return isLocal() && !isCatchUp(); } + /** True for connections that are shadowing remote broker connections */ bool isShadow() const; @@ -101,7 +103,6 @@ class Connection : size_t decode(const char* buffer, size_t size); // Called for data delivered from the cluster. - void deliveredEvent(const Event&, PollableFrameQueue&); void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled); @@ -166,7 +167,6 @@ class Connection : WriteEstimate writeEstimate; OutputInterceptor output; framing::FrameDecoder localDecoder; - framing::FrameDecoder mcastDecoder; broker::Connection connection; framing::SequenceNumber deliverSeq; framing::ChannelId currentChannel; diff --git a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp index 1334f97eec..442ac1438f 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -59,8 +59,6 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, { std::auto_ptr ih(new ProxyInputHandler(interceptor)); codec.setInputHandler(ih); - if (!catchUp) // Don't put catchUp connections in the cluster map. - cluster.insert(interceptor); } ConnectionCodec::~ConnectionCodec() {} diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp new file mode 100644 index 0000000000..1500b6a743 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionDecoder.h" +#include "EventFrame.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +ConnectionDecoder::ConnectionDecoder(const Handler& h) : handler(h), readCredit(0) {} + +void ConnectionDecoder::decode(const EventHeader& eh, const void* data) { + assert(eh.getType() == DATA); // Only handle connection data events. + const char* cp = static_cast(data); + Buffer buf(const_cast(cp), eh.getSize()); + // Set read credit on the last frame in the event. + ++readCredit; // One credit per event = connection read buffer. + if (decoder.decode(buf)) { // Decoded a frame + AMQFrame frame(decoder.frame); + while (decoder.decode(buf)) { + handler(EventFrame(eh, frame)); + frame = decoder.frame; + } + handler(EventFrame(eh, frame, readCredit)); + readCredit = 0; // Reset credit for next event. + } +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h new file mode 100644 index 0000000000..5f139b23e9 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ConnectionDecoder.h @@ -0,0 +1,60 @@ +#ifndef QPID_CLUSTER_CONNECTIONDECODER_H +#define QPID_CLUSTER_CONNECTIONDECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/framing/FrameDecoder.h" +#include + +namespace qpid { +namespace cluster { + +class EventHeader; +class EventFrame; +/** + * Decodes delivered connection data Event's as EventFrame's for a + * connection replica, local or shadow. Manages state for frame + * fragments and flow control. + * + * THREAD UNSAFE: connection events are decoded in sequence. + */ +class ConnectionDecoder +{ + public: + typedef boost::function Handler; + + ConnectionDecoder(const Handler& h); + + /** Takes EventHeader + data rather than Event so that the caller can + * pass a pointer to connection data or a CPG buffer directly without copy. + */ + void decode(const EventHeader& eh, const void* data); + + private: + Handler handler; + framing::FrameDecoder decoder; + int readCredit; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_CONNECTIONDECODER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp new file mode 100644 index 0000000000..9dc6210666 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionMap.h" +#include "Cluster.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace cluster { + +using framing::InternalErrorException; + +void ConnectionMap::insert(ConnectionPtr p) { + std::pair ib = map.insert(Map::value_type(p->getId(), p)); + if (!ib.second) { + assert(0); + throw InternalErrorException(QPID_MSG("Duplicate connection replica: " << p->getId())); + } +} + +void ConnectionMap::erase(const ConnectionId& id) { + Map::iterator i = map.find(id); + if (i == map.end()) { + assert(0); + QPID_LOG(warning, "Erase non-existent connection replica: " << id); + } + map.erase(i); +} + +ConnectionMap::ConnectionPtr ConnectionMap::get(const ConnectionId& id) { + Map::const_iterator i = map.find(id); + if (i == map.end()) { + assert(id.getMember() != cluster.getId()); + // New remote connection, create a shadow. + std::ostringstream mgmtId; + mgmtId << id; + ConnectionPtr cp = new Connection(cluster, shadowOut, mgmtId.str(), id); + std::pair ib = map.insert(Map::value_type(id, cp)); + assert(ib.second); // FIXME aconway 2009-02-03: exception. + i = ib.first; + } + return i->second; +} + +ConnectionMap::Vector ConnectionMap::values() const { + Vector result(map.size()); + std::transform(map.begin(), map.end(), result.begin(), + boost::bind(&Map::value_type::second, _1)); + return result; +} + +void ConnectionMap::update(MemberId myId, const ClusterMap& cluster) { + for (Map::iterator i = map.begin(); i != map.end(); ) { + MemberId member = i->first.getMember(); + if (member != myId && !cluster.isMember(member)) { + i->second->left(); + map.erase(i++); + } else { + i++; + } + } +} + +void ConnectionMap::clear() { + map.clear(); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ConnectionMap.h b/qpid/cpp/src/qpid/cluster/ConnectionMap.h index c355074e75..23084796cf 100644 --- a/qpid/cpp/src/qpid/cluster/ConnectionMap.h +++ b/qpid/cpp/src/qpid/cluster/ConnectionMap.h @@ -24,6 +24,7 @@ #include "types.h" #include "Connection.h" #include "ClusterMap.h" +#include "NoOpConnectionOutputHandler.h" #include "qpid/sys/Mutex.h" #include #include @@ -31,61 +32,48 @@ namespace qpid { namespace cluster { +class Cluster; + /** - * Thread safe map of connections. + * Thread safe map of connections. The map is used in: + * - deliver thread to look connections and create new shadow connections. + * - local catch-up connection threads to add a caught-up shadow connections. + * - local client connection threads when local connections are created. */ -class ConnectionMap -{ +class ConnectionMap { public: typedef boost::intrusive_ptr ConnectionPtr; typedef std::vector Vector; - void insert(ConnectionId id, ConnectionPtr p) { - ScopedLock l(lock); - map.insert(Map::value_type(id,p)); - } + ConnectionMap(Cluster& c) : cluster(c) {} + + /** Insert a local connection or a caught up shadow connection. + * Called in local connection thread. + */ + void insert(ConnectionPtr p); + + /** Erase a closed connection. Called in deliver thread. */ + void erase(const ConnectionId& id); - void erase(ConnectionId id) { - ScopedLock l(lock); - map.erase(id); - } + /** Get an existing connection. */ + ConnectionPtr get(const ConnectionId& id); - ConnectionPtr find(ConnectionId id) const { - ScopedLock l(lock); - Map::const_iterator i = map.find(id); - return i == map.end() ? ConnectionPtr() : i->second; - } + /** Get connections for sending an update. */ + Vector values() const; - Vector values() const { - Vector result(map.size()); - std::transform(map.begin(), map.end(), result.begin(), - boost::bind(&Map::value_type::second, _1)); - return result; - } + /** Remove connections who's members are no longer in the cluster. Deliver thread. */ + void update(MemberId myId, const ClusterMap& cluster); - void update(MemberId myId, const ClusterMap& cluster) { - for (Map::iterator i = map.begin(); i != map.end(); ) { - MemberId member = i->first.getMember(); - if (member != myId && !cluster.isMember(member)) { - i->second->left(); - map.erase(i++); - } else { - i++; - } - } - } + + void clear(); - void clear() { - ScopedLock l(lock); - map.clear(); - } + size_t size() const; - size_t size() const { return map.size(); } private: typedef std::map Map; - typedef sys::Mutex::ScopedLock ScopedLock; - - mutable sys::Mutex lock; + + Cluster& cluster; + NoOpConnectionOutputHandler shadowOut; Map map; }; diff --git a/qpid/cpp/src/qpid/cluster/Decoder.cpp b/qpid/cpp/src/qpid/cluster/Decoder.cpp new file mode 100644 index 0000000000..b2ab7c8d0f --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Decoder.cpp @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Decoder.h" +#include "Event.h" +#include "qpid/framing/Buffer.h" +#include "qpid/ptr_map.h" + +namespace qpid { +namespace cluster { + +using namespace framing; + +Decoder::Decoder(const Handler& h) : handler(h) {} + +void Decoder::decode(const EventHeader& eh, const void* data) { + ConnectionId id = eh.getConnectionId(); + std::pair ib = map.insert(id, new ConnectionDecoder(handler)); + ptr_map_ptr(ib.first)->decode(eh, data); +} + +void Decoder::erase(const ConnectionId& c) { + Map::iterator i = map.find(c); + if (i != map.end()) // FIXME aconway 2009-02-03: + map.erase(i); +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Decoder.h b/qpid/cpp/src/qpid/cluster/Decoder.h new file mode 100644 index 0000000000..dffd6c8f75 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/Decoder.h @@ -0,0 +1,62 @@ +#ifndef QPID_CLUSTER_DECODER_H +#define QPID_CLUSTER_DECODER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionDecoder.h" +#include "types.h" +#include + +namespace qpid { +namespace cluster { + +class EventHeader; + +/** + * Holds a map of ConnectionDecoders. Decodes Events into EventFrames + * and forwards EventFrames to a handler. + * + * THREAD UNSAFE: Called sequentially with un-decoded cluster events from CPG. + */ +class Decoder +{ + public: + typedef boost::function Handler; + + Decoder(const Handler& h); + + /** Takes EventHeader + data rather than Event so that the caller can + * pass a pointer to connection data or a CPG buffer directly without copy. + */ + void decode(const EventHeader& eh, const void* data); + + /** Erase the decoder for a connection. */ + void erase(const ConnectionId&); + + private: + typedef boost::ptr_map Map; + Handler handler; + Map map; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_DECODER_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Event.h b/qpid/cpp/src/qpid/cluster/Event.h index 5df0c96f77..f1de248f89 100644 --- a/qpid/cpp/src/qpid/cluster/Event.h +++ b/qpid/cpp/src/qpid/cluster/Event.h @@ -49,7 +49,12 @@ class EventHeader : public ::qpid::sys::LatencyMetricTimestamp { EventType getType() const { return type; } ConnectionId getConnectionId() const { return connectionId; } MemberId getMemberId() const { return connectionId.getMember(); } + + /** Size of payload data, excluding header. */ size_t getSize() const { return size; } + /** Size of header + payload. */ + size_t getStoreSize() { return size + HEADER_SIZE; } + uint64_t getSequence() const { return sequence; } void setSequence(uint64_t n) { sequence = n; } @@ -88,7 +93,6 @@ class Event : public EventHeader { // Store including header char* getStore() { return store; } const char* getStore() const { return store; } - size_t getStoreSize() { return size + HEADER_SIZE; } operator framing::Buffer() const; diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp index c1f96ad1b2..ba01c170dd 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp +++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp @@ -26,21 +26,14 @@ namespace cluster { EventFrame::EventFrame() : sequence(0) {} -EventFrame::EventFrame( - const boost::intrusive_ptr& c, const Event& e, - const framing::AMQFrame& f, int rc -) : connection(c), member(e.getMemberId()), frame(f), - sequence(e.getSequence()), readCredit(rc) +EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) + : connectionId(e.getConnectionId()), frame(f), sequence(e.getSequence()), readCredit(rc) { QPID_LATENCY_INIT(frame); } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - if (e.connection) - o << e.connection->getId(); - else - o << e.member; - return o << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit; + return o << e.connectionId << "/" << e.sequence << " " << e.frame << " rc=" << e.readCredit; } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.h b/qpid/cpp/src/qpid/cluster/EventFrame.h index 2ef33b9695..7f33cedb5b 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.h +++ b/qpid/cpp/src/qpid/cluster/EventFrame.h @@ -32,22 +32,21 @@ namespace qpid { namespace cluster { -class Connection; - /** * A frame decoded from an Event. */ struct EventFrame { + public: EventFrame(); - EventFrame(const boost::intrusive_ptr& c, const Event& e, - const framing::AMQFrame& f, int rc=0); + EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc=0); - bool isCluster() const { return !connection; } - bool isConnection() const { return connection; } + bool isCluster() const { return !connectionId.getPointer(); } + bool isConnection() const { return connectionId.getPointer(); } bool isLastInEvent() const { return readCredit; } + // True if this frame follows immediately after frame e. bool follows(const EventFrame& e) const { return sequence == e.sequence || (sequence == e.sequence+1 && e.readCredit); @@ -55,8 +54,7 @@ struct EventFrame bool operator<(const EventFrame& e) const { return sequence < e.sequence; } - boost::intrusive_ptr connection; - MemberId member; + ConnectionId connectionId; framing::AMQFrame frame; uint64_t sequence; int readCredit; // last frame in an event, give credit when processed. diff --git a/qpid/cpp/src/qpid/cluster/PollableQueue.h b/qpid/cpp/src/qpid/cluster/PollableQueue.h new file mode 100644 index 0000000000..e0422e2449 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/PollableQueue.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_POLLABLEQUEUE_H +#define QPID_CLUSTER_POLLABLEQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/PollableQueue.h" +#include + +namespace qpid { +namespace cluster { + +/** + * More convenient version of PollableQueue that handles iterating + * over the batch and error handling. + */ +template class PollableQueue : public sys::PollableQueue { + public: + typedef boost::function Callback; + typedef boost::function ErrorCallback; + + PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr& poller) + : sys::PollableQueue(boost::bind(&PollableQueue::handleBatch, this, _1), poller), + callback(f), error(err), message(msg) {} + + void handleBatch(typename sys::PollableQueue::Queue& values) { + try { + typename sys::PollableQueue::Queue::iterator i = values.begin(); + while (i != values.end() && !this->isStopped()) { + callback(*i); + ++i; + } + values.erase(values.begin(), i); + } + catch (const std::exception& e) { + QPID_LOG(error, message << ": " << e.what()); + error(); + } + } + + private: + Callback callback; + ErrorCallback error; + std::string message; +}; + + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_POLLABLEQUEUE_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp index edce1698ee..62c014fcc4 100644 --- a/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp +++ b/qpid/cpp/src/qpid/cluster/Quorum_cman.cpp @@ -35,7 +35,7 @@ void Quorum::init() { enable = true; cman = cman_init(0); if (cman == 0) throw ErrnoException("Can't connect to cman service"); - // FIXME aconway 2008-11-13: configurable max wait. + // TODO aconway 2008-11-13: configurable max wait. for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) { QPID_LOG(info, "Waiting for cluster quorum: " << sys::strError(errno)); sys::sleep(1); diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index 08f09573a4..91d4c6d3ce 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -117,7 +117,10 @@ void UpdateClient::update() { session.close(); std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); - AMQFrame frame(map.asMethodBody()); + + ClusterConnectionMembershipBody membership; + map.toMethodBody(membership); + AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index 0797d472b6..5e0d3d20e3 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -24,11 +24,10 @@ #include "config.h" #include "qpid/Url.h" - +#include #include #include #include - #include extern "C" { @@ -45,6 +44,7 @@ namespace qpid { namespace cluster { class Connection; +typedef boost::intrusive_ptr ConnectionPtr; /** Types of cluster event. */ enum EventType { DATA, CONTROL }; diff --git a/qpid/cpp/src/tests/ForkedBroker.h b/qpid/cpp/src/tests/ForkedBroker.h index bf9e9265c4..925f8011f2 100644 --- a/qpid/cpp/src/tests/ForkedBroker.h +++ b/qpid/cpp/src/tests/ForkedBroker.h @@ -108,6 +108,7 @@ class ForkedBroker { std::vector args2(args); args2.push_back("--port=0"); args2.push_back("--mgmt-enable=no"); // TODO aconway 2008-07-16: why does mgmt cause problems? + if (!::getenv("QPID_TRACE") && !::getenv("QPID_LOG_ENABLE")) args2.push_back("--log-enable=error+"); // Keep quiet except for errors. args2.push_back(0); execv(prog, const_cast(&args2[0])); diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index b7d28bf914..585c981afc 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -219,7 +219,7 @@ QPID_AUTO_TEST_CASE(testSequenceOptions) { ClusterFixture cluster(1); Client c0(cluster[0], "c0"); FieldTable args; - args.setInt("qpid.msg_sequence", 1); // FIXME aconway 2008-11-11: works with "qpid.sequence_counter"?? + args.setInt("qpid.msg_sequence", 1); c0.session.queueDeclare(arg::queue="q"); c0.session.exchangeDeclare(arg::exchange="ex", arg::type="direct", arg::arguments=args); c0.session.exchangeBind(arg::exchange="ex", arg::queue="q", arg::bindingKey="k"); @@ -452,7 +452,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { BOOST_CHECK_EQUAL(kb0, kb2); } -QPID_AUTO_TEST_CASE(UpdateConsumers) { +QPID_AUTO_TEST_CASE(testUpdateConsumers) { ClusterFixture cluster(1, 1); Client c0(cluster[0], "c0"); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index e6cacb0223..c114ef0151 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -36,7 +36,7 @@ -Min + @@ -45,6 +45,7 @@ Min + @@ -53,8 +54,7 @@ Min - - + -- cgit v1.2.1