summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp2
-rw-r--r--cpp/lib/client/Connection.cpp9
-rw-r--r--cpp/lib/client/Connection.h3
-rw-r--r--cpp/lib/client/Connector.cpp5
-rw-r--r--cpp/lib/client/Connector.h2
-rw-r--r--cpp/lib/common/sys/Socket.h1
-rw-r--r--cpp/lib/common/sys/apr/Socket.cpp4
-rw-r--r--cpp/lib/common/sys/posix/Socket.cpp2
-rw-r--r--cpp/tests/client_test.cpp1
9 files changed, 24 insertions, 5 deletions
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index b91bee5a4b..0bea721175 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -298,7 +298,6 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
- parent->getChannel(channel)->setDefaultQueue(queue);
//apply settings & create persistent record if required
queue_created.first->create(arguments);
@@ -315,6 +314,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t
if (exclusive && !queue->isExclusiveOwner(parent)) {
throw ChannelException(405, "Cannot grant exclusive access to queue");
}
+ parent->getChannel(channel)->setDefaultQueue(queue);
if (!nowait) {
string queueName = queue->getName();
parent->client->getQueue().declareOk(channel, queueName, queue->getMessageCount(), queue->getConsumerCount());
diff --git a/cpp/lib/client/Connection.cpp b/cpp/lib/client/Connection.cpp
index f7897aa4df..c00b58a4a9 100644
--- a/cpp/lib/client/Connection.cpp
+++ b/cpp/lib/client/Connection.cpp
@@ -35,7 +35,8 @@ Connection::Connection( bool _debug, u_int32_t _max_frame_size, qpid::framing::
channelIdCounter(0),
max_frame_size(_max_frame_size),
closed(true),
- version(_version->getMajor(),_version->getMinor())
+ version(_version->getMajor(),_version->getMinor()),
+ tcpNoDelay(false)
{
connector = new Connector(version, debug, _max_frame_size);
}
@@ -44,6 +45,10 @@ Connection::~Connection(){
delete connector;
}
+void Connection::setTcpNoDelay(bool on) {
+ tcpNoDelay = on;
+}
+
void Connection::open(const std::string& _host, int _port, const std::string& uid, const std::string& pwd, const std::string& virtualhost){
host = _host;
port = _port;
@@ -51,7 +56,7 @@ void Connection::open(const std::string& _host, int _port, const std::string& ui
connector->setTimeoutHandler(this);
connector->setShutdownHandler(this);
out = connector->getOutputHandler();
- connector->connect(host, port);
+ connector->connect(host, port, tcpNoDelay);
ProtocolInitiation* header = new ProtocolInitiation(version);
responses.expect();
diff --git a/cpp/lib/client/Connection.h b/cpp/lib/client/Connection.h
index bbf8c03b0b..2222250188 100644
--- a/cpp/lib/client/Connection.h
+++ b/cpp/lib/client/Connection.h
@@ -80,6 +80,7 @@ namespace client {
ResponseHandler responses;
volatile bool closed;
qpid::framing::ProtocolVersion version;
+ bool tcpNoDelay;
void channelException(Channel* channel, qpid::framing::AMQMethodBody* body, QpidError& e);
void error(int code, const std::string& msg, int classid = 0, int methodid = 0);
@@ -109,6 +110,8 @@ namespace client {
qpid::framing::ProtocolVersion* _version = &(qpid::framing::highestProtocolVersion));
~Connection();
+ void setTcpNoDelay(bool on);
+
/**
* Opens a connection to a broker.
*
diff --git a/cpp/lib/client/Connector.cpp b/cpp/lib/client/Connector.cpp
index c57b3d6dc4..a99360b840 100644
--- a/cpp/lib/client/Connector.cpp
+++ b/cpp/lib/client/Connector.cpp
@@ -44,8 +44,11 @@ Connector::Connector(const qpid::framing::ProtocolVersion& pVersion, bool _debug
Connector::~Connector(){ }
-void Connector::connect(const std::string& host, int port){
+void Connector::connect(const std::string& host, int port, bool tcpNoDelay){
socket = Socket::createTcp();
+ if (tcpNoDelay) {
+ socket.setTcpNoDelay(true);
+ }
socket.connect(host, port);
closed = false;
receiver = Thread(this);
diff --git a/cpp/lib/client/Connector.h b/cpp/lib/client/Connector.h
index eccb931e6c..44112369dc 100644
--- a/cpp/lib/client/Connector.h
+++ b/cpp/lib/client/Connector.h
@@ -78,7 +78,7 @@ namespace client {
public:
Connector(const qpid::framing::ProtocolVersion& pVersion, bool debug = false, u_int32_t buffer_size = 1024);
virtual ~Connector();
- virtual void connect(const std::string& host, int port);
+ virtual void connect(const std::string& host, int port, bool tcpNoDelay=false);
virtual void init(qpid::framing::ProtocolInitiation* header);
virtual void close();
virtual void setInputHandler(qpid::framing::InputHandler* handler);
diff --git a/cpp/lib/common/sys/Socket.h b/cpp/lib/common/sys/Socket.h
index d793a240c6..b5f74847c2 100644
--- a/cpp/lib/common/sys/Socket.h
+++ b/cpp/lib/common/sys/Socket.h
@@ -47,6 +47,7 @@ class Socket
/** Set timeout for read and write */
void setTimeout(Time interval);
+ void setTcpNoDelay(bool on);
void connect(const std::string& host, int port);
diff --git a/cpp/lib/common/sys/apr/Socket.cpp b/cpp/lib/common/sys/apr/Socket.cpp
index ab98c07479..c2abf50c5f 100644
--- a/cpp/lib/common/sys/apr/Socket.cpp
+++ b/cpp/lib/common/sys/apr/Socket.cpp
@@ -87,4 +87,8 @@ ssize_t Socket::recv(void* data, size_t size)
return received;
}
+void Socket::setTcpNoDelay(bool on)
+{
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_TCP_NODELAY, on ? 1 : 0));
+}
diff --git a/cpp/lib/common/sys/posix/Socket.cpp b/cpp/lib/common/sys/posix/Socket.cpp
index 5bd13742f6..e27ced9161 100644
--- a/cpp/lib/common/sys/posix/Socket.cpp
+++ b/cpp/lib/common/sys/posix/Socket.cpp
@@ -116,3 +116,5 @@ int Socket::fd()
{
return socket;
}
+
+void Socket::setTcpNoDelay(bool) {} //not yet implemented
diff --git a/cpp/tests/client_test.cpp b/cpp/tests/client_test.cpp
index f869cf4860..a5cc64d1e4 100644
--- a/cpp/tests/client_test.cpp
+++ b/cpp/tests/client_test.cpp
@@ -67,6 +67,7 @@ int main(int argc, char**)
Connection con(argc > 1);
+ con.setTcpNoDelay(true);
string host("localhost");
con.open(host, 5672, "guest", "guest", "/test");
std::cout << "Opened connection." << std::endl;