summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp65
1 files changed, 49 insertions, 16 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index ada26ab2fb..513816735d 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -24,7 +24,11 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -36,7 +40,7 @@
#include <boost/current_function.hpp>
-// FIXME aconway 2008-11-03:
+// TODO aconway 2008-11-03:
//
// Disproportionate amount of code here is dedicated to receiving a
// brain-dump when joining a cluster and building initial
@@ -113,7 +117,6 @@ bool Connection::checkUnsupported(const AMQBody& body) {
std::string message;
if (body.getMethod()) {
switch (body.getMethod()->amqpClassId()) {
- case TX_CLASS_ID: message = "TX transactions are not currently supported by cluster."; break;
case DTX_CLASS_ID: message = "DTX transactions are not currently supported by cluster."; break;
}
}
@@ -122,13 +125,13 @@ bool Connection::checkUnsupported(const AMQBody& body) {
if (dp && dp->getTtl()) message = "Message TTL is not currently supported by cluster.";
}
if (!message.empty())
- connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+ connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0);
return !message.empty();
}
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+ QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
assert(!catchUp);
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
@@ -247,11 +250,15 @@ bool Connection::isDumped() const {
return self.first == cluster.getId() && self.second == 0;
}
+
+shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
+ shared_ptr<broker::Queue> queue = cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " << qname));
+ return queue;
+}
+
broker::QueuedMessage Connection::getDumpMessage() {
- // Get a message from the DUMP queue.
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump queue"));
- broker::QueuedMessage m = dumpQueue->get();
+ broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
return m;
}
@@ -267,14 +274,11 @@ void Connection::deliveryRecord(const string& qname,
bool ended,
bool windowing)
{
- broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
- if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) { // Message at front of dump queue
- broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
- m = dumpQueue->get();
- }
+ if (acquired) // Message is on the dump queue
+ m = getDumpMessage();
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
@@ -286,8 +290,7 @@ void Connection::deliveryRecord(const string& qname,
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
-
- semanticState().record(dr);
+ semanticState().record(dr); // Part of the session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
@@ -304,6 +307,36 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
+void Connection::txStart() {
+ txBuffer = make_shared_ptr(new broker::TxBuffer());
+}
+void Connection::txAccept(const framing::SequenceSet& acked) {
+ txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked, semanticState().getUnacked())));
+}
+
+void Connection::txDequeue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txEnqueue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
+ txPub->deliverTo(findQueue((*i)->get<std::string>()));
+ txPub->delivered = delivered;
+ txBuffer->enlist(txPub);
+}
+
+void Connection::txEnd() {
+ semanticState().setTxBuffer(txBuffer);
+}
+
+void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
+ semanticState().setAccumulatedAck(s);
+}
}} // namespace qpid::cluster