summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp103
1 files changed, 79 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index a1ed5f34f5..4aa66cce1f 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -20,10 +20,15 @@
*/
#include "Connection.h"
#include "Cluster.h"
+
+#include "qpid/broker/SessionState.h"
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
-#include "qpid/framing/AllInvoker.h"
+
#include <boost/current_function.hpp>
namespace qpid {
@@ -31,6 +36,8 @@ namespace cluster {
using namespace framing;
+NoOpConnectionOutputHandler Connection::discardHandler;
+
// Shadow connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
@@ -49,7 +56,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
QPID_LOG(debug, "New connection: " << *this);
}
-Connection::~Connection() {}
+Connection::~Connection() {
+ QPID_LOG(debug, "Deleted connection: " << *this);
+}
bool Connection::doOutput() {
return output.doOutput();
@@ -63,28 +72,55 @@ void Connection::deliverDoOutput(uint32_t requested) {
output.deliverDoOutput(requested);
}
+// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
- QPID_LOG(trace, "EXEC [" << *this << "]: " << f);
+ QPID_LOG(trace, "RECV " << *this << ": " << f);
+ if (isShadow()) {
+ // Final close that completes catch-up for shadow connection.
+ if (catchUp && f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+ AMQFrame ok(in_place<ConnectionCloseOkBody>());
+ connection.getOutput().send(ok);
+ }
+ else
+ QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
+ }
+ else {
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled())
+ connection.received(f);
+ }
+}
+
+// Delivered from cluster.
+void Connection::delivered(framing::AMQFrame& f) {
+ QPID_LOG(trace, "DLVR " << *this << ": " << f);
+ assert(!isCatchUp());
// Handle connection controls, deliver other frames to connection.
+ currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
}
void Connection::closed() {
try {
+ QPID_LOG(debug, "Connection closed " << *this);
+
+ if (catchUp) {
+ catchUp = false;
+ cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
+ if (!isShadow()) connection.closed();
+ }
+
// Local network connection has closed. We need to keep the
// connection around but replace the output handler with a
// no-op handler as the network output handler will be
// deleted.
output.setOutputHandler(discardHandler);
- if (catchUp) {
- catchUp = false;
- cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
- }
- else {
- // This was a local replicated connection. Multicast a deliver closed
- // and process any outstanding frames from the cluster until
- // self-delivery of deliver-closed.
+
+ if (isLocal()) {
+ // This was a local replicated connection. Multicast a deliver
+ // closed and process any outstanding frames from the cluster
+ // until self-delivery of deliver-close.
cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
++mcastSeq;
}
@@ -100,29 +136,48 @@ void Connection::deliverClose () {
cluster.erase(self);
}
-size_t Connection::decode(const char* buffer, size_t size) {
- assert(!catchUp);
- ++mcastSeq;
- cluster.mcastBuffer(buffer, size, self);
+// Decode data from local clients.
+size_t Connection::decode(const char* buffer, size_t size) {
+ if (catchUp) { // Handle catch-up locally.
+ Buffer buf(const_cast<char*>(buffer), size);
+ while (localDecoder.decode(buf))
+ received(localDecoder.frame);
+ }
+ else { // Multicast local connections.
+ assert(isLocal());
+ cluster.mcastBuffer(buffer, size, self, ++mcastSeq);
+ }
return size;
}
void Connection::deliverBuffer(Buffer& buf) {
assert(!catchUp);
++deliverSeq;
- while (decoder.decode(buf))
- received(decoder.frame);
+ while (mcastDecoder.decode(buf))
+ delivered(mcastDecoder.frame);
}
-void Connection::sessionState(const SequenceNumber& /*replayStart*/,
- const SequenceSet& /*sentIncomplete*/,
- const SequenceNumber& /*expected*/,
- const SequenceNumber& /*received*/,
- const SequenceSet& /*unknownCompleted*/,
- const SequenceSet& /*receivedIncomplete*/)
+void Connection::sessionState(
+ const SequenceNumber& replayStart,
+ const SequenceNumber& sendCommandPoint,
+ const SequenceSet& sentIncomplete,
+ const SequenceNumber& expected,
+ const SequenceNumber& received,
+ const SequenceSet& unknownCompleted,
+ const SequenceSet& receivedIncomplete)
{
- // FIXME aconway 2008-09-10: TODO
+ broker::SessionHandler& h = connection.getChannel(currentChannel);
+ broker::SessionState* s = h.getSession();
+ s->setState(
+ replayStart,
+ sendCommandPoint,
+ sentIncomplete,
+ expected,
+ received,
+ unknownCompleted,
+ receivedIncomplete);
+ QPID_LOG(debug, "Received session state dump for " << s->getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {