diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 49 |
1 files changed, 47 insertions, 2 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 28391a5c78..d8d41027cc 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -19,6 +19,7 @@ * */ #include "Connection.h" +#include "DumpClient.h" #include "Cluster.h" #include "qpid/broker/SessionState.h" @@ -73,8 +74,6 @@ void Connection::deliverDoOutput(uint32_t requested) { output.deliverDoOutput(requested); } -// FIXME aconway 2008-10-15: changes here, dubious. - // Received from a directly connected client. void Connection::received(framing::AMQFrame& f) { QPID_LOG(trace, cluster << " RECV " << *this << ": " << f); @@ -214,6 +213,50 @@ bool Connection::isDumped() const { return self.first == cluster.getId() && self.second == 0; } +void Connection::deliveryRecord(const string& qname, + const SequenceNumber& position, + const string& tag, + const SequenceNumber& id, + bool acquired, + bool accepted, + bool cancelled, + bool completed, + bool ended, + bool windowing) +{ + broker::QueuedMessage m; + broker::Queue::shared_ptr queue = cluster.getBroker().getQueues().find(qname); + if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue " << qname)); + broker::Queue::shared_ptr dumpQueue = cluster.getBroker().getQueues().find(DumpClient::DUMP); + if (!dumpQueue) throw Exception(QPID_MSG(cluster << " deliveryRecord missing dump queue")); + + if (!ended) { // Has a message + if (acquired) // Message at front of dump queue + m = dumpQueue->get(); + else // Message at original position in original queue + m = queue->find(position); + if (!m.payload) + throw Exception(QPID_MSG("deliveryRecord no dump message")); + } + + broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing); + dr.setId(id); + if (cancelled) dr.cancel(dr.getTag()); + if (completed) dr.complete(); + if (ended) dr.setEnded(); // Exsitance of message + + broker::SessionHandler& h = connection.getChannel(currentChannel); + broker::SessionState* s = h.getSession(); + assert(s); + s->record(dr); +} + +void Connection::queuePosition(const string& qname, const SequenceNumber& position) { + shared_ptr<broker::Queue> q = cluster.getBroker().getQueues().find(qname); + if (!q) throw InvalidArgumentException(QPID_MSG("Invalid queue name " << qname)); + q->setPosition(position); +} + std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; @@ -222,5 +265,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } + + }} // namespace qpid::cluster |