summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Connection.cpp')
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp77
1 files changed, 72 insertions, 5 deletions
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 30828d7bd9..118be27bb5 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -39,6 +39,7 @@
#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionAnnounceBody.h"
+#include "qpid/framing/ClusterConnectionSecureUserIdBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
@@ -46,6 +47,9 @@
#include <boost/current_function.hpp>
+
+typedef boost::function<void ( std::string& )> UserIdCallback;
+
// TODO aconway 2008-11-03:
//
// Refactor code for receiving an update into a separate UpdateConnection
@@ -59,6 +63,7 @@ namespace cluster {
using namespace framing;
using namespace framing::cluster;
+
qpid::sys::AtomicValue<uint64_t> Connection::catchUpId(0x5000000000000000LL);
Connection::NullFrameHandler Connection::nullFrameHandler;
@@ -82,8 +87,11 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
connectionCtor(&output, cluster.getBroker(), mgmtId, external, false, 0, true),
expectProtocolHeader(false),
mcastFrameHandler(cluster.getMulticast(), self),
- updateIn(c.getUpdateReceiver())
-{}
+ updateIn(c.getUpdateReceiver()),
+ secureConnection(0),
+ mcastSentButNotReceived(false),
+ inConnectionNegotiation(true)
+{ }
// Local connection
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
@@ -98,7 +106,9 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
isCatchUp), // isCatchUp => shadow
expectProtocolHeader(isLink),
mcastFrameHandler(cluster.getMulticast(), self),
- updateIn(c.getUpdateReceiver())
+ updateIn(c.getUpdateReceiver()),
+ secureConnection(0),
+ mcastSentButNotReceived(false)
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
@@ -120,13 +130,19 @@ Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
updateIn.nextShadowMgmtId.clear();
init();
}
+
+}
+
+void Connection::setSecureConnection(broker::SecureConnection* sc) {
+ secureConnection = sc;
}
void Connection::init() {
connection = connectionCtor.construct();
QPID_LOG(debug, cluster << " initialized connection: " << *this
<< " ssf=" << connection->getExternalSecuritySettings().ssf);
- if (isLocalClient()) {
+ if (isLocalClient()) {
+ if (secureConnection) connection->setSecureConnection(secureConnection);
// Actively send cluster-order frames from local node
connection->setClusterOrderOutput(mcastFrameHandler);
}
@@ -138,9 +154,19 @@ void Connection::init() {
}
if (!isCatchUp())
connection->setErrorListener(this);
+ UserIdCallback fn = boost::bind ( &Connection::mcastUserId, this, _1 );
+ connection->setUserIdCallback ( fn );
}
void Connection::giveReadCredit(int credit) {
+ {
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if (inConnectionNegotiation) {
+ mcastSentButNotReceived = false;
+ connectionNegotiationMonitor.notify();
+ }
+ }
+
if (cluster.getSettings().readMax && credit)
output.giveReadCredit(credit);
}
@@ -278,8 +304,9 @@ void Connection::abort() {
cluster.erase(self);
}
-// ConnectoinCodec::decode receives read buffers from directly-connected clients.
+// ConnectionCodec::decode receives read buffers from directly-connected clients.
size_t Connection::decode(const char* buffer, size_t size) {
+
if (catchUp) { // Handle catch-up locally.
Buffer buf(const_cast<char*>(buffer), size);
while (localDecoder.decode(buf))
@@ -289,6 +316,15 @@ size_t Connection::decode(const char* buffer, size_t size) {
assert(isLocal());
const char* remainingData = buffer;
size_t remainingSize = size;
+
+ { // scope for scoped lock.
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if ( inConnectionNegotiation ) {
+ assert(!mcastSentButNotReceived);
+ mcastSentButNotReceived = true;
+ }
+ }
+
if (expectProtocolHeader) {
//If this is an outgoing link, we will receive a protocol
//header which needs to be decoded first
@@ -307,6 +343,13 @@ size_t Connection::decode(const char* buffer, size_t size) {
}
}
cluster.getMulticast().mcastBuffer(remainingData, remainingSize, self);
+
+ { // scope for scoped lock.
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ if ( inConnectionNegotiation )
+ while (inConnectionNegotiation && mcastSentButNotReceived)
+ connectionNegotiationMonitor.wait();
+ }
}
return size;
}
@@ -570,5 +613,29 @@ void Connection::managementAgents(const std::string& data) {
QPID_LOG(debug, cluster << " updated management agents");
}
+
+// Only the direct, non-shadow gets this call.
+void Connection::mcastUserId ( std::string & id ) {
+ cluster.getMulticast().mcastControl( ClusterConnectionSecureUserIdBody(ProtocolVersion(), string(id)), getId() );
+
+ {
+ sys::Mutex::ScopedLock l(connectionNegotiationMonitor);
+ inConnectionNegotiation = false;
+ connectionNegotiationMonitor.notify();
+ }
+}
+
+// All connections, shadow or not, get this call.
+void Connection::secureUserId(const std::string& id) {
+ if ( isShadow() ) {
+ // If the user ID is "none", it is not legitimate. Take no action.
+ if ( strcmp ( id.c_str(), "none" ) ) {
+ connection->setUserId ( id );
+ }
+ }
+}
+
+
+
}} // Namespace qpid::cluster