diff options
author | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
commit | c256b20602a42a3d33f36bb0e8d9692906d282a6 (patch) | |
tree | 0fba6619fbb98c3511785143ca30647c5e6a4469 /cpp/common | |
parent | 1e6a034ccd8e260e615195bf193aed7d37b928a8 (diff) | |
download | qpid-python-c256b20602a42a3d33f36bb0e8d9692906d282a6.tar.gz |
Converted broker to a class for use in tests, plugins etc.
qpid::Exception base class for all exceptions, inherits std::exception.
Require boost on all platforms: http://www.boost.org, 'yum boost' on fedora.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@463376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common')
-rw-r--r-- | cpp/common/Makefile | 2 | ||||
-rw-r--r-- | cpp/common/error/inc/Exception.h | 46 | ||||
-rw-r--r-- | cpp/common/error/inc/QpidError.h | 25 | ||||
-rw-r--r-- | cpp/common/error/src/Exception.cpp (renamed from cpp/common/error/inc/QpidErrorIO.h) | 12 | ||||
-rw-r--r-- | cpp/common/error/src/QpidError.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/inc/Acceptor.h | 21 | ||||
-rw-r--r-- | cpp/common/io/inc/BlockingAPRAcceptor.h | 7 | ||||
-rw-r--r-- | cpp/common/io/inc/LFAcceptor.h | 7 | ||||
-rw-r--r-- | cpp/common/io/inc/LFProcessor.h | 6 | ||||
-rw-r--r-- | cpp/common/io/src/Acceptor.cpp | 21 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 35 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 4 |
13 files changed, 201 insertions, 49 deletions
diff --git a/cpp/common/Makefile b/cpp/common/Makefile index 98e4025390..766d5fbcde 100644 --- a/cpp/common/Makefile +++ b/cpp/common/Makefile @@ -21,7 +21,7 @@ QPID_HOME=../.. include $(QPID_HOME)/cpp/options.mk TARGET = $(LIB_DIR)/libqpid_common.so.1.0 -SOURCES = $(wildcard */src/*.cpp framing/generated/*.cpp) +SOURCES = $(wildcard */src/*.cpp framing/generated/*.cpp error/*.cpp) OBJECTS = $(SOURCES:.cpp=.o) DEPS = $(SOURCES:.cpp=.d) diff --git a/cpp/common/error/inc/Exception.h b/cpp/common/error/inc/Exception.h new file mode 100644 index 0000000000..709538c851 --- /dev/null +++ b/cpp/common/error/inc/Exception.h @@ -0,0 +1,46 @@ +#ifndef _Exception_ +#define _Exception_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <exception> +#include <string> + +namespace qpid +{ + /** + * Exception base class for all Qpid exceptions. + */ + class Exception : public std::exception + { + protected: + std::string whatStr; + + public: + Exception() throw() {} + Exception(const std::string& str) throw() : whatStr(str) {} + Exception(const char* str) throw() : whatStr(str) {} + virtual ~Exception() throw(); + + const char* what() const throw() { return whatStr.c_str(); } + virtual std::string toString() const throw() { return whatStr; } + }; +} + +#endif /*!_Exception_*/ diff --git a/cpp/common/error/inc/QpidError.h b/cpp/common/error/inc/QpidError.h index a739b506c7..3310c71a88 100644 --- a/cpp/common/error/inc/QpidError.h +++ b/cpp/common/error/inc/QpidError.h @@ -1,3 +1,5 @@ +#ifndef __QpidError__ +#define __QpidError__ /* * * Copyright (c) 2006 The Apache Software Foundation @@ -16,23 +18,20 @@ * */ #include <string> - -#ifndef __QpidError__ -#define __QpidError__ +#include "Exception.h" namespace qpid { -class QpidError{ -public: - const int code; - const std::string msg; - const std::string file; - const int line; + class QpidError : public Exception { + public: + const int code; + const std::string msg; + const std::string file; + const int line; - inline QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) : code(_code), msg(_msg), file(_file), line(_line) {} - ~QpidError(){} - -}; + QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw(); + ~QpidError() throw(); + }; #define THROW_QPID_ERROR(A, B) throw QpidError(A, B, __FILE__, __LINE__) diff --git a/cpp/common/error/inc/QpidErrorIO.h b/cpp/common/error/src/Exception.cpp index 4f9bd3ce26..69e5f135f2 100644 --- a/cpp/common/error/inc/QpidErrorIO.h +++ b/cpp/common/error/src/Exception.cpp @@ -16,14 +16,6 @@ * */ -#include "QpidError.h" -#include <ostream> - -std::ostream& operator <<(std::ostream& out, const QpidError& error) -{ - out << "Qpid Error [" << error.code << "] " << error.msg - << " (" << error.file << ":" << error.line << ")"; - return out; -} - +#include "Exception.h" +qpid::Exception::~Exception() throw() {} diff --git a/cpp/common/error/src/QpidError.cpp b/cpp/common/error/src/QpidError.cpp new file mode 100644 index 0000000000..c8049ab3cb --- /dev/null +++ b/cpp/common/error/src/QpidError.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "QpidError.h" +#include <sstream> + +using namespace qpid; + +QpidError::QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw() + : code(_code), msg(_msg), file(_file), line(_line) +{ + std::ostringstream os; + os << "QpidError(" << code << ") " << msg << " (" << file << ":" << line << ")"; + whatStr = os.str(); +} + +QpidError::~QpidError() throw() {} diff --git a/cpp/common/io/inc/Acceptor.h b/cpp/common/io/inc/Acceptor.h index 5c690c546f..d7313b84db 100644 --- a/cpp/common/io/inc/Acceptor.h +++ b/cpp/common/io/inc/Acceptor.h @@ -20,15 +20,30 @@ #include "SessionHandlerFactory.h" - namespace qpid { namespace io { class Acceptor { public: - virtual void bind(int port, SessionHandlerFactory* factory) = 0; - virtual ~Acceptor(){} + /** + * Bind to port. + * @param port Port to bind to, 0 to bind to dynamically chosen port. + * @return The local bound port. + */ + virtual int16_t bind(int16_t port) = 0; + + /** + * Run the acceptor. + */ + virtual void run(SessionHandlerFactory* factory) = 0; + + /** + * Shut down the acceptor. + */ + virtual void shutdown() = 0; + + virtual ~Acceptor(); }; } diff --git a/cpp/common/io/inc/BlockingAPRAcceptor.h b/cpp/common/io/inc/BlockingAPRAcceptor.h index b77371b02e..bd069ed8db 100644 --- a/cpp/common/io/inc/BlockingAPRAcceptor.h +++ b/cpp/common/io/inc/BlockingAPRAcceptor.h @@ -47,10 +47,13 @@ namespace io { apr_socket_t* socket; const int connectionBacklog; volatile bool running; - + public: BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); - virtual void bind(int port, SessionHandlerFactory* factory); + virtual int16_t bind(int16_t port); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); virtual ~BlockingAPRAcceptor(); void closed(BlockingAPRSessionContext* session); }; diff --git a/cpp/common/io/inc/LFAcceptor.h b/cpp/common/io/inc/LFAcceptor.h index 314f811827..9a40eed222 100644 --- a/cpp/common/io/inc/LFAcceptor.h +++ b/cpp/common/io/inc/LFAcceptor.h @@ -48,7 +48,7 @@ namespace io { APRPool aprPool; LFProcessor processor; - + apr_socket_t* socket; const int max_connections_per_processor; const bool debug; const int connectionBacklog; @@ -60,7 +60,10 @@ namespace io { int connectionBacklog = 10, int worker_threads = 5, int max_connections_per_processor = 500); - virtual void bind(int port, SessionHandlerFactory* factory); + virtual int16_t bind(int16_t port); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); virtual ~LFAcceptor(); }; diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h index 6e67268906..25a3c8626c 100644 --- a/cpp/common/io/inc/LFProcessor.h +++ b/cpp/common/io/inc/LFProcessor.h @@ -104,7 +104,11 @@ namespace io { * Start processing. */ void start(); - + /** + * Is processing stopped? + */ + bool isStopped(); + ~LFProcessor(); }; diff --git a/cpp/common/io/src/Acceptor.cpp b/cpp/common/io/src/Acceptor.cpp new file mode 100644 index 0000000000..d1825c78fa --- /dev/null +++ b/cpp/common/io/src/Acceptor.cpp @@ -0,0 +1,21 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Acceptor.h" + +qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp index bf74260a55..4c55b9e2c8 100644 --- a/cpp/common/io/src/BlockingAPRAcceptor.cpp +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -33,14 +33,25 @@ BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); } -void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ +int16_t BlockingAPRAcceptor::bind(int16_t _port){ apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, apr_pool)); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + return getPort(); +} + +int16_t BlockingAPRAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void BlockingAPRAcceptor::run(SessionHandlerFactory* factory) +{ running = true; - std::cout << "Listening on port " << port << "..." << std::endl; + std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; apr_status_t status = apr_socket_accept(&client, socket, apr_pool); @@ -61,11 +72,20 @@ void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ } } } - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } + shutdown(); +} - CHECK_APR_SUCCESS(apr_socket_close(socket)); +void BlockingAPRAcceptor::shutdown() +{ + // TODO aconway 2006-10-12: Not thread safe. + if (running) + { + running = false; + apr_socket_close(socket); // Don't check, exception safety. + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } + } } BlockingAPRAcceptor::~BlockingAPRAcceptor(){ @@ -77,6 +97,5 @@ BlockingAPRAcceptor::~BlockingAPRAcceptor(){ void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ sessions.erase(find(sessions.begin(), sessions.end(), session)); - delete this; } diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp index bb5164f457..86f382afac 100644 --- a/cpp/common/io/src/LFAcceptor.cpp +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -29,18 +29,26 @@ LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : { } -void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ - apr_socket_t* socket; +int16_t LFAcceptor::bind(int16_t _port){ apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, aprPool.pool)); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + return getPort(); +} + +int16_t LFAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void LFAcceptor::run(SessionHandlerFactory* factory) { running = true; processor.start(); - - std::cout << "Listening on port " << port << "..." << std::endl; + std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); @@ -60,14 +68,20 @@ void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ } } } + shutdown(); +} - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); +void LFAcceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } } -LFAcceptor::~LFAcceptor(){ -} +LFAcceptor::~LFAcceptor(){} LFAcceptor::APRPool::APRPool(){ APRBase::increment(); diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp index 3ac66576e3..65d7451767 100644 --- a/cpp/common/io/src/LFProcessor.cpp +++ b/cpp/common/io/src/LFProcessor.cpp @@ -25,6 +25,9 @@ using namespace qpid::io; using namespace qpid::concurrent; using qpid::QpidError; +// TODO aconway 2006-10-12: stopped is read outside locks. +// + LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), timeout(_timeout), @@ -46,6 +49,7 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout LFProcessor::~LFProcessor(){ + if (!stopped) stop(); for(int i = 0; i < workerCount; i++){ delete workers[i]; } |