summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/UpdateClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/UpdateClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp34
1 files changed, 22 insertions, 12 deletions
diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index cf1633e40b..97eae7efa3 100644
--- a/cpp/src/qpid/cluster/UpdateClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -23,6 +23,7 @@
#include "ClusterMap.h"
#include "Connection.h"
#include "Decoder.h"
+#include "ExpiryPolicy.h"
#include "qpid/client/SessionBase_0_10Access.h"
#include "qpid/client/ConnectionAccess.h"
#include "qpid/broker/Broker.h"
@@ -87,14 +88,14 @@ void send(client::AsyncSession& s, const AMQBody& body) {
// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
- broker::Broker& broker, const ClusterMap& m, uint64_t frameId_,
+ broker::Broker& broker, const ClusterMap& m, ExpiryPolicy& expiry_,
const Cluster::ConnectionVector& cons, Decoder& decoder_,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail,
const client::ConnectionSettings& cs
)
: updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m),
- frameId(frameId_), connections(cons), decoder(decoder_),
+ expiry(expiry_), connections(cons), decoder(decoder_),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail), connectionSettings(cs)
{
@@ -129,9 +130,9 @@ void UpdateClient::update() {
std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
+ ClusterConnectionProxy(session).expiryId(expiry.getId());
ClusterConnectionMembershipBody membership;
map.toMethodBody(membership);
- membership.setFrameId(frameId);
AMQFrame frame(membership);
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
@@ -150,8 +151,7 @@ template <class T> std::string encode(const T& t) {
void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
QPID_LOG(debug, updaterId << " updating exchange " << ex->getName());
- ClusterConnectionProxy proxy(session);
- proxy.exchange(encode(*ex));
+ ClusterConnectionProxy(session).exchange(encode(*ex));
}
/** Bind a queue to the update exchange and update messges to it
@@ -162,10 +162,11 @@ class MessageUpdater {
bool haveLastPos;
framing::SequenceNumber lastPos;
client::AsyncSession session;
-
+ ExpiryPolicy& expiry;
+
public:
- MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+ MessageUpdater(const string& q, const client::AsyncSession s, ExpiryPolicy& expiry_) : queue(q), haveLastPos(false), session(s), expiry(expiry_) {
session.exchangeBind(queue, UpdateClient::UPDATE);
}
@@ -181,11 +182,20 @@ class MessageUpdater {
void updateQueuedMessage(const broker::QueuedMessage& message) {
+ // Send the queue position if necessary.
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
}
lastPos = message.position;
+
+ // Send the expiry ID if necessary.
+ if (message.payload->getProperties<DeliveryProperties>()->getTtl()) {
+ boost::optional<uint64_t> expiryId = expiry.getId(*message.payload);
+ if (!expiryId) return; // Message already expired, don't replicate.
+ ClusterConnectionProxy(session).expiryId(*expiryId);
+ }
+
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
@@ -214,7 +224,7 @@ void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
QPID_LOG(debug, updaterId << " updating queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
- MessageUpdater updater(q->getName(), session);
+ MessageUpdater updater(q->getName(), session, expiry);
q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
}
@@ -323,7 +333,7 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
// If the message is acquired then it is no longer on the
// updatees queue, put it on the update queue for updatee to pick up.
//
- MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession, expiry).updateQueuedMessage(dr.getMessage());
}
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
@@ -342,8 +352,8 @@ void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
public:
- TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
- : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
+ TxOpUpdater(UpdateClient& dc, client::AsyncSession s, ExpiryPolicy& expiry)
+ : MessageUpdater(UpdateClient::UPDATE, s, expiry), parent(dc), session(s), proxy(s) {}
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
@@ -386,7 +396,7 @@ void UpdateClient::updateTxState(broker::SemanticState& s) {
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
if (txBuffer) {
proxy.txStart();
- TxOpUpdater updater(*this, shadowSession);
+ TxOpUpdater updater(*this, shadowSession, expiry);
txBuffer->accept(updater);
proxy.txEnd();
}