diff options
Diffstat (limited to 'qpid/cpp/src/qpid/client')
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionHandler.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/ConnectionImpl.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Connector.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/RdmaConnector.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/Sasl.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SaslFactory.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/SessionImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/client/windows/SaslFactory.cpp | 6 |
13 files changed, 98 insertions, 89 deletions
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index 9b2f662c8e..bb348675c6 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -257,6 +257,7 @@ void ConnectionHandler::openOk ( const Array& knownBrokers ) knownBrokersUrls.push_back(Url((*i)->get<std::string>())); if (sasl.get()) { securityLayer = sasl->getSecurityLayer(maxFrameSize); + operUserId = sasl->getUserId(); } setState(OPEN); QPID_LOG(debug, "Known-brokers for connection: " << log::formatList(knownBrokersUrls)); diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index b1fd5be7c3..e9cc5194ae 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -71,6 +71,7 @@ class ConnectionHandler : private StateManager, std::auto_ptr<Sasl> sasl; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; boost::intrusive_ptr<qpid::sys::TimerTask> rcvTimeoutTask; + std::string operUserId; void checkState(STATES s, const std::string& msg); @@ -120,6 +121,7 @@ public: std::vector<Url> knownBrokersUrls; static framing::connection::CloseCode convert(uint16_t replyCode); + const std::string& getUserId() const { return operUserId; } }; }} diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index 45ad819ebd..c56d6a6807 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -151,6 +151,12 @@ void ConnectionImpl::open() handler.waitForOpen(); + // If the SASL layer has provided an "operational" userId for the connection, + // put it in the negotiated settings. + const std::string& userId(handler.getUserId()); + if (!userId.empty()) + handler.username = userId; + //enable security layer if one has been negotiated: std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer(); if (securityLayer.get()) { diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index f69032b26d..fbb571d40a 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -51,10 +51,10 @@ using boost::str; // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } @@ -93,7 +93,7 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -118,16 +118,17 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable void run(); void handleClosed(); bool closeInternal(); - + + void connected(const Socket&); + void connectFailed(const std::string& msg); bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); - void init(); void close(); void send(framing::AMQFrame& frame); void abort(); @@ -142,7 +143,6 @@ class TCPConnector : public Connector, public sys::Codec, private sys::Runnable size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); - public: TCPConnector(framing::ProtocolVersion pVersion, @@ -163,6 +163,11 @@ namespace { } init; } +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + TCPConnector::TCPConnector(ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) @@ -189,15 +194,19 @@ TCPConnector::~TCPConnector() { void TCPConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(lock); assert(closed); - try { - socket.connect(host, port); - } catch (const std::exception& /*e*/) { - socket.close(); - throw; - } - - identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); + assert(joined); poller = Poller::shared_ptr(new Poller); + AsynchConnector::create(socket, + poller, + host, port, + boost::bind(&TCPConnector::connected, this, _1), + boost::bind(&TCPConnector::connectFailed, this, _3)); + closed = false; + joined = false; + receiver = Thread(this); +} + +void TCPConnector::connected(const Socket&) { aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -205,16 +214,23 @@ void TCPConnector::connect(const std::string& host, int port){ 0, // closed 0, // nobuffs boost::bind(&TCPConnector::writebuff, this, _1)); - closed = false; -} + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); -void TCPConnector::init(){ - Mutex::ScopedLock l(lock); - assert(joined); + identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); ProtocolInitiation init(version); writeDataBlock(init); - joined = false; - receiver = Thread(this); +} + +void TCPConnector::connectFailed(const std::string& msg) { + QPID_LOG(warning, "Connecting failed: " << msg); + closed = true; + poller->shutdown(); + closeInternal(); + if (shutdownHandler) + shutdownHandler->shutdown(); } bool TCPConnector::closeInternal() { @@ -235,7 +251,7 @@ bool TCPConnector::closeInternal() { receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -243,7 +259,13 @@ void TCPConnector::close() { void TCPConnector::abort() { // Can't abort a closed connection if (!closed) { - aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + if (aio) { + // Established connection + aio->requestCallback(boost::bind(&TCPConnector::eof, this, _1)); + } else { + // We're still connecting + connectFailed("Connection timedout"); + } } } @@ -288,18 +310,13 @@ void TCPConnector::handleClosed() { shutdownHandler->shutdown(); } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -395,11 +412,6 @@ void TCPConnector::run() { try { Dispatcher d(poller); - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); d.run(); } catch (const std::exception& e) { QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp index 0aefcc04cf..0692c3d85c 100644 --- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp +++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp @@ -167,20 +167,9 @@ void RdmaConnector::connect(const std::string& host, int port){ assert(joined); poller = Poller::shared_ptr(new Poller); - // This stuff needs to abstracted out of here to a platform specific file - ::addrinfo *res; - ::addrinfo hints; - hints.ai_flags = 0; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = 0; - int n = ::getaddrinfo(host.c_str(), boost::lexical_cast<std::string>(port).c_str(), &hints, &res); - if (n<0) { - throw Exception(QPID_MSG("Cannot resolve " << host << ": " << ::gai_strerror(n))); - } - + SocketAddress sa(host, boost::lexical_cast<std::string>(port)); Rdma::Connector* c = new Rdma::Connector( - *res->ai_addr, + sa, Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), diff --git a/qpid/cpp/src/qpid/client/Sasl.h b/qpid/cpp/src/qpid/client/Sasl.h index 9dc5817f3d..d773609655 100644 --- a/qpid/cpp/src/qpid/client/Sasl.h +++ b/qpid/cpp/src/qpid/client/Sasl.h @@ -45,6 +45,7 @@ class Sasl virtual std::string start(const std::string& mechanisms) = 0; virtual std::string step(const std::string& challenge) = 0; virtual std::string getMechanism() = 0; + virtual std::string getUserId() = 0; virtual std::auto_ptr<qpid::sys::SecurityLayer> getSecurityLayer(uint16_t maxFrameSize) = 0; virtual ~Sasl() {} }; diff --git a/qpid/cpp/src/qpid/client/SaslFactory.cpp b/qpid/cpp/src/qpid/client/SaslFactory.cpp index 884f527f01..2258163ec8 100644 --- a/qpid/cpp/src/qpid/client/SaslFactory.cpp +++ b/qpid/cpp/src/qpid/client/SaslFactory.cpp @@ -82,6 +82,7 @@ class CyrusSasl : public Sasl std::string start(const std::string& mechanisms); std::string step(const std::string& challenge); std::string getMechanism(); + std::string getUserId(); std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); private: sasl_conn_t* conn; @@ -266,6 +267,18 @@ std::string CyrusSasl::getMechanism() return mechanism; } +std::string CyrusSasl::getUserId() +{ + int propResult; + const void* operName; + + propResult = sasl_getprop(conn, SASL_USERNAME, &operName); + if (propResult == SASL_OK) + return std::string((const char*) operName); + + return std::string(); +} + void CyrusSasl::interact(sasl_interact_t* client_interact) { diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 8ead44a172..32541dceac 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -64,7 +64,8 @@ SessionImpl::SessionImpl(const std::string& name, boost::shared_ptr<ConnectionIm proxy(ioHandler), nextIn(0), nextOut(0), - sendMsgCredit(0) + sendMsgCredit(0), + doClearDeliveryPropertiesExchange(true) { channel.next = connectionShared.get(); } @@ -396,11 +397,16 @@ void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - // Client is not allowed to set the delivery-properties.exchange. - AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); - if (headerp && headerp->get<DeliveryProperties>()) - headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); - + // doClearDeliveryPropertiesExchange is set by cluster update client so + // it can send messages with delivery-properties.exchange set. + // + if (doClearDeliveryPropertiesExchange) { + // Normal client is not allowed to set the delivery-properties.exchange + // so clear it here. + AMQHeaderBody* headerp = static_cast<AMQHeaderBody*>(header.getBody()); + if (headerp && headerp->get<DeliveryProperties>()) + headerp->get<DeliveryProperties>(true)->clearExchangeFlag(); + } header.setFirstSegment(false); uint64_t data_length = content.getData().length(); if(data_length > 0){ diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 49d268c44d..0624bb8b3c 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -130,6 +130,8 @@ public: */ boost::shared_ptr<ConnectionImpl> getConnection(); + void setDoClearDeliveryPropertiesExchange(bool b=true) { doClearDeliveryPropertiesExchange = b; } + private: enum State { INACTIVE, @@ -243,6 +245,8 @@ private: // Only keep track of message credit sys::Semaphore* sendMsgCredit; + bool doClearDeliveryPropertiesExchange; + friend class client::SessionHandler; }; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 9b9f06ec57..f51a96efd9 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -362,21 +362,12 @@ void QueueSink::send(qpid::client::AsyncSession& session, const std::string&, Ou void QueueSink::cancel(qpid::client::AsyncSession&, const std::string&) {} -template <class T> void encode(qpid::messaging::Message& from) -{ - T codec; - from.encode(codec); - from.setContentType(T::contentType); -} - void translate(const Variant::Map& from, FieldTable& to);//implementation in Codecs.cpp void convert(qpid::messaging::Message& from, qpid::client::Message& to) { //TODO: need to avoid copying as much as possible - if (from.getContent().isList()) encode<ListCodec>(from); - if (from.getContent().isMap()) encode<MapCodec>(from); - to.setData(from.getBytes()); + to.setData(from.getContent()); to.getDeliveryProperties().setRoutingKey(from.getSubject()); //TODO: set other delivery properties to.getMessageProperties().setContentType(from.getContentType()); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index d22208368b..8e060c62d7 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -269,18 +269,9 @@ void populate(qpid::messaging::Message& message, FrameSet& command) //e.g. for rejecting. MessageImplAccess::get(message).setInternalId(command.getId()); - command.getContent(message.getBytes()); + command.getContent(message.getContent()); populateHeaders(message, command.getHeaders()); - - //decode content if necessary - if (message.getContentType() == ListCodec::contentType) { - ListCodec codec; - message.decode(codec); - } else if (message.getContentType() == MapCodec::contentType) { - MapCodec codec; - message.decode(codec); - } } diff --git a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index 716f955f98..cbc95b44fb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -33,24 +33,11 @@ namespace amqp0_10 { using qpid::messaging::Address; using qpid::messaging::MessageImplAccess; -template <class T> void encode(const qpid::messaging::Message& from, qpid::client::Message& to) -{ - T codec; - MessageImplAccess::get(from).getEncodedContent(codec, to.getData()); - to.getMessageProperties().setContentType(T::contentType); -} - void OutgoingMessage::convert(const qpid::messaging::Message& from) { //TODO: need to avoid copying as much as possible - if (from.getContent().isList()) { - encode<ListCodec>(from, message); - } else if (from.getContent().isMap()) { - encode<MapCodec>(from, message); - } else { - message.setData(from.getBytes()); - message.getMessageProperties().setContentType(from.getContentType()); - } + message.setData(from.getContent()); + message.getMessageProperties().setContentType(from.getContentType()); const Address& address = from.getReplyTo(); if (!address.value.empty()) { message.getMessageProperties().setReplyTo(AddressResolution::convert(address)); diff --git a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp index 58956609a4..3a662463c1 100644 --- a/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp +++ b/qpid/cpp/src/qpid/client/windows/SaslFactory.cpp @@ -43,6 +43,7 @@ class WindowsSasl : public Sasl std::string start(const std::string& mechanisms); std::string step(const std::string& challenge); std::string getMechanism(); + std::string getUserId(); std::auto_ptr<SecurityLayer> getSecurityLayer(uint16_t maxFrameSize); private: ConnectionSettings settings; @@ -131,6 +132,11 @@ std::string WindowsSasl::getMechanism() return mechanism; } +std::string WindowsSasl::getUserId() +{ + return std::string(); // TODO - when GSSAPI is supported, return userId for connection. +} + std::auto_ptr<SecurityLayer> WindowsSasl::getSecurityLayer(uint16_t maxFrameSize) { return std::auto_ptr<SecurityLayer>(0); |