From 295145d247a7523affdf43f8d870912b1a303caf Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 9 Nov 2006 01:29:59 +0000 Subject: More separation of concerns with APR, client side complete. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472732 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/apr/Connector.cpp | 190 -------------------------------------- cpp/src/qpid/apr/Connector.h | 93 ------------------- cpp/src/qpid/apr/Monitor.cpp | 64 ------------- cpp/src/qpid/apr/Monitor.h | 71 +++++++++++--- cpp/src/qpid/apr/Socket.h | 107 +++++++++++++++++++++ cpp/src/qpid/client/Connection.h | 4 +- cpp/src/qpid/client/Connector.cpp | 169 +++++++++++++++++++++++++++++++++ cpp/src/qpid/client/Connector.h | 90 ++++++++++++++++++ cpp/src/qpid/sys/Connector.h | 25 ----- cpp/src/qpid/sys/Socket.h | 27 ++++++ cpp/src/qpid/sys/Time.h | 6 +- 11 files changed, 458 insertions(+), 388 deletions(-) delete mode 100644 cpp/src/qpid/apr/Connector.cpp delete mode 100644 cpp/src/qpid/apr/Connector.h delete mode 100644 cpp/src/qpid/apr/Monitor.cpp create mode 100644 cpp/src/qpid/apr/Socket.h create mode 100644 cpp/src/qpid/client/Connector.cpp create mode 100644 cpp/src/qpid/client/Connector.h delete mode 100644 cpp/src/qpid/sys/Connector.h create mode 100644 cpp/src/qpid/sys/Socket.h (limited to 'cpp/src') diff --git a/cpp/src/qpid/apr/Connector.cpp b/cpp/src/qpid/apr/Connector.cpp deleted file mode 100644 index 4446731654..0000000000 --- a/cpp/src/qpid/apr/Connector.cpp +++ /dev/null @@ -1,190 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include -#include "APRBase.h" -#include "Connector.h" - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); -} - -Connector::~Connector(){ - apr_pool_destroy(pool); - - APRBase::decrement(); -} - -void Connector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); - closed = false; - receiver = Thread(this); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver.join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - Mutex::ScopedLock l(writeLock); - data->encode(outbuf); - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); -} - -void Connector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); - while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; - } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; - } -} - -void Connector::checkIdle(apr_status_t status){ - if(timeoutHandler){ - int64_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ - if(idleIn && (now - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(APR_STATUS_IS_EOF(status)){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(shutdownHandler) shutdownHandler->shutdown(); - }else{ - lastIn = now; - } - if(idleOut && (now - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); - - if(bytes > 0){ - inbuf.move(bytes); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} diff --git a/cpp/src/qpid/apr/Connector.h b/cpp/src/qpid/apr/Connector.h deleted file mode 100644 index e69a7205f3..0000000000 --- a/cpp/src/qpid/apr/Connector.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/ShutdownHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Connector.h" -#include "qpid/sys/Monitor.h" - -namespace qpid { -namespace sys { - - class Connector : public qpid::framing::OutputHandler, - private qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - int64_t lastIn; - int64_t lastOut; - int64_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::sys::Mutex writeLock; - qpid::sys::Thread receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/apr/Monitor.cpp b/cpp/src/qpid/apr/Monitor.cpp deleted file mode 100644 index 69fb2f6ffd..0000000000 --- a/cpp/src/qpid/apr/Monitor.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "Monitor.h" -#include "APRPool.h" - -using namespace qpid::sys; - -Mutex::Mutex() -{ - APRBase::increment(); - // TODO aconway 2006-11-08: Switch to non-nested. - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); -} - -Mutex::~Mutex(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - APRBase::decrement(); -} - -Monitor::Monitor() -{ - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); -} - -Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); -} - - - -void Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - -void Monitor::wait(int64_t nsecs){ - // APR uses microseconds. - apr_status_t status = apr_thread_cond_timedwait( - condition, mutex, nsecs/1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - diff --git a/cpp/src/qpid/apr/Monitor.h b/cpp/src/qpid/apr/Monitor.h index a51baf8d94..10bc20820e 100644 --- a/cpp/src/qpid/apr/Monitor.h +++ b/cpp/src/qpid/apr/Monitor.h @@ -23,6 +23,7 @@ #include "apr-1/apr_thread_mutex.h" #include "apr-1/apr_thread_cond.h" #include "APRBase.h" +#include "APRPool.h" namespace qpid { namespace sys { @@ -43,11 +44,11 @@ class Mutex : private boost::noncopyable public: typedef ScopedLock ScopedLock; - Mutex(); - ~Mutex(); - void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); } - void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); } - void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); } + inline Mutex(); + inline ~Mutex(); + inline void lock(); + inline void unlock(); + inline void trylock(); protected: apr_thread_mutex_t* mutex; @@ -57,18 +58,66 @@ class Mutex : private boost::noncopyable class Monitor : public Mutex { public: - Monitor(); - ~Monitor(); - void wait(); - void wait(int64_t nsecs); - void notify(); - void notifyAll(); + inline Monitor(); + inline ~Monitor(); + inline void wait(); + inline void wait(int64_t nsecs); + inline void notify(); + inline void notifyAll(); private: apr_thread_cond_t* condition; }; + +Mutex::Mutex() { + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); +} + +void Mutex::lock() { + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} +void Mutex::unlock() { + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} + +void Mutex::trylock() { + CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); +} + +Monitor::Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Monitor::~Monitor() { + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + +void Monitor::wait() { + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +void Monitor::wait(int64_t nsecs){ + // APR uses microseconds. + apr_status_t status = apr_thread_cond_timedwait( + condition, mutex, nsecs/1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + + }} diff --git a/cpp/src/qpid/apr/Socket.h b/cpp/src/qpid/apr/Socket.h new file mode 100644 index 0000000000..9a519e7391 --- /dev/null +++ b/cpp/src/qpid/apr/Socket.h @@ -0,0 +1,107 @@ +#ifndef _apr_Socket_h +#define _apr_Socket_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "APRBase.h" +#include "APRPool.h" + +namespace qpid { +namespace sys { + +class Socket +{ + public: + inline Socket(); + inline ~Socket(); + inline void setTimeout(long msecs); + inline void connect(const std::string& host, int port); + inline void close(); + + enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 }; + + inline ssize_t send(const char* data, size_t size); + inline ssize_t recv(char* data, size_t size); + + private: + apr_socket_t* socket; +}; + +inline +Socket::Socket() +{ + CHECK_APR_SUCCESS( + apr_socket_create( + &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, + APRPool::get())); +} + +inline +Socket::~Socket() { } + +inline void +Socket::setTimeout(long msecs) +{ + apr_socket_timeout_set(socket, msecs*1000); +} + +inline void +Socket::connect(const std::string& host, int port) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS( + apr_sockaddr_info_get( + &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, + APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); +} + +inline void +Socket::close() +{ + CHECK_APR_SUCCESS(apr_socket_close(socket)); + socket = 0; +} + +inline ssize_t +Socket::send(const char* data, size_t size) +{ + apr_size_t sent = size; + apr_status_t status = apr_socket_send(socket, data, &sent); + if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + if (!APR_STATUS_IS_EOF(status)) return SOCKET_EOF; + CHECK_APR_SUCCESS(status); + return sent; +} + +inline ssize_t +Socket::recv(char* data, size_t size) +{ + apr_size_t received = size; + apr_status_t status = apr_socket_recv(socket, data, &received); + if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT; + CHECK_APR_SUCCESS(status); + return received; +} + +}} + + +#endif /*!_apr_Socket_h*/ diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index da747d0e1d..4ff4e83859 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -22,7 +22,7 @@ #define _Connection_ #include "qpid/QpidError.h" -#include "qpid/sys/Connector.h" +#include "qpid/client/Connector.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" @@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler, int port; const u_int32_t max_frame_size; std::map channels; - qpid::sys::Connector* connector; + Connector* connector; qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp new file mode 100644 index 0000000000..5d3a20be6a --- /dev/null +++ b/cpp/src/qpid/client/Connector.cpp @@ -0,0 +1,169 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include +#include +#include "Connector.h" + +using namespace qpid::sys; +using namespace qpid::client; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ } + +Connector::~Connector(){ } + +void Connector::connect(const std::string& host, int port){ + socket.connect(host, port); + closed = false; + receiver = Thread(this); +} + +void Connector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void Connector::close(){ + closed = true; + socket.close(); + receiver.join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void Connector::writeBlock(AMQDataBlock* data){ + Mutex::ScopedLock l(writeLock); + data->encode(outbuf); + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); +} + +void Connector::writeToSocket(char* data, size_t available){ + size_t written = 0; + while(written < available && !closed){ + ssize_t sent = socket.send(data + written, available-written); + if(sent > 0) { + lastOut = getTimeMsecs(); + written += sent; + } + } +} + +void Connector::checkIdle(ssize_t status){ + if(timeoutHandler){ + int64_t now = getTimeMsecs(); + if(status == Socket::SOCKET_TIMEOUT) { + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(status == Socket::SOCKET_EOF){ + closed = true; + socket.close(); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + socket.setTimeout(timeout); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + ssize_t available = inbuf.available(); + if(available < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + ssize_t received = socket.recv(inbuf.start(), available); + checkIdle(received); + + if(received > 0){ + inbuf.move(received); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h new file mode 100644 index 0000000000..91ec58c95c --- /dev/null +++ b/cpp/src/qpid/client/Connector.h @@ -0,0 +1,90 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Monitor.h" +#include + +namespace qpid { +namespace client { + + class Connector : public qpid::framing::OutputHandler, + private qpid::sys::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + int64_t lastIn; + int64_t lastOut; + int64_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + qpid::sys::TimeoutHandler* timeoutHandler; + qpid::sys::ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Mutex writeLock; + qpid::sys::Thread receiver; + + qpid::sys::Socket socket; + + void checkIdle(ssize_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + + public: + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(qpid::sys::TimeoutHandler* handler); + virtual void setShutdownHandler(qpid::sys::ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Connector.h b/cpp/src/qpid/sys/Connector.h deleted file mode 100644 index 67e12b008c..0000000000 --- a/cpp/src/qpid/sys/Connector.h +++ /dev/null @@ -1,25 +0,0 @@ -#ifndef _sys_Connector_h -#define _sys_Connector_h - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "platform.h" -#include QPID_PLATFORM_H(Connector.h) - -#endif /*!_sys_Connector_h*/ diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h new file mode 100644 index 0000000000..243764353e --- /dev/null +++ b/cpp/src/qpid/sys/Socket.h @@ -0,0 +1,27 @@ +#ifndef _sys_Socket_h +#define _sys_Socket_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include QPID_PLATFORM_H(Socket.h) + + + +#endif /*!_sys_Socket_h*/ diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h index 79d17b433b..92e83116a5 100644 --- a/cpp/src/qpid/sys/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -1,5 +1,5 @@ -#ifndef _concurrent_Time_h -#define _concurrent_Time_h +#ifndef _sys_Time_h +#define _sys_Time_h /* * @@ -35,4 +35,4 @@ int64_t getTimeMsecs(); }} -#endif /*!_concurrent_Time_h*/ +#endif /*!_sys_Time_h*/ -- cgit v1.2.1