summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp52
-rw-r--r--cpp/src/qpid/cluster/Connection.h15
-rw-r--r--cpp/src/qpid/cluster/DumpClient.cpp35
-rw-r--r--cpp/src/qpid/cluster/DumpClient.h7
4 files changed, 78 insertions, 31 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 604df9dde6..ada26ab2fb 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,6 +24,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxPublish.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -35,6 +36,14 @@
#include <boost/current_function.hpp>
+// FIXME aconway 2008-11-03:
+//
+// Disproportionate amount of code here is dedicated to receiving a
+// brain-dump when joining a cluster and building initial
+// state. Should be separated out into its own classes.
+//
+
+
namespace qpid {
namespace cluster {
@@ -180,10 +189,16 @@ void Connection::deliverBuffer(Buffer& buf) {
delivered(mcastDecoder.frame);
}
+broker::SessionState& Connection::sessionState() {
+ return *connection.getChannel(currentChannel).getSession();
+}
+
+broker::SemanticState& Connection::semanticState() {
+ return sessionState().getSemanticState();
+}
+
void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled) {
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- broker::SemanticState::ConsumerImpl& c = s->getConsumer(name);
+ broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
c.setBlocked(blocked);
if (notifyEnabled) c.enableNotify(); else c.disableNotify();
}
@@ -197,9 +212,7 @@ void Connection::sessionState(
const SequenceSet& unknownCompleted,
const SequenceSet& receivedIncomplete)
{
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- s->setState(
+ sessionState().setState(
replayStart,
sendCommandPoint,
sentIncomplete,
@@ -207,7 +220,7 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state dump for " << s->getId());
+ QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -234,6 +247,15 @@ bool Connection::isDumped() const {
return self.first == cluster.getId() && self.second == 0;
}
+broker::QueuedMessage Connection::getDumpMessage() {
+ // Get a message from the DUMP queue.
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+ if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
+ broker::QueuedMessage m = dumpQueue->get();
+ if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+ return m;
+}
+
void Connection::deliveryRecord(const string& qname,
const SequenceNumber& position,
const string& tag,
@@ -245,15 +267,14 @@ void Connection::deliveryRecord(const string& qname,
bool ended,
bool windowing)
{
- broker::QueuedMessage m;
broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
-
+ broker::QueuedMessage m;
if (!ended) { // Has a message
- if (acquired) // Message at front of dump queue
+ if (acquired) { // Message at front of dump queue
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
m = dumpQueue->get();
+ }
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
@@ -266,10 +287,7 @@ void Connection::deliveryRecord(const string& qname,
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
- broker::SessionHandler& h = connection.getChannel(currentChannel);
- broker::SessionState* s = h.getSession();
- assert(s);
- s->record(dr);
+ semanticState().record(dr);
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -286,7 +304,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
-
+
}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 9f75d3dae3..331ac33ab0 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -40,6 +40,13 @@ namespace qpid {
namespace framing { class AMQFrame; }
+namespace broker {
+class SemanticState;
+class QueuedMessage;
+class TxBuffer;
+class TxAccept;
+}
+
namespace cluster {
class Cluster;
@@ -117,15 +124,17 @@ class Connection :
bool windowing);
void queuePosition(const std::string&, const framing::SequenceNumber&);
-
- private:
- bool catcUp;
+ private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
+ broker::SessionState& sessionState();
+ broker::SemanticState& semanticState();
+ broker::QueuedMessage getDumpMessage();
+
static NoOpConnectionOutputHandler discardHandler;
Cluster& cluster;
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp
index 40852a0411..a2860f6f32 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/DumpClient.cpp
@@ -39,10 +39,12 @@
#include "qpid/framing/ClusterConnectionConsumerStateBody.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/framing/TypeCode.h"
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
+
namespace qpid {
namespace cluster {
@@ -103,7 +105,7 @@ void DumpClient::dump() {
// Dump exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
-// Dump queue is used to transfer acquired messages that are no longer on their original queue.
+ // Dump queue is used to transfer acquired messages that are no longer on their original queue.
session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
session.sync();
session.close();
@@ -154,7 +156,7 @@ class MessageDumper {
session.exchangeUnbind(queue, DumpClient::DUMP);
}
- void dump(const broker::QueuedMessage& message) {
+ void dumpQueuedMessage(const broker::QueuedMessage& message) {
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
@@ -165,6 +167,10 @@ class MessageDumper {
framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames());
}
+
+ void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) {
+ dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
+ }
};
@@ -178,7 +184,7 @@ void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
arg::autoDelete=q->isAutoDelete(),
arg::arguments=q->getSettings());
MessageDumper dumper(q->getName(), session);
- q->eachMessage(boost::bind(&MessageDumper::dump, &dumper, _1));
+ q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
}
@@ -217,11 +223,14 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
// Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
- ss->eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
- ss->eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ QPID_LOG(debug, dumperId << " dumping consumers.");
+ ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+
+ QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
+ ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked, this, _1));
+ // Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
- // Adjust for message in progress, will be sent after state update.
SequenceNumber received = ss->receiverGetReceived().command;
if (inProgress)
--received;
@@ -274,14 +283,22 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
}
void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- assert(dr.isEnded() || dr.getMessage().payload);
+ dumpDeliveryRecordMessage(dr);
+ dumpDeliveryRecord(dr);
+}
- if (!dr.isEnded() && dr.isAcquired()) {
+void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
+ // Dump the message associated with a dr if need be.
+ if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// dumpees queue, put it on the dump queue for dumpee to pick up.
//
- MessageDumper(DUMP, shadowSession).dump(dr.getMessage());
+ MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
}
+}
+
+void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
+ // Assumes the associated message has already been dumped (if needed)
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h
index bb349a39ee..716e7dcc3a 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/DumpClient.h
@@ -44,6 +44,8 @@ class QueueBinding;
class QueuedMessage;
class SessionHandler;
class DeliveryRecord;
+class SessionState;
+class SemanticState;
} // namespace broker
@@ -79,8 +81,9 @@ class DumpClient : public sys::Runnable {
void dumpSession(broker::SessionHandler& s);
void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
void dumpUnacked(const broker::DeliveryRecord&);
-
- private:
+ void dumpDeliveryRecord(const broker::DeliveryRecord&);
+ void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
+
MemberId dumperId;
MemberId dumpeeId;
Url dumpeeUrl;