summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-04-29 20:15:18 +0000
committerGordon Sim <gsim@apache.org>2008-04-29 20:15:18 +0000
commit414f520f8b730c8437ed9c42628f07824a7e9dd1 (patch)
treefca4fab15a6fde4943288cc8efe8c7b06a2cd787
parent6915cc6324acacbbe7707e19573f0e35c5010e85 (diff)
downloadqpid-python-414f520f8b730c8437ed9c42628f07824a7e9dd1.tar.gz
QPID-974: allow the size of the queue of outgoing frames to be restricted
QPID-544: tidy up configuration (ensuring desired settings are used correctly, allowing tcp socket options to be set etc) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@652083 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am30
-rw-r--r--qpid/cpp/src/qpid/client/Bounds.cpp55
-rw-r--r--qpid/cpp/src/qpid/client/Bounds.h48
-rw-r--r--qpid/cpp/src/qpid/client/Connection.cpp67
-rw-r--r--qpid/cpp/src/qpid/client/Connection.h25
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.cpp27
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionHandler.h35
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.cpp51
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionImpl.h31
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.cpp81
-rw-r--r--qpid/cpp/src/qpid/client/ConnectionSettings.h76
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp56
-rw-r--r--qpid/cpp/src/qpid/client/Connector.h17
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.cpp46
-rw-r--r--qpid/cpp/src/qpid/client/SessionImpl.h5
-rw-r--r--qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h1
-rw-r--r--qpid/cpp/src/qpid/framing/ProtocolVersion.h2
-rw-r--r--qpid/cpp/src/qpid/sys/Socket.h10
-rw-r--r--qpid/cpp/src/qpid/sys/posix/Socket.cpp33
-rw-r--r--qpid/cpp/src/tests/BasicP2PTest.cpp4
-rw-r--r--qpid/cpp/src/tests/BasicP2PTest.h2
-rw-r--r--qpid/cpp/src/tests/BasicPubSubTest.cpp6
-rw-r--r--qpid/cpp/src/tests/BasicPubSubTest.h2
-rw-r--r--qpid/cpp/src/tests/SimpleTestCaseBase.cpp6
-rw-r--r--qpid/cpp/src/tests/SimpleTestCaseBase.h7
-rw-r--r--qpid/cpp/src/tests/TestCase.h4
-rw-r--r--qpid/cpp/src/tests/TestOptions.h30
-rw-r--r--qpid/cpp/src/tests/client_test.cpp31
-rw-r--r--qpid/cpp/src/tests/interop_runner.cpp49
-rw-r--r--qpid/cpp/src/tests/latencytest.cpp3
-rw-r--r--qpid/cpp/src/tests/topic_listener.cpp2
-rw-r--r--qpid/cpp/src/tests/topic_publisher.cpp2
32 files changed, 574 insertions, 270 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 1e31ac60fd..1a66b6051b 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -289,21 +289,23 @@ libqpidbroker_la_SOURCES = \
libqpidclient_la_LIBADD = libqpidcommon.la
libqpidclient_la_SOURCES = \
$(rgen_client_srcs) \
- qpid/client/Connection.cpp \
+ qpid/client/Bounds.cpp \
qpid/client/Channel.cpp \
- qpid/client/Exchange.cpp \
- qpid/client/Queue.cpp \
qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
+ qpid/client/Connection.cpp \
+ qpid/client/ConnectionHandler.cpp \
+ qpid/client/ConnectionSettings.cpp \
qpid/client/Demux.cpp \
qpid/client/Dispatcher.cpp \
- qpid/client/LocalQueue.cpp \
- qpid/client/Message.cpp \
- qpid/client/MessageListener.cpp \
- qpid/client/ConnectionHandler.cpp \
+ qpid/client/Exchange.cpp \
qpid/client/Future.cpp \
qpid/client/FutureCompletion.cpp \
qpid/client/FutureResult.cpp \
+ qpid/client/LocalQueue.cpp \
+ qpid/client/Message.cpp \
+ qpid/client/MessageListener.cpp \
+ qpid/client/Queue.cpp \
qpid/client/Results.cpp \
qpid/client/SessionBase.cpp \
qpid/client/SessionImpl.cpp \
@@ -400,26 +402,28 @@ nobase_include_HEADERS = \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.h \
qpid/client/AckMode.h \
- qpid/client/ChainableFrameHandler.h \
- qpid/client/Channel.h \
- qpid/client/Exchange.h \
- qpid/client/Message.h \
- qpid/client/Queue.h \
qpid/client/AckPolicy.h \
+ qpid/client/Bounds.h \
qpid/client/Completion.h \
qpid/client/Connection.h \
qpid/client/ConnectionHandler.h \
qpid/client/ConnectionImpl.h \
+ qpid/client/ConnectionSettings.h \
qpid/client/Connector.h \
+ qpid/client/ChainableFrameHandler.h \
+ qpid/client/Channel.h \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
- qpid/client/LocalQueue.h \
+ qpid/client/Exchange.h \
qpid/client/Execution.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
qpid/client/FutureResult.h \
+ qpid/client/LocalQueue.h \
+ qpid/client/Message.h \
qpid/client/MessageListener.h \
qpid/client/MessageQueue.h \
+ qpid/client/Queue.h \
qpid/client/Results.h \
qpid/client/SessionBase.h \
qpid/client/Session.h \
diff --git a/qpid/cpp/src/qpid/client/Bounds.cpp b/qpid/cpp/src/qpid/client/Bounds.cpp
new file mode 100644
index 0000000000..1df21db941
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Bounds.cpp
@@ -0,0 +1,55 @@
+#include "Bounds.h"
+
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace client {
+
+using sys::Monitor;
+
+Bounds::Bounds(size_t maxSize) : max(maxSize), current(0) {}
+
+bool Bounds::expand(size_t sizeRequired, bool block)
+{
+ if (max) {
+ Monitor::ScopedLock l(lock);
+ current += sizeRequired;
+ if (block) {
+ while (current > max) {
+ QPID_LOG(debug, "Waiting for bounds: " << *this);
+ lock.wait();
+ }
+ QPID_LOG(debug, "Bounds ok: " << *this);
+ }
+ return current <= max;
+ } else {
+ return true;
+ }
+}
+
+void Bounds::reduce(size_t size)
+{
+ if (!max || size == 0) return;
+ Monitor::ScopedLock l(lock);
+ if (current == 0) return;
+ bool needNotify = current > max;
+ current -= std::min(size, current);
+ if (needNotify && current < max) {
+ //todo: notify one at a time, but ensure that all threads are
+ //eventually notified
+ lock.notifyAll();
+ }
+}
+
+size_t Bounds::getCurrentSize()
+{
+ Monitor::ScopedLock l(lock);
+ return current;
+}
+
+std::ostream& operator<<(std::ostream& out, const Bounds& bounds) {
+ out << "current=" << bounds.current << ", max=" << bounds.max << " [" << &bounds << "]";
+ return out;
+}
+
+}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Bounds.h b/qpid/cpp/src/qpid/client/Bounds.h
new file mode 100644
index 0000000000..db18becce3
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/Bounds.h
@@ -0,0 +1,48 @@
+#ifndef QPID_CLIENT_BOUNDSCHECKING_H
+#define QPID_CLIENT_BOUNDSCHECKING_H
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/Monitor.h"
+
+namespace qpid{
+namespace client{
+
+class Bounds
+{
+ public:
+ Bounds(size_t maxSize);
+ bool expand(size_t, bool block);
+ void reduce(size_t);
+ size_t getCurrentSize();
+
+ private:
+ friend std::ostream& operator<<(std::ostream&, const Bounds&);
+ sys::Monitor lock;
+ const size_t max;
+ size_t current;
+};
+
+std::ostream& operator<<(std::ostream&, const Bounds&);
+
+
+}}
+
+#endif
diff --git a/qpid/cpp/src/qpid/client/Connection.cpp b/qpid/cpp/src/qpid/client/Connection.cpp
index 25d1c510c8..c11f155afb 100644
--- a/qpid/cpp/src/qpid/client/Connection.cpp
+++ b/qpid/cpp/src/qpid/client/Connection.cpp
@@ -18,11 +18,8 @@
* under the License.
*
*/
-#include <algorithm>
-#include <boost/format.hpp>
-#include <boost/bind.hpp>
-
#include "Connection.h"
+#include "ConnectionSettings.h"
#include "Channel.h"
#include "Message.h"
#include "SessionImpl.h"
@@ -30,9 +27,13 @@
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
#include "qpid/shared_ptr.h"
+
+#include <algorithm>
#include <iostream>
#include <sstream>
#include <functional>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
using namespace qpid::framing;
using namespace qpid::sys;
@@ -41,41 +42,49 @@ using namespace qpid::sys;
namespace qpid {
namespace client {
-Connection::Connection(bool _debug, uint32_t _max_frame_size, framing::ProtocolVersion _version) :
- channelIdCounter(0), version(_version),
- max_frame_size(_max_frame_size),
- isOpen(false),
- impl(new ConnectionImpl(
- shared_ptr<Connector>(new Connector(_version, _debug))))
-{}
-
-Connection::Connection(shared_ptr<Connector> c) :
- channelIdCounter(0), version(framing::highestProtocolVersion),
- max_frame_size(65535),
- isOpen(false),
- impl(new ConnectionImpl(c))
-{}
+Connection::Connection(framing::ProtocolVersion _version) :
+ channelIdCounter(0), version(_version) {}
Connection::~Connection(){ }
void Connection::open(
const std::string& host, int port,
- const std::string& uid, const std::string& pwd, const std::string& vhost)
+ const std::string& uid, const std::string& pwd,
+ const std::string& vhost,
+ uint16_t maxFrameSize)
+{
+ ConnectionSettings settings;
+ settings.host = host;
+ settings.port = port;
+ settings.username = uid;
+ settings.password = pwd;
+ settings.virtualhost = vhost;
+ settings.maxFrameSize = maxFrameSize;
+ open(settings);
+}
+
+void Connection::open(const ConnectionSettings& settings)
{
- if (isOpen)
- throw Exception(QPID_MSG("Channel object is already open"));
+ if (impl)
+ throw Exception(QPID_MSG("Connection::open() was already called"));
- impl->open(host, port, uid, pwd, vhost);
- isOpen = true;
+ impl = boost::shared_ptr<ConnectionImpl>(new ConnectionImpl(version, settings));
+ max_frame_size = impl->getNegotiatedSettings().maxFrameSize;
}
-void Connection::openChannel(Channel& channel) {
+void Connection::openChannel(Channel& channel)
+{
+ if (!impl)
+ throw Exception(QPID_MSG("Connection has not yet been opened"));
channel.open(newSession(ASYNC));
}
Session Connection::newSession(SynchronousMode sync,
- uint32_t detachedLifetime)
+ uint32_t detachedLifetime)
{
+ if (!impl)
+ throw Exception(QPID_MSG("Connection has not yet been opened"));
+
shared_ptr<SessionImpl> core(
new SessionImpl(impl, ++channelIdCounter, max_frame_size));
core->setSync(sync);
@@ -85,13 +94,19 @@ Session Connection::newSession(SynchronousMode sync,
}
void Connection::resume(Session& session) {
+ if (!impl)
+ throw Exception(QPID_MSG("Connection has not yet been opened"));
+
session.impl->setChannel(++channelIdCounter);
impl->addSession(session.impl);
session.impl->resume(impl);
}
void Connection::close() {
- impl->close();
+ if (impl) {
+ impl->close();
+ impl.reset();
+ }
}
}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/Connection.h b/qpid/cpp/src/qpid/client/Connection.h
index 0ddd383381..417739fd1d 100644
--- a/qpid/cpp/src/qpid/client/Connection.h
+++ b/qpid/cpp/src/qpid/client/Connection.h
@@ -39,6 +39,7 @@ namespace qpid {
*/
namespace client {
+class ConnectionSettings;
/**
* \defgroup clientapi Application API for an AMQP client.
*/
@@ -54,9 +55,7 @@ class Connection
{
framing::ChannelId channelIdCounter;
framing::ProtocolVersion version;
- const uint32_t max_frame_size;
- bool isOpen;
- bool debug;
+ uint16_t max_frame_size;
protected:
boost::shared_ptr<ConnectionImpl> impl;
@@ -67,17 +66,8 @@ class Connection
* connection.
*
* @param _version the version of the protocol to connect with.
- *
- * @param debug turns on tracing for the connection
- * (i.e. prints details of the frames sent and received to std
- * out). Optional. Defaults to false.
- *
- * @param max_frame_size the maximum frame size that the
- * client will accept. Optional. Defaults to 65535.
*/
- Connection(bool debug = false, uint32_t max_frame_size = 65535,
- framing::ProtocolVersion=framing::highestProtocolVersion);
- Connection(boost::shared_ptr<Connector>);
+ Connection(framing::ProtocolVersion=framing::highestProtocolVersion);
~Connection();
/**
@@ -100,7 +90,14 @@ class Connection
void open(const std::string& host, int port = 5672,
const std::string& uid = "guest",
const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
+ const std::string& virtualhost = "/", uint16_t maxFrameSize=65535);
+
+ /**
+ * Opens a connection to a broker.
+ *
+ * @param the settings to use (host, port etc) @see ConnectionSettings
+ */
+ void open(const ConnectionSettings& settings);
/**
* Close the connection.
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
index df27942008..81d966d53f 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -20,9 +20,9 @@
*/
#include "ConnectionHandler.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/all_method_bodies.h"
#include "qpid/framing/ClientInvoker.h"
#include "qpid/framing/reply_exceptions.h"
@@ -42,17 +42,10 @@ const std::string INVALID_STATE_OPEN_OK("open-ok received in invalid state");
const std::string INVALID_STATE_CLOSE_OK("close-ok received in invalid state");
}
-ConnectionHandler::ConnectionHandler()
- : StateManager(NOT_STARTED), outHandler(*this), proxy(outHandler), errorCode(200)
-{
-
- mechanism = PLAIN;
- locale = en_US;
- heartbeat = 0;
- maxChannels = 32767;
- maxFrameSize = 65535;
+ConnectionHandler::ConnectionHandler(const ConnectionSettings& s, framing::ProtocolVersion& v)
+ : StateManager(NOT_STARTED), ConnectionSettings(s), outHandler(*this), proxy(outHandler), errorCode(200), version(v)
+{
insist = true;
- version = framing::highestProtocolVersion;
ESTABLISHED.insert(FAILED);
ESTABLISHED.insert(CLOSED);
@@ -141,7 +134,7 @@ void ConnectionHandler::start(const FieldTable& /*serverProps*/, const Array& /*
checkState(NOT_STARTED, INVALID_STATE_START);
setState(NEGOTIATING);
//TODO: verify that desired mechanism and locale are supported
- string response = ((char)0) + uid + ((char)0) + pwd;
+ string response = ((char)0) + username + ((char)0) + password;
proxy.startOk(properties, mechanism, response, locale);
}
@@ -150,14 +143,16 @@ void ConnectionHandler::secure(const std::string& /*challenge*/)
throw NotImplementedException("Challenge-response cycle not yet implemented in client");
}
-void ConnectionHandler::tune(uint16_t channelMax, uint16_t /*frameMax*/, uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
+void ConnectionHandler::tune(uint16_t maxChannelsProposed, uint16_t maxFrameSizeProposed,
+ uint16_t /*heartbeatMin*/, uint16_t /*heartbeatMax*/)
{
checkState(NEGOTIATING, INVALID_STATE_TUNE);
- //TODO: verify that desired heartbeat and max frame size are valid
- maxChannels = channelMax;
+ maxChannels = std::min(maxChannels, maxChannelsProposed);
+ maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed);
+ //TODO: implement heartbeats and check desired value is in valid range
proxy.tuneOk(maxChannels, maxFrameSize, heartbeat);
setState(OPENING);
- proxy.open(vhost, capabilities, insist);
+ proxy.open(virtualhost, capabilities, insist);
}
void ConnectionHandler::openOk(const framing::Array& /*knownHosts*/)
diff --git a/qpid/cpp/src/qpid/client/ConnectionHandler.h b/qpid/cpp/src/qpid/client/ConnectionHandler.h
index d7ab97ce31..1cf0c905ed 100644
--- a/qpid/cpp/src/qpid/client/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/client/ConnectionHandler.h
@@ -22,9 +22,10 @@
#define _ConnectionHandler_
#include "ChainableFrameHandler.h"
-#include "Connector.h"
+#include "ConnectionSettings.h"
#include "StateManager.h"
#include "qpid/framing/AMQMethodBody.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/Array.h"
@@ -35,27 +36,11 @@
namespace qpid {
namespace client {
-struct ConnectionProperties
-{
- std::string uid;
- std::string pwd;
- std::string vhost;
- framing::FieldTable properties;
- std::string mechanism;
- std::string locale;
- framing::Array capabilities;
- uint16_t heartbeat;
- uint16_t maxChannels;
- uint64_t maxFrameSize;
- bool insist;
- framing::ProtocolVersion version;
-};
-
-class ConnectionHandler : private StateManager,
- public ConnectionProperties,
- public ChainableFrameHandler,
- public framing::InputHandler,
- private framing::AMQP_ClientOperations::ConnectionHandler
+class ConnectionHandler : private StateManager,
+ public ConnectionSettings,
+ public ChainableFrameHandler,
+ public framing::InputHandler,
+ private framing::AMQP_ClientOperations::ConnectionHandler
{
typedef framing::AMQP_ClientOperations::ConnectionHandler ConnectionOperations;
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
@@ -73,6 +58,10 @@ class ConnectionHandler : private StateManager,
framing::AMQP_ServerProxy::Connection proxy;
uint16_t errorCode;
std::string errorText;
+ bool insist;
+ framing::ProtocolVersion version;
+ framing::Array capabilities;
+ framing::FieldTable properties;
void checkState(STATES s, const std::string& msg);
@@ -96,7 +85,7 @@ public:
typedef boost::function<void()> CloseListener;
typedef boost::function<void(uint16_t, const std::string&)> ErrorListener;
- ConnectionHandler();
+ ConnectionHandler(const ConnectionSettings&, framing::ProtocolVersion&);
void received(framing::AMQFrame& f) { incoming(f); }
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
index ce95e43f58..643d42403d 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,13 +18,14 @@
* under the License.
*
*/
+#include "ConnectionImpl.h"
+#include "ConnectionSettings.h"
+#include "SessionImpl.h"
+
#include "qpid/log/Statement.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
-#include "ConnectionImpl.h"
-#include "SessionImpl.h"
-
#include <boost/bind.hpp>
#include <boost/format.hpp>
@@ -34,24 +35,32 @@ using namespace qpid::sys;
using namespace qpid::framing::connection;//for connection error codes
-ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
- : connector(c), isClosed(false), isClosing(false)
+ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings)
+ : Bounds(settings.maxFrameSize * settings.bounds),
+ handler(settings, v),
+ connector(v, settings, this),
+ version(v),
+ isClosed(false),
+ isClosing(false)
{
+ QPID_LOG(debug, "ConnectionImpl created for " << version);
handler.in = boost::bind(&ConnectionImpl::incoming, this, _1);
- handler.out = boost::bind(&Connector::send, connector, _1);
+ handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
handler.onClose = boost::bind(&ConnectionImpl::closed, this,
NORMAL, std::string());
handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
- connector->setInputHandler(&handler);
- connector->setTimeoutHandler(this);
- connector->setShutdownHandler(this);
+ connector.setInputHandler(&handler);
+ connector.setTimeoutHandler(this);
+ connector.setShutdownHandler(this);
+
+ open(settings.host, settings.port);
}
ConnectionImpl::~ConnectionImpl() {
// Important to close the connector first, to ensure the
// connector thread does not call on us while the destructor
// is running.
- connector->close();
+ connector.close();
}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session)
@@ -79,18 +88,11 @@ void ConnectionImpl::incoming(framing::AMQFrame& frame)
s->in(frame);
}
-void ConnectionImpl::open(const std::string& host, int port,
- const std::string& uid, const std::string& pwd,
- const std::string& vhost)
+void ConnectionImpl::open(const std::string& host, int port)
{
- //TODO: better management of connection properties
- handler.uid = uid;
- handler.pwd = pwd;
- handler.vhost = vhost;
-
QPID_LOG(info, "Connecting to " << host << ":" << port);
- connector->connect(host, port);
- connector->init();
+ connector.connect(host, port);
+ connector.init();
handler.waitForOpen();
}
@@ -102,7 +104,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
AMQFrame frame(in_place<AMQHeartbeatBody>());
- connector->send(frame);
+ connector.send(frame);
}
void ConnectionImpl::close()
@@ -121,7 +123,7 @@ void ConnectionImpl::close()
// so sessions can be updated outside the lock.
ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
isClosed = true;
- connector->close();
+ connector.close();
SessionVector save;
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
boost::shared_ptr<SessionImpl> s = i->second.lock();
@@ -157,4 +159,9 @@ void ConnectionImpl::erase(uint16_t ch) {
Mutex::ScopedLock l(lock);
sessions.erase(ch);
}
+
+const ConnectionSettings& ConnectionImpl::getNegotiatedSettings()
+{
+ return handler;
+}
diff --git a/qpid/cpp/src/qpid/client/ConnectionImpl.h b/qpid/cpp/src/qpid/client/ConnectionImpl.h
index d0df9238f2..986b044b49 100644
--- a/qpid/cpp/src/qpid/client/ConnectionImpl.h
+++ b/qpid/cpp/src/qpid/client/ConnectionImpl.h
@@ -22,22 +22,26 @@
#ifndef _ConnectionImpl_
#define _ConnectionImpl_
-#include <map>
-#include <boost/shared_ptr.hpp>
-#include <boost/weak_ptr.hpp>
+#include "Bounds.h"
+#include "ConnectionHandler.h"
+#include "Connector.h"
#include "qpid/framing/FrameHandler.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
-#include "ConnectionHandler.h"
-#include "Connector.h"
+
+#include <map>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
namespace qpid {
namespace client {
+class ConnectionSettings;
class SessionImpl;
-class ConnectionImpl : public framing::FrameHandler,
+class ConnectionImpl : public Bounds,
+ public framing::FrameHandler,
public sys::TimeoutHandler,
public sys::ShutdownHandler
@@ -47,7 +51,7 @@ class ConnectionImpl : public framing::FrameHandler,
SessionMap sessions;
ConnectionHandler handler;
- boost::shared_ptr<Connector> connector;
+ Connector connector;
framing::ProtocolVersion version;
sys::Mutex lock;
bool isClosed;
@@ -55,6 +59,8 @@ class ConnectionImpl : public framing::FrameHandler,
template <class F> void detachAll(const F&);
+ void open(const std::string& host, int port);
+
SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
@@ -64,21 +70,16 @@ class ConnectionImpl : public framing::FrameHandler,
bool setClosing();
public:
- typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
-
- ConnectionImpl(boost::shared_ptr<Connector> c);
+ ConnectionImpl(framing::ProtocolVersion version, const ConnectionSettings& settings);
~ConnectionImpl();
void addSession(const boost::shared_ptr<SessionImpl>&);
- void open(const std::string& host, int port = 5672,
- const std::string& uid = "guest",
- const std::string& pwd = "guest",
- const std::string& virtualhost = "/");
void close();
void handle(framing::AMQFrame& frame);
void erase(uint16_t channel);
- boost::shared_ptr<Connector> getConnector() { return connector; }
+
+ const ConnectionSettings& getNegotiatedSettings();
};
}}
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.cpp b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
new file mode 100644
index 0000000000..ea2729c2dd
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ConnectionSettings.h"
+
+#include "qpid/sys/posix/check.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+namespace qpid {
+namespace client {
+
+ConnectionSettings::ConnectionSettings() :
+ Options("Connection Settings"),
+ host("localhost"),
+ port(TcpAddress::DEFAULT_PORT),
+ clientid("cpp"),
+ username("guest"),
+ password("guest"),
+ mechanism("PLAIN"),
+ locale("en_US"),
+ heartbeat(0),
+ maxChannels(32767),
+ maxFrameSize(65535),
+ bounds(2),
+ tcpNoDelay(false)
+{
+ addOptions()
+ ("broker,b", optValue(host, "HOST"), "Broker host to connect to")
+ ("port,p", optValue(port, "PORT"), "Broker port to connect to")
+ ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
+ ("clientname,n", optValue(clientid, "ID"), "unique client identifier")
+ ("username", optValue(username, "USER"), "user name for broker log in.")
+ ("password", optValue(password, "PASSWORD"), "password for broker log in.")
+ ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.")
+ ("locale", optValue(locale, "LOCALE"), "locale to use.")
+ ("max-channels", optValue(maxChannels, "N"), "the maximum number of channels the client requires.")
+ ("max-frame-size", optValue(maxFrameSize, "N"), "the maximum frame size to request.")
+ ("bounds-multiplier", optValue(bounds, "N"),
+ "restricts the total size of outgoing frames queued up for writing (as a multiple of the max frame size).");
+ add(log);
+}
+
+ConnectionSettings::~ConnectionSettings() {}
+
+void ConnectionSettings::parse(int argc, char** argv)
+{
+ qpid::Options::parse(argc, argv);
+ qpid::log::Logger::instance().configure(log, argv[0]);
+}
+
+
+void ConnectionSettings::configurePosixTcpSocket(int fd) const
+{
+ if (tcpNoDelay) {
+ int flag = 1;
+ int result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ QPID_LOG(debug, "Set TCP_NODELAY");
+ }
+}
+
+}} // namespace qpid::client
diff --git a/qpid/cpp/src/qpid/client/ConnectionSettings.h b/qpid/cpp/src/qpid/client/ConnectionSettings.h
new file mode 100644
index 0000000000..b430c87099
--- /dev/null
+++ b/qpid/cpp/src/qpid/client/ConnectionSettings.h
@@ -0,0 +1,76 @@
+#ifndef QPID_CLIENT_CONNECTIONSETTINGS_H
+#define QPID_CLIENT_CONNECTIONSETTINGS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "qpid/log/Options.h"
+#include "qpid/Url.h"
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Socket.h"
+
+#include <iostream>
+#include <exception>
+
+namespace qpid {
+namespace client {
+
+/**
+ * Used to hold seetings for a connection (and parse these from
+ * command line oprions etc as a convenience).
+ */
+struct ConnectionSettings : qpid::Options, qpid::sys::Socket::Configuration
+{
+ ConnectionSettings();
+ virtual ~ConnectionSettings();
+
+ /**
+ * Applies any tcp specific options to the sockets file descriptor
+ */
+ virtual void configurePosixTcpSocket(int fd) const;
+
+ /**
+ * Parse options from command line arguments (will throw exception
+ * if arguments cannot be parsed).
+ */
+ void parse(int argc, char** argv);
+
+ std::string host;
+ uint16_t port;
+ std::string virtualhost;
+ std::string clientid;
+ std::string username;
+ std::string password;
+ std::string mechanism;
+ std::string locale;
+ uint16_t heartbeat;
+ uint16_t maxChannels;
+ uint16_t maxFrameSize;
+ uint bounds;
+ bool tcpNoDelay;
+
+ log::Options log;
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_CONNECTIONSETTINGS_H*/
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
index 7fb4997f5a..c9c55c50e8 100644
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ b/qpid/cpp/src/qpid/client/Connector.cpp
@@ -20,6 +20,8 @@
*/
#include "Connector.h"
+#include "Bounds.h"
+#include "ConnectionSettings.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Time.h"
#include "qpid/framing/AMQFrame.h"
@@ -40,21 +42,22 @@ using namespace qpid::framing;
using boost::format;
using boost::str;
-Connector::Connector(
- ProtocolVersion ver, bool _debug, uint32_t buffer_size
-) : debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- version(ver),
- initiated(false),
- closed(true),
- joined(true),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- aio(0)
-{}
+Connector::Connector(ProtocolVersion ver, const ConnectionSettings& settings, Bounds* bounds)
+ : maxFrameSize(settings.maxFrameSize),
+ version(ver),
+ initiated(false),
+ closed(true),
+ joined(true),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ writer(maxFrameSize, bounds),
+ aio(0)
+{
+ QPID_LOG(debug, "Connector created for " << version);
+ socket.configure(settings);
+}
Connector::~Connector() {
close();
@@ -176,11 +179,11 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
}
struct Connector::Buff : public AsynchIO::BufferBase {
- Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}
+ Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {}
~Buff() { delete [] bytes;}
};
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0)
+Connector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b)
{
}
@@ -192,12 +195,12 @@ void Connector::Writer::init(std::string id, sys::AsynchIO* a) {
aio = a;
newBuffer(l);
}
-
void Connector::Writer::handle(framing::AMQFrame& frame) {
Mutex::ScopedLock l(lock);
frames.push_back(frame);
- if (frame.getEof()) {
+ if (frame.getEof()) {//or if we already have a buffers worth
lastEof = frames.size();
+ QPID_LOG(debug, "Requesting write: lastEof=" << lastEof);
aio->notifyPendingWrite();
}
QPID_LOG(trace, "SENT " << identifier << ": " << frame);
@@ -217,7 +220,7 @@ void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff();
+ if (!buffer) buffer = new Buff(maxFrameSize);
encode = framing::Buffer(buffer->bytes, buffer->byteCount);
framesEncoded = 0;
}
@@ -226,15 +229,20 @@ void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
void Connector::Writer::write(sys::AsynchIO&) {
Mutex::ScopedLock l(lock);
assert(buffer);
+ size_t bytesWritten(0);
for (size_t i = 0; i < lastEof; ++i) {
AMQFrame& frame = frames[i];
- if (frame.size() > encode.available()) writeOne(l);
- assert(frame.size() <= encode.available());
+ uint32_t size = frame.size();
+ if (size > encode.available()) writeOne(l);
+ assert(size <= encode.available());
frame.encode(encode);
++framesEncoded;
+ bytesWritten += size;
+ QPID_LOG(debug, "Wrote frame: lastEof=" << lastEof << ", i=" << i);
}
frames.erase(frames.begin(), frames.begin()+lastEof);
lastEof = 0;
+ if (bounds) bounds->reduce(bytesWritten);
if (encode.getPosition() > 0) writeOne(l);
}
@@ -272,7 +280,7 @@ void Connector::writebuff(AsynchIO& aio_) {
}
void Connector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff;
+ AsynchIO::BufferBase* buff = new Buff(maxFrameSize);
framing::Buffer out(buff->bytes, buff->byteCount);
data.encode(out);
buff->dataCount = data.size();
@@ -290,7 +298,7 @@ void Connector::run(){
Dispatcher d(poller);
for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
+ aio->queueReadBuffer(new Buff(maxFrameSize));
}
aio->start(poller);
diff --git a/qpid/cpp/src/qpid/client/Connector.h b/qpid/cpp/src/qpid/client/Connector.h
index 366f82acbd..9b13e1d519 100644
--- a/qpid/cpp/src/qpid/client/Connector.h
+++ b/qpid/cpp/src/qpid/client/Connector.h
@@ -31,17 +31,21 @@
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/Socket.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/AsynchIO.h"
#include <queue>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace client {
+class Bounds;
+class ConnectionSettings;
+
class Connector : public framing::OutputHandler,
private sys::Runnable
{
@@ -52,6 +56,7 @@ class Connector : public framing::OutputHandler,
typedef sys::AsynchIO::BufferBase BufferBase;
typedef std::vector<framing::AMQFrame> Frames;
+ const uint16_t maxFrameSize;
sys::Mutex lock;
sys::AsynchIO* aio;
BufferBase* buffer;
@@ -60,22 +65,21 @@ class Connector : public framing::OutputHandler,
framing::Buffer encode;
size_t framesEncoded;
std::string identifier;
+ Bounds* bounds;
void writeOne(const sys::Mutex::ScopedLock&);
void newBuffer(const sys::Mutex::ScopedLock&);
public:
- Writer();
+ Writer(uint16_t maxFrameSize, Bounds*);
~Writer();
void init(std::string id, sys::AsynchIO*);
void handle(framing::AMQFrame&);
void write(sys::AsynchIO&);
};
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
+ const uint16_t maxFrameSize;
framing::ProtocolVersion version;
bool initiated;
@@ -122,7 +126,8 @@ class Connector : public framing::OutputHandler,
public:
Connector(framing::ProtocolVersion pVersion,
- bool debug = false, uint32_t buffer_size = 1024);
+ const ConnectionSettings&,
+ Bounds* bounds = 0);
virtual ~Connector();
virtual void connect(const std::string& host, int port);
virtual void init();
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.cpp b/qpid/cpp/src/qpid/client/SessionImpl.cpp
index 571d54df0c..e998d040c8 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/SessionImpl.cpp
@@ -45,6 +45,7 @@ using namespace qpid::framing::session;//for detach codes
typedef sys::Monitor::ScopedLock Lock;
typedef sys::Monitor::ScopedUnlock UnLock;
+typedef sys::ScopedLock<sys::Semaphore> Acquire;
SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
@@ -60,8 +61,9 @@ SessionImpl::SessionImpl(shared_ptr<ConnectionImpl> conn,
name(id.str()),
//TODO: may want to allow application defined names instead
connection(conn),
+ ioHandler(*this),
channel(ch),
- proxy(channel),
+ proxy(ioHandler),
nextIn(0),
nextOut(0)
{
@@ -281,14 +283,20 @@ Future SessionImpl::send(const AMQBody& command, const MethodContent& content)
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* content)
{
- Lock l(state);
- checkOpen();
+ Acquire a(sendLock);
SequenceNumber id = nextOut++;
- incompleteOut.add(id);
+ bool sync;
+ {
+ Lock l(state);
+ checkOpen();
+ incompleteOut.add(id);
+ sync = syncMode;
+ }
- if (syncMode) command.getMethod()->setSync(syncMode);
+ if (sync) command.getMethod()->setSync(true);
Future f(id);
if (command.getMethod()->resultExpected()) {
+ Lock l(state);
//result listener must be set before the command is sent
f.setFutureResult(results.listenForResult(id));
}
@@ -300,26 +308,25 @@ Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent* con
if (content) {
sendContent(*content);
}
- if (syncMode) {
- waitForCompletionImpl(id);
+ if (sync) {
+ waitForCompletion(id);
}
return f;
}
-
void SessionImpl::sendContent(const MethodContent& content)
{
AMQFrame header(content.getHeader());
- header.setBof(false);
+ header.setFirstSegment(false);
u_int64_t data_length = content.getData().length();
if(data_length > 0){
- header.setEof(false);
+ header.setLastSegment(false);
handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const u_int32_t frag_size = maxFrameSize - (AMQFrame::frameOverhead() - 1);
if(data_length < frag_size){
AMQFrame frame(in_place<AMQContentBody>(content.getData()));
- frame.setBof(false);
+ frame.setFirstSegment(false);
handleOut(frame);
}else{
u_int32_t offset = 0;
@@ -328,15 +335,15 @@ void SessionImpl::sendContent(const MethodContent& content)
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
AMQFrame frame(in_place<AMQContentBody>(frag));
- frame.setBof(false);
+ frame.setFirstSegment(false);
+ frame.setLastSegment(true);
if (offset > 0) {
- frame.setBos(false);
+ frame.setFirstFrame(false);
}
offset += length;
remaining = data_length - offset;
if (remaining) {
- frame.setEos(false);
- frame.setEof(false);
+ frame.setLastFrame(false);
}
handleOut(frame);
}
@@ -391,6 +398,13 @@ void SessionImpl::handleIn(AMQFrame& frame) // network thread
void SessionImpl::handleOut(AMQFrame& frame) // user thread
{
+ connection->expand(frame.size(), true);
+ channel.handle(frame);
+}
+
+void SessionImpl::proxyOut(AMQFrame& frame) // network thread
+{
+ connection->expand(frame.size(), false);
channel.handle(frame);
}
@@ -620,10 +634,8 @@ void SessionImpl::assertOpen() const
void SessionImpl::handleClosed()
{
- QPID_LOG(info, "SessionImpl::handleClosed(): entering");
demux.close();
results.close();
- QPID_LOG(info, "SessionImpl::handleClosed(): returning");
}
}}
diff --git a/qpid/cpp/src/qpid/client/SessionImpl.h b/qpid/cpp/src/qpid/client/SessionImpl.h
index 3b2e80fefd..0bcec4dd0c 100644
--- a/qpid/cpp/src/qpid/client/SessionImpl.h
+++ b/qpid/cpp/src/qpid/client/SessionImpl.h
@@ -33,6 +33,7 @@
#include "qpid/framing/SequenceNumber.h"
#include "qpid/framing/AMQP_ClientOperations.h"
#include "qpid/framing/AMQP_ServerProxy.h"
+#include "qpid/sys/Semaphore.h"
#include "qpid/sys/StateMonitor.h"
#include <boost/optional.hpp>
@@ -124,6 +125,7 @@ private:
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
+ void proxyOut(framing::AMQFrame& frame);
void deliver(framing::AMQFrame& frame);
Future sendCommand(const framing::AMQBody&, const framing::MethodContent* = 0);
@@ -164,14 +166,15 @@ private:
int code; // Error code
std::string text; // Error text
mutable StateMonitor state;
+ mutable sys::Semaphore sendLock;
volatile bool syncMode;
uint32_t detachedLifetime;
const uint64_t maxFrameSize;
const framing::Uuid id;
const std::string name;
-
shared_ptr<ConnectionImpl> connection;
+ framing::FrameHandler::MemFunRef<SessionImpl, &SessionImpl::proxyOut> ioHandler;
framing::ChannelHandler channel;
framing::AMQP_ServerProxy::Session proxy;
diff --git a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
index 0aa1bf7e66..42139c7937 100644
--- a/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
+++ b/qpid/cpp/src/qpid/framing/AMQP_HighestVersion.h
@@ -32,7 +32,6 @@
namespace qpid {
namespace framing {
-//static ProtocolVersion highestProtocolVersion(99, 0);
static ProtocolVersion highestProtocolVersion(0, 10);
} /* namespace framing */
diff --git a/qpid/cpp/src/qpid/framing/ProtocolVersion.h b/qpid/cpp/src/qpid/framing/ProtocolVersion.h
index a2a755397b..9a7ebec491 100644
--- a/qpid/cpp/src/qpid/framing/ProtocolVersion.h
+++ b/qpid/cpp/src/qpid/framing/ProtocolVersion.h
@@ -35,7 +35,7 @@ private:
uint8_t minor_;
public:
- ProtocolVersion(uint8_t _major=0, uint8_t _minor=0)
+ explicit ProtocolVersion(uint8_t _major=0, uint8_t _minor=0)
: major_(_major), minor_(_minor) {}
uint8_t getMajor() const { return major_; }
diff --git a/qpid/cpp/src/qpid/sys/Socket.h b/qpid/cpp/src/qpid/sys/Socket.h
index cab95654ad..20dc0f1ce3 100644
--- a/qpid/cpp/src/qpid/sys/Socket.h
+++ b/qpid/cpp/src/qpid/sys/Socket.h
@@ -103,8 +103,16 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
+ struct Configuration
+ {
+ virtual void configurePosixTcpSocket(int fd) const = 0;
+ virtual ~Configuration() {}
+ };
+
+ void configure(const Configuration&);
+
private:
- Socket(IOHandlePrivate*);
+ Socket(IOHandlePrivate*);
};
}}
diff --git a/qpid/cpp/src/qpid/sys/posix/Socket.cpp b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
index 99cf7210b6..5f10cd84c2 100644
--- a/qpid/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/qpid/cpp/src/qpid/sys/posix/Socket.cpp
@@ -104,8 +104,8 @@ Socket::Socket(IOHandlePrivate* h) :
void Socket::createTcp() const
{
- int& socket = impl->fd;
- if (socket != -1) Socket::close();
+ int& socket = impl->fd;
+ if (socket != -1) Socket::close();
int s = ::socket (PF_INET, SOCK_STREAM, 0);
if (s < 0) throw QPID_POSIX_ERROR(errno);
socket = s;
@@ -113,7 +113,7 @@ void Socket::createTcp() const
void Socket::setTimeout(const Duration& interval) const
{
- const int& socket = impl->fd;
+ const int& socket = impl->fd;
struct timeval tv;
toTimeval(tv, interval);
setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
@@ -138,7 +138,7 @@ const char* h_errstr(int e) {
void Socket::connect(const std::string& host, int port) const
{
- const int& socket = impl->fd;
+ const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
name.sin_port = htons(port);
@@ -155,7 +155,7 @@ void Socket::connect(const std::string& host, int port) const
void
Socket::close() const
{
- int& socket = impl->fd;
+ int& socket = impl->fd;
if (socket == -1) return;
if (::close(socket) < 0) throw QPID_POSIX_ERROR(errno);
socket = -1;
@@ -164,7 +164,7 @@ Socket::close() const
ssize_t
Socket::send(const void* data, size_t size) const
{
- const int& socket = impl->fd;
+ const int& socket = impl->fd;
ssize_t sent = ::send(socket, data, size, 0);
if (sent < 0) {
if (errno == ECONNRESET) return SOCKET_EOF;
@@ -177,7 +177,7 @@ Socket::send(const void* data, size_t size) const
ssize_t
Socket::recv(void* data, size_t size) const
{
- const int& socket = impl->fd;
+ const int& socket = impl->fd;
ssize_t received = ::recv(socket, data, size, 0);
if (received < 0) {
if (errno == ETIMEDOUT) return SOCKET_TIMEOUT;
@@ -209,22 +209,22 @@ int Socket::listen(int port, int backlog) const
Socket* Socket::accept(struct sockaddr *addr, socklen_t *addrlen) const
{
- int afd = ::accept(impl->fd, addr, addrlen);
- if ( afd >= 0)
- return new Socket(new IOHandlePrivate(afd));
- else if (errno == EAGAIN)
- return 0;
+ int afd = ::accept(impl->fd, addr, addrlen);
+ if ( afd >= 0)
+ return new Socket(new IOHandlePrivate(afd));
+ else if (errno == EAGAIN)
+ return 0;
else throw QPID_POSIX_ERROR(errno);
}
int Socket::read(void *buf, size_t count) const
{
- return ::read(impl->fd, buf, count);
+ return ::read(impl->fd, buf, count);
}
int Socket::write(const void *buf, size_t count) const
{
- return ::write(impl->fd, buf, count);
+ return ::write(impl->fd, buf, count);
}
std::string Socket::getSockname() const
@@ -257,4 +257,9 @@ uint16_t Socket::getRemotePort() const
return atoi(getService(impl->fd, true).c_str());
}
+void Socket::configure(const Configuration& c)
+{
+ c.configurePosixTcpSocket(impl->fd);
+}
+
}} // namespace qpid::sys
diff --git a/qpid/cpp/src/tests/BasicP2PTest.cpp b/qpid/cpp/src/tests/BasicP2PTest.cpp
index b202f88ca6..9f5f1acccb 100644
--- a/qpid/cpp/src/tests/BasicP2PTest.cpp
+++ b/qpid/cpp/src/tests/BasicP2PTest.cpp
@@ -29,7 +29,7 @@ class BasicP2PTest::Receiver : public Worker, public MessageListener
const std::string queue;
std::string tag;
public:
- Receiver(TestOptions& options, const std::string& _queue, const int _messages)
+ Receiver(ConnectionSettings& options, const std::string& _queue, const int _messages)
: Worker(options, _messages), queue(_queue){}
void init()
{
@@ -51,7 +51,7 @@ public:
}
};
-void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options)
+void BasicP2PTest::assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options)
{
std::string queue = params.getString("P2P_QUEUE_AND_KEY_NAME");
int messages = params.getInt("P2P_NUM_MESSAGES");
diff --git a/qpid/cpp/src/tests/BasicP2PTest.h b/qpid/cpp/src/tests/BasicP2PTest.h
index 3f0a3704f5..8f957187b0 100644
--- a/qpid/cpp/src/tests/BasicP2PTest.h
+++ b/qpid/cpp/src/tests/BasicP2PTest.h
@@ -38,7 +38,7 @@ class BasicP2PTest : public SimpleTestCaseBase
{
class Receiver;
public:
- void assign(const std::string& role, framing::FieldTable& params, TestOptions& options);
+ void assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options);
};
}
diff --git a/qpid/cpp/src/tests/BasicPubSubTest.cpp b/qpid/cpp/src/tests/BasicPubSubTest.cpp
index 623194d331..aed47b419c 100644
--- a/qpid/cpp/src/tests/BasicPubSubTest.cpp
+++ b/qpid/cpp/src/tests/BasicPubSubTest.cpp
@@ -30,7 +30,7 @@ class BasicPubSubTest::Receiver : public Worker, public MessageListener
const std::string key;
std::string tag;
public:
- Receiver(TestOptions& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages)
+ Receiver(ConnectionSettings& options, const Exchange& _exchange, const std::string& _queue, const std::string& _key, const int _messages)
: Worker(options, _messages), exchange(_exchange), queue(_queue), key(_key){}
void init()
@@ -58,7 +58,7 @@ class BasicPubSubTest::MultiReceiver : public Worker, public MessageListener
ReceiverList receivers;
public:
- MultiReceiver(TestOptions& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount)
+ MultiReceiver(ConnectionSettings& options, const Exchange& exchange, const std::string& key, const int _messages, int receiverCount)
: Worker(options, _messages)
{
for (int i = 0; i != receiverCount; i++) {
@@ -104,7 +104,7 @@ public:
}
};
-void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, TestOptions& options)
+void BasicPubSubTest::assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options)
{
std::string key = params.getString("PUBSUB_KEY");
int messages = params.getInt("PUBSUB_NUM_MESSAGES");
diff --git a/qpid/cpp/src/tests/BasicPubSubTest.h b/qpid/cpp/src/tests/BasicPubSubTest.h
index c3f8020b3a..a6d794c5b6 100644
--- a/qpid/cpp/src/tests/BasicPubSubTest.h
+++ b/qpid/cpp/src/tests/BasicPubSubTest.h
@@ -43,7 +43,7 @@ class BasicPubSubTest : public SimpleTestCaseBase
class Receiver;
class MultiReceiver;
public:
- void assign(const std::string& role, framing::FieldTable& params, TestOptions& options);
+ void assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options);
};
}
diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
index 4f071cd02b..0111e030fb 100644
--- a/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
+++ b/qpid/cpp/src/tests/SimpleTestCaseBase.cpp
@@ -47,7 +47,7 @@ void SimpleTestCaseBase::report(client::Message& report)
}
}
-SimpleTestCaseBase::Sender::Sender(TestOptions& options,
+SimpleTestCaseBase::Sender::Sender(ConnectionSettings& options,
const Exchange& _exchange,
const std::string& _key,
const int _messages)
@@ -67,8 +67,8 @@ void SimpleTestCaseBase::Sender::start(){
stop();
}
-SimpleTestCaseBase::Worker::Worker(TestOptions& options, const int _messages) :
- connection(options.trace), messages(_messages), count(0)
+SimpleTestCaseBase::Worker::Worker(ConnectionSettings& options, const int _messages) :
+ messages(_messages), count(0)
{
connection.open(options.host, options.port);
connection.openChannel(channel);
diff --git a/qpid/cpp/src/tests/SimpleTestCaseBase.h b/qpid/cpp/src/tests/SimpleTestCaseBase.h
index 7f94fa7e1c..e2c328437e 100644
--- a/qpid/cpp/src/tests/SimpleTestCaseBase.h
+++ b/qpid/cpp/src/tests/SimpleTestCaseBase.h
@@ -28,6 +28,7 @@
#include "qpid/client/Channel.h"
#include "qpid/client/Message.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/MessageListener.h"
#include "TestCase.h"
@@ -49,7 +50,7 @@ protected:
public:
- Worker(TestOptions& options, const int messages);
+ Worker(ConnectionSettings& options, const int messages);
virtual ~Worker(){}
virtual void stop();
@@ -63,7 +64,7 @@ protected:
const Exchange& exchange;
const std::string key;
public:
- Sender(TestOptions& options,
+ Sender(ConnectionSettings& options,
const Exchange& exchange,
const std::string& key,
const int messages);
@@ -74,7 +75,7 @@ protected:
std::auto_ptr<Worker> worker;
public:
- virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0;
+ virtual void assign(const std::string& role, framing::FieldTable& params, ConnectionSettings& options) = 0;
virtual ~SimpleTestCaseBase() {}
diff --git a/qpid/cpp/src/tests/TestCase.h b/qpid/cpp/src/tests/TestCase.h
index 07bdd68933..7926c4fe1d 100644
--- a/qpid/cpp/src/tests/TestCase.h
+++ b/qpid/cpp/src/tests/TestCase.h
@@ -21,8 +21,8 @@
*
*/
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/Message.h"
-#include "TestOptions.h"
namespace qpid {
@@ -39,7 +39,7 @@ public:
* may be 'activated' at this stage others may require an explicit
* start request.
*/
- virtual void assign(const std::string& role, framing::FieldTable& params, TestOptions& options) = 0;
+ virtual void assign(const std::string& role, framing::FieldTable& params, client::ConnectionSettings& options) = 0;
/**
* Each test will be started on its own thread, which should block
* until the test completes (this may or may not require an
diff --git a/qpid/cpp/src/tests/TestOptions.h b/qpid/cpp/src/tests/TestOptions.h
index 87710964d6..b34acaec4e 100644
--- a/qpid/cpp/src/tests/TestOptions.h
+++ b/qpid/cpp/src/tests/TestOptions.h
@@ -26,6 +26,7 @@
#include "qpid/Url.h"
#include "qpid/log/Logger.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include <iostream>
#include <exception>
@@ -35,22 +36,11 @@ namespace qpid {
struct TestOptions : public qpid::Options
{
TestOptions(const std::string& helpText_=std::string()) :
- Options("Test Options"),
- host("localhost"), port(TcpAddress::DEFAULT_PORT),
- clientid("cpp"), username("guest"), password("guest"),
- help(false), helpText(helpText_)
+ Options("Test Options"), help(false), helpText(helpText_)
{
addOptions()
- ("host,h", optValue(host, "HOST"), "Broker host to connect to")
- // TODO aconway 2007-06-26: broker is synonym for host. Drop broker?
- ("broker,b", optValue(host, "HOST"), "Broker host to connect to")
- ("port,p", optValue(port, "PORT"), "Broker port to connect to")
- ("virtualhost,v", optValue(virtualhost, "VHOST"), "virtual host")
- ("clientname,n", optValue(clientid, "ID"), "unique client identifier")
- ("username", optValue(username, "USER"), "user name for broker log in.")
- ("password", optValue(password, "USER"), "password for broker log in.")
("help", optValue(help), "print this usage statement");
- add(log);
+ add(con);
}
/** As well as parsing, throw help message if requested. */
@@ -62,8 +52,7 @@ struct TestOptions : public qpid::Options
msg << *this << std::endl << std::endl << e.what() << std::endl;
throw qpid::Options::Exception(msg.str());
}
- trace = log.trace;
- qpid::log::Logger::instance().configure(log, argv[0]);
+ qpid::log::Logger::instance().configure(con.log, argv[0]);
if (help) {
std::ostringstream msg;
msg << *this << std::endl << std::endl << helpText << std::endl;
@@ -73,19 +62,12 @@ struct TestOptions : public qpid::Options
/** Open a connection using option values */
void open(qpid::client::Connection& connection) {
- connection.open(host, port, username, password, virtualhost);
+ connection.open(con);
}
- std::string host;
- uint16_t port;
- std::string virtualhost;
- std::string clientid;
- std::string username;
- std::string password;
- bool trace;
bool help;
- log::Options log;
+ client::ConnectionSettings con;
std::string helpText;
};
diff --git a/qpid/cpp/src/tests/client_test.cpp b/qpid/cpp/src/tests/client_test.cpp
index d0da2ec8ca..20e8b21a3a 100644
--- a/qpid/cpp/src/tests/client_test.cpp
+++ b/qpid/cpp/src/tests/client_test.cpp
@@ -30,6 +30,7 @@
#include "TestOptions.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/Message.h"
#include "qpid/client/Session.h"
#include "qpid/framing/FrameSet.h"
@@ -40,15 +41,15 @@ using namespace qpid::client;
using namespace qpid::framing;
using std::string;
-struct Args : public qpid::TestOptions {
+struct Args : public TestOptions {
uint msgSize;
- uint maxFrameSize;
+ bool verbose;
- Args() : msgSize(26), maxFrameSize(65535)
+ Args() : TestOptions("Simple test of Qpid c++ client; sends and receives a single message."), msgSize(26)
{
addOptions()
("size", optValue(msgSize, "N"), "message size")
- ("max-frame-size", optValue(maxFrameSize, "N"), "max frame size");
+ ("verbose", optValue(verbose), "print out some status messages");
}
};
@@ -85,33 +86,33 @@ int main(int argc, char** argv)
opts.parse(argc, argv);
//Connect to the broker:
- Connection connection(opts.trace, opts.maxFrameSize);
+ Connection connection;
opts.open(connection);
- if (opts.trace) std::cout << "Opened connection." << std::endl;
+ if (opts.verbose) std::cout << "Opened connection." << std::endl;
//Create and open a session on the connection through which
//most functionality is exposed:
Session session = connection.newSession(ASYNC);
- if (opts.trace) std::cout << "Opened session." << std::endl;
+ if (opts.verbose) std::cout << "Opened session." << std::endl;
//'declare' the exchange and the queue, which will create them
//as they don't exist
session.exchangeDeclare(arg::exchange="MyExchange", arg::type="direct");
- if (opts.trace) std::cout << "Declared exchange." << std::endl;
+ if (opts.verbose) std::cout << "Declared exchange." << std::endl;
session.queueDeclare(arg::queue="MyQueue", arg::autoDelete=true, arg::exclusive=true);
- if (opts.trace) std::cout << "Declared queue." << std::endl;
+ if (opts.verbose) std::cout << "Declared queue." << std::endl;
//now bind the queue to the exchange
session.exchangeBind(arg::exchange="MyExchange", arg::queue="MyQueue", arg::bindingKey="MyKey");
- if (opts.trace) std::cout << "Bound queue to exchange." << std::endl;
+ if (opts.verbose) std::cout << "Bound queue to exchange." << std::endl;
//create and send a message to the exchange using the routing
//key we bound our queue with:
Message msgOut(generateData(opts.msgSize));
msgOut.getDeliveryProperties().setRoutingKey("MyKey");
session.messageTransfer(arg::destination="MyExchange", arg::content=msgOut, arg::acceptMode=1);
- if (opts.trace) print("Published message: ", msgOut);
+ if (opts.verbose) print("Published message: ", msgOut);
//subscribe to the queue, add sufficient credit and then get
//incoming 'frameset', check that its a message transfer and
@@ -121,12 +122,12 @@ int main(int argc, char** argv)
session.messageSubscribe(arg::queue="MyQueue", arg::destination="MyId");
session.messageFlow(arg::destination="MyId", arg::unit=0, arg::value=1); //credit for one message
session.messageFlow(arg::destination="MyId", arg::unit=1, arg::value=0xFFFFFFFF); //credit for infinite bytes
- if (opts.trace) std::cout << "Subscribed to queue." << std::endl;
+ if (opts.verbose) std::cout << "Subscribed to queue." << std::endl;
FrameSet::shared_ptr incoming = session.get();
if (incoming->isA<MessageTransferBody>()) {
Message msgIn(*incoming);
if (msgIn.getData() == msgOut.getData()) {
- if (opts.trace) std::cout << "Received the exepected message." << std::endl;
+ if (opts.verbose) std::cout << "Received the exepected message." << std::endl;
session.messageAccept(SequenceSet(msgIn.getId()));
session.markCompleted(msgIn.getId(), true, true);
} else {
@@ -138,9 +139,9 @@ int main(int argc, char** argv)
//close the session & connection
session.close();
- if (opts.trace) std::cout << "Closed session." << std::endl;
+ if (opts.verbose) std::cout << "Closed session." << std::endl;
connection.close();
- if (opts.trace) std::cout << "Closed connection." << std::endl;
+ if (opts.verbose) std::cout << "Closed connection." << std::endl;
return 0;
} catch(const std::exception& e) {
std::cout << e.what() << std::endl;
diff --git a/qpid/cpp/src/tests/interop_runner.cpp b/qpid/cpp/src/tests/interop_runner.cpp
index 51dd2cb924..9435b8169f 100644
--- a/qpid/cpp/src/tests/interop_runner.cpp
+++ b/qpid/cpp/src/tests/interop_runner.cpp
@@ -24,6 +24,7 @@
#include "qpid/Exception.h"
#include "qpid/client/Channel.h"
#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Queue.h"
@@ -45,7 +46,6 @@
using namespace qpid::client;
using namespace qpid::sys;
using qpid::TestCase;
-using qpid::TestOptions;
using qpid::framing::FieldTable;
using qpid::framing::ReplyTo;
using namespace std;
@@ -54,7 +54,7 @@ class DummyRun : public TestCase
{
public:
DummyRun() {}
- void assign(const string&, FieldTable&, TestOptions&) {}
+ void assign(const string&, FieldTable&, ConnectionSettings&) {}
void start() {}
void stop() {}
void report(qpid::client::Message&) {}
@@ -68,7 +68,7 @@ class Listener : public MessageListener, private Runnable{
typedef boost::ptr_map<string, TestCase> TestMap;
Channel& channel;
- TestOptions& options;
+ ConnectionSettings& options;
TestMap tests;
const string name;
const string topic;
@@ -86,41 +86,52 @@ class Listener : public MessageListener, private Runnable{
void sendSimpleResponse(const string& type, Message& request);
void sendReport();
public:
- Listener(Channel& channel, TestOptions& options);
+ Listener(Channel& channel, ConnectionSettings& options);
void received(Message& msg);
void bindAndConsume();
void registerTest(string name, TestCase* test);
};
+struct TestSettings : ConnectionSettings
+{
+ bool help;
+
+ TestSettings() : help(false)
+ {
+ addOptions()
+ ("help", qpid::optValue(help), "print this usage statement");
+ }
+};
+
int main(int argc, char** argv) {
try {
- TestOptions options;
+ TestSettings options;
options.parse(argc, argv);
- if (options.help)
+ if (options.help) {
cout << options;
- else {
- Connection connection(options.trace);
+ } else {
+ Connection connection;
connection.open(options.host, options.port, "guest", "guest", options.virtualhost);
- Channel channel;
- connection.openChannel(channel);
+ Channel channel;
+ connection.openChannel(channel);
- Listener listener(channel, options);
- listener.registerTest("TC1_DummyRun", new DummyRun());
- listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
- listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
-
- listener.bindAndConsume();
+ Listener listener(channel, options);
+ listener.registerTest("TC1_DummyRun", new DummyRun());
+ listener.registerTest("TC2_BasicP2P", new qpid::BasicP2PTest());
+ listener.registerTest("TC3_BasicPubSub", new qpid::BasicPubSubTest());
+
+ listener.bindAndConsume();
- channel.run();
- connection.close();
+ channel.run();
+ connection.close();
}
} catch(const exception& error) {
cout << error.what() << endl << "Type " << argv[0] << " --help for help" << endl;
}
}
-Listener::Listener(Channel& _channel, TestOptions& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
+Listener::Listener(Channel& _channel, ConnectionSettings& _options) : channel(_channel), options(_options), name(options.clientid), topic("iop.control." + name)
{}
void Listener::registerTest(string name, TestCase* test)
diff --git a/qpid/cpp/src/tests/latencytest.cpp b/qpid/cpp/src/tests/latencytest.cpp
index a61a4b2e42..0b343d0243 100644
--- a/qpid/cpp/src/tests/latencytest.cpp
+++ b/qpid/cpp/src/tests/latencytest.cpp
@@ -66,7 +66,8 @@ struct Args : public qpid::TestOptions {
("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)")
("durable", optValue(durable, "yes|no"), "use durable messages")
- ("queue-base-name", optValue(base, "<name>"), "base name for queues");
+ ("queue-base-name", optValue(base, "<name>"), "base name for queues")
+ ("tcp-nodelay", optValue(con.tcpNoDelay), "Turn on tcp-nodelay");
}
};
diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp
index f1007ee4c2..ba6cd5f267 100644
--- a/qpid/cpp/src/tests/topic_listener.cpp
+++ b/qpid/cpp/src/tests/topic_listener.cpp
@@ -99,7 +99,7 @@ int main(int argc, char** argv){
if(args.help)
cout << args << endl;
else {
- Connection connection(args.trace);
+ Connection connection;
args.open(connection);
Session session = connection.newSession(ASYNC);
if (args.transactional) {
diff --git a/qpid/cpp/src/tests/topic_publisher.cpp b/qpid/cpp/src/tests/topic_publisher.cpp
index 8242530db1..a4d0dd9382 100644
--- a/qpid/cpp/src/tests/topic_publisher.cpp
+++ b/qpid/cpp/src/tests/topic_publisher.cpp
@@ -105,7 +105,7 @@ int main(int argc, char** argv) {
if(args.help)
cout << args << endl;
else {
- Connection connection(args.trace);
+ Connection connection;
args.open(connection);
Session session = connection.newSession(ASYNC);
if (args.transactional) {