diff options
-rw-r--r-- | cpp/lib/broker/SessionHandlerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/lib/client/Connection.cpp | 9 | ||||
-rw-r--r-- | cpp/lib/client/Connection.h | 3 | ||||
-rw-r--r-- | cpp/lib/client/Connector.cpp | 5 | ||||
-rw-r--r-- | cpp/lib/client/Connector.h | 2 | ||||
-rw-r--r-- | cpp/lib/common/sys/Socket.h | 1 | ||||
-rw-r--r-- | cpp/lib/common/sys/apr/Socket.cpp | 4 | ||||
-rw-r--r-- | cpp/lib/common/sys/posix/Socket.cpp | 2 | ||||
-rw-r--r-- | cpp/tests/client_test.cpp | 1 |
9 files changed, 24 insertions, 5 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index b91bee5a4b..0bea721175 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -298,7 +298,6 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue - parent->getChannel(channel)->setDefaultQueue(queue); //apply settings & create persistent record if required queue_created.first->create(arguments); @@ -315,6 +314,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t if (exclusive && !queue->isExclusiveOwner(parent)) { throw ChannelException(405, "Cannot grant exclusive access to queue"); } + parent->getChannel(channel)->setDefaultQueue(queue); if (!nowait) { string queueName = queue->getName(); parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount()); diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp index f7897aa4df..c00b58a4a9 100644 --- a/cpp/lib/client/Connection.cpp +++ b/cpp/lib/client/Connection.cpp @@ -35,7 +35,8 @@ Connection::Connection( bool _debug, u_int32_t _max_frame_size, qpid::framing:: channelIdCounter(0), max_frame_size(_max_frame_size), closed(true), - version(_version->getMajor(),_version->getMinor()) + version(_version->getMajor(),_version->getMinor()), + tcpNoDelay(false) { connector = new Connector(version, debug, _max_frame_size); } @@ -44,6 +45,10 @@ Connection::~Connection(){ delete connector; } +void Connection::setTcpNoDelay(bool on) { + tcpNoDelay = on; +} + void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){ host = _host; port = _port; @@ -51,7 +56,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui connector->setTimeoutHandler(this); connector->setShutdownHandler(this); out = connector->getOutputHandler(); - connector->connect(host, port); + connector->connect(host, port, tcpNoDelay); ProtocolInitiation* header = new ProtocolInitiation(version); responses.expect(); diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h index bbf8c03b0b..2222250188 100644 --- a/cpp/lib/client/Connection.h +++ b/cpp/lib/client/Connection.h @@ -80,6 +80,7 @@ namespace client { ResponseHandler responses; volatile bool closed; qpid::framing::ProtocolVersion version; + bool tcpNoDelay; void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e); void error(int code, const std::string& msg, int classid = 0, int methodid = 0); @@ -109,6 +110,8 @@ namespace client { qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion)); ~Connection(); + void setTcpNoDelay(bool on); + /** * Opens a connection to a broker. * diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp index c57b3d6dc4..a99360b840 100644 --- a/cpp/lib/client/Connector.cpp +++ b/cpp/lib/client/Connector.cpp @@ -44,8 +44,11 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug Connector::~Connector(){ } -void Connector::connect(const std::string& host, int port){ +void Connector::connect(const std::string& host, int port, bool tcpNoDelay){ socket = Socket::createTcp(); + if (tcpNoDelay) { + socket.setTcpNoDelay(true); + } socket.connect(host, port); closed = false; receiver = Thread(this); diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h index eccb931e6c..44112369dc 100644 --- a/cpp/lib/client/Connector.h +++ b/cpp/lib/client/Connector.h @@ -78,7 +78,7 @@ namespace client { public: Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024); virtual ~Connector(); - virtual void connect(const std::string& host, int port); + virtual void connect(const std::string& host, int port, bool tcpNoDelay=false); virtual void init(qpid::framing::ProtocolInitiation* header); virtual void close(); virtual void setInputHandler(qpid::framing::InputHandler* handler); diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h index d793a240c6..b5f74847c2 100644 --- a/cpp/lib/common/sys/Socket.h +++ b/cpp/lib/common/sys/Socket.h @@ -47,6 +47,7 @@ class Socket /** Set timeout for read and write */ void setTimeout(Time interval); + void setTcpNoDelay(bool on); void connect(const std::string& host, int port); diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp index ab98c07479..c2abf50c5f 100644 --- a/cpp/lib/common/sys/apr/Socket.cpp +++ b/cpp/lib/common/sys/apr/Socket.cpp @@ -87,4 +87,8 @@ ssize_t Socket::recv(void* data, size_t size) return received; } +void Socket::setTcpNoDelay(bool on) +{ + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_TCP_NODELAY, on ? 1 : 0)); +} diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp index 5bd13742f6..e27ced9161 100644 --- a/cpp/lib/common/sys/posix/Socket.cpp +++ b/cpp/lib/common/sys/posix/Socket.cpp @@ -116,3 +116,5 @@ int Socket::fd() { return socket; } + +void Socket::setTcpNoDelay(bool) {} //not yet implemented diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp index f869cf4860..a5cc64d1e4 100644 --- a/cpp/tests/client_test.cpp +++ b/cpp/tests/client_test.cpp @@ -67,6 +67,7 @@ int main(int argc, char**) Connection con(argc > 1); + con.setTcpNoDelay(true); string host("localhost"); con.open(host, 5672, "guest", "guest", "/test"); std::cout << "Opened connection." << std::endl; |