diff options
32 files changed, 574 insertions, 270 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 1e31ac60fd..1a66b6051b 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -289,21 +289,23 @@ libqpidbroker_la_SOURCES = \ libqpidclient_la_LIBADD = libqpidcommon.la libqpidclient_la_SOURCES = \ $(rgen_client_srcs) \ - qpid/client/Connection.cpp \ + qpid/client/Bounds.cpp \ qpid/client/Channel.cpp \ - qpid/client/Exchange.cpp \ - qpid/client/Queue.cpp \ qpid/client/ConnectionImpl.cpp \ qpid/client/Connector.cpp \ + qpid/client/Connection.cpp \ + qpid/client/ConnectionHandler.cpp \ + qpid/client/ConnectionSettings.cpp \ qpid/client/Demux.cpp \ qpid/client/Dispatcher.cpp \ - qpid/client/LocalQueue.cpp \ - qpid/client/Message.cpp \ - qpid/client/MessageListener.cpp \ - qpid/client/ConnectionHandler.cpp \ + qpid/client/Exchange.cpp \ qpid/client/Future.cpp \ qpid/client/FutureCompletion.cpp \ qpid/client/FutureResult.cpp \ + qpid/client/LocalQueue.cpp \ + qpid/client/Message.cpp \ + qpid/client/MessageListener.cpp \ + qpid/client/Queue.cpp \ qpid/client/Results.cpp \ qpid/client/SessionBase.cpp \ qpid/client/SessionImpl.cpp \ @@ -400,26 +402,28 @@ nobase_include_HEADERS = \ qpid/broker/TxPublish.h \ qpid/broker/Vhost.h \ qpid/client/AckMode.h \ - qpid/client/ChainableFrameHandler.h \ - qpid/client/Channel.h \ - qpid/client/Exchange.h \ - qpid/client/Message.h \ - qpid/client/Queue.h \ qpid/client/AckPolicy.h \ + qpid/client/Bounds.h \ qpid/client/Completion.h \ qpid/client/Connection.h \ qpid/client/ConnectionHandler.h \ qpid/client/ConnectionImpl.h \ + qpid/client/ConnectionSettings.h \ qpid/client/Connector.h \ + qpid/client/ChainableFrameHandler.h \ + qpid/client/Channel.h \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ - qpid/client/LocalQueue.h \ + qpid/client/Exchange.h \ qpid/client/Execution.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ qpid/client/FutureResult.h \ + qpid/client/LocalQueue.h \ + qpid/client/Message.h \ qpid/client/MessageListener.h \ qpid/client/MessageQueue.h \ + qpid/client/Queue.h \ qpid/client/Results.h \ qpid/client/SessionBase.h \ qpid/client/Session.h \ diff --git a/qpid/cpp/src/qpid/client/Bounds.cpp b/qpid/cpp/src/qpid/client/Bounds.cpp new file mode 100644 index 0000000000..1df21db941 --- /dev/null +++ b/qpid/cpp/src/qpid/client/Bounds.cpp @@ -0,0 +1,55 @@ +#include "Bounds.h" + +#include "qpid/log/Statement.h" + +namespace qpid { +namespace client { + +using sys::Monitor; + +Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {} + +bool Bounds::expand(size_t sizeRequired, bool block) +{ + if (max) { + Monitor::ScopedLock l(lock); + current += sizeRequired; + if (block) { + while (current > max) { + QPID_LOG(debug, "Waiting for bounds: " << *this); + lock.wait(); + } + QPID_LOG(debug, "Bounds ok: " << *this); + } + return current <= max; + } else { + return true; + } +} + +void Bounds::reduce(size_t size) +{ + if (!max || size == 0) return; + Monitor::ScopedLock l(lock); + if (current == 0) return; + bool needNotify = current > max; + current -= std::min(size, current); + if (needNotify && current < max) { + //todo: notify one at a time, but ensure that all threads are + //eventually notified + lock.notifyAll(); + } +} + +size_t Bounds::getCurrentSize() +{ + Monitor::ScopedLock l(lock); + return current; +} + +std::ostream& operator<<(std::ostream& out, const Bounds& bounds) { + out << "current=" << bounds.current << ", max=" << bounds.max << " [" << &bounds << "]"; + return out; +} + +}} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Bounds.h b/qpid/cpp/src/qpid/client/Bounds.h new file mode 100644 index 0000000000..db18becce3 --- /dev/null +++ b/qpid/cpp/src/qpid/client/Bounds.h @@ -0,0 +1,48 @@ +#ifndef QPID_CLIENT_BOUNDSCHECKING_H +#define QPID_CLIENT_BOUNDSCHECKING_H +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/Monitor.h" + +namespace qpid{ +namespace client{ + +class Bounds +{ + public: + Bounds(size_t maxSize); + bool expand(size_t, bool block); + void reduce(size_t); + size_t getCurrentSize(); + + private: + friend std::ostream& operator<<(std::ostream&, const Bounds&); + sys::Monitor lock; + const size_t max; + size_t current; +}; + +std::ostream& operator<<(std::ostream&, const Bounds&); + + +}} + +#endif diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp index 25d1c510c8..c11f155afb 100644 --- a/qpid/cpp/src/qpid/client/Connection.cpp +++ b/qpid/cpp/src/qpid/client/Connection.cpp @@ -18,11 +18,8 @@ * under the License. * */ -#include <algorithm> -#include <boost/format.hpp> -#include <boost/bind.hpp> - #include "Connection.h" +#include "ConnectionSettings.h" #include "Channel.h" #include "Message.h" #include "SessionImpl.h" @@ -30,9 +27,13 @@ #include "qpid/log/Options.h" #include "qpid/log/Statement.h" #include "qpid/shared_ptr.h" + +#include <algorithm> #include <iostream> #include <sstream> #include <functional> +#include <boost/format.hpp> +#include <boost/bind.hpp> using namespace qpid::framing; using namespace qpid::sys; @@ -41,41 +42,49 @@ using namespace qpid::sys; namespace qpid { namespace client { -Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) : - channelIdCounter(0), version(_version), - max_frame_size(_max_frame_size), - isOpen(false), - impl(new ConnectionImpl( - shared_ptr<Connector>(new Connector(_version, _debug)))) -{} - -Connection::Connection(shared_ptr<Connector> c) : - channelIdCounter(0), version(framing::highestProtocolVersion), - max_frame_size(65535), - isOpen(false), - impl(new ConnectionImpl(c)) -{} +Connection::Connection(framing::ProtocolVersion _version) : + channelIdCounter(0), version(_version) {} Connection::~Connection(){ } void Connection::open( const std::string& host, int port, - const std::string& uid, const std::string& pwd, const std::string& vhost) + const std::string& uid, const std::string& pwd, + const std::string& vhost, + uint16_t maxFrameSize) +{ + ConnectionSettings settings; + settings.host = host; + settings.port = port; + settings.username = uid; + settings.password = pwd; + settings.virtualhost = vhost; + settings.maxFrameSize = maxFrameSize; + open(settings); +} + +void Connection::open(const ConnectionSettings& settings) { - if (isOpen) - throw Exception(QPID_MSG("Channel object is already open")); + if (impl) + throw Exception(QPID_MSG("Connection::open() was already called")); - impl->open(host, port, uid, pwd, vhost); - isOpen = true; + impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings)); + max_frame_size = impl->getNegotiatedSettings().maxFrameSize; } -void Connection::openChannel(Channel& channel) { +void Connection::openChannel(Channel& channel) +{ + if (!impl) + throw Exception(QPID_MSG("Connection has not yet been opened")); channel.open(newSession(ASYNC)); } Session Connection::newSession(SynchronousMode sync, - uint32_t detachedLifetime) + uint32_t detachedLifetime) { + if (!impl) + throw Exception(QPID_MSG("Connection has not yet been opened")); + shared_ptr<SessionImpl> core( new SessionImpl(impl, ++channelIdCounter, max_frame_size)); core->setSync(sync); @@ -85,13 +94,19 @@ Session Connection::newSession(SynchronousMode sync, } void Connection::resume(Session& session) { + if (!impl) + throw Exception(QPID_MSG("Connection has not yet been opened")); + session.impl->setChannel(++channelIdCounter); impl->addSession(session.impl); session.impl->resume(impl); } void Connection::close() { - impl->close(); + if (impl) { + impl->close(); + impl.reset(); + } } }} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h index 0ddd383381..417739fd1d 100644 --- a/qpid/cpp/src/qpid/client/Connection.h +++ b/qpid/cpp/src/qpid/client/Connection.h @@ -39,6 +39,7 @@ namespace qpid { */ namespace client { +class ConnectionSettings; /** * \defgroup clientapi Application API for an AMQP client. */ @@ -54,9 +55,7 @@ class Connection { framing::ChannelId channelIdCounter; framing::ProtocolVersion version; - const uint32_t max_frame_size; - bool isOpen; - bool debug; + uint16_t max_frame_size; protected: boost::shared_ptr<ConnectionImpl> impl; @@ -67,17 +66,8 @@ class Connection * connection. * * @param _version the version of the protocol to connect with. - * - * @param debug turns on tracing for the connection - * (i.e. prints details of the frames sent and received to std - * out). Optional. Defaults to false. - * - * @param max_frame_size the maximum frame size that the - * client will accept. Optional. Defaults to 65535. */ - Connection(bool debug = false, uint32_t max_frame_size = 65535, - framing::ProtocolVersion=framing::highestProtocolVersion); - Connection(boost::shared_ptr<Connector>); + Connection(framing::ProtocolVersion=framing::highestProtocolVersion); ~Connection(); /** @@ -100,7 +90,14 @@ class Connection void open(const std::string& host, int port = 5672, const std::string& uid = "guest", const std::string& pwd = "guest", - const std::string& virtualhost = "/"); + const std::string& virtualhost = "/", uint16_t maxFrameSize=65535); + + /** + * Opens a connection to a broker. + * + * @param the settings to use (host, port etc) @see ConnectionSettings + */ + void open(const ConnectionSettings& settings); /** * Close the connection. diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp index df27942008..81d966d53f 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp @@ -20,9 +20,9 @@ */ #include "ConnectionHandler.h" + #include "qpid/log/Statement.h" #include "qpid/framing/amqp_framing.h" -#include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/all_method_bodies.h" #include "qpid/framing/ClientInvoker.h" #include "qpid/framing/reply_exceptions.h" @@ -42,17 +42,10 @@ const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state"); const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state"); } -ConnectionHandler::ConnectionHandler() - : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler), errorCode(200) -{ - - mechanism = PLAIN; - locale = en_US; - heartbeat = 0; - maxChannels = 32767; - maxFrameSize = 65535; +ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, framing::ProtocolVersion& v) + : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(200), version(v) +{ insist = true; - version = framing::highestProtocolVersion; ESTABLISHED.insert(FAILED); ESTABLISHED.insert(CLOSED); @@ -141,7 +134,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /* checkState(NOT_STARTED, INVALID_STATE_START); setState(NEGOTIATING); //TODO: verify that desired mechanism and locale are supported - string response = ((char)0) + uid + ((char)0) + pwd; + string response = ((char)0) + username + ((char)0) + password; proxy.startOk(properties, mechanism, response, locale); } @@ -150,14 +143,16 @@ void ConnectionHandler::secure(const std::string& /*challenge*/) throw NotImplementedException("Challenge-response cycle not yet implemented in client"); } -void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) +void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed, + uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/) { checkState(NEGOTIATING, INVALID_STATE_TUNE); - //TODO: verify that desired heartbeat and max frame size are valid - maxChannels = channelMax; + maxChannels = std::min(maxChannels, maxChannelsProposed); + maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); + //TODO: implement heartbeats and check desired value is in valid range proxy.tuneOk(maxChannels, maxFrameSize, heartbeat); setState(OPENING); - proxy.open(vhost, capabilities, insist); + proxy.open(virtualhost, capabilities, insist); } void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/) diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h index d7ab97ce31..1cf0c905ed 100644 --- a/qpid/cpp/src/qpid/client/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h @@ -22,9 +22,10 @@ #define _ConnectionHandler_ #include "ChainableFrameHandler.h" -#include "Connector.h" +#include "ConnectionSettings.h" #include "StateManager.h" #include "qpid/framing/AMQMethodBody.h" +#include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/Array.h" @@ -35,27 +36,11 @@ namespace qpid { namespace client { -struct ConnectionProperties -{ - std::string uid; - std::string pwd; - std::string vhost; - framing::FieldTable properties; - std::string mechanism; - std::string locale; - framing::Array capabilities; - uint16_t heartbeat; - uint16_t maxChannels; - uint64_t maxFrameSize; - bool insist; - framing::ProtocolVersion version; -}; - -class ConnectionHandler : private StateManager, - public ConnectionProperties, - public ChainableFrameHandler, - public framing::InputHandler, - private framing::AMQP_ClientOperations::ConnectionHandler +class ConnectionHandler : private StateManager, + public ConnectionSettings, + public ChainableFrameHandler, + public framing::InputHandler, + private framing::AMQP_ClientOperations::ConnectionHandler { typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations; enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; @@ -73,6 +58,10 @@ class ConnectionHandler : private StateManager, framing::AMQP_ServerProxy::Connection proxy; uint16_t errorCode; std::string errorText; + bool insist; + framing::ProtocolVersion version; + framing::Array capabilities; + framing::FieldTable properties; void checkState(STATES s, const std::string& msg); @@ -96,7 +85,7 @@ public: typedef boost::function<void()> CloseListener; typedef boost::function<void(uint16_t, const std::string&)> ErrorListener; - ConnectionHandler(); + ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&); void received(framing::AMQFrame& f) { incoming(f); } diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp index ce95e43f58..643d42403d 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp @@ -18,13 +18,14 @@ * under the License. * */ +#include "ConnectionImpl.h" +#include "ConnectionSettings.h" +#include "SessionImpl.h" + #include "qpid/log/Statement.h" #include "qpid/framing/constants.h" #include "qpid/framing/reply_exceptions.h" -#include "ConnectionImpl.h" -#include "SessionImpl.h" - #include <boost/bind.hpp> #include <boost/format.hpp> @@ -34,24 +35,32 @@ using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) - : connector(c), isClosed(false), isClosing(false) +ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) + : Bounds(settings.maxFrameSize * settings.bounds), + handler(settings, v), + connector(v, settings, this), + version(v), + isClosed(false), + isClosing(false) { + QPID_LOG(debug, "ConnectionImpl created for " << version); handler.in = boost::bind(&ConnectionImpl::incoming, this, _1); - handler.out = boost::bind(&Connector::send, connector, _1); + handler.out = boost::bind(&Connector::send, boost::ref(connector), _1); handler.onClose = boost::bind(&ConnectionImpl::closed, this, NORMAL, std::string()); handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2); - connector->setInputHandler(&handler); - connector->setTimeoutHandler(this); - connector->setShutdownHandler(this); + connector.setInputHandler(&handler); + connector.setTimeoutHandler(this); + connector.setShutdownHandler(this); + + open(settings.host, settings.port); } ConnectionImpl::~ConnectionImpl() { // Important to close the connector first, to ensure the // connector thread does not call on us while the destructor // is running. - connector->close(); + connector.close(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session) @@ -79,18 +88,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame) s->in(frame); } -void ConnectionImpl::open(const std::string& host, int port, - const std::string& uid, const std::string& pwd, - const std::string& vhost) +void ConnectionImpl::open(const std::string& host, int port) { - //TODO: better management of connection properties - handler.uid = uid; - handler.pwd = pwd; - handler.vhost = vhost; - QPID_LOG(info, "Connecting to " << host << ":" << port); - connector->connect(host, port); - connector->init(); + connector.connect(host, port); + connector.init(); handler.waitForOpen(); } @@ -102,7 +104,7 @@ void ConnectionImpl::idleIn() void ConnectionImpl::idleOut() { AMQFrame frame(in_place<AMQHeartbeatBody>()); - connector->send(frame); + connector.send(frame); } void ConnectionImpl::close() @@ -121,7 +123,7 @@ void ConnectionImpl::close() // so sessions can be updated outside the lock. ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) { isClosed = true; - connector->close(); + connector.close(); SessionVector save; for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { boost::shared_ptr<SessionImpl> s = i->second.lock(); @@ -157,4 +159,9 @@ void ConnectionImpl::erase(uint16_t ch) { Mutex::ScopedLock l(lock); sessions.erase(ch); } + +const ConnectionSettings& ConnectionImpl::getNegotiatedSettings() +{ + return handler; +} diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h index d0df9238f2..986b044b49 100644 --- a/qpid/cpp/src/qpid/client/ConnectionImpl.h +++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h @@ -22,22 +22,26 @@ #ifndef _ConnectionImpl_ #define _ConnectionImpl_ -#include <map> -#include <boost/shared_ptr.hpp> -#include <boost/weak_ptr.hpp> +#include "Bounds.h" +#include "ConnectionHandler.h" +#include "Connector.h" #include "qpid/framing/FrameHandler.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" -#include "ConnectionHandler.h" -#include "Connector.h" + +#include <map> +#include <boost/shared_ptr.hpp> +#include <boost/weak_ptr.hpp> namespace qpid { namespace client { +class ConnectionSettings; class SessionImpl; -class ConnectionImpl : public framing::FrameHandler, +class ConnectionImpl : public Bounds, + public framing::FrameHandler, public sys::TimeoutHandler, public sys::ShutdownHandler @@ -47,7 +51,7 @@ class ConnectionImpl : public framing::FrameHandler, SessionMap sessions; ConnectionHandler handler; - boost::shared_ptr<Connector> connector; + Connector connector; framing::ProtocolVersion version; sys::Mutex lock; bool isClosed; @@ -55,6 +59,8 @@ class ConnectionImpl : public framing::FrameHandler, template <class F> void detachAll(const F&); + void open(const std::string& host, int port); + SessionVector closeInternal(const sys::Mutex::ScopedLock&); void incoming(framing::AMQFrame& frame); void closed(uint16_t, const std::string&); @@ -64,21 +70,16 @@ class ConnectionImpl : public framing::FrameHandler, bool setClosing(); public: - typedef boost::shared_ptr<ConnectionImpl> shared_ptr; - - ConnectionImpl(boost::shared_ptr<Connector> c); + ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings); ~ConnectionImpl(); void addSession(const boost::shared_ptr<SessionImpl>&); - void open(const std::string& host, int port = 5672, - const std::string& uid = "guest", - const std::string& pwd = "guest", - const std::string& virtualhost = "/"); void close(); void handle(framing::AMQFrame& frame); void erase(uint16_t channel); - boost::shared_ptr<Connector> getConnector() { return connector; } + + const ConnectionSettings& getNegotiatedSettings(); }; }} diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp new file mode 100644 index 0000000000..ea2729c2dd --- /dev/null +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ConnectionSettings.h" + +#include "qpid/sys/posix/check.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> + +namespace qpid { +namespace client { + +ConnectionSettings::ConnectionSettings() : + Options("Connection Settings"), + host("localhost"), + port(TcpAddress::DEFAULT_PORT), + clientid("cpp"), + username("guest"), + password("guest"), + mechanism("PLAIN"), + locale("en_US"), + heartbeat(0), + maxChannels(32767), + maxFrameSize(65535), + bounds(2), + tcpNoDelay(false) +{ + addOptions() + ("broker,b", optValue(host, "HOST"), "Broker host to connect to") + ("port,p", optValue(port, "PORT"), "Broker port to connect to") + ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host") + ("clientname,n", optValue(clientid, "ID"), "unique client identifier") + ("username", optValue(username, "USER"), "user name for broker log in.") + ("password", optValue(password, "PASSWORD"), "password for broker log in.") + ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") + ("locale", optValue(locale, "LOCALE"), "locale to use.") + ("max-channels", optValue(maxChannels, "N"), "the maximum number of channels the client requires.") + ("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.") + ("bounds-multiplier", optValue(bounds, "N"), + "restricts the total size of outgoing frames queued up for writing (as a multiple of the max frame size)."); + add(log); +} + +ConnectionSettings::~ConnectionSettings() {} + +void ConnectionSettings::parse(int argc, char** argv) +{ + qpid::Options::parse(argc, argv); + qpid::log::Logger::instance().configure(log, argv[0]); +} + + +void ConnectionSettings::configurePosixTcpSocket(int fd) const +{ + if (tcpNoDelay) { + int flag = 1; + int result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)); + QPID_POSIX_CHECK(result); + QPID_LOG(debug, "Set TCP_NODELAY"); + } +} + +}} // namespace qpid::client diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.h b/qpid/cpp/src/qpid/client/ConnectionSettings.h new file mode 100644 index 0000000000..b430c87099 --- /dev/null +++ b/qpid/cpp/src/qpid/client/ConnectionSettings.h @@ -0,0 +1,76 @@ +#ifndef QPID_CLIENT_CONNECTIONSETTINGS_H +#define QPID_CLIENT_CONNECTIONSETTINGS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/log/Options.h" +#include "qpid/Url.h" +#include "qpid/log/Logger.h" +#include "qpid/sys/Socket.h" + +#include <iostream> +#include <exception> + +namespace qpid { +namespace client { + +/** + * Used to hold seetings for a connection (and parse these from + * command line oprions etc as a convenience). + */ +struct ConnectionSettings : qpid::Options, qpid::sys::Socket::Configuration +{ + ConnectionSettings(); + virtual ~ConnectionSettings(); + + /** + * Applies any tcp specific options to the sockets file descriptor + */ + virtual void configurePosixTcpSocket(int fd) const; + + /** + * Parse options from command line arguments (will throw exception + * if arguments cannot be parsed). + */ + void parse(int argc, char** argv); + + std::string host; + uint16_t port; + std::string virtualhost; + std::string clientid; + std::string username; + std::string password; + std::string mechanism; + std::string locale; + uint16_t heartbeat; + uint16_t maxChannels; + uint16_t maxFrameSize; + uint bounds; + bool tcpNoDelay; + + log::Options log; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_CONNECTIONSETTINGS_H*/ diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp index 7fb4997f5a..c9c55c50e8 100644 --- a/qpid/cpp/src/qpid/client/Connector.cpp +++ b/qpid/cpp/src/qpid/client/Connector.cpp @@ -20,6 +20,8 @@ */ #include "Connector.h" +#include "Bounds.h" +#include "ConnectionSettings.h" #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" @@ -40,21 +42,22 @@ using namespace qpid::framing; using boost::format; using boost::str; -Connector::Connector( - ProtocolVersion ver, bool _debug, uint32_t buffer_size -) : debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - version(ver), - initiated(false), - closed(true), - joined(true), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - aio(0) -{} +Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds) + : maxFrameSize(settings.maxFrameSize), + version(ver), + initiated(false), + closed(true), + joined(true), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + writer(maxFrameSize, bounds), + aio(0) +{ + QPID_LOG(debug, "Connector created for " << version); + socket.configure(settings); +} Connector::~Connector() { close(); @@ -176,11 +179,11 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ } struct Connector::Buff : public AsynchIO::BufferBase { - Buff() : AsynchIO::BufferBase(new char[65536], 65536) {} + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} ~Buff() { delete [] bytes;} }; -Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0) +Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -192,12 +195,12 @@ void Connector::Writer::init(std::string id, sys::AsynchIO* a) { aio = a; newBuffer(l); } - void Connector::Writer::handle(framing::AMQFrame& frame) { Mutex::ScopedLock l(lock); frames.push_back(frame); - if (frame.getEof()) { + if (frame.getEof()) {//or if we already have a buffers worth lastEof = frames.size(); + QPID_LOG(debug, "Requesting write: lastEof=" << lastEof); aio->notifyPendingWrite(); } QPID_LOG(trace, "SENT " << identifier << ": " << frame); @@ -217,7 +220,7 @@ void Connector::Writer::writeOne(const Mutex::ScopedLock& l) { void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { buffer = aio->getQueuedBuffer(); - if (!buffer) buffer = new Buff(); + if (!buffer) buffer = new Buff(maxFrameSize); encode = framing::Buffer(buffer->bytes, buffer->byteCount); framesEncoded = 0; } @@ -226,15 +229,20 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) { void Connector::Writer::write(sys::AsynchIO&) { Mutex::ScopedLock l(lock); assert(buffer); + size_t bytesWritten(0); for (size_t i = 0; i < lastEof; ++i) { AMQFrame& frame = frames[i]; - if (frame.size() > encode.available()) writeOne(l); - assert(frame.size() <= encode.available()); + uint32_t size = frame.size(); + if (size > encode.available()) writeOne(l); + assert(size <= encode.available()); frame.encode(encode); ++framesEncoded; + bytesWritten += size; + QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i); } frames.erase(frames.begin(), frames.begin()+lastEof); lastEof = 0; + if (bounds) bounds->reduce(bytesWritten); if (encode.getPosition() > 0) writeOne(l); } @@ -272,7 +280,7 @@ void Connector::writebuff(AsynchIO& aio_) { } void Connector::writeDataBlock(const AMQDataBlock& data) { - AsynchIO::BufferBase* buff = new Buff; + AsynchIO::BufferBase* buff = new Buff(maxFrameSize); framing::Buffer out(buff->bytes, buff->byteCount); data.encode(out); buff->dataCount = data.size(); @@ -290,7 +298,7 @@ void Connector::run(){ Dispatcher d(poller); for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff); + aio->queueReadBuffer(new Buff(maxFrameSize)); } aio->start(poller); diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h index 366f82acbd..9b13e1d519 100644 --- a/qpid/cpp/src/qpid/client/Connector.h +++ b/qpid/cpp/src/qpid/client/Connector.h @@ -31,17 +31,21 @@ #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" -#include "qpid/sys/Monitor.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/Socket.h" #include "qpid/sys/Time.h" #include "qpid/sys/AsynchIO.h" #include <queue> +#include <boost/shared_ptr.hpp> namespace qpid { namespace client { +class Bounds; +class ConnectionSettings; + class Connector : public framing::OutputHandler, private sys::Runnable { @@ -52,6 +56,7 @@ class Connector : public framing::OutputHandler, typedef sys::AsynchIO::BufferBase BufferBase; typedef std::vector<framing::AMQFrame> Frames; + const uint16_t maxFrameSize; sys::Mutex lock; sys::AsynchIO* aio; BufferBase* buffer; @@ -60,22 +65,21 @@ class Connector : public framing::OutputHandler, framing::Buffer encode; size_t framesEncoded; std::string identifier; + Bounds* bounds; void writeOne(const sys::Mutex::ScopedLock&); void newBuffer(const sys::Mutex::ScopedLock&); public: - Writer(); + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::AsynchIO*); void handle(framing::AMQFrame&); void write(sys::AsynchIO&); }; - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; @@ -122,7 +126,8 @@ class Connector : public framing::OutputHandler, public: Connector(framing::ProtocolVersion pVersion, - bool debug = false, uint32_t buffer_size = 1024); + const ConnectionSettings&, + Bounds* bounds = 0); virtual ~Connector(); virtual void connect(const std::string& host, int port); virtual void init(); diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp index 571d54df0c..e998d040c8 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp @@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes typedef sys::Monitor::ScopedLock Lock; typedef sys::Monitor::ScopedUnlock UnLock; +typedef sys::ScopedLock<sys::Semaphore> Acquire; SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, @@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn, name(id.str()), //TODO: may want to allow application defined names instead connection(conn), + ioHandler(*this), channel(ch), - proxy(channel), + proxy(ioHandler), nextIn(0), nextOut(0) { @@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content) Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content) { - Lock l(state); - checkOpen(); + Acquire a(sendLock); SequenceNumber id = nextOut++; - incompleteOut.add(id); + bool sync; + { + Lock l(state); + checkOpen(); + incompleteOut.add(id); + sync = syncMode; + } - if (syncMode) command.getMethod()->setSync(syncMode); + if (sync) command.getMethod()->setSync(true); Future f(id); if (command.getMethod()->resultExpected()) { + Lock l(state); //result listener must be set before the command is sent f.setFutureResult(results.listenForResult(id)); } @@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con if (content) { sendContent(*content); } - if (syncMode) { - waitForCompletionImpl(id); + if (sync) { + waitForCompletion(id); } return f; } - void SessionImpl::sendContent(const MethodContent& content) { AMQFrame header(content.getHeader()); - header.setBof(false); + header.setFirstSegment(false); u_int64_t data_length = content.getData().length(); if(data_length > 0){ - header.setEof(false); + header.setLastSegment(false); handleOut(header); /*Note: end of frame marker included in overhead but not in size*/ const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1); if(data_length < frag_size){ AMQFrame frame(in_place<AMQContentBody>(content.getData())); - frame.setBof(false); + frame.setFirstSegment(false); handleOut(frame); }else{ u_int32_t offset = 0; @@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content) u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(content.getData().substr(offset, length)); AMQFrame frame(in_place<AMQContentBody>(frag)); - frame.setBof(false); + frame.setFirstSegment(false); + frame.setLastSegment(true); if (offset > 0) { - frame.setBos(false); + frame.setFirstFrame(false); } offset += length; remaining = data_length - offset; if (remaining) { - frame.setEos(false); - frame.setEof(false); + frame.setLastFrame(false); } handleOut(frame); } @@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread void SessionImpl::handleOut(AMQFrame& frame) // user thread { + connection->expand(frame.size(), true); + channel.handle(frame); +} + +void SessionImpl::proxyOut(AMQFrame& frame) // network thread +{ + connection->expand(frame.size(), false); channel.handle(frame); } @@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const void SessionImpl::handleClosed() { - QPID_LOG(info, "SessionImpl::handleClosed(): entering"); demux.close(); results.close(); - QPID_LOG(info, "SessionImpl::handleClosed(): returning"); } }} diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h index 3b2e80fefd..0bcec4dd0c 100644 --- a/qpid/cpp/src/qpid/client/SessionImpl.h +++ b/qpid/cpp/src/qpid/client/SessionImpl.h @@ -33,6 +33,7 @@ #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/AMQP_ClientOperations.h" #include "qpid/framing/AMQP_ServerProxy.h" +#include "qpid/sys/Semaphore.h" #include "qpid/sys/StateMonitor.h" #include <boost/optional.hpp> @@ -124,6 +125,7 @@ private: void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); + void proxyOut(framing::AMQFrame& frame); void deliver(framing::AMQFrame& frame); Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0); @@ -164,14 +166,15 @@ private: int code; // Error code std::string text; // Error text mutable StateMonitor state; + mutable sys::Semaphore sendLock; volatile bool syncMode; uint32_t detachedLifetime; const uint64_t maxFrameSize; const framing::Uuid id; const std::string name; - shared_ptr<ConnectionImpl> connection; + framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler; framing::ChannelHandler channel; framing::AMQP_ServerProxy::Session proxy; diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h index 0aa1bf7e66..42139c7937 100644 --- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h +++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h @@ -32,7 +32,6 @@ namespace qpid { namespace framing { -//static ProtocolVersion highestProtocolVersion(99, 0); static ProtocolVersion highestProtocolVersion(0, 10); } /* namespace framing */ diff --git a/qpid/cpp/src/qpid/framing/ProtocolVersion.h b/qpid/cpp/src/qpid/framing/ProtocolVersion.h index a2a755397b..9a7ebec491 100644 --- a/qpid/cpp/src/qpid/framing/ProtocolVersion.h +++ b/qpid/cpp/src/qpid/framing/ProtocolVersion.h @@ -35,7 +35,7 @@ private: uint8_t minor_; public: - ProtocolVersion(uint8_t _major=0, uint8_t _minor=0) + explicit ProtocolVersion(uint8_t _major=0, uint8_t _minor=0) : major_(_major), minor_(_minor) {} uint8_t getMajor() const { return major_; } diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h index cab95654ad..20dc0f1ce3 100644 --- a/qpid/cpp/src/qpid/sys/Socket.h +++ b/qpid/cpp/src/qpid/sys/Socket.h @@ -103,8 +103,16 @@ public: int read(void *buf, size_t count) const; int write(const void *buf, size_t count) const; + struct Configuration + { + virtual void configurePosixTcpSocket(int fd) const = 0; + virtual ~Configuration() {} + }; + + void configure(const Configuration&); + private: - Socket(IOHandlePrivate*); + Socket(IOHandlePrivate*); }; }} diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp index 99cf7210b6..5f10cd84c2 100644 --- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp @@ -104,8 +104,8 @@ Socket::Socket(IOHandlePrivate* h) : void Socket::createTcp() const { - int& socket = impl->fd; - if (socket != -1) Socket::close(); + int& socket = impl->fd; + if (socket != -1) Socket::close(); int s = ::socket (PF_INET, SOCK_STREAM, 0); if (s < 0) throw QPID_POSIX_ERROR(errno); socket = s; @@ -113,7 +113,7 @@ void Socket::createTcp() const void Socket::setTimeout(const Duration& interval) const { - const int& socket = impl->fd; + const int& socket = impl->fd; struct timeval tv; toTimeval(tv, interval); setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); @@ -138,7 +138,7 @@ const char* h_errstr(int e) { void Socket::connect(const std::string& host, int port) const { - const int& socket = impl->fd; + const int& socket = impl->fd; struct sockaddr_in name; name.sin_family = AF_INET; name.sin_port = htons(port); @@ -155,7 +155,7 @@ void Socket::connect(const std::string& host, int port) const void Socket::close() const { - int& socket = impl->fd; + int& socket = impl->fd; if (socket == -1) return; if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno); socket = -1; @@ -164,7 +164,7 @@ Socket::close() const ssize_t Socket::send(const void* data, size_t size) const { - const int& socket = impl->fd; + const int& socket = impl->fd; ssize_t sent = ::send(socket, data, size, 0); if (sent < 0) { if (errno == ECONNRESET) return SOCKET_EOF; @@ -177,7 +177,7 @@ Socket::send(const void* data, size_t size) const ssize_t Socket::recv(void* data, size_t size) const { - const int& socket = impl->fd; + const int& socket = impl->fd; ssize_t received = ::recv(socket, data, size, 0); if (received < 0) { if (errno == ETIMEDOUT) return SOCKET_TIMEOUT; @@ -209,22 +209,22 @@ int Socket::listen(int port, int backlog) const Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const { - int afd = ::accept(impl->fd, addr, addrlen); - if ( afd >= 0) - return new Socket(new IOHandlePrivate(afd)); - else if (errno == EAGAIN) - return 0; + int afd = ::accept(impl->fd, addr, addrlen); + if ( afd >= 0) + return new Socket(new IOHandlePrivate(afd)); + else if (errno == EAGAIN) + return 0; else throw QPID_POSIX_ERROR(errno); } int Socket::read(void *buf, size_t count) const { - return ::read(impl->fd, buf, count); + return ::read(impl->fd, buf, count); } int Socket::write(const void *buf, size_t count) const { - return ::write(impl->fd, buf, count); + return ::write(impl->fd, buf, count); } std::string Socket::getSockname() const @@ -257,4 +257,9 @@ uint16_t Socket::getRemotePort() const return atoi(getService(impl->fd, true).c_str()); } +void Socket::configure(const Configuration& c) +{ + c.configurePosixTcpSocket(impl->fd); +} + }} // namespace qpid::sys diff --git a/qpid/cpp/src/tests/BasicP2PTest.cpp b/qpid/cpp/src/tests/BasicP2PTest.cpp index b202f88ca6..9f5f1acccb 100644 --- a/qpid/cpp/src/tests/BasicP2PTest.cpp +++ b/qpid/cpp/src/tests/BasicP2PTest.cpp @@ -29,7 +29,7 @@ class BasicP2PTest::Receiver : public Worker, public MessageListener const std::string queue; std::string tag; public: - Receiver(TestOptions& options, const std::string& _queue, const int _messages) + Receiver(ConnectionSettings& options, const std::string& _queue, const int _messages) : Worker(options, _messages), queue(_queue){} void init() { @@ -51,7 +51,7 @@ public: } }; -void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options) +void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options) { std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME"); int messages = params.getInt("P2P_NUM_MESSAGES"); diff --git a/qpid/cpp/src/tests/BasicP2PTest.h b/qpid/cpp/src/tests/BasicP2PTest.h index 3f0a3704f5..8f957187b0 100644 --- a/qpid/cpp/src/tests/BasicP2PTest.h +++ b/qpid/cpp/src/tests/BasicP2PTest.h @@ -38,7 +38,7 @@ class BasicP2PTest : public SimpleTestCaseBase { class Receiver; public: - void assign(const std::string& role, framing::FieldTable& params, TestOptions& options); + void assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options); }; } diff --git a/qpid/cpp/src/tests/BasicPubSubTest.cpp b/qpid/cpp/src/tests/BasicPubSubTest.cpp index 623194d331..aed47b419c 100644 --- a/qpid/cpp/src/tests/BasicPubSubTest.cpp +++ b/qpid/cpp/src/tests/BasicPubSubTest.cpp @@ -30,7 +30,7 @@ class BasicPubSubTest::Receiver : public Worker, public MessageListener const std::string key; std::string tag; public: - Receiver(TestOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages) + Receiver(ConnectionSettings& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages) : Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){} void init() @@ -58,7 +58,7 @@ class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener ReceiverList receivers; public: - MultiReceiver(TestOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount) + MultiReceiver(ConnectionSettings& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount) : Worker(options, _messages) { for (int i = 0; i != receiverCount; i++) { @@ -104,7 +104,7 @@ public: } }; -void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options) +void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options) { std::string key = params.getString("PUBSUB_KEY"); int messages = params.getInt("PUBSUB_NUM_MESSAGES"); diff --git a/qpid/cpp/src/tests/BasicPubSubTest.h b/qpid/cpp/src/tests/BasicPubSubTest.h index c3f8020b3a..a6d794c5b6 100644 --- a/qpid/cpp/src/tests/BasicPubSubTest.h +++ b/qpid/cpp/src/tests/BasicPubSubTest.h @@ -43,7 +43,7 @@ class BasicPubSubTest : public SimpleTestCaseBase class Receiver; class MultiReceiver; public: - void assign(const std::string& role, framing::FieldTable& params, TestOptions& options); + void assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options); }; } diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp index 4f071cd02b..0111e030fb 100644 --- a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp +++ b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp @@ -47,7 +47,7 @@ void SimpleTestCaseBase::report(client::Message& report) } } -SimpleTestCaseBase::Sender::Sender(TestOptions& options, +SimpleTestCaseBase::Sender::Sender(ConnectionSettings& options, const Exchange& _exchange, const std::string& _key, const int _messages) @@ -67,8 +67,8 @@ void SimpleTestCaseBase::Sender::start(){ stop(); } -SimpleTestCaseBase::Worker::Worker(TestOptions& options, const int _messages) : - connection(options.trace), messages(_messages), count(0) +SimpleTestCaseBase::Worker::Worker(ConnectionSettings& options, const int _messages) : + messages(_messages), count(0) { connection.open(options.host, options.port); connection.openChannel(channel); diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.h b/qpid/cpp/src/tests/SimpleTestCaseBase.h index 7f94fa7e1c..e2c328437e 100644 --- a/qpid/cpp/src/tests/SimpleTestCaseBase.h +++ b/qpid/cpp/src/tests/SimpleTestCaseBase.h @@ -28,6 +28,7 @@ #include "qpid/client/Channel.h" #include "qpid/client/Message.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/MessageListener.h" #include "TestCase.h" @@ -49,7 +50,7 @@ protected: public: - Worker(TestOptions& options, const int messages); + Worker(ConnectionSettings& options, const int messages); virtual ~Worker(){} virtual void stop(); @@ -63,7 +64,7 @@ protected: const Exchange& exchange; const std::string key; public: - Sender(TestOptions& options, + Sender(ConnectionSettings& options, const Exchange& exchange, const std::string& key, const int messages); @@ -74,7 +75,7 @@ protected: std::auto_ptr<Worker> worker; public: - virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0; + virtual void assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options) = 0; virtual ~SimpleTestCaseBase() {} diff --git a/qpid/cpp/src/tests/TestCase.h b/qpid/cpp/src/tests/TestCase.h index 07bdd68933..7926c4fe1d 100644 --- a/qpid/cpp/src/tests/TestCase.h +++ b/qpid/cpp/src/tests/TestCase.h @@ -21,8 +21,8 @@ * */ +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/Message.h" -#include "TestOptions.h" namespace qpid { @@ -39,7 +39,7 @@ public: * may be 'activated' at this stage others may require an explicit * start request. */ - virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0; + virtual void assign(const std::string& role, framing::FieldTable& params, client::ConnectionSettings& options) = 0; /** * Each test will be started on its own thread, which should block * until the test completes (this may or may not require an diff --git a/qpid/cpp/src/tests/TestOptions.h b/qpid/cpp/src/tests/TestOptions.h index 87710964d6..b34acaec4e 100644 --- a/qpid/cpp/src/tests/TestOptions.h +++ b/qpid/cpp/src/tests/TestOptions.h @@ -26,6 +26,7 @@ #include "qpid/Url.h" #include "qpid/log/Logger.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include <iostream> #include <exception> @@ -35,22 +36,11 @@ namespace qpid { struct TestOptions : public qpid::Options { TestOptions(const std::string& helpText_=std::string()) : - Options("Test Options"), - host("localhost"), port(TcpAddress::DEFAULT_PORT), - clientid("cpp"), username("guest"), password("guest"), - help(false), helpText(helpText_) + Options("Test Options"), help(false), helpText(helpText_) { addOptions() - ("host,h", optValue(host, "HOST"), "Broker host to connect to") - // TODO aconway 2007-06-26: broker is synonym for host. Drop broker? - ("broker,b", optValue(host, "HOST"), "Broker host to connect to") - ("port,p", optValue(port, "PORT"), "Broker port to connect to") - ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host") - ("clientname,n", optValue(clientid, "ID"), "unique client identifier") - ("username", optValue(username, "USER"), "user name for broker log in.") - ("password", optValue(password, "USER"), "password for broker log in.") ("help", optValue(help), "print this usage statement"); - add(log); + add(con); } /** As well as parsing, throw help message if requested. */ @@ -62,8 +52,7 @@ struct TestOptions : public qpid::Options msg << *this << std::endl << std::endl << e.what() << std::endl; throw qpid::Options::Exception(msg.str()); } - trace = log.trace; - qpid::log::Logger::instance().configure(log, argv[0]); + qpid::log::Logger::instance().configure(con.log, argv[0]); if (help) { std::ostringstream msg; msg << *this << std::endl << std::endl << helpText << std::endl; @@ -73,19 +62,12 @@ struct TestOptions : public qpid::Options /** Open a connection using option values */ void open(qpid::client::Connection& connection) { - connection.open(host, port, username, password, virtualhost); + connection.open(con); } - std::string host; - uint16_t port; - std::string virtualhost; - std::string clientid; - std::string username; - std::string password; - bool trace; bool help; - log::Options log; + client::ConnectionSettings con; std::string helpText; }; diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp index d0da2ec8ca..20e8b21a3a 100644 --- a/qpid/cpp/src/tests/client_test.cpp +++ b/qpid/cpp/src/tests/client_test.cpp @@ -30,6 +30,7 @@ #include "TestOptions.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/Message.h" #include "qpid/client/Session.h" #include "qpid/framing/FrameSet.h" @@ -40,15 +41,15 @@ using namespace qpid::client; using namespace qpid::framing; using std::string; -struct Args : public qpid::TestOptions { +struct Args : public TestOptions { uint msgSize; - uint maxFrameSize; + bool verbose; - Args() : msgSize(26), maxFrameSize(65535) + Args() : TestOptions("Simple test of Qpid c++ client; sends and receives a single message."), msgSize(26) { addOptions() ("size", optValue(msgSize, "N"), "message size") - ("max-frame-size", optValue(maxFrameSize, "N"), "max frame size"); + ("verbose", optValue(verbose), "print out some status messages"); } }; @@ -85,33 +86,33 @@ int main(int argc, char** argv) opts.parse(argc, argv); //Connect to the broker: - Connection connection(opts.trace, opts.maxFrameSize); + Connection connection; opts.open(connection); - if (opts.trace) std::cout << "Opened connection." << std::endl; + if (opts.verbose) std::cout << "Opened connection." << std::endl; //Create and open a session on the connection through which //most functionality is exposed: Session session = connection.newSession(ASYNC); - if (opts.trace) std::cout << "Opened session." << std::endl; + if (opts.verbose) std::cout << "Opened session." << std::endl; //'declare' the exchange and the queue, which will create them //as they don't exist session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct"); - if (opts.trace) std::cout << "Declared exchange." << std::endl; + if (opts.verbose) std::cout << "Declared exchange." << std::endl; session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true); - if (opts.trace) std::cout << "Declared queue." << std::endl; + if (opts.verbose) std::cout << "Declared queue." << std::endl; //now bind the queue to the exchange session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey"); - if (opts.trace) std::cout << "Bound queue to exchange." << std::endl; + if (opts.verbose) std::cout << "Bound queue to exchange." << std::endl; //create and send a message to the exchange using the routing //key we bound our queue with: Message msgOut(generateData(opts.msgSize)); msgOut.getDeliveryProperties().setRoutingKey("MyKey"); session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1); - if (opts.trace) print("Published message: ", msgOut); + if (opts.verbose) print("Published message: ", msgOut); //subscribe to the queue, add sufficient credit and then get //incoming 'frameset', check that its a message transfer and @@ -121,12 +122,12 @@ int main(int argc, char** argv) session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId"); session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes - if (opts.trace) std::cout << "Subscribed to queue." << std::endl; + if (opts.verbose) std::cout << "Subscribed to queue." << std::endl; FrameSet::shared_ptr incoming = session.get(); if (incoming->isA<MessageTransferBody>()) { Message msgIn(*incoming); if (msgIn.getData() == msgOut.getData()) { - if (opts.trace) std::cout << "Received the exepected message." << std::endl; + if (opts.verbose) std::cout << "Received the exepected message." << std::endl; session.messageAccept(SequenceSet(msgIn.getId())); session.markCompleted(msgIn.getId(), true, true); } else { @@ -138,9 +139,9 @@ int main(int argc, char** argv) //close the session & connection session.close(); - if (opts.trace) std::cout << "Closed session." << std::endl; + if (opts.verbose) std::cout << "Closed session." << std::endl; connection.close(); - if (opts.trace) std::cout << "Closed connection." << std::endl; + if (opts.verbose) std::cout << "Closed connection." << std::endl; return 0; } catch(const std::exception& e) { std::cout << e.what() << std::endl; diff --git a/qpid/cpp/src/tests/interop_runner.cpp b/qpid/cpp/src/tests/interop_runner.cpp index 51dd2cb924..9435b8169f 100644 --- a/qpid/cpp/src/tests/interop_runner.cpp +++ b/qpid/cpp/src/tests/interop_runner.cpp @@ -24,6 +24,7 @@ #include "qpid/Exception.h" #include "qpid/client/Channel.h" #include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" #include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Queue.h" @@ -45,7 +46,6 @@ using namespace qpid::client; using namespace qpid::sys; using qpid::TestCase; -using qpid::TestOptions; using qpid::framing::FieldTable; using qpid::framing::ReplyTo; using namespace std; @@ -54,7 +54,7 @@ class DummyRun : public TestCase { public: DummyRun() {} - void assign(const string&, FieldTable&, TestOptions&) {} + void assign(const string&, FieldTable&, ConnectionSettings&) {} void start() {} void stop() {} void report(qpid::client::Message&) {} @@ -68,7 +68,7 @@ class Listener : public MessageListener, private Runnable{ typedef boost::ptr_map<string, TestCase> TestMap; Channel& channel; - TestOptions& options; + ConnectionSettings& options; TestMap tests; const string name; const string topic; @@ -86,41 +86,52 @@ class Listener : public MessageListener, private Runnable{ void sendSimpleResponse(const string& type, Message& request); void sendReport(); public: - Listener(Channel& channel, TestOptions& options); + Listener(Channel& channel, ConnectionSettings& options); void received(Message& msg); void bindAndConsume(); void registerTest(string name, TestCase* test); }; +struct TestSettings : ConnectionSettings +{ + bool help; + + TestSettings() : help(false) + { + addOptions() + ("help", qpid::optValue(help), "print this usage statement"); + } +}; + int main(int argc, char** argv) { try { - TestOptions options; + TestSettings options; options.parse(argc, argv); - if (options.help) + if (options.help) { cout << options; - else { - Connection connection(options.trace); + } else { + Connection connection; connection.open(options.host, options.port, "guest", "guest", options.virtualhost); - Channel channel; - connection.openChannel(channel); + Channel channel; + connection.openChannel(channel); - Listener listener(channel, options); - listener.registerTest("TC1_DummyRun", new DummyRun()); - listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest()); - listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest()); - - listener.bindAndConsume(); + Listener listener(channel, options); + listener.registerTest("TC1_DummyRun", new DummyRun()); + listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest()); + listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest()); + + listener.bindAndConsume(); - channel.run(); - connection.close(); + channel.run(); + connection.close(); } } catch(const exception& error) { cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl; } } -Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name) +Listener::Listener(Channel& _channel, ConnectionSettings& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name) {} void Listener::registerTest(string name, TestCase* test) diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp index a61a4b2e42..0b343d0243 100644 --- a/qpid/cpp/src/tests/latencytest.cpp +++ b/qpid/cpp/src/tests/latencytest.cpp @@ -66,7 +66,8 @@ struct Args : public qpid::TestOptions { ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)") ("durable", optValue(durable, "yes|no"), "use durable messages") - ("queue-base-name", optValue(base, "<name>"), "base name for queues"); + ("queue-base-name", optValue(base, "<name>"), "base name for queues") + ("tcp-nodelay", optValue(con.tcpNoDelay), "Turn on tcp-nodelay"); } }; diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp index f1007ee4c2..ba6cd5f267 100644 --- a/qpid/cpp/src/tests/topic_listener.cpp +++ b/qpid/cpp/src/tests/topic_listener.cpp @@ -99,7 +99,7 @@ int main(int argc, char** argv){ if(args.help) cout << args << endl; else { - Connection connection(args.trace); + Connection connection; args.open(connection); Session session = connection.newSession(ASYNC); if (args.transactional) { diff --git a/qpid/cpp/src/tests/topic_publisher.cpp b/qpid/cpp/src/tests/topic_publisher.cpp index 8242530db1..a4d0dd9382 100644 --- a/qpid/cpp/src/tests/topic_publisher.cpp +++ b/qpid/cpp/src/tests/topic_publisher.cpp @@ -105,7 +105,7 @@ int main(int argc, char** argv) { if(args.help) cout << args << endl; else { - Connection connection(args.trace); + Connection connection; args.open(connection); Session session = connection.newSession(ASYNC); if (args.transactional) { |