summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-06-08 15:31:31 +0000
committerAlan Conway <aconway@apache.org>2010-06-08 15:31:31 +0000
commit9c8302099de20be264d1bf357b6bceb963ada021 (patch)
treeabd4d1ede4af4dc9dcc6427b2b40c51441b51f5f /cpp/src/qpid/cluster/Connection.cpp
parentc116d46cc03b19304ce51e047ab31519098380fa (diff)
downloadqpid-python-9c8302099de20be264d1bf357b6bceb963ada021.tar.gz
Cluster handle connection-negotiation phase in local broker.
The connection negotiation phase up to the "open" or "open-ok" frame establishes whether/what encryption to use for the rest of the connection. With this patch a cluster broker completes the initial negotiation with its local clients and only then begins multicasting to other brokers. The local broker decrypts if necessary and multicasts in the clear. This replaces a problematic locking scheme that was formerly in place which caused deadlocks. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@952692 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp206
1 files changed, 89 insertions, 117 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 9a8cab24a6..08e31c184a 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -39,7 +39,6 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
-#include "qpid/framing/ClusterConnectionSecureUserIdBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -48,15 +47,6 @@
#include <boost/current_function.hpp>
-typedef boost::function<void ( std::string& )> UserIdCallback;
-
-// TODO aconway 2008-11-03:
-//
-// Refactor code for receiving an update into a separate UpdateConnection
-// class.
-//
-
-
namespace qpid {
namespace cluster {
@@ -88,10 +78,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
- secureConnection(0),
- mcastSentButNotReceived(false),
- inConnectionNegotiation(true)
-{ }
+ secureConnection(0)
+{}
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
@@ -107,9 +95,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
updateIn(c.getUpdateReceiver()),
- secureConnection(0),
- mcastSentButNotReceived(false),
- inConnectionNegotiation(true)
+ secureConnection(0)
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -117,11 +103,7 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
// and initialized when the announce is received.
QPID_LOG(info, "new client connection " << *this);
giveReadCredit(cluster.getSettings().readMax); // Flow control
- cluster.getMulticast().mcastControl(
- ClusterConnectionAnnounceBody(ProtocolVersion(), mgmtId,
- connectionCtor.external.ssf,
- connectionCtor.external.authid,
- connectionCtor.external.nodict), getId());
+ init();
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -135,7 +117,8 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
- secureConnection = sc;
+ secureConnection = sc;
+ if (connection.get()) connection->setSecureConnection(sc);
}
void Connection::init() {
@@ -155,30 +138,33 @@ void Connection::init() {
}
if (!isCatchUp())
connection->setErrorListener(this);
- UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 );
- connection->setUserIdCallback ( fn );
}
// Called when we have consumed a read buffer to give credit to the
// connection layer to continue reading.
void Connection::giveReadCredit(int credit) {
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if (inConnectionNegotiation) {
- mcastSentButNotReceived = false;
- connectionNegotiationMonitor.notify();
- }
- }
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
-void Connection::announce(const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict) {
+void Connection::announce(
+ const std::string& mgmtId, uint32_t ssf, const std::string& authid, bool nodict,
+ const std::string& username, const std::string& initialFrames)
+{
QPID_ASSERT(mgmtId == connectionCtor.mgmtId);
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- init();
+ // Local connections are already initialized.
+ if (isShadow()) {
+ init();
+ // Play initial frames into the connection.
+ Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
+ AMQFrame frame;
+ while (frame.decode(buf))
+ connection->received(frame);
+ connection->setUserId(username);
+ }
}
Connection::~Connection() {
@@ -201,7 +187,6 @@ void Connection::received(framing::AMQFrame& f) {
if (isLocal()) { // Local catch-up connection.
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled())
-
connection->received(f);
}
else { // Shadow or updated catch-up connection.
@@ -235,7 +220,7 @@ struct GiveReadCreditOnExit {
int credit;
GiveReadCreditOnExit(Connection& connection_, int credit_) :
connection(connection_), credit(credit_) {}
- ~GiveReadCreditOnExit() { connection.giveReadCredit(credit); }
+ ~GiveReadCreditOnExit() { if (credit) connection.giveReadCredit(credit); }
};
void Connection::deliverDoOutput(uint32_t limit) {
@@ -307,57 +292,76 @@ void Connection::abort() {
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.
-size_t Connection::decode(const char* buffer, size_t size) {
-
- if (catchUp) { // Handle catch-up locally.
- Buffer buf(const_cast<char*>(buffer), size);
+size_t Connection::decode(const char* data, size_t size) {
+ GiveReadCreditOnExit grc(*this, 1); // Give a read credit by default.
+ const char* ptr = data;
+ const char* end = data + size;
+ if (catchUp) { // Handle catch-up locally.
+ Buffer buf(const_cast<char*>(ptr), size);
+ ptr += size;
while (localDecoder.decode(buf))
received(localDecoder.getFrame());
- return buf.getPosition();
}
else { // Multicast local connections.
- assert(isLocal());
- const char* remainingData = buffer;
- size_t remainingSize = size;
-
- if (expectProtocolHeader) {
- //If this is an outgoing link, we will receive a protocol
- //header which needs to be decoded first
- framing::ProtocolInitiation pi;
- Buffer buf(const_cast<char*>(buffer), size);
- if (pi.decode(buf)) {
- //TODO: check the version is correct
- QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
- expectProtocolHeader = false;
- remainingData = buffer + pi.encodedSize();
- remainingSize = size - pi.encodedSize();
- } else {
- QPID_LOG(debug, "Not enough data for protocol header on outgoing clustered link");
- giveReadCredit(1); // We're not going to mcast so give read credit now.
- return 0;
- }
+ assert(isLocalClient());
+ assert(connection.get());
+ if (!checkProtocolHeader(ptr, size)) // Updates ptr
+ return 0; // Incomplete header
+
+ if (!connection->isOpen())
+ processInitialFrames(ptr, end-ptr); // Updates ptr
+
+ if (connection->isOpen() && end - ptr > 0) {
+ // We're multi-casting, we will give read credit on delivery.
+ grc.credit = 0;
+ cluster.getMulticast().mcastBuffer(ptr, end - ptr, self);
+ ptr = end;
}
-
- // During connection negotiation wait for each multicast to be
- // processed before sending the next, to ensure that the
- // security layer is activated before we attempt to decode
- // encrypted frames.
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if ( inConnectionNegotiation ) {
- assert(!mcastSentButNotReceived);
- mcastSentButNotReceived = true;
- }
- }
- cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
- {
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- if (inConnectionNegotiation)
- while (mcastSentButNotReceived)
- connectionNegotiationMonitor.wait();
- assert(!mcastSentButNotReceived);
+ }
+ return ptr - data;
+}
+
+// Decode the protocol header if needed. Updates data and size
+// returns true if the header is complete or already read.
+bool Connection::checkProtocolHeader(const char*& data, size_t size) {
+ if (expectProtocolHeader) {
+ //If this is an outgoing link, we will receive a protocol
+ //header which needs to be decoded first
+ framing::ProtocolInitiation pi;
+ Buffer buf(const_cast<char*&>(data), size);
+ if (pi.decode(buf)) {
+ //TODO: check the version is correct
+ QPID_LOG(debug, "Outgoing clustered link connection received INIT(" << pi << ")");
+ expectProtocolHeader = false;
+ data += pi.encodedSize();
+ } else {
+ return false;
}
- return size;
+ }
+ return true;
+}
+
+void Connection::processInitialFrames(const char*& ptr, size_t size) {
+ // Process the initial negotiation locally and store it so
+ // it can be replayed on other brokers in announce()
+ Buffer buf(const_cast<char*>(ptr), size);
+ framing::AMQFrame frame;
+ while (!connection->isOpen() && frame.decode(buf))
+ received(frame);
+ initialFrames.append(ptr, buf.getPosition());
+ ptr += buf.getPosition();
+ if (connection->isOpen()) { // initial negotiation complete
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionAnnounceBody(
+ ProtocolVersion(),
+ connectionCtor.mgmtId,
+ connectionCtor.external.ssf,
+ connectionCtor.external.authid,
+ connectionCtor.external.nodict,
+ connection->getUserId(),
+ initialFrames),
+ getId());
+ initialFrames.clear();
}
}
@@ -574,21 +578,14 @@ void Connection::queue(const std::string& encoded) {
}
void Connection::sessionError(uint16_t , const std::string& msg) {
- // If we are negotiating the connection when it fails just close the connectoin.
- // If it fails after that then we have to flag the error to the cluster.
- if (inConnectionNegotiation)
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
- else
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
cluster.flagError(*this, ERROR_TYPE_SESSION, msg);
-
}
void Connection::connectionError(const std::string& msg) {
- // If we are negotiating the connection when it fails just close the connectoin.
- // If it fails after that then we have to flag the error to the cluster.
- if (inConnectionNegotiation)
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
- else
+ // Ignore errors before isOpen(), we're not multicasting yet.
+ if (connection->isOpen())
cluster.flagError(*this, ERROR_TYPE_CONNECTION, msg);
}
@@ -630,30 +627,5 @@ void Connection::managementAgents(const std::string& data) {
QPID_LOG(debug, cluster << " updated management agents");
}
-
-void Connection::mcastUserId ( std::string & id ) {
- // Only the directly connected broker will mcast the secure user id, and only
- // for client connections (not update connections)
- if (isLocalClient())
- cluster.getMulticast().mcastControl(
- ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() );
- {
- // This call signals the end of the connection negotiation phase.
- sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
- inConnectionNegotiation = false;
- mcastSentButNotReceived = false;
- connectionNegotiationMonitor.notify();
- }
-}
-
-// All connections, shadow or not, get this call.
-void Connection::secureUserId(const std::string& id) {
- // Only set the user ID on shadow connections, and only if id is not the empty string.
- if ( isShadow() && !id.empty() )
- connection->setUserId ( id );
-}
-
-
-
}} // Namespace qpid::cluster