diff options
Diffstat (limited to 'cpp/broker/src')
-rw-r--r-- | cpp/broker/src/Broker.cpp | 94 | ||||
-rw-r--r-- | cpp/broker/src/Configuration.cpp | 14 |
2 files changed, 50 insertions, 58 deletions
diff --git a/cpp/broker/src/Broker.cpp b/cpp/broker/src/Broker.cpp index 99cf8d6ce4..b6472d1729 100644 --- a/cpp/broker/src/Broker.cpp +++ b/cpp/broker/src/Broker.cpp @@ -17,76 +17,68 @@ */ #include <iostream> #include <memory> -#include "apr_signal.h" - +#include "Broker.h" #include "Acceptor.h" #include "Configuration.h" #include "QpidError.h" #include "SessionHandlerFactoryImpl.h" - -//optional includes: -#ifdef _USE_APR_IO_ - #include "BlockingAPRAcceptor.h" #include "LFAcceptor.h" -#endif using namespace qpid::broker; using namespace qpid::io; -void handle_signal(int signal); +namespace { + Acceptor* createAcceptor(const Configuration& config){ + const string type(config.getAcceptor()); + if("blocking" == type){ + std::cout << "Using blocking acceptor " << std::endl; + return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); + }else if("non-blocking" == type){ + std::cout << "Using non-blocking acceptor " << std::endl; + return new LFAcceptor(config.isTrace(), + config.getConnectionBacklog(), + config.getWorkerThreads(), + config.getMaxConnections()); + } + throw Configuration::ParseException("Unrecognised acceptor: " + type); + } +} -Acceptor* createAcceptor(Configuration& config); +Broker::Broker(const Configuration& config) : + acceptor(createAcceptor(config)), + port(config.getPort()), + isBound(false) {} -int main(int argc, char** argv) +Broker::shared_ptr Broker::create(int port) { - SessionHandlerFactoryImpl factory; Configuration config; - try{ + config.setPort(port); + return create(config); +} - config.parse(argc, argv); - if(config.isHelp()){ - config.usage(); - }else{ -#ifdef _USE_APR_IO_ - apr_signal(SIGINT, handle_signal); -#endif - try{ - std::auto_ptr<Acceptor> acceptor(createAcceptor(config)); - try{ - acceptor->bind(config.getPort(), &factory); - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } - } - }catch(Configuration::ParseException error){ - std::cout << "Error: " << error.error << std::endl; +Broker::shared_ptr Broker::create(const Configuration& config) { + return Broker::shared_ptr(new Broker(config)); +} + +int16_t Broker::bind() +{ + if (!isBound) { + port = acceptor->bind(port); } - - return 1; + return port; } -Acceptor* createAcceptor(Configuration& config){ - const string type(config.getAcceptor()); -#ifdef _USE_APR_IO_ - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } -#endif - throw Configuration::ParseException("Unrecognised acceptor: " + type); +void Broker::run() { + bind(); + acceptor->run(&factory); } -void handle_signal(int /*signal*/){ - std::cout << "Shutting down..." << std::endl; +void Broker::shutdown() { + acceptor->shutdown(); } + +Broker::~Broker() { } + +const int16_t Broker::DEFAULT_PORT(5672); diff --git a/cpp/broker/src/Configuration.cpp b/cpp/broker/src/Configuration.cpp index 11c2d374fe..6e7df7889e 100644 --- a/cpp/broker/src/Configuration.cpp +++ b/cpp/broker/src/Configuration.cpp @@ -61,31 +61,31 @@ void Configuration::usage(){ } } -bool Configuration::isHelp(){ +bool Configuration::isHelp() const { return help.getValue(); } -bool Configuration::isTrace(){ +bool Configuration::isTrace() const { return trace.getValue(); } -int Configuration::getPort(){ +int Configuration::getPort() const { return port.getValue(); } -int Configuration::getWorkerThreads(){ +int Configuration::getWorkerThreads() const { return workerThreads.getValue(); } -int Configuration::getMaxConnections(){ +int Configuration::getMaxConnections() const { return maxConnections.getValue(); } -int Configuration::getConnectionBacklog(){ +int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } -const string& Configuration::getAcceptor(){ +string Configuration::getAcceptor() const { return acceptor.getValue(); } |