diff options
author | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
commit | c256b20602a42a3d33f36bb0e8d9692906d282a6 (patch) | |
tree | 0fba6619fbb98c3511785143ca30647c5e6a4469 /cpp/common/io | |
parent | 1e6a034ccd8e260e615195bf193aed7d37b928a8 (diff) | |
download | qpid-python-c256b20602a42a3d33f36bb0e8d9692906d282a6.tar.gz |
Converted broker to a class for use in tests, plugins etc.
qpid::Exception base class for all exceptions, inherits std::exception.
Require boost on all platforms: http://www.boost.org, 'yum boost' on fedora.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@463376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io')
-rw-r--r-- | cpp/common/io/inc/Acceptor.h | 21 | ||||
-rw-r--r-- | cpp/common/io/inc/BlockingAPRAcceptor.h | 7 | ||||
-rw-r--r-- | cpp/common/io/inc/LFAcceptor.h | 7 | ||||
-rw-r--r-- | cpp/common/io/inc/LFProcessor.h | 6 | ||||
-rw-r--r-- | cpp/common/io/src/Acceptor.cpp | 21 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 35 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 4 |
8 files changed, 108 insertions, 25 deletions
diff --git a/cpp/common/io/inc/Acceptor.h b/cpp/common/io/inc/Acceptor.h index 5c690c546f..d7313b84db 100644 --- a/cpp/common/io/inc/Acceptor.h +++ b/cpp/common/io/inc/Acceptor.h @@ -20,15 +20,30 @@ #include "SessionHandlerFactory.h" - namespace qpid { namespace io { class Acceptor { public: - virtual void bind(int port, SessionHandlerFactory* factory) = 0; - virtual ~Acceptor(){} + /** + * Bind to port. + * @param port Port to bind to, 0 to bind to dynamically chosen port. + * @return The local bound port. + */ + virtual int16_t bind(int16_t port) = 0; + + /** + * Run the acceptor. + */ + virtual void run(SessionHandlerFactory* factory) = 0; + + /** + * Shut down the acceptor. + */ + virtual void shutdown() = 0; + + virtual ~Acceptor(); }; } diff --git a/cpp/common/io/inc/BlockingAPRAcceptor.h b/cpp/common/io/inc/BlockingAPRAcceptor.h index b77371b02e..bd069ed8db 100644 --- a/cpp/common/io/inc/BlockingAPRAcceptor.h +++ b/cpp/common/io/inc/BlockingAPRAcceptor.h @@ -47,10 +47,13 @@ namespace io { apr_socket_t* socket; const int connectionBacklog; volatile bool running; - + public: BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); - virtual void bind(int port, SessionHandlerFactory* factory); + virtual int16_t bind(int16_t port); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); virtual ~BlockingAPRAcceptor(); void closed(BlockingAPRSessionContext* session); }; diff --git a/cpp/common/io/inc/LFAcceptor.h b/cpp/common/io/inc/LFAcceptor.h index 314f811827..9a40eed222 100644 --- a/cpp/common/io/inc/LFAcceptor.h +++ b/cpp/common/io/inc/LFAcceptor.h @@ -48,7 +48,7 @@ namespace io { APRPool aprPool; LFProcessor processor; - + apr_socket_t* socket; const int max_connections_per_processor; const bool debug; const int connectionBacklog; @@ -60,7 +60,10 @@ namespace io { int connectionBacklog = 10, int worker_threads = 5, int max_connections_per_processor = 500); - virtual void bind(int port, SessionHandlerFactory* factory); + virtual int16_t bind(int16_t port); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); virtual ~LFAcceptor(); }; diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h index 6e67268906..25a3c8626c 100644 --- a/cpp/common/io/inc/LFProcessor.h +++ b/cpp/common/io/inc/LFProcessor.h @@ -104,7 +104,11 @@ namespace io { * Start processing. */ void start(); - + /** + * Is processing stopped? + */ + bool isStopped(); + ~LFProcessor(); }; diff --git a/cpp/common/io/src/Acceptor.cpp b/cpp/common/io/src/Acceptor.cpp new file mode 100644 index 0000000000..d1825c78fa --- /dev/null +++ b/cpp/common/io/src/Acceptor.cpp @@ -0,0 +1,21 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Acceptor.h" + +qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp index bf74260a55..4c55b9e2c8 100644 --- a/cpp/common/io/src/BlockingAPRAcceptor.cpp +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -33,14 +33,25 @@ BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); } -void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ +int16_t BlockingAPRAcceptor::bind(int16_t _port){ apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, apr_pool)); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + return getPort(); +} + +int16_t BlockingAPRAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void BlockingAPRAcceptor::run(SessionHandlerFactory* factory) +{ running = true; - std::cout << "Listening on port " << port << "..." << std::endl; + std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; apr_status_t status = apr_socket_accept(&client, socket, apr_pool); @@ -61,11 +72,20 @@ void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ } } } - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } + shutdown(); +} - CHECK_APR_SUCCESS(apr_socket_close(socket)); +void BlockingAPRAcceptor::shutdown() +{ + // TODO aconway 2006-10-12: Not thread safe. + if (running) + { + running = false; + apr_socket_close(socket); // Don't check, exception safety. + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } + } } BlockingAPRAcceptor::~BlockingAPRAcceptor(){ @@ -77,6 +97,5 @@ BlockingAPRAcceptor::~BlockingAPRAcceptor(){ void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ sessions.erase(find(sessions.begin(), sessions.end(), session)); - delete this; } diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp index bb5164f457..86f382afac 100644 --- a/cpp/common/io/src/LFAcceptor.cpp +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -29,18 +29,26 @@ LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : { } -void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ - apr_socket_t* socket; +int16_t LFAcceptor::bind(int16_t _port){ apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, aprPool.pool)); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + return getPort(); +} + +int16_t LFAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void LFAcceptor::run(SessionHandlerFactory* factory) { running = true; processor.start(); - - std::cout << "Listening on port " << port << "..." << std::endl; + std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); @@ -60,14 +68,20 @@ void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ } } } + shutdown(); +} - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); +void LFAcceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } } -LFAcceptor::~LFAcceptor(){ -} +LFAcceptor::~LFAcceptor(){} LFAcceptor::APRPool::APRPool(){ APRBase::increment(); diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp index 3ac66576e3..65d7451767 100644 --- a/cpp/common/io/src/LFProcessor.cpp +++ b/cpp/common/io/src/LFProcessor.cpp @@ -25,6 +25,9 @@ using namespace qpid::io; using namespace qpid::concurrent; using qpid::QpidError; +// TODO aconway 2006-10-12: stopped is read outside locks. +// + LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), timeout(_timeout), @@ -46,6 +49,7 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout LFProcessor::~LFProcessor(){ + if (!stopped) stop(); for(int i = 0; i < workerCount; i++){ delete workers[i]; } |