summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/apr
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-09 01:29:59 +0000
committerAlan Conway <aconway@apache.org>2006-11-09 01:29:59 +0000
commit295145d247a7523affdf43f8d870912b1a303caf (patch)
treee27b156b80f0904530d512177e35284def40ab27 /cpp/src/qpid/apr
parentf6113838a8e6d271e46466fe74884c5bf9706ae0 (diff)
downloadqpid-python-295145d247a7523affdf43f8d870912b1a303caf.tar.gz
More separation of concerns with APR, client side complete.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@472732 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/apr')
-rw-r--r--cpp/src/qpid/apr/Connector.cpp190
-rw-r--r--cpp/src/qpid/apr/Connector.h93
-rw-r--r--cpp/src/qpid/apr/Monitor.cpp64
-rw-r--r--cpp/src/qpid/apr/Monitor.h71
-rw-r--r--cpp/src/qpid/apr/Socket.h107
5 files changed, 167 insertions, 358 deletions
diff --git a/cpp/src/qpid/apr/Connector.cpp b/cpp/src/qpid/apr/Connector.cpp
deleted file mode 100644
index 4446731654..0000000000
--- a/cpp/src/qpid/apr/Connector.cpp
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include <qpid/QpidError.h>
-#include "APRBase.h"
-#include "Connector.h"
-
-using namespace qpid::sys;
-using namespace qpid::sys;
-using namespace qpid::framing;
-using qpid::QpidError;
-
-Connector::Connector(bool _debug, u_int32_t buffer_size) :
- debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- closed(true),
- lastIn(0), lastOut(0),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size){
-
- APRBase::increment();
-
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
-}
-
-Connector::~Connector(){
- apr_pool_destroy(pool);
-
- APRBase::decrement();
-}
-
-void Connector::connect(const std::string& host, int port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
- CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
- closed = false;
- receiver = Thread(this);
-}
-
-void Connector::init(ProtocolInitiation* header){
- writeBlock(header);
- delete header;
-}
-
-void Connector::close(){
- closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- receiver.join();
-}
-
-void Connector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void Connector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* Connector::getOutputHandler(){
- return this;
-}
-
-void Connector::send(AMQFrame* frame){
- writeBlock(frame);
- if(debug) std::cout << "SENT: " << *frame << std::endl;
- delete frame;
-}
-
-void Connector::writeBlock(AMQDataBlock* data){
- Mutex::ScopedLock l(writeLock);
- data->encode(outbuf);
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
-}
-
-void Connector::writeToSocket(char* data, size_t available){
- apr_size_t bytes(available);
- apr_size_t written(0);
- while(written < available && !closed){
- apr_status_t status = apr_socket_send(socket, data + written, &bytes);
- if(status == APR_TIMEUP){
- std::cout << "Write request timed out." << std::endl;
- }
- if(bytes == 0){
- std::cout << "Write request wrote 0 bytes." << std::endl;
- }
- lastOut = apr_time_as_msec(apr_time_now());
- written += bytes;
- bytes = available - written;
- }
-}
-
-void Connector::checkIdle(apr_status_t status){
- if(timeoutHandler){
- int64_t now = apr_time_as_msec(apr_time_now());
- if(APR_STATUS_IS_TIMEUP(status)){
- if(idleIn && (now - lastIn > idleIn)){
- timeoutHandler->idleIn();
- }
- }else if(APR_STATUS_IS_EOF(status)){
- closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- if(shutdownHandler) shutdownHandler->shutdown();
- }else{
- lastIn = now;
- }
- if(idleOut && (now - lastOut > idleOut)){
- timeoutHandler->idleOut();
- }
- }
-}
-
-void Connector::setReadTimeout(u_int16_t t){
- idleIn = t * 1000;//t is in secs
- if(idleIn && (!timeout || idleIn < timeout)){
- timeout = idleIn;
- setSocketTimeout();
- }
-
-}
-
-void Connector::setWriteTimeout(u_int16_t t){
- idleOut = t * 1000;//t is in secs
- if(idleOut && (!timeout || idleOut < timeout)){
- timeout = idleOut;
- setSocketTimeout();
- }
-}
-
-void Connector::setSocketTimeout(){
- //interval is in microseconds, timeout in milliseconds
- //want the interval to be a bit shorter than the timeout, hence multiply
- //by 800 rather than 1000.
- apr_interval_time_t interval(timeout * 800);
- apr_socket_timeout_set(socket, interval);
-}
-
-void Connector::setTimeoutHandler(TimeoutHandler* handler){
- timeoutHandler = handler;
-}
-
-void Connector::run(){
- try{
- while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
-
- if(bytes > 0){
- inbuf.move(bytes);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame;
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: " << frame << std::endl;
- input->received(&frame);
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
diff --git a/cpp/src/qpid/apr/Connector.h b/cpp/src/qpid/apr/Connector.h
deleted file mode 100644
index e69a7205f3..0000000000
--- a/cpp/src/qpid/apr/Connector.h
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _Connector_
-#define _Connector_
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/sys/ShutdownHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Connector.h"
-#include "qpid/sys/Monitor.h"
-
-namespace qpid {
-namespace sys {
-
- class Connector : public qpid::framing::OutputHandler,
- private qpid::sys::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
-
- bool closed;
-
- int64_t lastIn;
- int64_t lastOut;
- int64_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- TimeoutHandler* timeoutHandler;
- ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
-
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
-
- qpid::sys::Mutex writeLock;
- qpid::sys::Thread receiver;
-
- apr_pool_t* pool;
- apr_socket_t* socket;
-
- void checkIdle(apr_status_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
-
- public:
- Connector(bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~Connector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(TimeoutHandler* handler);
- virtual void setShutdownHandler(ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/apr/Monitor.cpp b/cpp/src/qpid/apr/Monitor.cpp
deleted file mode 100644
index 69fb2f6ffd..0000000000
--- a/cpp/src/qpid/apr/Monitor.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "Monitor.h"
-#include "APRPool.h"
-
-using namespace qpid::sys;
-
-Mutex::Mutex()
-{
- APRBase::increment();
- // TODO aconway 2006-11-08: Switch to non-nested.
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
-}
-
-Mutex::~Mutex(){
- CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
- APRBase::decrement();
-}
-
-Monitor::Monitor()
-{
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
-}
-
-Monitor::~Monitor(){
- CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
-}
-
-
-
-void Monitor::wait(){
- CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
-}
-
-void Monitor::wait(int64_t nsecs){
- // APR uses microseconds.
- apr_status_t status = apr_thread_cond_timedwait(
- condition, mutex, nsecs/1000);
- if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
-}
-
-void Monitor::notify(){
- CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
-}
-
-void Monitor::notifyAll(){
- CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
-}
-
diff --git a/cpp/src/qpid/apr/Monitor.h b/cpp/src/qpid/apr/Monitor.h
index a51baf8d94..10bc20820e 100644
--- a/cpp/src/qpid/apr/Monitor.h
+++ b/cpp/src/qpid/apr/Monitor.h
@@ -23,6 +23,7 @@
#include "apr-1/apr_thread_mutex.h"
#include "apr-1/apr_thread_cond.h"
#include "APRBase.h"
+#include "APRPool.h"
namespace qpid {
namespace sys {
@@ -43,11 +44,11 @@ class Mutex : private boost::noncopyable
public:
typedef ScopedLock<Mutex> ScopedLock;
- Mutex();
- ~Mutex();
- void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); }
- void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); }
- void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); }
+ inline Mutex();
+ inline ~Mutex();
+ inline void lock();
+ inline void unlock();
+ inline void trylock();
protected:
apr_thread_mutex_t* mutex;
@@ -57,18 +58,66 @@ class Mutex : private boost::noncopyable
class Monitor : public Mutex
{
public:
- Monitor();
- ~Monitor();
- void wait();
- void wait(int64_t nsecs);
- void notify();
- void notifyAll();
+ inline Monitor();
+ inline ~Monitor();
+ inline void wait();
+ inline void wait(int64_t nsecs);
+ inline void notify();
+ inline void notifyAll();
private:
apr_thread_cond_t* condition;
};
+
+Mutex::Mutex() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
+}
+
+Mutex::~Mutex(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+}
+
+void Mutex::lock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+void Mutex::unlock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
+
+void Mutex::trylock() {
+ CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex));
+}
+
+Monitor::Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
+}
+
+Monitor::~Monitor() {
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+void Monitor::wait() {
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+void Monitor::wait(int64_t nsecs){
+ // APR uses microseconds.
+ apr_status_t status = apr_thread_cond_timedwait(
+ condition, mutex, nsecs/1000);
+ if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void Monitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void Monitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+
}}
diff --git a/cpp/src/qpid/apr/Socket.h b/cpp/src/qpid/apr/Socket.h
new file mode 100644
index 0000000000..9a519e7391
--- /dev/null
+++ b/cpp/src/qpid/apr/Socket.h
@@ -0,0 +1,107 @@
+#ifndef _apr_Socket_h
+#define _apr_Socket_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <apr-1/apr_network_io.h>
+#include "APRBase.h"
+#include "APRPool.h"
+
+namespace qpid {
+namespace sys {
+
+class Socket
+{
+ public:
+ inline Socket();
+ inline ~Socket();
+ inline void setTimeout(long msecs);
+ inline void connect(const std::string& host, int port);
+ inline void close();
+
+ enum { SOCKET_TIMEOUT=-2, SOCKET_EOF=-3 };
+
+ inline ssize_t send(const char* data, size_t size);
+ inline ssize_t recv(char* data, size_t size);
+
+ private:
+ apr_socket_t* socket;
+};
+
+inline
+Socket::Socket()
+{
+ CHECK_APR_SUCCESS(
+ apr_socket_create(
+ &socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP,
+ APRPool::get()));
+}
+
+inline
+Socket::~Socket() { }
+
+inline void
+Socket::setTimeout(long msecs)
+{
+ apr_socket_timeout_set(socket, msecs*1000);
+}
+
+inline void
+Socket::connect(const std::string& host, int port)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(
+ apr_sockaddr_info_get(
+ &address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK,
+ APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+}
+
+inline void
+Socket::close()
+{
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ socket = 0;
+}
+
+inline ssize_t
+Socket::send(const char* data, size_t size)
+{
+ apr_size_t sent = size;
+ apr_status_t status = apr_socket_send(socket, data, &sent);
+ if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ if (!APR_STATUS_IS_EOF(status)) return SOCKET_EOF;
+ CHECK_APR_SUCCESS(status);
+ return sent;
+}
+
+inline ssize_t
+Socket::recv(char* data, size_t size)
+{
+ apr_size_t received = size;
+ apr_status_t status = apr_socket_recv(socket, data, &received);
+ if (!APR_STATUS_IS_TIMEUP(status)) return SOCKET_TIMEOUT;
+ CHECK_APR_SUCCESS(status);
+ return received;
+}
+
+}}
+
+
+#endif /*!_apr_Socket_h*/