summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp65
1 files changed, 45 insertions, 20 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9c2b4f1638..0f71a91293 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -40,6 +40,7 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
+#include "qpid/sys/AtomicValue.h"
#include <boost/current_function.hpp>
@@ -58,19 +59,22 @@ using namespace framing;
NoOpConnectionOutputHandler Connection::discardHandler;
-// Shadow connections
-Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, ConnectionId myId)
- : cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId), expectProtocolHeader(false),
+namespace {
+sys::AtomicValue<uint64_t> idCounter;
+}
+
+// Shadow connection
+Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out, const std::string& logId, const ConnectionId& id)
+ : cluster(c), self(id), catchUp(false), output(*this, out),
+ connection(&output, cluster.getBroker(), logId), expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
-// Local connections
+// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
- const std::string& wrappedId, MemberId myId, bool isCatchUp, bool isLink)
- : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId, isLink, catchUp ? ++catchUpId : 0),
+ const std::string& logId, MemberId member, bool isCatchUp, bool isLink)
+ : cluster(c), self(member, ++idCounter), catchUp(isCatchUp), output(*this, out),
+ connection(&output, cluster.getBroker(), logId, isLink, catchUp ? ++catchUpId : 0),
expectProtocolHeader(isLink), mcastFrameHandler(cluster.getMulticast(), self)
{ init(); }
@@ -149,12 +153,9 @@ void Connection::deliveredFrame(const EventFrame& f) {
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
&& !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- // FIXME aconway 2009-02-24: Using the DATA/CONTROL
- // distinction to distinguish incoming vs. outgoing frames is
- // very unclear.
if (f.type == DATA) // incoming data frames to broker::Connection
connection.received(const_cast<AMQFrame&>(f.frame));
- else { // outgoing data frame, send via SessionState
+ else { // frame control, send frame via SessionState
broker::SessionState* ss = connection.getChannel(f.frame.getChannel()).getSession();
if (ss) ss->out(const_cast<AMQFrame&>(f.frame));
}
@@ -200,12 +201,12 @@ void Connection::left() {
connection.closed();
}
-// Decode data from local clients.
+// ConnectoinCodec::decode receives read buffers from directly-connected clients.
size_t Connection::decode(const char* buffer, size_t size) {
if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
- received(localDecoder.frame);
+ received(localDecoder.getFrame());
}
else { // Multicast local connections.
assert(isLocal());
@@ -233,6 +234,29 @@ size_t Connection::decode(const char* buffer, size_t size) {
return size;
}
+// Decode a data event, a read buffer that has been delivered by the cluster.
+void Connection::decode(const EventHeader& eh, const void* data) {
+ assert(eh.getType() == DATA); // Only handle connection data events.
+ const char* cp = static_cast<const char*>(data);
+ Buffer buf(const_cast<char*>(cp), eh.getSize());
+ if (clusterDecoder.decode(buf)) { // Decoded a frame
+ AMQFrame frame(clusterDecoder.getFrame());
+ while (clusterDecoder.decode(buf)) {
+ cluster.connectionFrame(EventFrame(eh, frame));
+ frame = clusterDecoder.getFrame();
+ }
+ // Set read-credit on the last frame ending in this event.
+ // Credit will be given when this frame is processed.
+ cluster.connectionFrame(EventFrame(eh, frame, 1));
+ }
+ else {
+ // We must give 1 unit read credit per event.
+ // This event does not complete any frames so
+ // we give read credit directly.
+ giveReadCredit(1);
+ }
+}
+
broker::SessionState& Connection::sessionState() {
return *connection.getChannel(currentChannel).getSession();
}
@@ -267,11 +291,12 @@ void Connection::sessionState(
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username) {
- ConnectionId shadow = ConnectionId(memberId, connectionId);
- QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadow);
- self = shadow;
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+ ConnectionId shadowId = ConnectionId(memberId, connectionId);
+ QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
+ self = shadowId;
connection.setUserId(username);
+ clusterDecoder.setFragment(fragment.data(), fragment.size());
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameId) {
@@ -281,7 +306,7 @@ void Connection::membership(const FieldTable& joiners, const FieldTable& members
}
bool Connection::isLocal() const {
- return self.first == cluster.getId() && self.second == this;
+ return self.first == cluster.getId() && self.second;
}
bool Connection::isShadow() const {