summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-03-09 17:03:40 +0000
committerAlan Conway <aconway@apache.org>2009-03-09 17:03:40 +0000
commit8b47ecc67da479eb0d8b650c8d172bc81b0343f6 (patch)
tree21f73ebef117dcc680c5e325e937b245c1d627d7
parent8a6eafcd203a6211cbee55ee22e5a1928d4aa4b4 (diff)
downloadqpid-python-8b47ecc67da479eb0d8b650c8d172bc81b0343f6.tar.gz
Fix cluster TTL: replicte expiry information to newcomers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@751760 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp10
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h7
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.cpp8
-rw-r--r--qpid/cpp/src/qpid/cluster/Connection.h3
-rw-r--r--qpid/cpp/src/qpid/cluster/EventFrame.cpp4
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp45
-rw-r--r--qpid/cpp/src/qpid/cluster/ExpiryPolicy.h21
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.cpp34
-rw-r--r--qpid/cpp/src/qpid/cluster/UpdateClient.h5
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.cpp1
-rw-r--r--qpid/cpp/src/qpid/framing/AMQFrame.h4
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp19
-rwxr-xr-xqpid/cpp/src/tests/start_cluster1
-rw-r--r--qpid/cpp/xml/cluster.xml6
14 files changed, 98 insertions, 70 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index 126247e458..f8e412f1e6 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -111,7 +111,6 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
- frameId(0),
lastSize(0),
lastBroker(false)
{
@@ -267,9 +266,6 @@ void Cluster::deliveredFrame(const EventFrame& e) {
}
else if (state >= CATCHUP) {
QPID_LOG(trace, *this << " DLVR: " << e);
- EventFrame ef(e); // Non-const copy
- if (ef.type == DATA) // Add cluster-id to to data frames.
- ef.frame.setClusterId(frameId++);
ConnectionPtr connection = getConnection(e.connectionId, l);
if (connection)
connection->deliveredFrame(e);
@@ -475,18 +471,16 @@ void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock& l) {
cs.password = settings.password;
cs.mechanism = settings.mechanism;
updateThread = Thread(
- new UpdateClient(self, updatee, url, broker, map, frameId, getConnections(l), decoder,
+ new UpdateClient(self, updatee, url, broker, map, *expiryPolicy, getConnections(l), decoder,
boost::bind(&Cluster::updateOutDone, this),
boost::bind(&Cluster::updateOutError, this, _1),
cs));
}
// Called in update thread.
-void Cluster::updateInDone(const ClusterMap& m, uint64_t frameId_) {
+void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
updatedMap = m;
- // Safe to set frameId here because we are stalled: deliveredFrame cannot be called concurrently.
- frameId = frameId_;
checkUpdateIn(l);
}
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 5e66db0097..b716e2d781 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -92,7 +92,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void leave();
// Update completed - called in update thread
- void updateInDone(const ClusterMap&, uint64_t frameId);
+ void updateInDone(const ClusterMap&);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -108,6 +108,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Called only during update by Connection::shadowReady
Decoder& getDecoder() { return decoder; }
+ ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
+
private:
typedef sys::Monitor::ScopedLock Lock;
@@ -115,8 +117,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
typedef PollableQueue<EventFrame> PollableFrameQueue;
typedef std::map<ConnectionId, ConnectionPtr> ConnectionMap;
- // FIXME aconway 2009-03-07: sort functions by thread
-
// NB: A dummy Lock& parameter marks functions that must only be
// called with Cluster::lock locked.
@@ -237,7 +237,6 @@ class Cluster : private Cpg::Handler, public management::Manageable {
} state;
ConnectionMap connections;
- uint64_t frameId;
ClusterMap map;
ClusterMap::Set elders;
size_t lastSize;
diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp
index 1889b37e9f..aa7d082720 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.cpp
+++ b/qpid/cpp/src/qpid/cluster/Connection.cpp
@@ -279,9 +279,9 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const str
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
}
-void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
+void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
- cluster.updateInDone(ClusterMap(joiners, members), frameId);
+ cluster.updateInDone(ClusterMap(joiners, members));
self.second = 0; // Mark this as completed update connection.
}
@@ -352,6 +352,10 @@ void Connection::queuePosition(const string& qname, const SequenceNumber& positi
q->setPosition(position);
}
+void Connection::expiryId(uint64_t id) {
+ cluster.getExpiryPolicy().setId(id);
+}
+
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h
index 0659015672..6434f763a8 100644
--- a/qpid/cpp/src/qpid/cluster/Connection.h
+++ b/qpid/cpp/src/qpid/cluster/Connection.h
@@ -120,7 +120,7 @@ class Connection :
void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
- void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameId);
+ void membership(const framing::FieldTable&, const framing::FieldTable&);
void deliveryRecord(const std::string& queue,
const framing::SequenceNumber& position,
@@ -135,6 +135,7 @@ class Connection :
uint32_t credit);
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void expiryId(uint64_t);
void txStart();
void txAccept(const framing::SequenceSet&);
diff --git a/qpid/cpp/src/qpid/cluster/EventFrame.cpp b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
index 1a57528c3a..8259b6da6e 100644
--- a/qpid/cpp/src/qpid/cluster/EventFrame.cpp
+++ b/qpid/cpp/src/qpid/cluster/EventFrame.cpp
@@ -33,7 +33,9 @@ EventFrame::EventFrame(const EventHeader& e, const framing::AMQFrame& f, int rc)
}
std::ostream& operator<<(std::ostream& o, const EventFrame& e) {
- return o << e.frame << " " << e.type << " " << e.connectionId << " read-credit=" << e.readCredit;
+ return o << e.frame << " " << e.type << " " << e.connectionId;
+ if (e.readCredit) o << " read-credit=" << e.readCredit;
+ return o;
}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index cc451bf661..409180c499 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -31,46 +31,45 @@ namespace qpid {
namespace cluster {
ExpiryPolicy::ExpiryPolicy(Multicaster& m, const MemberId& id, broker::Timer& t)
- : expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
-
-namespace {
-uint64_t clusterId(const broker::Message& m) {
- assert(m.getFrames().begin() != m.getFrames().end());
- return m.getFrames().begin()->getClusterId();
-}
+ : expiryId(0), expiredPolicy(new Expired), mcast(m), memberId(id), timer(t) {}
struct ExpiryTask : public broker::TimerTask {
ExpiryTask(const boost::intrusive_ptr<ExpiryPolicy>& policy, uint64_t id, sys::AbsTime when)
- : TimerTask(when), expiryPolicy(policy), messageId(id) {}
- void fire() { expiryPolicy->sendExpire(messageId); }
+ : TimerTask(when), expiryPolicy(policy), expiryId(id) {}
+ void fire() { expiryPolicy->sendExpire(expiryId); }
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
- const uint64_t messageId;
+ const uint64_t expiryId;
};
-}
void ExpiryPolicy::willExpire(broker::Message& m) {
- timer.add(new ExpiryTask(this, clusterId(m), m.getExpiration()));
+ uint64_t id = expiryId++;
+ assert(unexpiredById.find(id) == unexpiredById.end());
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById[id] = &m;
+ unexpiredByMessage[&m] = id;
+ timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
bool ExpiryPolicy::hasExpired(broker::Message& m) {
- sys::Mutex::ScopedLock l(lock);
- IdSet::iterator i = expired.find(clusterId(m));
- if (i != expired.end()) {
- expired.erase(i);
- const_cast<broker::Message&>(m).setExpiryPolicy(expiredPolicy); // hasExpired() == true;
- return true;
- }
- return false;
+ return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
void ExpiryPolicy::sendExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
void ExpiryPolicy::deliverExpire(uint64_t id) {
- sys::Mutex::ScopedLock l(lock);
- expired.insert(id);
+ IdMessageMap::iterator i = unexpiredById.find(id);
+ if (i != unexpiredById.end()) {
+ i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
+ unexpiredByMessage.erase(i->second);
+ unexpiredById.erase(i);
+ }
+}
+
+boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ MessageIdMap::iterator i = unexpiredByMessage.find(&m);
+ return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
bool ExpiryPolicy::Expired::hasExpired(broker::Message&) { return true; }
diff --git a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
index b58946b8f7..9f8b1a9236 100644
--- a/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/qpid/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -27,11 +27,15 @@
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
-#include <set>
+#include <boost/optional.hpp>
+#include <map>
namespace qpid {
-namespace broker { class Timer; }
+namespace broker {
+class Timer;
+class Message;
+}
namespace cluster {
class Multicaster;
@@ -54,16 +58,23 @@ class ExpiryPolicy : public broker::ExpiryPolicy
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
+ void setId(uint64_t id) { expiryId = id; }
+ uint64_t getId() const { return expiryId; }
+
+ boost::optional<uint64_t> getId(broker::Message&);
+
private:
- sys::Mutex lock;
- typedef std::set<uint64_t> IdSet;
+ typedef std::map<broker::Message*, uint64_t> MessageIdMap;
+ typedef std::map<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
- IdSet expired;
+ MessageIdMap unexpiredByMessage;
+ IdMessageMap unexpiredById;
+ uint64_t expiryId;
boost::intrusive_ptr<Expired> expiredPolicy;
Multicaster& mcast;
MemberId memberId;
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
index cf1633e40b..97eae7efa3 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -23,6 +23,7 @@
#include "ClusterMap.h"
#include "Connection.h"
#include "Decoder.h"
+#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
@@ -87,14 +88,14 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- frameId(frameId_), connections(cons), decoder(decoder_),
+ expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -129,9 +130,9 @@ void UpdateClient::update() {
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
- membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
@@ -150,8 +151,7 @@ template <class T> std::string encode(const T& t) {
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
- ClusterConnectionProxy proxy(session);
- proxy.exchange(encode(*ex));
+ ClusterConnectionProxy(session).exchange(encode(*ex));
}
/** Bind a queue to the update exchange and update messges to it
@@ -162,10 +162,11 @@ class MessageUpdater {
bool haveLastPos;
framing::SequenceNumber lastPos;
client::AsyncSession session;
-
+ ExpiryPolicy& expiry;
+
public:
- MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+ MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
session.exchangeBind(queue, UpdateClient::UPDATE);
}
@@ -181,11 +182,20 @@ class MessageUpdater {
void updateQueuedMessage(const broker::QueuedMessage& message) {
+ // Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
}
lastPos = message.position;
+
+ // Send the expiry ID if necessary.
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+ boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+ if (!expiryId) return; // Message already expired, don't replicate.
+ ClusterConnectionProxy(session).expiryId(*expiryId);
+ }
+
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
@@ -214,7 +224,7 @@ void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
- MessageUpdater updater(q->getName(), session);
+ MessageUpdater updater(q->getName(), session, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
}
@@ -323,7 +333,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
}
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
@@ -342,8 +352,8 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
public:
- TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
- : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
+ TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
+ : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
@@ -386,7 +396,7 @@ void UpdateClient::updateTxState(broker::SemanticState& s) {
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
if (txBuffer) {
proxy.txStart();
- TxOpUpdater updater(*this, shadowSession);
+ TxOpUpdater updater(*this, shadowSession, expiry);
txBuffer->accept(updater);
proxy.txEnd();
}
diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.h b/qpid/cpp/src/qpid/cluster/UpdateClient.h
index a0813d0a17..23d061b7e4 100644
--- a/qpid/cpp/src/qpid/cluster/UpdateClient.h
+++ b/qpid/cpp/src/qpid/cluster/UpdateClient.h
@@ -56,6 +56,7 @@ class Cluster;
class Connection;
class ClusterMap;
class Decoder;
+class ExpiryPolicy;
/**
* A client that updates the contents of a local broker to a remote one using AMQP.
@@ -65,7 +66,7 @@ class UpdateClient : public sys::Runnable {
static const std::string UPDATE; // Name for special update queue and exchange.
UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
- broker::Broker& donor, const ClusterMap& map, uint64_t frameId,
+ broker::Broker& donor, const ClusterMap& map, ExpiryPolicy& expiry,
const std::vector<boost::intrusive_ptr<Connection> >&, Decoder&,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail,
@@ -94,7 +95,7 @@ class UpdateClient : public sys::Runnable {
Url updateeUrl;
broker::Broker& updaterBroker;
ClusterMap map;
- uint64_t frameId;
+ ExpiryPolicy& expiry;
std::vector<boost::intrusive_ptr<Connection> > connections;
Decoder& decoder;
client::Connection connection, shadowConnection;
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.cpp b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
index e56cf5c546..9473b2a513 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.cpp
@@ -35,7 +35,6 @@ void AMQFrame::init() {
subchannel=0;
channel=0;
encodedSizeCache = 0;
- clusterId = 0;
}
AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); }
diff --git a/qpid/cpp/src/qpid/framing/AMQFrame.h b/qpid/cpp/src/qpid/framing/AMQFrame.h
index 028d0c1d8a..02a1ea4622 100644
--- a/qpid/cpp/src/qpid/framing/AMQFrame.h
+++ b/qpid/cpp/src/qpid/framing/AMQFrame.h
@@ -92,9 +92,6 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
/** Must point to at least DECODE_SIZE_MIN bytes of data */
static uint16_t decodeSize(char* data);
- uint64_t getClusterId() const { return clusterId; }
- void setClusterId(uint64_t id) { clusterId = id; }
-
private:
void init();
@@ -106,7 +103,6 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
bool bos : 1;
bool eos : 1;
mutable uint32_t encodedSizeCache;
- uint64_t clusterId; // Used to identify frames in a clustered broekr.
};
std::ostream& operator<<(std::ostream&, const AMQFrame&);
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index 1acb2d4bf4..2ff52c1709 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -151,7 +151,10 @@ vector<string> browse(Client& c, const string& q, int n) {
c.subs.subscribe(lq, q, browseSettings);
vector<string> result;
for (int i = 0; i < n; ++i) {
- result.push_back(lq.get(TIMEOUT).getData());
+ Message m;
+ if (!lq.get(m, TIMEOUT))
+ break;
+ result.push_back(m.getData());
}
c.subs.getSubscription(q).cancel();
return result;
@@ -202,13 +205,23 @@ QPID_AUTO_TEST_CASE(testMessageTimeToLive) {
ClusterFixture cluster(2);
Client c0(cluster[0], "c0");
Client c1(cluster[1], "c1");
+ c0.session.queueDeclare("p");
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=ttlMessage("a", "q", 200));
c0.session.messageTransfer(arg::content=Message("b", "q"));
- BOOST_CHECK_EQUAL(browse(c1, "q", 2), list_of<string>("a")("b"));
- sys::usleep(300*1000);
+ c0.session.messageTransfer(arg::content=ttlMessage("x", "p", 10000));
+ c0.session.messageTransfer(arg::content=Message("y", "p"));
+ cluster.add();
+ Client c2(cluster[1], "c2");
+
+ BOOST_CHECK_EQUAL(browse(c0, "p", 2), list_of<string>("x")("y"));
+ BOOST_CHECK_EQUAL(browse(c1, "p", 2), list_of<string>("x")("y"));
+ BOOST_CHECK_EQUAL(browse(c2, "p", 2), list_of<string>("x")("y"));
+
+ sys::usleep(200*1000);
BOOST_CHECK_EQUAL(browse(c0, "q", 1), list_of<string>("b"));
BOOST_CHECK_EQUAL(browse(c1, "q", 1), list_of<string>("b"));
+ BOOST_CHECK_EQUAL(browse(c2, "q", 1), list_of<string>("b"));
}
QPID_AUTO_TEST_CASE(testSequenceOptions) {
diff --git a/qpid/cpp/src/tests/start_cluster b/qpid/cpp/src/tests/start_cluster
index 48e299d942..053b23da33 100755
--- a/qpid/cpp/src/tests/start_cluster
+++ b/qpid/cpp/src/tests/start_cluster
@@ -28,7 +28,6 @@ with_ais_group() {
echo $* | newgrp ais
}
-test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
rm -f cluster*.log
SIZE=${1:-1}; shift
CLUSTER=`pwd` # Cluster name=pwd, avoid clashes.
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index d3e4b488fb..8fdde0ada6 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -132,7 +132,6 @@
<control name="membership" code="0x21" label="Cluster membership details.">
<field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
- <field name="frame-id" type="uint64"/>> <!-- Frame id counter value -->
</control>
<!-- Set the position of a replicated queue. -->
@@ -140,11 +139,12 @@
<field name="queue" type="str8"/>
<field name="position" type="sequence-no"/>
</control>
-
+
<!-- Replicate encoded exchanges/queues. -->
<control name="exchange" code="0x31"><field name="encoded" type="str32"/></control>
<control name="queue" code="0x32"><field name="encoded" type="str32"/></control>
-
+ <!-- Set expiry-id for subsequent messages. -->
+ <control name="expiry-id" code="0x33"><field name="expiry-id" type="uint64"/></control>
</class>
</amqp>