diff options
author | Alan Conway <aconway@apache.org> | 2006-11-09 01:29:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-09 01:29:59 +0000 |
commit | 295145d247a7523affdf43f8d870912b1a303caf (patch) | |
tree | e27b156b80f0904530d512177e35284def40ab27 /cpp/src/qpid | |
parent | f6113838a8e6d271e46466fe74884c5bf9706ae0 (diff) | |
download | qpid-python-295145d247a7523affdf43f8d870912b1a303caf.tar.gz |
More separation of concerns with APR, client side complete.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472732 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r-- | cpp/src/qpid/apr/Monitor.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/apr/Monitor.h | 71 | ||||
-rw-r--r-- | cpp/src/qpid/apr/Socket.h | 107 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connection.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.cpp (renamed from cpp/src/qpid/apr/Connector.cpp) | 67 | ||||
-rw-r--r-- | cpp/src/qpid/client/Connector.h (renamed from cpp/src/qpid/apr/Connector.h) | 21 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Socket.h (renamed from cpp/src/qpid/sys/Connector.h) | 12 | ||||
-rw-r--r-- | cpp/src/qpid/sys/Time.h | 6 |
8 files changed, 211 insertions, 141 deletions
diff --git a/cpp/src/qpid/apr/Monitor.cpp b/cpp/src/qpid/apr/Monitor.cpp deleted file mode 100644 index 69fb2f6ffd..0000000000 --- a/cpp/src/qpid/apr/Monitor.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Monitor.h" -#include "APRPool.h" - -using namespace qpid::sys; - -Mutex::Mutex() -{ - APRBase::increment(); - // TODO aconway 2006-11-08: Switch to non-nested. - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - APRBase::decrement(); -} - -Monitor::Monitor() -{ - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); -} - -Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); -} - - - -void Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - -void Monitor::wait(int64_t nsecs){ - // APR uses microseconds. - apr_status_t status = apr_thread_cond_timedwait( - condition, mutex, nsecs/1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - diff --git a/cpp/src/qpid/apr/Monitor.h b/cpp/src/qpid/apr/Monitor.h index a51baf8d94..10bc20820e 100644 --- a/cpp/src/qpid/apr/Monitor.h +++ b/cpp/src/qpid/apr/Monitor.h @@ -23,6 +23,7 @@ #include "apr-1/apr_thread_mutex.h" #include "apr-1/apr_thread_cond.h" #include "APRBase.h" +#include "APRPool.h" namespace qpid { namespace sys { @@ -43,11 +44,11 @@ class Mutex : private boost::noncopyable public: typedef ScopedLock<Mutex> ScopedLock; - Mutex(); - ~Mutex(); - void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); } - void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); } - void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); } + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); protected: apr_thread_mutex_t* mutex; @@ -57,18 +58,66 @@ class Mutex : private boost::noncopyable class Monitor : public Mutex { public: - Monitor(); - ~Monitor(); - void wait(); - void wait(int64_t nsecs); - void notify(); - void notifyAll(); + inline Monitor(); + inline ~Monitor(); + inline void wait(); + inline void wait(int64_t nsecs); + inline void notify(); + inline void notifyAll(); private: apr_thread_cond_t* condition; }; + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +Monitor::Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Monitor::~Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Monitor::wait() { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +void Monitor::wait(int64_t nsecs){ + // APR uses microseconds. + apr_status_t status = apr_thread_cond_timedwait( + condition, mutex, nsecs/1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + + }} diff --git a/cpp/src/qpid/apr/Socket.h b/cpp/src/qpid/apr/Socket.h new file mode 100644 index 0000000000..9a519e7391 --- /dev/null +++ b/cpp/src/qpid/apr/Socket.h @@ -0,0 +1,107 @@ +#ifndef _apr_Socket_h +#define _apr_Socket_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <apr-1/apr_network_io.h> +#include "APRBase.h" +#include "APRPool.h" + +namespace qpid { +namespace sys { + +class Socket +{ + public: + inline Socket(); + inline ~Socket(); + inline void setTimeout(long msecs); + inline void connect(const std::string& host, int port); + inline void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 }; + + inline ssize_t send(const char* data, size_t size); + inline ssize_t recv(char* data, size_t size); + + private: + apr_socket_t* socket; +}; + +inline +Socket::Socket() +{ + CHECK_APR_SUCCESS( + apr_socket_create( + &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, + APRPool::get())); +} + +inline +Socket::~Socket() { } + +inline void +Socket::setTimeout(long msecs) +{ + apr_socket_timeout_set(socket, msecs*1000); +} + +inline void +Socket::connect(const std::string& host, int port) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS( + apr_sockaddr_info_get( + &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, + APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); +} + +inline void +Socket::close() +{ + CHECK_APR_SUCCESS(apr_socket_close(socket)); + socket = 0; +} + +inline ssize_t +Socket::send(const char* data, size_t size) +{ + apr_size_t sent = size; + apr_status_t status = apr_socket_send(socket, data, &sent); + if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + if (!APR_STATUS_IS_EOF(status)) return SOCKET_EOF; + CHECK_APR_SUCCESS(status); + return sent; +} + +inline ssize_t +Socket::recv(char* data, size_t size) +{ + apr_size_t received = size; + apr_status_t status = apr_socket_recv(socket, data, &received); + if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + CHECK_APR_SUCCESS(status); + return received; +} + +}} + + +#endif /*!_apr_Socket_h*/ diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index da747d0e1d..4ff4e83859 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -22,7 +22,7 @@ #define _Connection_ #include "qpid/QpidError.h" -#include "qpid/sys/Connector.h" +#include "qpid/client/Connector.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler, int port; const u_int32_t max_frame_size; std::map<int, Channel*> channels; - qpid::sys::Connector* connector; + Connector* connector; qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; diff --git a/cpp/src/qpid/apr/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 4446731654..5d3a20be6a 100644 --- a/cpp/src/qpid/apr/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -17,11 +17,11 @@ */ #include <iostream> #include <qpid/QpidError.h> -#include "APRBase.h" +#include <qpid/sys/Time.h> #include "Connector.h" using namespace qpid::sys; -using namespace qpid::sys; +using namespace qpid::client; using namespace qpid::framing; using qpid::QpidError; @@ -36,24 +36,12 @@ Connector::Connector(bool _debug, u_int32_t buffer_size) : timeoutHandler(0), shutdownHandler(0), inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); -} + outbuf(send_buffer_size){ } -Connector::~Connector(){ - apr_pool_destroy(pool); - - APRBase::decrement(); -} +Connector::~Connector(){ } void Connector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + socket.connect(host, port); closed = false; receiver = Thread(this); } @@ -65,7 +53,7 @@ void Connector::init(ProtocolInitiation* header){ void Connector::close(){ closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); + socket.close(); receiver.join(); } @@ -97,32 +85,26 @@ void Connector::writeBlock(AMQDataBlock* data){ } void Connector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); + size_t written = 0; while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; + ssize_t sent = socket.send(data + written, available-written); + if(sent > 0) { + lastOut = getTimeMsecs(); + written += sent; } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; } } -void Connector::checkIdle(apr_status_t status){ +void Connector::checkIdle(ssize_t status){ if(timeoutHandler){ - int64_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ + int64_t now = getTimeMsecs(); + if(status == Socket::SOCKET_TIMEOUT) { if(idleIn && (now - lastIn > idleIn)){ timeoutHandler->idleIn(); } - }else if(APR_STATUS_IS_EOF(status)){ + }else if(status == Socket::SOCKET_EOF){ closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); + socket.close(); if(shutdownHandler) shutdownHandler->shutdown(); }else{ lastIn = now; @@ -151,11 +133,7 @@ void Connector::setWriteTimeout(u_int16_t t){ } void Connector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); + socket.setTimeout(timeout); } void Connector::setTimeoutHandler(TimeoutHandler* handler){ @@ -165,14 +143,15 @@ void Connector::setTimeoutHandler(TimeoutHandler* handler){ void Connector::run(){ try{ while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ + ssize_t available = inbuf.available(); + if(available < 1){ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + ssize_t received = socket.recv(inbuf.start(), available); + checkIdle(received); - if(bytes > 0){ - inbuf.move(bytes); + if(received > 0){ + inbuf.move(received); inbuf.flip();//position = 0, limit = total data read AMQFrame frame; diff --git a/cpp/src/qpid/apr/Connector.h b/cpp/src/qpid/client/Connector.h index e69a7205f3..91ec58c95c 100644 --- a/cpp/src/qpid/apr/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -18,8 +18,6 @@ #ifndef _Connector_ #define _Connector_ -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" #include "qpid/framing/InputHandler.h" #include "qpid/framing/OutputHandler.h" @@ -28,11 +26,11 @@ #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/Connector.h" #include "qpid/sys/Monitor.h" +#include <qpid/sys/Socket.h> namespace qpid { -namespace sys { +namespace client { class Connector : public qpid::framing::OutputHandler, private qpid::sys::Runnable @@ -49,8 +47,8 @@ namespace sys { u_int32_t idleIn; u_int32_t idleOut; - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; + qpid::sys::TimeoutHandler* timeoutHandler; + qpid::sys::ShutdownHandler* shutdownHandler; qpid::framing::InputHandler* input; qpid::framing::InitiationHandler* initialiser; qpid::framing::OutputHandler* output; @@ -61,10 +59,9 @@ namespace sys { qpid::sys::Mutex writeLock; qpid::sys::Thread receiver; - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); + qpid::sys::Socket socket; + + void checkIdle(ssize_t status); void writeBlock(qpid::framing::AMQDataBlock* data); void writeToSocket(char* data, size_t available); void setSocketTimeout(); @@ -78,8 +75,8 @@ namespace sys { virtual void init(qpid::framing::ProtocolInitiation* header); virtual void close(); virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); + virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); + virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); virtual qpid::framing::OutputHandler* getOutputHandler(); virtual void send(qpid::framing::AMQFrame* frame); virtual void setReadTimeout(u_int16_t timeout); diff --git a/cpp/src/qpid/sys/Connector.h b/cpp/src/qpid/sys/Socket.h index 67e12b008c..243764353e 100644 --- a/cpp/src/qpid/sys/Connector.h +++ b/cpp/src/qpid/sys/Socket.h @@ -1,5 +1,5 @@ -#ifndef _sys_Connector_h -#define _sys_Connector_h +#ifndef _sys_Socket_h +#define _sys_Socket_h /* * @@ -19,7 +19,9 @@ * */ -#include "platform.h" -#include QPID_PLATFORM_H(Connector.h) +#include <qpid/sys/platform.h> +#include QPID_PLATFORM_H(Socket.h) -#endif /*!_sys_Connector_h*/ + + +#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 79d17b433b..92e83116a5 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -1,5 +1,5 @@ -#ifndef _concurrent_Time_h -#define _concurrent_Time_h +#ifndef _sys_Time_h +#define _sys_Time_h /* * @@ -35,4 +35,4 @@ int64_t getTimeMsecs(); }} -#endif /*!_concurrent_Time_h*/ +#endif /*!_sys_Time_h*/ |