summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/apr/Monitor.cpp64
-rw-r--r--cpp/src/qpid/apr/Monitor.h71
-rw-r--r--cpp/src/qpid/apr/Socket.h107
-rw-r--r--cpp/src/qpid/client/Connection.h4
-rw-r--r--cpp/src/qpid/client/Connector.cpp (renamed from cpp/src/qpid/apr/Connector.cpp)67
-rw-r--r--cpp/src/qpid/client/Connector.h (renamed from cpp/src/qpid/apr/Connector.h)21
-rw-r--r--cpp/src/qpid/sys/Socket.h (renamed from cpp/src/qpid/sys/Connector.h)12
-rw-r--r--cpp/src/qpid/sys/Time.h6
-rw-r--r--cpp/test/client/topic_publisher.cpp5
9 files changed, 213 insertions, 144 deletions
diff --git a/cpp/src/qpid/apr/Monitor.cpp b/cpp/src/qpid/apr/Monitor.cpp
deleted file mode 100644
index 69fb2f6ffd..0000000000
--- a/cpp/src/qpid/apr/Monitor.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Monitor.h"
-#include "APRPool.h"
-
-using namespace qpid::sys;
-
-Mutex::Mutex()
-{
- APRBase::increment();
- // TODO aconway 2006-11-08: Switch to non-nested.
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
-}
-
-Mutex::~Mutex(){
- CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
- APRBase::decrement();
-}
-
-Monitor::Monitor()
-{
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
-}
-
-Monitor::~Monitor(){
- CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
-}
-
-
-
-void Monitor::wait(){
- CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
-}
-
-void Monitor::wait(int64_t nsecs){
- // APR uses microseconds.
- apr_status_t status = apr_thread_cond_timedwait(
- condition, mutex, nsecs/1000);
- if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
-}
-
-void Monitor::notify(){
- CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
-}
-
-void Monitor::notifyAll(){
- CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
-}
-
diff --git a/cpp/src/qpid/apr/Monitor.h b/cpp/src/qpid/apr/Monitor.h
index a51baf8d94..10bc20820e 100644
--- a/cpp/src/qpid/apr/Monitor.h
+++ b/cpp/src/qpid/apr/Monitor.h
@@ -23,6 +23,7 @@
#include "apr-1/apr_thread_mutex.h"
#include "apr-1/apr_thread_cond.h"
#include "APRBase.h"
+#include "APRPool.h"
namespace qpid {
namespace sys {
@@ -43,11 +44,11 @@ class Mutex : private boost::noncopyable
public:
typedef ScopedLock<Mutex> ScopedLock;
- Mutex();
- ~Mutex();
- void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); }
- void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); }
- void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); }
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
protected:
apr_thread_mutex_t* mutex;
@@ -57,18 +58,66 @@ class Mutex : private boost::noncopyable
class Monitor : public Mutex
{
public:
- Monitor();
- ~Monitor();
- void wait();
- void wait(int64_t nsecs);
- void notify();
- void notifyAll();
+ inline Monitor();
+ inline ~Monitor();
+ inline void wait();
+ inline void wait(int64_t nsecs);
+ inline void notify();
+ inline void notifyAll();
private:
apr_thread_cond_t* condition;
};
+
+Mutex::Mutex() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+Monitor::Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Monitor::~Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Monitor::wait() {
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+void Monitor::wait(int64_t nsecs){
+ // APR uses microseconds.
+ apr_status_t status = apr_thread_cond_timedwait(
+ condition, mutex, nsecs/1000);
+ if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void Monitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Monitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+
}}
diff --git a/cpp/src/qpid/apr/Socket.h b/cpp/src/qpid/apr/Socket.h
new file mode 100644
index 0000000000..9a519e7391
--- /dev/null
+++ b/cpp/src/qpid/apr/Socket.h
@@ -0,0 +1,107 @@
+#ifndef _apr_Socket_h
+#define _apr_Socket_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <apr-1/apr_network_io.h>
+#include "APRBase.h"
+#include "APRPool.h"
+
+namespace qpid {
+namespace sys {
+
+class Socket
+{
+ public:
+ inline Socket();
+ inline ~Socket();
+ inline void setTimeout(long msecs);
+ inline void connect(const std::string& host, int port);
+ inline void close();
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 };
+
+ inline ssize_t send(const char* data, size_t size);
+ inline ssize_t recv(char* data, size_t size);
+
+ private:
+ apr_socket_t* socket;
+};
+
+inline
+Socket::Socket()
+{
+ CHECK_APR_SUCCESS(
+ apr_socket_create(
+ &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ APRPool::get()));
+}
+
+inline
+Socket::~Socket() { }
+
+inline void
+Socket::setTimeout(long msecs)
+{
+ apr_socket_timeout_set(socket, msecs*1000);
+}
+
+inline void
+Socket::connect(const std::string& host, int port)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(
+ apr_sockaddr_info_get(
+ &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
+ APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+}
+
+inline void
+Socket::close()
+{
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket = 0;
+}
+
+inline ssize_t
+Socket::send(const char* data, size_t size)
+{
+ apr_size_t sent = size;
+ apr_status_t status = apr_socket_send(socket, data, &sent);
+ if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (!APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return sent;
+}
+
+inline ssize_t
+Socket::recv(char* data, size_t size)
+{
+ apr_size_t received = size;
+ apr_status_t status = apr_socket_recv(socket, data, &received);
+ if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ CHECK_APR_SUCCESS(status);
+ return received;
+}
+
+}}
+
+
+#endif /*!_apr_Socket_h*/
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index da747d0e1d..4ff4e83859 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -22,7 +22,7 @@
#define _Connection_
#include "qpid/QpidError.h"
-#include "qpid/sys/Connector.h"
+#include "qpid/client/Connector.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler,
int port;
const u_int32_t max_frame_size;
std::map<int, Channel*> channels;
- qpid::sys::Connector* connector;
+ Connector* connector;
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
diff --git a/cpp/src/qpid/apr/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 4446731654..5d3a20be6a 100644
--- a/cpp/src/qpid/apr/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -17,11 +17,11 @@
*/
#include <iostream>
#include <qpid/QpidError.h>
-#include "APRBase.h"
+#include <qpid/sys/Time.h>
#include "Connector.h"
using namespace qpid::sys;
-using namespace qpid::sys;
+using namespace qpid::client;
using namespace qpid::framing;
using qpid::QpidError;
@@ -36,24 +36,12 @@ Connector::Connector(bool _debug, u_int32_t buffer_size) :
timeoutHandler(0),
shutdownHandler(0),
inbuf(receive_buffer_size),
- outbuf(send_buffer_size){
-
- APRBase::increment();
-
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
-}
+ outbuf(send_buffer_size){ }
-Connector::~Connector(){
- apr_pool_destroy(pool);
-
- APRBase::decrement();
-}
+Connector::~Connector(){ }
void Connector::connect(const std::string& host, int port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
- CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ socket.connect(host, port);
closed = false;
receiver = Thread(this);
}
@@ -65,7 +53,7 @@ void Connector::init(ProtocolInitiation* header){
void Connector::close(){
closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket.close();
receiver.join();
}
@@ -97,32 +85,26 @@ void Connector::writeBlock(AMQDataBlock* data){
}
void Connector::writeToSocket(char* data, size_t available){
- apr_size_t bytes(available);
- apr_size_t written(0);
+ size_t written = 0;
while(written < available && !closed){
- apr_status_t status = apr_socket_send(socket, data + written, &bytes);
- if(status == APR_TIMEUP){
- std::cout << "Write request timed out." << std::endl;
- }
- if(bytes == 0){
- std::cout << "Write request wrote 0 bytes." << std::endl;
+ ssize_t sent = socket.send(data + written, available-written);
+ if(sent > 0) {
+ lastOut = getTimeMsecs();
+ written += sent;
}
- lastOut = apr_time_as_msec(apr_time_now());
- written += bytes;
- bytes = available - written;
}
}
-void Connector::checkIdle(apr_status_t status){
+void Connector::checkIdle(ssize_t status){
if(timeoutHandler){
- int64_t now = apr_time_as_msec(apr_time_now());
- if(APR_STATUS_IS_TIMEUP(status)){
+ int64_t now = getTimeMsecs();
+ if(status == Socket::SOCKET_TIMEOUT) {
if(idleIn && (now - lastIn > idleIn)){
timeoutHandler->idleIn();
}
- }else if(APR_STATUS_IS_EOF(status)){
+ }else if(status == Socket::SOCKET_EOF){
closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket.close();
if(shutdownHandler) shutdownHandler->shutdown();
}else{
lastIn = now;
@@ -151,11 +133,7 @@ void Connector::setWriteTimeout(u_int16_t t){
}
void Connector::setSocketTimeout(){
- //interval is in microseconds, timeout in milliseconds
- //want the interval to be a bit shorter than the timeout, hence multiply
- //by 800 rather than 1000.
- apr_interval_time_t interval(timeout * 800);
- apr_socket_timeout_set(socket, interval);
+ socket.setTimeout(timeout);
}
void Connector::setTimeoutHandler(TimeoutHandler* handler){
@@ -165,14 +143,15 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){
void Connector::run(){
try{
while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
+ ssize_t available = inbuf.available();
+ if(available < 1){
THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
}
- checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
+ ssize_t received = socket.recv(inbuf.start(), available);
+ checkIdle(received);
- if(bytes > 0){
- inbuf.move(bytes);
+ if(received > 0){
+ inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
AMQFrame frame;
diff --git a/cpp/src/qpid/apr/Connector.h b/cpp/src/qpid/client/Connector.h
index e69a7205f3..91ec58c95c 100644
--- a/cpp/src/qpid/apr/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -18,8 +18,6 @@
#ifndef _Connector_
#define _Connector_
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/OutputHandler.h"
@@ -28,11 +26,11 @@
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/Connector.h"
#include "qpid/sys/Monitor.h"
+#include <qpid/sys/Socket.h>
namespace qpid {
-namespace sys {
+namespace client {
class Connector : public qpid::framing::OutputHandler,
private qpid::sys::Runnable
@@ -49,8 +47,8 @@ namespace sys {
u_int32_t idleIn;
u_int32_t idleOut;
- TimeoutHandler* timeoutHandler;
- ShutdownHandler* shutdownHandler;
+ qpid::sys::TimeoutHandler* timeoutHandler;
+ qpid::sys::ShutdownHandler* shutdownHandler;
qpid::framing::InputHandler* input;
qpid::framing::InitiationHandler* initialiser;
qpid::framing::OutputHandler* output;
@@ -61,10 +59,9 @@ namespace sys {
qpid::sys::Mutex writeLock;
qpid::sys::Thread receiver;
- apr_pool_t* pool;
- apr_socket_t* socket;
-
- void checkIdle(apr_status_t status);
+ qpid::sys::Socket socket;
+
+ void checkIdle(ssize_t status);
void writeBlock(qpid::framing::AMQDataBlock* data);
void writeToSocket(char* data, size_t available);
void setSocketTimeout();
@@ -78,8 +75,8 @@ namespace sys {
virtual void init(qpid::framing::ProtocolInitiation* header);
virtual void close();
virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(TimeoutHandler* handler);
- virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler);
+ virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler);
virtual qpid::framing::OutputHandler* getOutputHandler();
virtual void send(qpid::framing::AMQFrame* frame);
virtual void setReadTimeout(u_int16_t timeout);
diff --git a/cpp/src/qpid/sys/Connector.h b/cpp/src/qpid/sys/Socket.h
index 67e12b008c..243764353e 100644
--- a/cpp/src/qpid/sys/Connector.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -1,5 +1,5 @@
-#ifndef _sys_Connector_h
-#define _sys_Connector_h
+#ifndef _sys_Socket_h
+#define _sys_Socket_h
/*
*
@@ -19,7 +19,9 @@
*
*/
-#include "platform.h"
-#include QPID_PLATFORM_H(Connector.h)
+#include <qpid/sys/platform.h>
+#include QPID_PLATFORM_H(Socket.h)
-#endif /*!_sys_Connector_h*/
+
+
+#endif /*!_sys_Socket_h*/
diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h
index 79d17b433b..92e83116a5 100644
--- a/cpp/src/qpid/sys/Time.h
+++ b/cpp/src/qpid/sys/Time.h
@@ -1,5 +1,5 @@
-#ifndef _concurrent_Time_h
-#define _concurrent_Time_h
+#ifndef _sys_Time_h
+#define _sys_Time_h
/*
*
@@ -35,4 +35,4 @@ int64_t getTimeMsecs();
}}
-#endif /*!_concurrent_Time_h*/
+#endif /*!_sys_Time_h*/
diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp
index 22c36ea9e3..1e1a91dfff 100644
--- a/cpp/test/client/topic_publisher.cpp
+++ b/cpp/test/client/topic_publisher.cpp
@@ -114,7 +114,7 @@ int main(int argc, char** argv){
if(!max || time > max) max = time;
if(!min || time < min) min = time;
sum += time;
- std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << nsecsToMsecs(time) << "ms" << std::endl;
+ std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl;
}
publisher.terminate();
int64_t avg = sum / batchSize;
@@ -133,13 +133,12 @@ int main(int argc, char** argv){
Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool tx) :
channel(_channel), controlTopic(_controlTopic), transactional(tx){}
-void Publisher::received(Message& msg){
+void Publisher::received(Message& ){
//count responses and when all are received end the current batch
Monitor::ScopedLock l(monitor);
if(--count == 0){
monitor.notify();
}
- std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl;
}
void Publisher::waitForCompletion(int msgs){