diff options
author | Alan Conway <aconway@apache.org> | 2009-03-09 17:03:40 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-03-09 17:03:40 +0000 |
commit | 8b47ecc67da479eb0d8b650c8d172bc81b0343f6 (patch) | |
tree | 21f73ebef117dcc680c5e325e937b245c1d627d7 | |
parent | 8a6eafcd203a6211cbee55ee22e5a1928d4aa4b4 (diff) | |
download | qpid-python-8b47ecc67da479eb0d8b650c8d172bc81b0343f6.tar.gz |
Fix cluster TTL: replicte expiry information to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@751760 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/EventFrame.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp | 45 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ExpiryPolicy.h | 21 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/UpdateClient.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/AMQFrame.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/framing/AMQFrame.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 19 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/start_cluster | 1 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 6 |
14 files changed, 98 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 126247e458..f8e412f1e6 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -111,7 +111,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), - frameId(0), lastSize(0), lastBroker(false) { @@ -267,9 +266,6 @@ void Cluster::deliveredFrame(const EventFrame& e) { } else if (state >= CATCHUP) { QPID_LOG(trace, *this << " DLVR: " << e); - EventFrame ef(e); // Non-const copy - if (ef.type == DATA) // Add cluster-id to to data frames. - ef.frame.setClusterId(frameId++); ConnectionPtr connection = getConnection(e.connectionId, l); if (connection) connection->deliveredFrame(e); @@ -475,18 +471,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) { cs.password = settings.password; cs.mechanism = settings.mechanism; updateThread = Thread( - new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder, + new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder, boost::bind(&Cluster::updateOutDone, this), boost::bind(&Cluster::updateOutError, this, _1), cs)); } // Called in update thread. -void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) { +void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); updatedMap = m; - // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently. - frameId = frameId_; checkUpdateIn(l); } diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 5e66db0097..b716e2d781 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -92,7 +92,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void leave(); // Update completed - called in update thread - void updateInDone(const ClusterMap&, uint64_t frameId); + void updateInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; @@ -108,6 +108,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Called only during update by Connection::shadowReady Decoder& getDecoder() { return decoder; } + ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } + private: typedef sys::Monitor::ScopedLock Lock; @@ -115,8 +117,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { typedef PollableQueue<EventFrame> PollableFrameQueue; typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap; - // FIXME aconway 2009-03-07: sort functions by thread - // NB: A dummy Lock& parameter marks functions that must only be // called with Cluster::lock locked. @@ -237,7 +237,6 @@ class Cluster : private Cpg::Handler, public management::Manageable { } state; ConnectionMap connections; - uint64_t frameId; ClusterMap map; ClusterMap::Set elders; size_t lastSize; diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 1889b37e9f..aa7d082720 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -279,9 +279,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size()); } -void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) { +void Connection::membership(const FieldTable& joiners, const FieldTable& members) { QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); - cluster.updateInDone(ClusterMap(joiners, members), frameId); + cluster.updateInDone(ClusterMap(joiners, members)); self.second = 0; // Mark this as completed update connection. } @@ -352,6 +352,10 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi q->setPosition(position); } +void Connection::expiryId(uint64_t id) { + cluster.getExpiryPolicy().setId(id); +} + std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 0659015672..6434f763a8 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -120,7 +120,7 @@ class Connection : void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment); - void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId); + void membership(const framing::FieldTable&, const framing::FieldTable&); void deliveryRecord(const std::string& queue, const framing::SequenceNumber& position, @@ -135,6 +135,7 @@ class Connection : uint32_t credit); void queuePosition(const std::string&, const framing::SequenceNumber&); + void expiryId(uint64_t); void txStart(); void txAccept(const framing::SequenceSet&); diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp index 1a57528c3a..8259b6da6e 100644 --- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp +++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp @@ -33,7 +33,9 @@ EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc) } std::ostream& operator<<(std::ostream& o, const EventFrame& e) { - return o << e.frame << " " << e.type << " " << e.connectionId << " read-credit=" << e.readCredit; + return o << e.frame << " " << e.type << " " << e.connectionId; + if (e.readCredit) o << " read-credit=" << e.readCredit; + return o; } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp index cc451bf661..409180c499 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp @@ -31,46 +31,45 @@ namespace qpid { namespace cluster { ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t) - : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} - -namespace { -uint64_t clusterId(const broker::Message& m) { - assert(m.getFrames().begin() != m.getFrames().end()); - return m.getFrames().begin()->getClusterId(); -} + : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {} struct ExpiryTask : public broker::TimerTask { ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when) - : TimerTask(when), expiryPolicy(policy), messageId(id) {} - void fire() { expiryPolicy->sendExpire(messageId); } + : TimerTask(when), expiryPolicy(policy), expiryId(id) {} + void fire() { expiryPolicy->sendExpire(expiryId); } boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; - const uint64_t messageId; + const uint64_t expiryId; }; -} void ExpiryPolicy::willExpire(broker::Message& m) { - timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration())); + uint64_t id = expiryId++; + assert(unexpiredById.find(id) == unexpiredById.end()); + assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end()); + unexpiredById[id] = &m; + unexpiredByMessage[&m] = id; + timer.add(new ExpiryTask(this, id, m.getExpiration())); } bool ExpiryPolicy::hasExpired(broker::Message& m) { - sys::Mutex::ScopedLock l(lock); - IdSet::iterator i = expired.find(clusterId(m)); - if (i != expired.end()) { - expired.erase(i); - const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true; - return true; - } - return false; + return unexpiredByMessage.find(&m) == unexpiredByMessage.end(); } void ExpiryPolicy::sendExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId); } void ExpiryPolicy::deliverExpire(uint64_t id) { - sys::Mutex::ScopedLock l(lock); - expired.insert(id); + IdMessageMap::iterator i = unexpiredById.find(id); + if (i != unexpiredById.end()) { + i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true; + unexpiredByMessage.erase(i->second); + unexpiredById.erase(i); + } +} + +boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) { + MessageIdMap::iterator i = unexpiredByMessage.find(&m); + return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second; } bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; } diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h index b58946b8f7..9f8b1a9236 100644 --- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h +++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h @@ -27,11 +27,15 @@ #include "qpid/sys/Mutex.h" #include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> -#include <set> +#include <boost/optional.hpp> +#include <map> namespace qpid { -namespace broker { class Timer; } +namespace broker { +class Timer; +class Message; +} namespace cluster { class Multicaster; @@ -54,16 +58,23 @@ class ExpiryPolicy : public broker::ExpiryPolicy // Cluster delivers expiry notice. void deliverExpire(uint64_t); + void setId(uint64_t id) { expiryId = id; } + uint64_t getId() const { return expiryId; } + + boost::optional<uint64_t> getId(broker::Message&); + private: - sys::Mutex lock; - typedef std::set<uint64_t> IdSet; + typedef std::map<broker::Message*, uint64_t> MessageIdMap; + typedef std::map<uint64_t, broker::Message*> IdMessageMap; struct Expired : public broker::ExpiryPolicy { bool hasExpired(broker::Message&); void willExpire(broker::Message&); }; - IdSet expired; + MessageIdMap unexpiredByMessage; + IdMessageMap unexpiredById; + uint64_t expiryId; boost::intrusive_ptr<Expired> expiredPolicy; Multicaster& mcast; MemberId memberId; diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index cf1633e40b..97eae7efa3 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -23,6 +23,7 @@ #include "ClusterMap.h" #include "Connection.h" #include "Decoder.h" +#include "ExpiryPolicy.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/ConnectionAccess.h" #include "qpid/broker/Broker.h" @@ -87,14 +88,14 @@ void send(client::AsyncSession& s, const AMQBody& body) { // TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, - broker::Broker& broker, const ClusterMap& m, uint64_t frameId_, + broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_, const Cluster::ConnectionVector& cons, Decoder& decoder_, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail, const client::ConnectionSettings& cs ) : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), - frameId(frameId_), connections(cons), decoder(decoder_), + expiry(expiry_), connections(cons), decoder(decoder_), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail), connectionSettings(cs) { @@ -129,9 +130,9 @@ void UpdateClient::update() { std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + ClusterConnectionProxy(session).expiryId(expiry.getId()); ClusterConnectionMembershipBody membership; map.toMethodBody(membership); - membership.setFrameId(frameId); AMQFrame frame(membership); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); @@ -150,8 +151,7 @@ template <class T> std::string encode(const T& t) { void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { QPID_LOG(debug, updaterId << " updating exchange " << ex->getName()); - ClusterConnectionProxy proxy(session); - proxy.exchange(encode(*ex)); + ClusterConnectionProxy(session).exchange(encode(*ex)); } /** Bind a queue to the update exchange and update messges to it @@ -162,10 +162,11 @@ class MessageUpdater { bool haveLastPos; framing::SequenceNumber lastPos; client::AsyncSession session; - + ExpiryPolicy& expiry; + public: - MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { + MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) { session.exchangeBind(queue, UpdateClient::UPDATE); } @@ -181,11 +182,20 @@ class MessageUpdater { void updateQueuedMessage(const broker::QueuedMessage& message) { + // Send the queue position if necessary. if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); haveLastPos = true; } lastPos = message.position; + + // Send the expiry ID if necessary. + if (message.payload->getProperties<DeliveryProperties>()->getTtl()) { + boost::optional<uint64_t> expiryId = expiry.getId(*message.payload); + if (!expiryId) return; // Message already expired, don't replicate. + ClusterConnectionProxy(session).expiryId(*expiryId); + } + SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); @@ -214,7 +224,7 @@ void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) { QPID_LOG(debug, updaterId << " updating queue " << q->getName()); ClusterConnectionProxy proxy(session); proxy.queue(encode(*q)); - MessageUpdater updater(q->getName(), session); + MessageUpdater updater(q->getName(), session, expiry); q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1)); } @@ -323,7 +333,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { // If the message is acquired then it is no longer on the // updatees queue, put it on the update queue for updatee to pick up. // - MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage()); } ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), @@ -342,8 +352,8 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { public: - TxOpUpdater(UpdateClient& dc, client::AsyncSession s) - : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {} + TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry) + : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {} void operator()(const broker::DtxAck& ) { throw InternalErrorException("DTX transactions not currently supported by cluster."); @@ -386,7 +396,7 @@ void UpdateClient::updateTxState(broker::SemanticState& s) { broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); if (txBuffer) { proxy.txStart(); - TxOpUpdater updater(*this, shadowSession); + TxOpUpdater updater(*this, shadowSession, expiry); txBuffer->accept(updater); proxy.txEnd(); } diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h index a0813d0a17..23d061b7e4 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.h +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h @@ -56,6 +56,7 @@ class Cluster; class Connection; class ClusterMap; class Decoder; +class ExpiryPolicy; /** * A client that updates the contents of a local broker to a remote one using AMQP. @@ -65,7 +66,7 @@ class UpdateClient : public sys::Runnable { static const std::string UPDATE; // Name for special update queue and exchange. UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, - broker::Broker& donor, const ClusterMap& map, uint64_t frameId, + broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry, const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&, const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail, @@ -94,7 +95,7 @@ class UpdateClient : public sys::Runnable { Url updateeUrl; broker::Broker& updaterBroker; ClusterMap map; - uint64_t frameId; + ExpiryPolicy& expiry; std::vector<boost::intrusive_ptr<Connection> > connections; Decoder& decoder; client::Connection connection, shadowConnection; diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp index e56cf5c546..9473b2a513 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp +++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp @@ -35,7 +35,6 @@ void AMQFrame::init() { subchannel=0; channel=0; encodedSizeCache = 0; - clusterId = 0; } AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); } diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h index 028d0c1d8a..02a1ea4622 100644 --- a/qpid/cpp/src/qpid/framing/AMQFrame.h +++ b/qpid/cpp/src/qpid/framing/AMQFrame.h @@ -92,9 +92,6 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp /** Must point to at least DECODE_SIZE_MIN bytes of data */ static uint16_t decodeSize(char* data); - uint64_t getClusterId() const { return clusterId; } - void setClusterId(uint64_t id) { clusterId = id; } - private: void init(); @@ -106,7 +103,6 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp bool bos : 1; bool eos : 1; mutable uint32_t encodedSizeCache; - uint64_t clusterId; // Used to identify frames in a clustered broekr. }; std::ostream& operator<<(std::ostream&, const AMQFrame&); diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 1acb2d4bf4..2ff52c1709 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -151,7 +151,10 @@ vector<string> browse(Client& c, const string& q, int n) { c.subs.subscribe(lq, q, browseSettings); vector<string> result; for (int i = 0; i < n; ++i) { - result.push_back(lq.get(TIMEOUT).getData()); + Message m; + if (!lq.get(m, TIMEOUT)) + break; + result.push_back(m.getData()); } c.subs.getSubscription(q).cancel(); return result; @@ -202,13 +205,23 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) { ClusterFixture cluster(2); Client c0(cluster[0], "c0"); Client c1(cluster[1], "c1"); + c0.session.queueDeclare("p"); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200)); c0.session.messageTransfer(arg::content=Message("b", "q")); - BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b")); - sys::usleep(300*1000); + c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000)); + c0.session.messageTransfer(arg::content=Message("y", "p")); + cluster.add(); + Client c2(cluster[1], "c2"); + + BOOST_CHECK_EQUAL(browse(c0, "p", 2), list_of<string>("x")("y")); + BOOST_CHECK_EQUAL(browse(c1, "p", 2), list_of<string>("x")("y")); + BOOST_CHECK_EQUAL(browse(c2, "p", 2), list_of<string>("x")("y")); + + sys::usleep(200*1000); BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b")); BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b")); + BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b")); } QPID_AUTO_TEST_CASE(testSequenceOptions) { diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster index 48e299d942..053b23da33 100755 --- a/qpid/cpp/src/tests/start_cluster +++ b/qpid/cpp/src/tests/start_cluster @@ -28,7 +28,6 @@ with_ais_group() { echo $* | newgrp ais } -test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } rm -f cluster*.log SIZE=${1:-1}; shift CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index d3e4b488fb..8fdde0ada6 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -132,7 +132,6 @@ <control name="membership" code="0x21" label="Cluster membership details."> <field name="joiners" type="map"/> <!-- member-id -> URL --> <field name="members" type="map"/> <!-- member-id -> state --> - <field name="frame-id" type="uint64"/>> <!-- Frame id counter value --> </control> <!-- Set the position of a replicated queue. --> @@ -140,11 +139,12 @@ <field name="queue" type="str8"/> <field name="position" type="sequence-no"/> </control> - + <!-- Replicate encoded exchanges/queues. --> <control name="exchange" code="0x31"><field name="encoded" type="str32"/></control> <control name="queue" code="0x32"><field name="encoded" type="str32"/></control> - + <!-- Set expiry-id for subsequent messages. --> + <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control> </class> </amqp> |