summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-08-05 16:45:23 +0000
committerGordon Sim <gsim@apache.org>2008-08-05 16:45:23 +0000
commit15f620be271fa92bcd09282a0f62ed9b28dc7d07 (patch)
treeaf133dc8a13b048b298a52bf4f376250881de899
parent29041dcd9c04b8bb01e50b114d3e5b168d339682 (diff)
downloadqpid-python-15f620be271fa92bcd09282a0f62ed9b28dc7d07.tar.gz
* revised approach for setting tcp-nodelay on client to avoid breaking platform abstractions
* added ability to set tcp-nodelay on server side of the socket also git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@682785 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/broker/Broker.cpp6
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/client/ConnectionSettings.cpp12
-rw-r--r--cpp/src/qpid/client/ConnectionSettings.h13
-rw-r--r--cpp/src/qpid/client/Connector.cpp2
-rw-r--r--cpp/src/qpid/sys/Socket.h8
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp14
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp9
8 files changed, 37 insertions, 28 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 28c7518600..962da9aa6c 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -88,7 +88,8 @@ Broker::Options::Options(const std::string& name) :
realm("QPID"),
replayFlushLimit(0),
replayHardLimit(0),
- queueLimit(100*1048576/*100M default limit*/)
+ queueLimit(100*1048576/*100M default limit*/),
+ tcpNoDelay(false)
{
int c = sys::SystemInfo::concurrency();
workerThreads=c+1;
@@ -112,7 +113,8 @@ Broker::Options::Options(const std::string& name) :
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled all incoming connections will be trusted")
("realm", optValue(realm, "REALM"), "Use the given realm when performing authentication")
- ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)");
+ ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default maximum size for queues (in bytes)")
+ ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP connections");
}
const std::string empty;
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 9bbc6aec94..125c3df9d4 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -85,6 +85,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
size_t replayFlushLimit;
size_t replayHardLimit;
uint queueLimit;
+ bool tcpNoDelay;
};
virtual ~Broker();
diff --git a/cpp/src/qpid/client/ConnectionSettings.cpp b/cpp/src/qpid/client/ConnectionSettings.cpp
index ea4e20b529..6bc220cd41 100644
--- a/cpp/src/qpid/client/ConnectionSettings.cpp
+++ b/cpp/src/qpid/client/ConnectionSettings.cpp
@@ -21,10 +21,8 @@
#include "ConnectionSettings.h"
#include "qpid/log/Logger.h"
-#include "qpid/sys/posix/check.h"
+#include "qpid/sys/Socket.h"
#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
namespace qpid {
namespace client {
@@ -45,13 +43,11 @@ ConnectionSettings::ConnectionSettings() :
ConnectionSettings::~ConnectionSettings() {}
-void ConnectionSettings::configurePosixTcpSocket(int fd) const
+void ConnectionSettings::configureSocket(qpid::sys::Socket& socket) const
{
if (tcpNoDelay) {
- int flag = 1;
- int result = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
- QPID_POSIX_CHECK(result);
- QPID_LOG(debug, "Set TCP_NODELAY");
+ socket.setTcpNoDelay(tcpNoDelay);
+ QPID_LOG(info, "Set TCP_NODELAY");
}
}
diff --git a/cpp/src/qpid/client/ConnectionSettings.h b/cpp/src/qpid/client/ConnectionSettings.h
index a2b85c5134..5e93b3103e 100644
--- a/cpp/src/qpid/client/ConnectionSettings.h
+++ b/cpp/src/qpid/client/ConnectionSettings.h
@@ -25,26 +25,31 @@
#include "qpid/Options.h"
#include "qpid/log/Options.h"
#include "qpid/Url.h"
-#include "qpid/sys/Socket.h"
#include <iostream>
#include <exception>
namespace qpid {
+
+namespace sys {
+class Socket;
+}
+
namespace client {
/**
* Settings for a Connection.
*/
-struct ConnectionSettings : public sys::Socket::Configuration {
+struct ConnectionSettings {
ConnectionSettings();
virtual ~ConnectionSettings();
/**
- * Applies any tcp specific options to the sockets file descriptor
+ * Allows socket to be configured; default only sets tcp-nodelay
+ * based on the flag set. Can be overridden.
*/
- virtual void configurePosixTcpSocket(int fd) const;
+ virtual void configureSocket(qpid::sys::Socket&) const;
/**
* The host (or ip address) to connect to (defaults to 'localhost').
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 793809fc7c..ede1c5ab06 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -60,7 +60,7 @@ Connector::Connector(ProtocolVersion ver,
impl(cimpl)
{
QPID_LOG(debug, "Connector created for " << version);
- socket.configure(settings);
+ settings.configureSocket(socket);
}
Connector::~Connector() {
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index f95d841b39..c327bce285 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -108,13 +108,7 @@ public:
int read(void *buf, size_t count) const;
int write(const void *buf, size_t count) const;
- struct Configuration
- {
- virtual void configurePosixTcpSocket(int fd) const = 0;
- virtual ~Configuration() {}
- };
-
- void configure(const Configuration&);
+ void setTcpNoDelay(bool nodelay) const;
private:
Socket(IOHandlePrivate*);
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index e82a6a9102..feb7a5e3e8 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -34,12 +34,13 @@ namespace qpid {
namespace sys {
class AsynchIOProtocolFactory : public ProtocolFactory {
+ const bool tcpNoDelay;
Socket listener;
const uint16_t listeningPort;
std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOProtocolFactory(int16_t port, int backlog);
+ AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, int16_t port,
ConnectionCodec::Factory*,
@@ -63,21 +64,26 @@ static class TCPIOPlugin : public Plugin {
// Only provide to a Broker
if (broker) {
const broker::Broker::Options& opts = broker->getOptions();
- ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog));
+ ProtocolFactory::shared_ptr protocol(new AsynchIOProtocolFactory(opts.port, opts.connectionBacklog, opts.tcpNoDelay));
QPID_LOG(info, "Listening on TCP port " << protocol->getPort());
broker->registerProtocolFactory(protocol);
}
}
} tcpPlugin;
-AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
- listeningPort(listener.listen(port, backlog))
+AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog, bool nodelay) :
+ tcpNoDelay(nodelay), listeningPort(listener.listen(port, backlog))
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
ConnectionCodec::Factory* f, bool isClient) {
AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ if (tcpNoDelay) {
+ s.setTcpNoDelay(tcpNoDelay);
+ QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+ }
+
if (isClient)
async->setClient();
AsynchIO* aio = new AsynchIO(s,
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index f4320531a9..eb98d0dfbf 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -29,6 +29,7 @@
#include <sys/socket.h>
#include <sys/errno.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
#include <netdb.h>
#include <cstdlib>
#include <string.h>
@@ -276,9 +277,13 @@ int Socket::getError() const
return result;
}
-void Socket::configure(const Configuration& c)
+void Socket::setTcpNoDelay(bool nodelay) const
{
- c.configurePosixTcpSocket(impl->fd);
+ if (nodelay) {
+ int flag = 1;
+ int result = setsockopt(impl->fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag));
+ QPID_POSIX_CHECK(result);
+ }
}
}} // namespace qpid::sys