diff options
author | Alan Conway <aconway@apache.org> | 2006-10-31 19:53:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-31 19:53:55 +0000 |
commit | 9094d2b10ecadd66fa3b22169183e7573cc79629 (patch) | |
tree | bf3915f72be2a5f09932b800d2fa4309fb3ad64e /cpp/src/qpid/io | |
parent | 0487ea40bc6568765cdec75a36273eeb26fae854 (diff) | |
download | qpid-python-9094d2b10ecadd66fa3b22169183e7573cc79629.tar.gz |
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/io')
-rw-r--r-- | cpp/src/qpid/io/APRConnector.h | 95 | ||||
-rw-r--r-- | cpp/src/qpid/io/APRPool.cpp (renamed from cpp/src/qpid/io/LConnector.h) | 39 | ||||
-rw-r--r-- | cpp/src/qpid/io/APRPool.h (renamed from cpp/src/qpid/io/SessionManager.h) | 39 | ||||
-rw-r--r-- | cpp/src/qpid/io/Acceptor.cpp | 61 | ||||
-rw-r--r-- | cpp/src/qpid/io/Acceptor.h | 57 | ||||
-rw-r--r-- | cpp/src/qpid/io/BlockingAPRAcceptor.cpp | 101 | ||||
-rw-r--r-- | cpp/src/qpid/io/BlockingAPRAcceptor.h | 65 | ||||
-rw-r--r-- | cpp/src/qpid/io/BlockingAPRSessionContext.cpp | 177 | ||||
-rw-r--r-- | cpp/src/qpid/io/BlockingAPRSessionContext.h | 94 | ||||
-rw-r--r-- | cpp/src/qpid/io/Connector.cpp (renamed from cpp/src/qpid/io/APRConnector.cpp) | 42 | ||||
-rw-r--r-- | cpp/src/qpid/io/Connector.h | 73 | ||||
-rw-r--r-- | cpp/src/qpid/io/ConnectorImpl.h | 53 | ||||
-rw-r--r-- | cpp/src/qpid/io/LFAcceptor.cpp | 94 | ||||
-rw-r--r-- | cpp/src/qpid/io/LFAcceptor.h | 74 | ||||
-rw-r--r-- | cpp/src/qpid/io/LFProcessor.h | 10 | ||||
-rw-r--r-- | cpp/src/qpid/io/LFSessionContext.cpp | 6 | ||||
-rw-r--r-- | cpp/src/qpid/io/LFSessionContext.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/io/doxygen_summary.h | 34 |
18 files changed, 251 insertions, 869 deletions
diff --git a/cpp/src/qpid/io/APRConnector.h b/cpp/src/qpid/io/APRConnector.h deleted file mode 100644 index c835f30056..0000000000 --- a/cpp/src/qpid/io/APRConnector.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRConnector_ -#define _APRConnector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/APRMonitor.h" - -namespace qpid { -namespace io { - - class APRConnector : public virtual qpid::framing::OutputHandler, - public virtual Connector, - private virtual qpid::concurrent::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - apr_time_t lastIn; - apr_time_t lastOut; - apr_interval_time_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::concurrent::APRMonitor* writeLock; - qpid::concurrent::ThreadFactory* threadFactory; - qpid::concurrent::Thread* receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - APRConnector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~APRConnector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LConnector.h b/cpp/src/qpid/io/APRPool.cpp index 5fc86597bd..edd434f16c 100644 --- a/cpp/src/qpid/io/LConnector.h +++ b/cpp/src/qpid/io/APRPool.cpp @@ -15,34 +15,25 @@ * limitations under the License. * */ -#ifndef _LConnector_ -#define _LConnector_ +#include "APRPool.h" +#include "qpid/concurrent/APRBase.h" +#include <boost/pool/singleton_pool.hpp> -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" - -namespace qpid { -namespace io { - - class LConnector : public virtual qpid::framing::OutputHandler, - public virtual Connector, - private virtual qpid::concurrent::Runnable - { - - public: - LConnector(bool debug = false, u_int32_t buffer_size = 1024){}; - virtual ~LConnector(){}; - - }; +using namespace qpid::io; +using namespace qpid::concurrent; +APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); } + +APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); } +apr_pool_t* APRPool::get() { + return boost::details::pool::singleton_default<APRPool>::instance().pool; +} -#endif diff --git a/cpp/src/qpid/io/SessionManager.h b/cpp/src/qpid/io/APRPool.h index e6b17451e4..063eedf1ee 100644 --- a/cpp/src/qpid/io/SessionManager.h +++ b/cpp/src/qpid/io/APRPool.h @@ -1,3 +1,6 @@ +#ifndef _APRPool_ +#define _APRPool_ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,26 +18,30 @@ * limitations under the License. * */ -#ifndef _SessionManager_ -#define _SessionManager_ - -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" +#include <boost/noncopyable.hpp> +#include <apr-1/apr_pools.h> namespace qpid { namespace io { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + /** Get singleton instance */ + static apr_pool_t* get(); + + private: + apr_pool_t* pool; +}; + +}} + - class SessionManager - { - public: - virtual SessionHandler* init(SessionContext* ctxt) = 0; - virtual void close(SessionContext* ctxt) = 0; - virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0; - virtual ~SessionManager(){} - }; -} -} -#endif +#endif /*!_APRPool_*/ diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp index 6b76bd4da2..f95d9448cf 100644 --- a/cpp/src/qpid/io/Acceptor.cpp +++ b/cpp/src/qpid/io/Acceptor.cpp @@ -15,7 +15,64 @@ * limitations under the License. * */ - #include "qpid/io/Acceptor.h" +#include "qpid/concurrent/APRBase.h" +#include "APRPool.h" + +using namespace qpid::concurrent; +using namespace qpid::io; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : + port(port_), + processor(APRPool::get(), threads, 1000, 5000000) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + shutdown(); +} + +void Acceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } +} + -qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h index a7f7ad66f0..bc189f7f6e 100644 --- a/cpp/src/qpid/io/Acceptor.h +++ b/cpp/src/qpid/io/Acceptor.h @@ -15,36 +15,43 @@ * limitations under the License. * */ -#ifndef _Acceptor_ -#define _Acceptor_ - +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/io/Acceptor.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/io/LFProcessor.h" +#include "qpid/io/LFSessionContext.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/SessionContext.h" #include "qpid/io/SessionHandlerFactory.h" +#include "qpid/concurrent/Thread.h" +#include <qpid/SharedObject.h> namespace qpid { namespace io { - class Acceptor - { - public: - /** - * Bind to port. - * @param port Port to bind to, 0 to bind to dynamically chosen port. - * @return The local bound port. - */ - virtual int16_t bind(int16_t port) = 0; - - /** - * Run the acceptor. - */ - virtual void run(SessionHandlerFactory* factory) = 0; - - /** - * Shut down the acceptor. - */ - virtual void shutdown() = 0; - - virtual ~Acceptor(); - }; +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject<Acceptor> +{ + public: + Acceptor(int16_t port, int backlog, int threads); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + int16_t port; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; +}; } } diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp deleted file mode 100644 index 0e1fc535a2..0000000000 --- a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - -BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : - debug(_debug), - threadFactory(new APRThreadFactory()), - connectionBacklog(c) -{ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); -} - -int16_t BlockingAPRAcceptor::bind(int16_t _port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); - return getPort(); -} - -int16_t BlockingAPRAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void BlockingAPRAcceptor::run(SessionHandlerFactory* factory) -{ - running = true; - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, apr_pool); - if(status == APR_SUCCESS){ - //configure socket: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - - BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug); - session->init(factory->create(session)); - sessions.push_back(session); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void BlockingAPRAcceptor::shutdown() -{ - // TODO aconway 2006-10-12: Not thread safe. - if (running) - { - running = false; - apr_socket_close(socket); // Don't check, exception safety. - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } - } -} - -BlockingAPRAcceptor::~BlockingAPRAcceptor(){ - delete threadFactory; - apr_pool_destroy(apr_pool); - APRBase::decrement(); -} - - -void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ - sessions.erase(find(sessions.begin(), sessions.end(), session)); -} - diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.h b/cpp/src/qpid/io/BlockingAPRAcceptor.h deleted file mode 100644 index a3042605aa..0000000000 --- a/cpp/src/qpid/io/BlockingAPRAcceptor.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _BlockingAPRAcceptor_ -#define _BlockingAPRAcceptor_ - -#include <vector> -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/io/BlockingAPRSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" - -namespace qpid { -namespace io { - - class BlockingAPRAcceptor : public virtual Acceptor - { - typedef std::vector<BlockingAPRSessionContext*>::iterator iterator; - - const bool debug; - apr_pool_t* apr_pool; - qpid::concurrent::ThreadFactory* threadFactory; - std::vector<BlockingAPRSessionContext*> sessions; - apr_socket_t* socket; - const int connectionBacklog; - volatile bool running; - - public: - BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); - virtual int16_t bind(int16_t port); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - virtual ~BlockingAPRAcceptor(); - void closed(BlockingAPRSessionContext* session); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp deleted file mode 100644 index 88e6b6b0fc..0000000000 --- a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <assert.h> -#include <iostream> -#include "qpid/io/BlockingAPRSessionContext.h" -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - - -BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, - ThreadFactory* factory, - BlockingAPRAcceptor* _acceptor, - bool _debug) - : socket(_socket), - debug(_debug), - handler(0), - acceptor(_acceptor), - inbuf(65536), - outbuf(65536), - closed(false){ - - reader = new Reader(this); - writer = new Writer(this); - - rThread = factory->create(reader); - wThread = factory->create(writer); -} - -BlockingAPRSessionContext::~BlockingAPRSessionContext(){ - delete reader; - delete writer; - - delete rThread; - delete wThread; - - delete handler; -} - -void BlockingAPRSessionContext::read(){ - try{ - bool initiated(false); - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out, check closed on loop - }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ - closed = true; - }else{ - inbuf.move(bytes); - inbuf.flip(); - - if(!initiated){ - ProtocolInitiation* protocolInit = new ProtocolInitiation(); - if(protocolInit->decode(inbuf)){ - handler->initiated(protocolInit); - if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; - initiated = true; - } - }else{ - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; - handler->received(&frame); - } - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - - //close socket - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void BlockingAPRSessionContext::write(){ - while(!closed){ - //get next frame - outlock.acquire(); - while(outframes.empty() && !closed){ - outlock.wait(); - } - if(!closed){ - AMQFrame* frame = outframes.front(); - outframes.pop(); - outlock.release(); - - //encode - frame->encode(outbuf); - if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; - delete frame; - outbuf.flip(); - - //write from outbuf to socket - char* data = outbuf.start(); - const int available = outbuf.available(); - int written = 0; - apr_size_t bytes = available; - while(available > written){ - apr_socket_send(socket, data + written, &bytes); - written += bytes; - bytes = available - written; - } - outbuf.clear(); - }else{ - outlock.release(); - } - } -} - -void BlockingAPRSessionContext::send(AMQFrame* frame){ - if(!closed){ - outlock.acquire(); - bool was_empty(outframes.empty()); - outframes.push(frame); - if(was_empty){ - outlock.notify(); - } - outlock.release(); - }else{ - std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; - } -} - -void BlockingAPRSessionContext::init(SessionHandler* _handler){ - handler = _handler; - rThread->start(); - wThread->start(); -} - -void BlockingAPRSessionContext::close(){ - closed = true; - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; - handler->closed(); - acceptor->closed(this); - delete this; -} - -void BlockingAPRSessionContext::shutdown(){ - closed = true; - outlock.acquire(); - outlock.notify(); - outlock.release(); - - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - rThread->join(); - handler->closed(); - delete this; -} diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.h b/cpp/src/qpid/io/BlockingAPRSessionContext.h deleted file mode 100644 index c06142ace5..0000000000 --- a/cpp/src/qpid/io/BlockingAPRSessionContext.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _BlockingAPRSessionContext_ -#define _BlockingAPRSessionContext_ - -#include <queue> -#include <vector> - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/framing/Buffer.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" - -namespace qpid { -namespace io { - - class BlockingAPRAcceptor; - - class BlockingAPRSessionContext : public virtual SessionContext - { - class Reader : public virtual qpid::concurrent::Runnable{ - BlockingAPRSessionContext* parent; - public: - inline Reader(BlockingAPRSessionContext* p) : parent(p){} - inline virtual void run(){ parent->read(); } - inline virtual ~Reader(){} - }; - - class Writer : public virtual qpid::concurrent::Runnable{ - BlockingAPRSessionContext* parent; - public: - inline Writer(BlockingAPRSessionContext* p) : parent(p){} - inline virtual void run(){ parent->write(); } - inline virtual ~Writer(){} - }; - - apr_socket_t* socket; - const bool debug; - SessionHandler* handler; - BlockingAPRAcceptor* acceptor; - std::queue<qpid::framing::AMQFrame*> outframes; - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - qpid::concurrent::APRMonitor outlock; - Reader* reader; - Writer* writer; - qpid::concurrent::Thread* rThread; - qpid::concurrent::Thread* wThread; - - volatile bool closed; - - void read(); - void write(); - public: - BlockingAPRSessionContext(apr_socket_t* socket, - qpid::concurrent::ThreadFactory* factory, - BlockingAPRAcceptor* acceptor, - bool debug = false); - ~BlockingAPRSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void shutdown(); - void init(SessionHandler* handler); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/APRConnector.cpp b/cpp/src/qpid/io/Connector.cpp index 91cf01c842..ca487deb86 100644 --- a/cpp/src/qpid/io/APRConnector.cpp +++ b/cpp/src/qpid/io/Connector.cpp @@ -17,8 +17,8 @@ */ #include <iostream> #include "qpid/concurrent/APRBase.h" -#include "qpid/io/APRConnector.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/QpidError.h" using namespace qpid::io; @@ -26,7 +26,7 @@ using namespace qpid::concurrent; using namespace qpid::framing; using qpid::QpidError; -APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : +Connector::Connector(bool _debug, u_int32_t buffer_size) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), @@ -44,11 +44,11 @@ APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - threadFactory = new APRThreadFactory(); - writeLock = new APRMonitor(); + threadFactory = new ThreadFactory(); + writeLock = new Monitor(); } -APRConnector::~APRConnector(){ +Connector::~Connector(){ delete receiver; delete writeLock; delete threadFactory; @@ -57,7 +57,7 @@ APRConnector::~APRConnector(){ APRBase::decrement(); } -void APRConnector::connect(const std::string& host, int port){ +void Connector::connect(const std::string& host, int port){ apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); @@ -67,36 +67,36 @@ void APRConnector::connect(const std::string& host, int port){ receiver->start(); } -void APRConnector::init(ProtocolInitiation* header){ +void Connector::init(ProtocolInitiation* header){ writeBlock(header); delete header; } -void APRConnector::close(){ +void Connector::close(){ closed = true; CHECK_APR_SUCCESS(apr_socket_close(socket)); receiver->join(); } -void APRConnector::setInputHandler(InputHandler* handler){ +void Connector::setInputHandler(InputHandler* handler){ input = handler; } -void APRConnector::setShutdownHandler(ShutdownHandler* handler){ +void Connector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } -OutputHandler* APRConnector::getOutputHandler(){ +OutputHandler* Connector::getOutputHandler(){ return this; } -void APRConnector::send(AMQFrame* frame){ +void Connector::send(AMQFrame* frame){ writeBlock(frame); if(debug) std::cout << "SENT: " << *frame << std::endl; delete frame; } -void APRConnector::writeBlock(AMQDataBlock* data){ +void Connector::writeBlock(AMQDataBlock* data){ writeLock->acquire(); data->encode(outbuf); @@ -107,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){ writeLock->release(); } -void APRConnector::writeToSocket(char* data, size_t available){ +void Connector::writeToSocket(char* data, size_t available){ apr_size_t bytes(available); apr_size_t written(0); while(written < available && !closed){ @@ -124,7 +124,7 @@ void APRConnector::writeToSocket(char* data, size_t available){ } } -void APRConnector::checkIdle(apr_status_t status){ +void Connector::checkIdle(apr_status_t status){ if(timeoutHandler){ apr_time_t now = apr_time_as_msec(apr_time_now()); if(APR_STATUS_IS_TIMEUP(status)){ @@ -144,7 +144,7 @@ void APRConnector::checkIdle(apr_status_t status){ } } -void APRConnector::setReadTimeout(u_int16_t t){ +void Connector::setReadTimeout(u_int16_t t){ idleIn = t * 1000;//t is in secs if(idleIn && (!timeout || idleIn < timeout)){ timeout = idleIn; @@ -153,7 +153,7 @@ void APRConnector::setReadTimeout(u_int16_t t){ } -void APRConnector::setWriteTimeout(u_int16_t t){ +void Connector::setWriteTimeout(u_int16_t t){ idleOut = t * 1000;//t is in secs if(idleOut && (!timeout || idleOut < timeout)){ timeout = idleOut; @@ -161,7 +161,7 @@ void APRConnector::setWriteTimeout(u_int16_t t){ } } -void APRConnector::setSocketTimeout(){ +void Connector::setSocketTimeout(){ //interval is in microseconds, timeout in milliseconds //want the interval to be a bit shorter than the timeout, hence multiply //by 800 rather than 1000. @@ -169,11 +169,11 @@ void APRConnector::setSocketTimeout(){ apr_socket_timeout_set(socket, interval); } -void APRConnector::setTimeoutHandler(TimeoutHandler* handler){ +void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } -void APRConnector::run(){ +void Connector::run(){ try{ while(!closed){ apr_size_t bytes(inbuf.available()); diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h index d0a2f470a8..7c52f7e87b 100644 --- a/cpp/src/qpid/io/Connector.h +++ b/cpp/src/qpid/io/Connector.h @@ -18,35 +18,74 @@ #ifndef _Connector_ #define _Connector_ +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" + #include "qpid/framing/InputHandler.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/io/ShutdownHandler.h" #include "qpid/io/TimeoutHandler.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace io { - class Connector + class Connector : public virtual qpid::framing::OutputHandler, + private virtual qpid::concurrent::Runnable { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::concurrent::Monitor* writeLock; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + public: - virtual void connect(const std::string& host, int port) = 0; - virtual void init(qpid::framing::ProtocolInitiation* header) = 0; - virtual void close() = 0; - virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0; - virtual void setTimeoutHandler(TimeoutHandler* handler) = 0; - virtual void setShutdownHandler(ShutdownHandler* handler) = 0; - virtual qpid::framing::OutputHandler* getOutputHandler() = 0; - /** - * Set the timeout for reads, in secs. - */ - virtual void setReadTimeout(u_int16_t timeout) = 0; - /** - * Set the timeout for writes, in secs. - */ - virtual void setWriteTimeout(u_int16_t timeout) = 0; - virtual ~Connector(){} + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); }; } diff --git a/cpp/src/qpid/io/ConnectorImpl.h b/cpp/src/qpid/io/ConnectorImpl.h deleted file mode 100644 index 55dcf7a2d4..0000000000 --- a/cpp/src/qpid/io/ConnectorImpl.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRConnectorImpl_ -#define _APRConnectorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/io/APRConnector.h" -#else -#include "qpid/io/LConnector.h" -#endif - -namespace qpid { -namespace io { - -#ifdef _USE_APR_IO_ - class ConnectorImpl : public virtual APRConnector - { - - public: - ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){}; - virtual ~ConnectorImpl(){}; - }; -#else - class ConnectorImpl : public virtual LConnector - { - - public: - ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){}; - virtual ~ConnectorImpl(){}; - }; - -#endif - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFAcceptor.cpp b/cpp/src/qpid/io/LFAcceptor.cpp deleted file mode 100644 index 7e51a550af..0000000000 --- a/cpp/src/qpid/io/LFAcceptor.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFAcceptor.h" -#include "qpid/concurrent/APRBase.h" - -using namespace qpid::concurrent; -using namespace qpid::io; - -LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : - processor(aprPool.pool, worker_threads, 1000, 5000000), - max_connections_per_processor(m), - debug(_debug), - connectionBacklog(c) -{ } - - -int16_t LFAcceptor::bind(int16_t _port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); - return getPort(); -} - -int16_t LFAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void LFAcceptor::run(SessionHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug); - session->init(factory->create(session)); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void LFAcceptor::shutdown() { - // TODO aconway 2006-10-12: Cleanup, this is not thread safe. - if (running) { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - } -} - - -LFAcceptor::~LFAcceptor(){} - -LFAcceptor::APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -LFAcceptor::APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} diff --git a/cpp/src/qpid/io/LFAcceptor.h b/cpp/src/qpid/io/LFAcceptor.h deleted file mode 100644 index 35a556d500..0000000000 --- a/cpp/src/qpid/io/LFAcceptor.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFAcceptor_ -#define _LFAcceptor_ - -#include <vector> -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace io { - - class LFAcceptor : public virtual Acceptor - { - class APRPool{ - public: - apr_pool_t* pool; - APRPool(); - ~APRPool(); - }; - - APRPool aprPool; - LFProcessor processor; - apr_socket_t* socket; - const int max_connections_per_processor; - const bool debug; - const int connectionBacklog; - - volatile bool running; - - public: - LFAcceptor(bool debug = false, - int connectionBacklog = 10, - int worker_threads = 5, - int max_connections_per_processor = 500); - virtual int16_t bind(int16_t port); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - virtual ~LFAcceptor(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h index 16c789f0ff..5b61f444af 100644 --- a/cpp/src/qpid/io/LFProcessor.h +++ b/cpp/src/qpid/io/LFProcessor.h @@ -21,8 +21,8 @@ #include "apr-1/apr_poll.h" #include <iostream> #include <vector> -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/concurrent/Runnable.h" namespace qpid { @@ -50,9 +50,9 @@ namespace io { const int workerCount; bool hasLeader; qpid::concurrent::Thread** const workers; - qpid::concurrent::APRMonitor leadLock; - qpid::concurrent::APRMonitor countLock; - qpid::concurrent::APRThreadFactory factory; + qpid::concurrent::Monitor leadLock; + qpid::concurrent::Monitor countLock; + qpid::concurrent::ThreadFactory factory; std::vector<LFSessionContext*> sessions; volatile bool stopped; diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp index 6d6d786841..ca1e6431a6 100644 --- a/cpp/src/qpid/io/LFSessionContext.cpp +++ b/cpp/src/qpid/io/LFSessionContext.cpp @@ -54,7 +54,7 @@ LFSessionContext::~LFSessionContext(){ void LFSessionContext::read(){ assert(!reading); // No concurrent read. - reading = APRThread::currentThread(); + reading = Thread::currentThread(); socket.read(in); in.flip(); @@ -79,7 +79,7 @@ void LFSessionContext::read(){ void LFSessionContext::write(){ assert(!writing); // No concurrent writes. - writing = APRThread::currentThread(); + writing = Thread::currentThread(); bool done = isClosed(); while(!done){ @@ -186,4 +186,4 @@ void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ logLock.release(); } -APRMonitor LFSessionContext::logLock; +Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h index 9406bb97b6..8d30b54204 100644 --- a/cpp/src/qpid/io/LFSessionContext.h +++ b/cpp/src/qpid/io/LFSessionContext.h @@ -25,7 +25,7 @@ #include "apr-1/apr_time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/APRMonitor.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/io/APRSocket.h" #include "qpid/framing/Buffer.h" #include "qpid/io/LFProcessor.h" @@ -51,7 +51,7 @@ namespace io { apr_pollfd_t fd; std::queue<qpid::framing::AMQFrame*> framesToWrite; - qpid::concurrent::APRMonitor writeLock; + qpid::concurrent::Monitor writeLock; bool processing; bool closing; @@ -60,7 +60,7 @@ namespace io { volatile unsigned int reading; volatile unsigned int writing; - static qpid::concurrent::APRMonitor logLock; + static qpid::concurrent::Monitor logLock; void log(const std::string& desc, qpid::framing::AMQFrame* const frame); public: diff --git a/cpp/src/qpid/io/doxygen_summary.h b/cpp/src/qpid/io/doxygen_summary.h new file mode 100644 index 0000000000..1086f65f63 --- /dev/null +++ b/cpp/src/qpid/io/doxygen_summary.h @@ -0,0 +1,34 @@ +#ifndef _doxygen_summary_ +#define _doxygen_summary_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// No code just a doxygen comment for the namespace + +/** \namspace qpid::io + * IO classes used by client and broker. + * + * This namespace contains platform-neutral classes. Platform + * specific classes are in a sub-namespace named after the + * platform. At build time the appropriate platform classes are + * imported into this namespace so other code does not need to be awre + * of the difference. + * + */ +#endif /*!_doxygen_summary_*/ |