summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp49
1 files changed, 47 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 28391a5c78..d8d41027cc 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -19,6 +19,7 @@
*
*/
#include "Connection.h"
+#include "DumpClient.h"
#include "Cluster.h"
#include "qpid/broker/SessionState.h"
@@ -73,8 +74,6 @@ void Connection::deliverDoOutput(uint32_t requested) {
output.deliverDoOutput(requested);
}
-// FIXME aconway 2008-10-15: changes here, dubious.
-
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
@@ -214,6 +213,50 @@ bool Connection::isDumped() const {
return self.first == cluster.getId() && self.second == 0;
}
+void Connection::deliveryRecord(const string& qname,
+ const SequenceNumber& position,
+ const string& tag,
+ const SequenceNumber& id,
+ bool acquired,
+ bool accepted,
+ bool cancelled,
+ bool completed,
+ bool ended,
+ bool windowing)
+{
+ broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname));
+ broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP);
+ if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue"));
+
+ if (!ended) { // Has a message
+ if (acquired) // Message at front of dump queue
+ m = dumpQueue->get();
+ else // Message at original position in original queue
+ m = queue->find(position);
+ if (!m.payload)
+ throw Exception(QPID_MSG("deliveryRecord no dump message"));
+ }
+
+ broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing);
+ dr.setId(id);
+ if (cancelled) dr.cancel(dr.getTag());
+ if (completed) dr.complete();
+ if (ended) dr.setEnded(); // Exsitance of message
+
+ broker::SessionHandler& h = connection.getChannel(currentChannel);
+ broker::SessionState* s = h.getSession();
+ assert(s);
+ s->record(dr);
+}
+
+void Connection::queuePosition(const string& qname, const SequenceNumber& position) {
+ shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname);
+ if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname));
+ q->setPosition(position);
+}
+
std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
@@ -222,5 +265,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
+
+
}} // namespace qpid::cluster