diff options
author | Alan Conway <aconway@apache.org> | 2006-11-09 01:29:59 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-09 01:29:59 +0000 |
commit | 295145d247a7523affdf43f8d870912b1a303caf (patch) | |
tree | e27b156b80f0904530d512177e35284def40ab27 /cpp/src/qpid/apr | |
parent | f6113838a8e6d271e46466fe74884c5bf9706ae0 (diff) | |
download | qpid-python-295145d247a7523affdf43f8d870912b1a303caf.tar.gz |
More separation of concerns with APR, client side complete.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472732 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/apr')
-rw-r--r-- | cpp/src/qpid/apr/Connector.cpp | 190 | ||||
-rw-r--r-- | cpp/src/qpid/apr/Connector.h | 93 | ||||
-rw-r--r-- | cpp/src/qpid/apr/Monitor.cpp | 64 | ||||
-rw-r--r-- | cpp/src/qpid/apr/Monitor.h | 71 | ||||
-rw-r--r-- | cpp/src/qpid/apr/Socket.h | 107 |
5 files changed, 167 insertions, 358 deletions
diff --git a/cpp/src/qpid/apr/Connector.cpp b/cpp/src/qpid/apr/Connector.cpp deleted file mode 100644 index 4446731654..0000000000 --- a/cpp/src/qpid/apr/Connector.cpp +++ /dev/null @@ -1,190 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include <qpid/QpidError.h> -#include "APRBase.h" -#include "Connector.h" - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); -} - -Connector::~Connector(){ - apr_pool_destroy(pool); - - APRBase::decrement(); -} - -void Connector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); - closed = false; - receiver = Thread(this); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver.join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); - while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; - } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; - } -} - -void Connector::checkIdle(apr_status_t status){ - if(timeoutHandler){ - int64_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ - if(idleIn && (now - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(APR_STATUS_IS_EOF(status)){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(shutdownHandler) shutdownHandler->shutdown(); - }else{ - lastIn = now; - } - if(idleOut && (now - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); - - if(bytes > 0){ - inbuf.move(bytes); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} diff --git a/cpp/src/qpid/apr/Connector.h b/cpp/src/qpid/apr/Connector.h deleted file mode 100644 index e69a7205f3..0000000000 --- a/cpp/src/qpid/apr/Connector.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/ShutdownHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Connector.h" -#include "qpid/sys/Monitor.h" - -namespace qpid { -namespace sys { - - class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/apr/Monitor.cpp b/cpp/src/qpid/apr/Monitor.cpp deleted file mode 100644 index 69fb2f6ffd..0000000000 --- a/cpp/src/qpid/apr/Monitor.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Monitor.h" -#include "APRPool.h" - -using namespace qpid::sys; - -Mutex::Mutex() -{ - APRBase::increment(); - // TODO aconway 2006-11-08: Switch to non-nested. - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - APRBase::decrement(); -} - -Monitor::Monitor() -{ - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); -} - -Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); -} - - - -void Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - -void Monitor::wait(int64_t nsecs){ - // APR uses microseconds. - apr_status_t status = apr_thread_cond_timedwait( - condition, mutex, nsecs/1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - diff --git a/cpp/src/qpid/apr/Monitor.h b/cpp/src/qpid/apr/Monitor.h index a51baf8d94..10bc20820e 100644 --- a/cpp/src/qpid/apr/Monitor.h +++ b/cpp/src/qpid/apr/Monitor.h @@ -23,6 +23,7 @@ #include "apr-1/apr_thread_mutex.h" #include "apr-1/apr_thread_cond.h" #include "APRBase.h" +#include "APRPool.h" namespace qpid { namespace sys { @@ -43,11 +44,11 @@ class Mutex : private boost::noncopyable public: typedef ScopedLock<Mutex> ScopedLock; - Mutex(); - ~Mutex(); - void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); } - void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); } - void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); } + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); protected: apr_thread_mutex_t* mutex; @@ -57,18 +58,66 @@ class Mutex : private boost::noncopyable class Monitor : public Mutex { public: - Monitor(); - ~Monitor(); - void wait(); - void wait(int64_t nsecs); - void notify(); - void notifyAll(); + inline Monitor(); + inline ~Monitor(); + inline void wait(); + inline void wait(int64_t nsecs); + inline void notify(); + inline void notifyAll(); private: apr_thread_cond_t* condition; }; + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +Monitor::Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Monitor::~Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Monitor::wait() { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +void Monitor::wait(int64_t nsecs){ + // APR uses microseconds. + apr_status_t status = apr_thread_cond_timedwait( + condition, mutex, nsecs/1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + + }} diff --git a/cpp/src/qpid/apr/Socket.h b/cpp/src/qpid/apr/Socket.h new file mode 100644 index 0000000000..9a519e7391 --- /dev/null +++ b/cpp/src/qpid/apr/Socket.h @@ -0,0 +1,107 @@ +#ifndef _apr_Socket_h +#define _apr_Socket_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <apr-1/apr_network_io.h> +#include "APRBase.h" +#include "APRPool.h" + +namespace qpid { +namespace sys { + +class Socket +{ + public: + inline Socket(); + inline ~Socket(); + inline void setTimeout(long msecs); + inline void connect(const std::string& host, int port); + inline void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 }; + + inline ssize_t send(const char* data, size_t size); + inline ssize_t recv(char* data, size_t size); + + private: + apr_socket_t* socket; +}; + +inline +Socket::Socket() +{ + CHECK_APR_SUCCESS( + apr_socket_create( + &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, + APRPool::get())); +} + +inline +Socket::~Socket() { } + +inline void +Socket::setTimeout(long msecs) +{ + apr_socket_timeout_set(socket, msecs*1000); +} + +inline void +Socket::connect(const std::string& host, int port) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS( + apr_sockaddr_info_get( + &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, + APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); +} + +inline void +Socket::close() +{ + CHECK_APR_SUCCESS(apr_socket_close(socket)); + socket = 0; +} + +inline ssize_t +Socket::send(const char* data, size_t size) +{ + apr_size_t sent = size; + apr_status_t status = apr_socket_send(socket, data, &sent); + if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + if (!APR_STATUS_IS_EOF(status)) return SOCKET_EOF; + CHECK_APR_SUCCESS(status); + return sent; +} + +inline ssize_t +Socket::recv(char* data, size_t size) +{ + apr_size_t received = size; + apr_status_t status = apr_socket_recv(socket, data, &received); + if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + CHECK_APR_SUCCESS(status); + return received; +} + +}} + + +#endif /*!_apr_Socket_h*/ |