summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp206
1 files changed, 89 insertions, 117 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9a8cab24a6..08e31c184a 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -39,7 +39,6 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
-#include "qpid/framing/ClusterConnectionSecureUserIdBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -48,15 +47,6 @@
#include <boost/current_function.hpp>
-typedef boost::function<void ( std::string& )> UserIdCallback;
-
-// TODO aconway 2008-11-03:
-//
-// Refactor code for receiving an update into a separate UpdateConnection
-// class.
-//
-
-
namespace qpid {
namespace cluster {
@@ -88,10 +78,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
- secureConnection(0),
- mcastSentButNotReceived(false),
- inConnectionNegotiation(true)
-{ }
+ secureConnection(0)
+{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
@@ -107,9 +95,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
- secureConnection(0),
- mcastSentButNotReceived(false),
- inConnectionNegotiation(true)
+ secureConnection(0)
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -117,11 +103,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
// and initialized when the announce is received.
QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax); // Flow control
- cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
- connectionCtor.external.ssf,
- connectionCtor.external.authid,
- connectionCtor.external.nodict), getId());
+ init();
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -135,7 +117,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
- secureConnection = sc;
+ secureConnection = sc;
+ if (connection.get()) connection->setSecureConnection(sc);
}
void Connection::init() {
@@ -155,30 +138,33 @@ void Connection::init() {
}
if (!isCatchUp())
connection->setErrorListener(this);
- UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 );
- connection->setUserIdCallback ( fn );
}
// Called when we have consumed a read buffer to give credit to the
// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if (inConnectionNegotiation) {
- mcastSentButNotReceived = false;
- connectionNegotiationMonitor.notify();
- }
- }
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
-void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) {
+void Connection::announce(
+ const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict,
+ const std::string& username, const std::string& initialFrames)
+{
QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- init();
+ // Local connections are already initialized.
+ if (isShadow()) {
+ init();
+ // Play initial frames into the connection.
+ Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
+ AMQFrame frame;
+ while (frame.decode(buf))
+ connection->received(frame);
+ connection->setUserId(username);
+ }
}
Connection::~Connection() {
@@ -201,7 +187,6 @@ void Connection::received(framing::AMQFrame& f) {
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
-
connection->received(f);
}
else { // Shadow or updated catch-up connection.
@@ -235,7 +220,7 @@ struct GiveReadCreditOnExit {
int credit;
GiveReadCreditOnExit(Connection& connection_, int credit_) :
connection(connection_), credit(credit_) {}
- ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); }
+ ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); }
};
void Connection::deliverDoOutput(uint32_t limit) {
@@ -307,57 +292,76 @@ void Connection::abort() {
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.
-size_t Connection::decode(const char* buffer, size_t size) {
-
- if (catchUp) { // Handle catch-up locally.
- Buffer buf(const_cast<char*>(buffer), size);
+size_t Connection::decode(const char* data, size_t size) {
+ GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default.
+ const char* ptr = data;
+ const char* end = data + size;
+ if (catchUp) { // Handle catch-up locally.
+ Buffer buf(const_cast<char*>(ptr), size);
+ ptr += size;
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
- return buf.getPosition();
}
else { // Multicast local connections.
- assert(isLocal());
- const char* remainingData = buffer;
- size_t remainingSize = size;
-
- if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
- framing::ProtocolInitiation pi;
- Buffer buf(const_cast<char*>(buffer), size);
- if (pi.decode(buf)) {
- //TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
- expectProtocolHeader = false;
- remainingData = buffer + pi.encodedSize();
- remainingSize = size - pi.encodedSize();
- } else {
- QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
- giveReadCredit(1); // We're not going to mcast so give read credit now.
- return 0;
- }
+ assert(isLocalClient());
+ assert(connection.get());
+ if (!checkProtocolHeader(ptr, size)) // Updates ptr
+ return 0; // Incomplete header
+
+ if (!connection->isOpen())
+ processInitialFrames(ptr, end-ptr); // Updates ptr
+
+ if (connection->isOpen() && end - ptr > 0) {
+ // We're multi-casting, we will give read credit on delivery.
+ grc.credit = 0;
+ cluster.getMulticast().mcastBuffer(ptr, end - ptr, self);
+ ptr = end;
}
-
- // During connection negotiation wait for each multicast to be
- // processed before sending the next, to ensure that the
- // security layer is activated before we attempt to decode
- // encrypted frames.
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation ) {
- assert(!mcastSentButNotReceived);
- mcastSentButNotReceived = true;
- }
- }
- cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if (inConnectionNegotiation)
- while (mcastSentButNotReceived)
- connectionNegotiationMonitor.wait();
- assert(!mcastSentButNotReceived);
+ }
+ return ptr - data;
+}
+
+// Decode the protocol header if needed. Updates data and size
+// returns true if the header is complete or already read.
+bool Connection::checkProtocolHeader(const char*& data, size_t size) {
+ if (expectProtocolHeader) {
+ //If this is an outgoing link, we will receive a protocol
+ //header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*&>(data), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+ expectProtocolHeader = false;
+ data += pi.encodedSize();
+ } else {
+ return false;
}
- return size;
+ }
+ return true;
+}
+
+void Connection::processInitialFrames(const char*& ptr, size_t size) {
+ // Process the initial negotiation locally and store it so
+ // it can be replayed on other brokers in announce()
+ Buffer buf(const_cast<char*>(ptr), size);
+ framing::AMQFrame frame;
+ while (!connection->isOpen() && frame.decode(buf))
+ received(frame);
+ initialFrames.append(ptr, buf.getPosition());
+ ptr += buf.getPosition();
+ if (connection->isOpen()) { // initial negotiation complete
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionAnnounceBody(
+ ProtocolVersion(),
+ connectionCtor.mgmtId,
+ connectionCtor.external.ssf,
+ connectionCtor.external.authid,
+ connectionCtor.external.nodict,
+ connection->getUserId(),
+ initialFrames),
+ getId());
+ initialFrames.clear();
}
}
@@ -574,21 +578,14 @@ void Connection::queue(const std::string& encoded) {
}
void Connection::sessionError(uint16_t , const std::string& msg) {
- // If we are negotiating the connection when it fails just close the connectoin.
- // If it fails after that then we have to flag the error to the cluster.
- if (inConnectionNegotiation)
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
- else
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
-
}
void Connection::connectionError(const std::string& msg) {
- // If we are negotiating the connection when it fails just close the connectoin.
- // If it fails after that then we have to flag the error to the cluster.
- if (inConnectionNegotiation)
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
- else
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
@@ -630,30 +627,5 @@ void Connection::managementAgents(const std::string& data) {
QPID_LOG(debug, cluster << " updated management agents");
}
-
-void Connection::mcastUserId ( std::string & id ) {
- // Only the directly connected broker will mcast the secure user id, and only
- // for client connections (not update connections)
- if (isLocalClient())
- cluster.getMulticast().mcastControl(
- ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() );
- {
- // This call signals the end of the connection negotiation phase.
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- inConnectionNegotiation = false;
- mcastSentButNotReceived = false;
- connectionNegotiationMonitor.notify();
- }
-}
-
-// All connections, shadow or not, get this call.
-void Connection::secureUserId(const std::string& id) {
- // Only set the user ID on shadow connections, and only if id is not the empty string.
- if ( isShadow() && !id.empty() )
- connection->setUserId ( id );
-}
-
-
-
}} // Namespace qpid::cluster