summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/DumpClient.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-10-30 21:07:28 +0000
committerAlan Conway <aconway@apache.org>2008-10-30 21:07:28 +0000
commit364b62744a35f7e48332af00217a1848345cd39a (patch)
treed8cab9a5c824133e4429919a9c129beaec0ce456 /cpp/src/qpid/cluster/DumpClient.cpp
parent5e79599484a675baabf45e6b2c50635dbd6b1119 (diff)
downloadqpid-python-364b62744a35f7e48332af00217a1848345cd39a.tar.gz
Replicate session state for un-acknowledged messages to new cluster members.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709242 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/DumpClient.cpp')
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp98
1 files changed, 73 insertions, 25 deletions
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 802019feb1..40852a0411 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -56,14 +56,11 @@ namespace arg=client::arg;
using client::SessionBase_0_10Access;
struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection {
- ClusterConnectionProxy(client::Connection& c) :
+ ClusterConnectionProxy(client::Connection c) :
AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {}
+ ClusterConnectionProxy(client::AsyncSession s) :
+ AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {}
};
-struct ClusterProxy : public AMQP_AllProxy::Cluster {
- ClusterProxy(client::Connection& c) :
- AMQP_AllProxy::Cluster(*client::ConnectionAccess::getImpl(c)) {}
-};
-
// Create a connection with special version that marks it as a catch-up connection.
client::Connection catchUpConnection() {
@@ -73,7 +70,7 @@ client::Connection catchUpConnection() {
}
// Send a control body directly to the session.
-void send(client::Session& s, const AMQBody& body) {
+void send(client::AsyncSession& s, const AMQBody& body) {
client::SessionBase_0_10Access sb(s);
sb.get()->send(body);
}
@@ -94,19 +91,23 @@ DumpClient::DumpClient(const MemberId& from, const MemberId& to, const Url& url,
DumpClient::~DumpClient() {}
-// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
-static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
-static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
+// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
+static const char DUMP_CHARS[] = "\000qpid-dump";
+const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS));
void DumpClient::dump() {
QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
Broker& b = dumperBroker;
b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
- // Catch-up exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
+
+ // Dump exchange is used to route messages to the proper queue without modifying routing key.
+ session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+// Dump queue is used to transfer acquired messages that are no longer on their original queue.
+ session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
session.sync();
session.close();
+
std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
AMQFrame frame(map.asMethodBody());
client::ConnectionAccess::getImpl(connection)->handle(frame);
@@ -134,6 +135,39 @@ void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
arg::arguments=ex->getArgs());
}
+/** Bind a queue to the dump exchange and dump messges to it
+ * setting the message possition as needed.
+ */
+class MessageDumper {
+ std::string queue;
+ bool haveLastPos;
+ framing::SequenceNumber lastPos;
+ client::AsyncSession session;
+
+ public:
+
+ MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+ session.exchangeBind(queue, DumpClient::DUMP);
+ }
+
+ ~MessageDumper() {
+ session.exchangeUnbind(queue, DumpClient::DUMP);
+ }
+
+ void dump(const broker::QueuedMessage& message) {
+ if (!haveLastPos || message.position - lastPos != 1) {
+ ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
+ haveLastPos = true;
+ }
+ lastPos = message.position;
+ SessionBase_0_10Access sb(session);
+ framing::MessageTransferBody transfer(
+ framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+ sb.get()->send(transfer, message.payload->getFrames());
+ }
+};
+
+
void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
session.queueDeclare(
q->getName(),
@@ -143,19 +177,11 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
arg::exclusive=q->hasExclusiveConsumer(),
arg::autoDelete=q->isAutoDelete(),
arg::arguments=q->getSettings());
-
- session.exchangeBind(q->getName(), CATCH_UP, std::string());
- q->eachMessage(boost::bind(&DumpClient::dumpMessage, this, _1));
- session.exchangeUnbind(q->getName(), CATCH_UP, std::string());
+ MessageDumper dumper(q->getName(), session);
+ q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
}
-void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
- SessionBase_0_10Access sb(session);
- framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
- sb.get()->send(transfer, message.payload->getFrames());
-}
void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
@@ -190,11 +216,11 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- // For reasons unknown, boost::bind does not work here with boost 1.33.
+ // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+ ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
-
// Adjust for message in progress, will be sent after state update.
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
@@ -221,7 +247,7 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
}
-void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
+void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
@@ -246,5 +272,27 @@ void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
client::SessionBase_0_10Access(shadowSession).get()->send(state);
QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
}
+
+void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
+ assert(dr.isEnded() || dr.getMessage().payload);
+
+ if (!dr.isEnded() && dr.isAcquired()) {
+ // If the message is acquired then it is no longer on the
+ // dumpees queue, put it on the dump queue for dumpee to pick up.
+ //
+ MessageDumper(DUMP, shadowSession).dump(dr.getMessage());
+ }
+ ClusterConnectionProxy(shadowSession).deliveryRecord(
+ dr.getQueue()->getName(),
+ dr.getMessage().position,
+ dr.getTag(),
+ dr.getId(),
+ dr.isAcquired(),
+ dr.isAccepted(),
+ dr.isCancelled(),
+ dr.isComplete(),
+ dr.isEnded(),
+ dr.isWindowing());
+}
}} // namespace qpid::cluster