summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp38
1 files changed, 27 insertions, 11 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 8faad9d6d5..506e982ffd 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -46,16 +46,23 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
Connection::~Connection() {}
-// Forward all received frames to the cluster, continue handling on delivery.
-void Connection::received(framing::AMQFrame& f) {
- cluster.send(f, self);
+void Connection::received(framing::AMQFrame& ) {
+ // FIXME aconway 2008-09-02: not called, codec sends straight to deliver
+ assert(0);
}
-// Don't doOutput in the
-bool Connection::doOutput() { return output.doOutput(); }
+bool Connection::doOutput() { return output.doOutput(); }
+
+// Delivery of doOutput allows us to run the real connection doOutput()
+// which stocks up the write buffers with data.
+//
+void Connection::deliverDoOutput(uint32_t requested) {
+ output.deliverDoOutput(requested);
+}
// Handle frames delivered from cluster.
void Connection::deliver(framing::AMQFrame& f) {
+ QPID_LOG(trace, "DLVR [" << self << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
@@ -71,7 +78,8 @@ void Connection::closed() {
// handler will be deleted.
//
connection.setOutputHandler(&discardHandler);
- cluster.send(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self);
+ ++mcastSeq;
}
catch (const std::exception& e) {
QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
@@ -83,11 +91,19 @@ void Connection::deliverClose () {
cluster.erase(self);
}
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-//
-void Connection::deliverDoOutput(uint32_t requested) {
- output.deliverDoOutput(requested);
+size_t Connection::decode(const char* buffer, size_t size) {
+ QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size);
+ ++mcastSeq;
+ cluster.mcastBuffer(buffer, size, self);
+ // FIXME aconway 2008-09-01: deserialize?
+ return size;
+}
+
+void Connection::deliverBuffer(Buffer& buf) {
+ QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available());
+ ++deliverSeq;
+ while (decoder.decode(buf))
+ deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
}
}} // namespace qpid::cluster