diff options
author | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-12 18:52:49 +0000 |
commit | c256b20602a42a3d33f36bb0e8d9692906d282a6 (patch) | |
tree | 0fba6619fbb98c3511785143ca30647c5e6a4469 /cpp | |
parent | 1e6a034ccd8e260e615195bf193aed7d37b928a8 (diff) | |
download | qpid-python-c256b20602a42a3d33f36bb0e8d9692906d282a6.tar.gz |
Converted broker to a class for use in tests, plugins etc.
qpid::Exception base class for all exceptions, inherits std::exception.
Require boost on all platforms: http://www.boost.org, 'yum boost' on fedora.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@463376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/Makefile | 9 | ||||
-rw-r--r-- | cpp/README | 2 | ||||
-rw-r--r-- | cpp/broker/Makefile | 18 | ||||
-rw-r--r-- | cpp/broker/inc/Broker.h | 86 | ||||
-rw-r--r-- | cpp/broker/inc/Configuration.h | 34 | ||||
-rw-r--r-- | cpp/broker/src/Broker.cpp | 94 | ||||
-rw-r--r-- | cpp/broker/src/Configuration.cpp | 14 | ||||
-rw-r--r-- | cpp/common/Makefile | 2 | ||||
-rw-r--r-- | cpp/common/error/inc/Exception.h | 46 | ||||
-rw-r--r-- | cpp/common/error/inc/QpidError.h | 25 | ||||
-rw-r--r-- | cpp/common/error/src/Exception.cpp (renamed from cpp/common/error/inc/QpidErrorIO.h) | 12 | ||||
-rw-r--r-- | cpp/common/error/src/QpidError.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/inc/Acceptor.h | 21 | ||||
-rw-r--r-- | cpp/common/io/inc/BlockingAPRAcceptor.h | 7 | ||||
-rw-r--r-- | cpp/common/io/inc/LFAcceptor.h | 7 | ||||
-rw-r--r-- | cpp/common/io/inc/LFProcessor.h | 6 | ||||
-rw-r--r-- | cpp/common/io/src/Acceptor.cpp | 21 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 35 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 32 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 4 | ||||
-rw-r--r-- | cpp/options.mk | 2 | ||||
-rw-r--r-- | cpp/qpidd/Makefile | 42 | ||||
-rw-r--r-- | cpp/qpidd/src/qpidd.cpp | 52 |
23 files changed, 465 insertions, 138 deletions
diff --git a/cpp/Makefile b/cpp/Makefile index 28f2212a7a..529ac1568f 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -21,6 +21,7 @@ # UNITTESTS=$(wildcard common/*/test/*.so broker/test/*.so) +SUBDIRS=common broker qpidd client .PHONY: all test unittest pythontest runtests clean doxygen @@ -37,14 +38,10 @@ runtests: $(MAKE) -k unittest pythontest all: - @$(MAKE) -C common all - @$(MAKE) -C broker all - @$(MAKE) -C client all + @for DIR in $(SUBDIRS) ; do $(MAKE) -C $$DIR all ; done clean: - @$(MAKE) -C common clean - @$(MAKE) -C broker clean - @$(MAKE) -C client clean + @for DIR in $(SUBDIRS) ; do $(MAKE) -C $$DIR clean ; done @$(MAKE) -C doxygen clean -@rm qpidd.log diff --git a/cpp/README b/cpp/README index 7d772dbf19..edd80a1fbc 100644 --- a/cpp/README +++ b/cpp/README @@ -7,6 +7,8 @@ Install in /usr/local/apr or update options.mk if installed elsewhere. CppUnit: http://cppunit.sourceforge.net +boost: http://www.boost.org + Optional: to generate source code documentation you need: * doxygen: http://sourceforge.net/projects/doxygen/ * graphviz - http://www.graphviz.org/ diff --git a/cpp/broker/Makefile b/cpp/broker/Makefile index afe93c455a..5c96589d95 100644 --- a/cpp/broker/Makefile +++ b/cpp/broker/Makefile @@ -15,31 +15,25 @@ # # -# Build broker library and executable. +# Build broker library. # QPID_HOME = ../.. include ${QPID_HOME}/cpp/options.mk - +TARGET=$(BROKER_LIB) SOURCES= $(wildcard src/*.cpp) OBJECTS= $(subst .cpp,.o,$(SOURCES)) -LIB_OBJECTS= $(subst src/Broker.o,,$(OBJECTS)) -EXE_OBJECTS= src/Broker.o - .PHONY: all clean -all: $(BROKER) +all: $(TARGET) @$(MAKE) -C test all clean: - -@rm -f ${OBJECTS} src/*.d ${BROKER} $(BROKER_LIB) + -@rm -f $(TARGET) ${OBJECTS} src/*.d @$(MAKE) -C test clean -$(BROKER): $(BROKER_LIB) $(EXE_OBJECTS) - ${CXX} -o $@ $(EXE_OBJECTS) $(LDFLAGS) -lapr-1 $(COMMON_LIB) $(BROKER_LIB) - -$(BROKER_LIB): $(LIB_OBJECTS) - $(CXX) -shared -o $@ $(LDFLAGS) $(LIB_OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR) +$(TARGET): $(OBJECTS) + $(CXX) -shared -o $@ $(LDFLAGS) $(OBJECTS) -lapr-1 $(COMMON_LIB) $(LIBDIR) -include $(SOURCES:.cpp=.d) diff --git a/cpp/broker/inc/Broker.h b/cpp/broker/inc/Broker.h new file mode 100644 index 0000000000..0cd2bd749e --- /dev/null +++ b/cpp/broker/inc/Broker.h @@ -0,0 +1,86 @@ +#ifndef _Broker_ +#define _Broker_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Acceptor.h" +#include "Configuration.h" +#include "Runnable.h" +#include "SessionHandlerFactoryImpl.h" +#include <boost/noncopyable.hpp> +#include <tr1/memory> + +namespace qpid { + namespace broker { + /** + * A broker instance. + */ + class Broker : public qpid::concurrent::Runnable, private boost::noncopyable { + Broker(const Configuration& config); // Private, use create() + std::auto_ptr<qpid::io::Acceptor> acceptor; + SessionHandlerFactoryImpl factory; + int16_t port; + bool isBound; + + public: + static const int16_t DEFAULT_PORT; + + virtual ~Broker(); + typedef std::tr1::shared_ptr<Broker> shared_ptr; + + /** + * Create a broker. + * @param port Port to listen on or 0 to pick a port dynamically. + */ + static shared_ptr create(int port = DEFAULT_PORT); + + /** + * Create a broker from a Configuration. + */ + static shared_ptr create(const Configuration& config); + + /** + * Bind to the listening port. + * @return The port number bound. + */ + virtual int16_t bind(); + + /** + * Return listening port. If called before bind this is + * the configured port. If called after it is the actual + * port, which will be different if the configured port is + * 0. + */ + virtual int16_t getPort() { return port; } + + /** + * Run the broker. Implements Runnable::run() so the broker + * can be run in a separate thread. + */ + virtual void run(); + + /** Shut down the broker */ + virtual void shutdown(); + }; + } +} + + + +#endif /*!_Broker_*/ diff --git a/cpp/broker/inc/Configuration.h b/cpp/broker/inc/Configuration.h index 5ec70a839b..aaabdd23a0 100644 --- a/cpp/broker/inc/Configuration.h +++ b/cpp/broker/inc/Configuration.h @@ -21,11 +21,12 @@ #include <cstdlib> #include <iostream> #include <vector> +#include "Exception.h" namespace qpid { namespace broker { class Configuration{ - class Option{ + class Option { const std::string flag; const std::string name; const std::string desc; @@ -56,6 +57,7 @@ namespace qpid { int getValue() const; virtual bool needsValue() const; virtual void setValue(const std::string& value); + virtual void setValue(int _value) { value = _value; } }; class StringOption : public Option{ @@ -82,6 +84,7 @@ namespace qpid { bool getValue() const; virtual bool needsValue() const; virtual void setValue(const std::string& value); + virtual void setValue(bool _value) { value = _value; } }; BoolOption trace; @@ -96,10 +99,9 @@ namespace qpid { std::vector<Option*> options; public: - class ParseException{ - public: - const std::string& error; - ParseException(const std::string& _error) : error(_error) {} + class ParseException : public Exception { + public: + ParseException(const std::string& msg) : Exception(msg) {} }; @@ -108,13 +110,21 @@ namespace qpid { void parse(int argc, char** argv); - bool isHelp(); - bool isTrace(); - int getPort(); - int getWorkerThreads(); - int getMaxConnections(); - int getConnectionBacklog(); - const std::string& getAcceptor(); + bool isHelp() const; + bool isTrace() const; + int getPort() const; + int getWorkerThreads() const; + int getMaxConnections() const; + int getConnectionBacklog() const; + std::string getAcceptor() const; + + void setHelp(bool b) { help.setValue(b); } + void setTrace(bool b) { trace.setValue(b); } + void setPort(int i) { port.setValue(i); } + void setWorkerThreads(int i) { workerThreads.setValue(i); } + void setMaxConnections(int i) { maxConnections.setValue(i); } + void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } + void setAcceptor(const std::string& val) { acceptor.setValue(val); } void usage(); }; diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp index 99cf8d6ce4..b6472d1729 100644 --- a/cpp/broker/src/Broker.cpp +++ b/cpp/broker/src/Broker.cpp @@ -17,76 +17,68 @@ */ #include <iostream> #include <memory> -#include "apr_signal.h" - +#include "Broker.h" #include "Acceptor.h" #include "Configuration.h" #include "QpidError.h" #include "SessionHandlerFactoryImpl.h" - -//optional includes: -#ifdef _USE_APR_IO_ - #include "BlockingAPRAcceptor.h" #include "LFAcceptor.h" -#endif using namespace qpid::broker; using namespace qpid::io; -void handle_signal(int signal); +namespace { + Acceptor* createAcceptor(const Configuration& config){ + const string type(config.getAcceptor()); + if("blocking" == type){ + std::cout << "Using blocking acceptor " << std::endl; + return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); + }else if("non-blocking" == type){ + std::cout << "Using non-blocking acceptor " << std::endl; + return new LFAcceptor(config.isTrace(), + config.getConnectionBacklog(), + config.getWorkerThreads(), + config.getMaxConnections()); + } + throw Configuration::ParseException("Unrecognised acceptor: " + type); + } +} -Acceptor* createAcceptor(Configuration& config); +Broker::Broker(const Configuration& config) : + acceptor(createAcceptor(config)), + port(config.getPort()), + isBound(false) {} -int main(int argc, char** argv) +Broker::shared_ptr Broker::create(int port) { - SessionHandlerFactoryImpl factory; Configuration config; - try{ + config.setPort(port); + return create(config); +} - config.parse(argc, argv); - if(config.isHelp()){ - config.usage(); - }else{ -#ifdef _USE_APR_IO_ - apr_signal(SIGINT, handle_signal); -#endif - try{ - std::auto_ptr<Acceptor> acceptor(createAcceptor(config)); - try{ - acceptor->bind(config.getPort(), &factory); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } - } - }catch(Configuration::ParseException error){ - std::cout << "Error: " << error.error << std::endl; +Broker::shared_ptr Broker::create(const Configuration& config) { + return Broker::shared_ptr(new Broker(config)); +} + +int16_t Broker::bind() +{ + if (!isBound) { + port = acceptor->bind(port); } - - return 1; + return port; } -Acceptor* createAcceptor(Configuration& config){ - const string type(config.getAcceptor()); -#ifdef _USE_APR_IO_ - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } -#endif - throw Configuration::ParseException("Unrecognised acceptor: " + type); +void Broker::run() { + bind(); + acceptor->run(&factory); } -void handle_signal(int /*signal*/){ - std::cout << "Shutting down..." << std::endl; +void Broker::shutdown() { + acceptor->shutdown(); } + +Broker::~Broker() { } + +const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp index 11c2d374fe..6e7df7889e 100644 --- a/cpp/broker/src/Configuration.cpp +++ b/cpp/broker/src/Configuration.cpp @@ -61,31 +61,31 @@ void Configuration::usage(){ } } -bool Configuration::isHelp(){ +bool Configuration::isHelp() const { return help.getValue(); } -bool Configuration::isTrace(){ +bool Configuration::isTrace() const { return trace.getValue(); } -int Configuration::getPort(){ +int Configuration::getPort() const { return port.getValue(); } -int Configuration::getWorkerThreads(){ +int Configuration::getWorkerThreads() const { return workerThreads.getValue(); } -int Configuration::getMaxConnections(){ +int Configuration::getMaxConnections() const { return maxConnections.getValue(); } -int Configuration::getConnectionBacklog(){ +int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } -const string& Configuration::getAcceptor(){ +string Configuration::getAcceptor() const { return acceptor.getValue(); } diff --git a/cpp/common/Makefile b/cpp/common/Makefile index 98e4025390..766d5fbcde 100644 --- a/cpp/common/Makefile +++ b/cpp/common/Makefile @@ -21,7 +21,7 @@ QPID_HOME=../.. include $(QPID_HOME)/cpp/options.mk TARGET = $(LIB_DIR)/libqpid_common.so.1.0 -SOURCES = $(wildcard */src/*.cpp framing/generated/*.cpp) +SOURCES = $(wildcard */src/*.cpp framing/generated/*.cpp error/*.cpp) OBJECTS = $(SOURCES:.cpp=.o) DEPS = $(SOURCES:.cpp=.d) diff --git a/cpp/common/error/inc/Exception.h b/cpp/common/error/inc/Exception.h new file mode 100644 index 0000000000..709538c851 --- /dev/null +++ b/cpp/common/error/inc/Exception.h @@ -0,0 +1,46 @@ +#ifndef _Exception_ +#define _Exception_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <exception> +#include <string> + +namespace qpid +{ + /** + * Exception base class for all Qpid exceptions. + */ + class Exception : public std::exception + { + protected: + std::string whatStr; + + public: + Exception() throw() {} + Exception(const std::string& str) throw() : whatStr(str) {} + Exception(const char* str) throw() : whatStr(str) {} + virtual ~Exception() throw(); + + const char* what() const throw() { return whatStr.c_str(); } + virtual std::string toString() const throw() { return whatStr; } + }; +} + +#endif /*!_Exception_*/ diff --git a/cpp/common/error/inc/QpidError.h b/cpp/common/error/inc/QpidError.h index a739b506c7..3310c71a88 100644 --- a/cpp/common/error/inc/QpidError.h +++ b/cpp/common/error/inc/QpidError.h @@ -1,3 +1,5 @@ +#ifndef __QpidError__ +#define __QpidError__ /* * * Copyright (c) 2006 The Apache Software Foundation @@ -16,23 +18,20 @@ * */ #include <string> - -#ifndef __QpidError__ -#define __QpidError__ +#include "Exception.h" namespace qpid { -class QpidError{ -public: - const int code; - const std::string msg; - const std::string file; - const int line; + class QpidError : public Exception { + public: + const int code; + const std::string msg; + const std::string file; + const int line; - inline QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) : code(_code), msg(_msg), file(_file), line(_line) {} - ~QpidError(){} - -}; + QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw(); + ~QpidError() throw(); + }; #define THROW_QPID_ERROR(A, B) throw QpidError(A, B, __FILE__, __LINE__) diff --git a/cpp/common/error/inc/QpidErrorIO.h b/cpp/common/error/src/Exception.cpp index 4f9bd3ce26..69e5f135f2 100644 --- a/cpp/common/error/inc/QpidErrorIO.h +++ b/cpp/common/error/src/Exception.cpp @@ -16,14 +16,6 @@ * */ -#include "QpidError.h" -#include <ostream> - -std::ostream& operator <<(std::ostream& out, const QpidError& error) -{ - out << "Qpid Error [" << error.code << "] " << error.msg - << " (" << error.file << ":" << error.line << ")"; - return out; -} - +#include "Exception.h" +qpid::Exception::~Exception() throw() {} diff --git a/cpp/common/error/src/QpidError.cpp b/cpp/common/error/src/QpidError.cpp new file mode 100644 index 0000000000..c8049ab3cb --- /dev/null +++ b/cpp/common/error/src/QpidError.cpp @@ -0,0 +1,32 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "QpidError.h" +#include <sstream> + +using namespace qpid; + +QpidError::QpidError(int _code, const std::string& _msg, const std::string& _file, int _line) throw() + : code(_code), msg(_msg), file(_file), line(_line) +{ + std::ostringstream os; + os << "QpidError(" << code << ") " << msg << " (" << file << ":" << line << ")"; + whatStr = os.str(); +} + +QpidError::~QpidError() throw() {} diff --git a/cpp/common/io/inc/Acceptor.h b/cpp/common/io/inc/Acceptor.h index 5c690c546f..d7313b84db 100644 --- a/cpp/common/io/inc/Acceptor.h +++ b/cpp/common/io/inc/Acceptor.h @@ -20,15 +20,30 @@ #include "SessionHandlerFactory.h" - namespace qpid { namespace io { class Acceptor { public: - virtual void bind(int port, SessionHandlerFactory* factory) = 0; - virtual ~Acceptor(){} + /** + * Bind to port. + * @param port Port to bind to, 0 to bind to dynamically chosen port. + * @return The local bound port. + */ + virtual int16_t bind(int16_t port) = 0; + + /** + * Run the acceptor. + */ + virtual void run(SessionHandlerFactory* factory) = 0; + + /** + * Shut down the acceptor. + */ + virtual void shutdown() = 0; + + virtual ~Acceptor(); }; } diff --git a/cpp/common/io/inc/BlockingAPRAcceptor.h b/cpp/common/io/inc/BlockingAPRAcceptor.h index b77371b02e..bd069ed8db 100644 --- a/cpp/common/io/inc/BlockingAPRAcceptor.h +++ b/cpp/common/io/inc/BlockingAPRAcceptor.h @@ -47,10 +47,13 @@ namespace io { apr_socket_t* socket; const int connectionBacklog; volatile bool running; - + public: BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); - virtual void bind(int port, SessionHandlerFactory* factory); + virtual int16_t bind(int16_t port); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); virtual ~BlockingAPRAcceptor(); void closed(BlockingAPRSessionContext* session); }; diff --git a/cpp/common/io/inc/LFAcceptor.h b/cpp/common/io/inc/LFAcceptor.h index 314f811827..9a40eed222 100644 --- a/cpp/common/io/inc/LFAcceptor.h +++ b/cpp/common/io/inc/LFAcceptor.h @@ -48,7 +48,7 @@ namespace io { APRPool aprPool; LFProcessor processor; - + apr_socket_t* socket; const int max_connections_per_processor; const bool debug; const int connectionBacklog; @@ -60,7 +60,10 @@ namespace io { int connectionBacklog = 10, int worker_threads = 5, int max_connections_per_processor = 500); - virtual void bind(int port, SessionHandlerFactory* factory); + virtual int16_t bind(int16_t port); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); virtual ~LFAcceptor(); }; diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h index 6e67268906..25a3c8626c 100644 --- a/cpp/common/io/inc/LFProcessor.h +++ b/cpp/common/io/inc/LFProcessor.h @@ -104,7 +104,11 @@ namespace io { * Start processing. */ void start(); - + /** + * Is processing stopped? + */ + bool isStopped(); + ~LFProcessor(); }; diff --git a/cpp/common/io/src/Acceptor.cpp b/cpp/common/io/src/Acceptor.cpp new file mode 100644 index 0000000000..d1825c78fa --- /dev/null +++ b/cpp/common/io/src/Acceptor.cpp @@ -0,0 +1,21 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Acceptor.h" + +qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp index bf74260a55..4c55b9e2c8 100644 --- a/cpp/common/io/src/BlockingAPRAcceptor.cpp +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -33,14 +33,25 @@ BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); } -void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ +int16_t BlockingAPRAcceptor::bind(int16_t _port){ apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, apr_pool)); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + return getPort(); +} + +int16_t BlockingAPRAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void BlockingAPRAcceptor::run(SessionHandlerFactory* factory) +{ running = true; - std::cout << "Listening on port " << port << "..." << std::endl; + std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; apr_status_t status = apr_socket_accept(&client, socket, apr_pool); @@ -61,11 +72,20 @@ void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ } } } - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } + shutdown(); +} - CHECK_APR_SUCCESS(apr_socket_close(socket)); +void BlockingAPRAcceptor::shutdown() +{ + // TODO aconway 2006-10-12: Not thread safe. + if (running) + { + running = false; + apr_socket_close(socket); // Don't check, exception safety. + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } + } } BlockingAPRAcceptor::~BlockingAPRAcceptor(){ @@ -77,6 +97,5 @@ BlockingAPRAcceptor::~BlockingAPRAcceptor(){ void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ sessions.erase(find(sessions.begin(), sessions.end(), session)); - delete this; } diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp index bb5164f457..86f382afac 100644 --- a/cpp/common/io/src/LFAcceptor.cpp +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -29,18 +29,26 @@ LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : { } -void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ - apr_socket_t* socket; +int16_t LFAcceptor::bind(int16_t _port){ apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, aprPool.pool)); + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + return getPort(); +} + +int16_t LFAcceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void LFAcceptor::run(SessionHandlerFactory* factory) { running = true; processor.start(); - - std::cout << "Listening on port " << port << "..." << std::endl; + std::cout << "Listening on port " << getPort() << "..." << std::endl; while(running){ apr_socket_t* client; apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); @@ -60,14 +68,20 @@ void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ } } } + shutdown(); +} - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); +void LFAcceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } } -LFAcceptor::~LFAcceptor(){ -} +LFAcceptor::~LFAcceptor(){} LFAcceptor::APRPool::APRPool(){ APRBase::increment(); diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp index 3ac66576e3..65d7451767 100644 --- a/cpp/common/io/src/LFProcessor.cpp +++ b/cpp/common/io/src/LFProcessor.cpp @@ -25,6 +25,9 @@ using namespace qpid::io; using namespace qpid::concurrent; using qpid::QpidError; +// TODO aconway 2006-10-12: stopped is read outside locks. +// + LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), timeout(_timeout), @@ -46,6 +49,7 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout LFProcessor::~LFProcessor(){ + if (!stopped) stop(); for(int i = 0; i < workerCount; i++){ delete workers[i]; } diff --git a/cpp/options.mk b/cpp/options.mk index f5d7d950f8..093017a1ed 100644 --- a/cpp/options.mk +++ b/cpp/options.mk @@ -30,7 +30,7 @@ OPTIMIZE = DEFINES = -D _USE_APR_IO_ APR_INCLUDES=-I ${APR_HOME}/include/apr-1/ COMMON_INCLUDES = -I ${COMMON_HOME}/framing/inc -I ${COMMON_HOME}/framing/generated -I ${COMMON_HOME}/concurrent/inc -I ${COMMON_HOME}/io/inc -I ${COMMON_HOME}/error/inc -I $(COMMON_HOME)/utils/inc ${APR_INCLUDES} -SRC_INCLUDES = $(COMMON_INCLUDES) -I inc +SRC_INCLUDES = $(COMMON_INCLUDES) -I inc TEST_INCLUDES = $(COMMON_INCLUDES) -I ../inc -I $(QPID_CPP_HOME)/test/include INCLUDES=$(SRC_INCLUDES) # Default to src diff --git a/cpp/qpidd/Makefile b/cpp/qpidd/Makefile new file mode 100644 index 0000000000..f2dca4df3b --- /dev/null +++ b/cpp/qpidd/Makefile @@ -0,0 +1,42 @@ +# +# Copyright (c) 2006 The Apache Software Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Build qpidd broker daemon executable. +# + +QPID_HOME = ../.. +include $(QPID_HOME)/cpp/options.mk +TARGET=$(BROKER) +SOURCES= $(wildcard src/*.cpp) +OBJECTS= $(subst .cpp,.o,$(SOURCES)) + + +.PHONY: all clean test + +all: $(TARGET) + +test: + @$(MAKE) -C test all + +clean: + -@rm -f $(OBJECTS) src/*.d $(TARGET) + @$(MAKE) -C test clean + +$(TARGET): $(OBJECTS) + $(CXX) -o $@ $(OBJECTS) $(LDFLAGS) -lapr-1 $(COMMON_LIB) $(BROKER_LIB) + +-include $(SOURCES:.cpp=.d) diff --git a/cpp/qpidd/src/qpidd.cpp b/cpp/qpidd/src/qpidd.cpp new file mode 100644 index 0000000000..51be80d51d --- /dev/null +++ b/cpp/qpidd/src/qpidd.cpp @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "../../broker/inc/../../broker/inc/Broker.h" +#include "../../broker/inc/Broker.h" +#include "../../broker/inc/Configuration.h" +#include "../../broker/inc/SessionHandlerFactoryImpl.h" +#include "Acceptor.h" +#include "QpidError.h" +#include "apr_signal.h" +#include <iostream> +#include <memory> + +using namespace qpid::broker; +using namespace qpid::io; + +void handle_signal(int /*signal*/){ + std::cout << "Shutting down..." << std::endl; +} + +int main(int argc, char** argv) +{ + Configuration config; + try { + config.parse(argc, argv); + if(config.isHelp()){ + config.usage(); + }else{ + apr_signal(SIGINT, handle_signal); + Broker::shared_ptr broker = Broker::create(config); + broker->run(); + } + return 0; + } catch(std::exception e) { + std::cout << e.what() << std::endl; + } + return 1; +} |