diff options
author | Alan Conway <aconway@apache.org> | 2006-10-31 19:53:55 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-31 19:53:55 +0000 |
commit | 9094d2b10ecadd66fa3b22169183e7573cc79629 (patch) | |
tree | bf3915f72be2a5f09932b800d2fa4309fb3ad64e /cpp | |
parent | 0487ea40bc6568765cdec75a36273eeb26fae854 (diff) | |
download | qpid-python-9094d2b10ecadd66fa3b22169183e7573cc79629.tar.gz |
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
62 files changed, 470 insertions, 1504 deletions
diff --git a/cpp/Makefile b/cpp/Makefile index fe5e40bb26..e81e89a6a7 100644 --- a/cpp/Makefile +++ b/cpp/Makefile @@ -36,6 +36,7 @@ STYLESHEETS := $(XSL:%=$(CURDIR)/etc/stylesheets/%) TRANSFORM := java -jar $(CURDIR)/tools/saxon8.jar -o results.out $(SPEC) generate: $(GENDIR)/timestamp $(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC) + rm -rf $(GENDIR) mkdir -p $(GENDIR)/qpid/framing ( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp $(shell find $(GENDIR) -name *.cpp -o -name *.h): $(GENDIR)/timestamp @@ -106,7 +107,7 @@ CLIENT_TEST_EXE := $(CLIENT_TEST_SRC:test/client/%.cpp=$(TESTDIR)/%) all-nogen: $(CLIENT_TEST_EXE) ## #include dependencies --include $(shell find src test -name '*.d') dummy-avoid-warning-if-none +-include $(shell find $(GENDIR) $(OBJDIR) -name '*.d') dummy-avoid-warning-if-none ## Clean up diff --git a/cpp/src/qpid/SharedObject.h b/cpp/src/qpid/SharedObject.h new file mode 100644 index 0000000000..15f333173a --- /dev/null +++ b/cpp/src/qpid/SharedObject.h @@ -0,0 +1,52 @@ +#ifndef _SharedObject_ +#define _SharedObject_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include <boost/noncopyable.hpp> + +namespace qpid { + /** + * Template to enforce shared object conventions. + * Shared object classes should inherit : public qpid::SharedObject + * That ensures Foo: + * - has typedef boost::shared_ptr<T> SharedPtr + * - has virtual destructor + * - is boost::noncopyable (no default copy or assign) + * - has a protected default constructor. + * + * Shared objects should not have public constructors. + * Make constructors protected and provide public statc create() + * functions that return a SharedPtr. + */ + template <class T> + class SharedObject : private boost::noncopyable + { + public: + typedef boost::shared_ptr<T> SharedPtr; + + virtual ~SharedObject() {}; + + protected: + SharedObject() {} + }; +} + +#endif /*!_SharedObject_*/ diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h index 9faa4aa4c4..509ac3bec1 100644 --- a/cpp/src/qpid/broker/AutoDelete.h +++ b/cpp/src/qpid/broker/AutoDelete.h @@ -20,17 +20,17 @@ #include <iostream> #include <queue> -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/ThreadFactoryImpl.h" +#include "qpid/concurrent/ThreadFactory.h" namespace qpid { namespace broker{ class AutoDelete : private virtual qpid::concurrent::Runnable{ - qpid::concurrent::ThreadFactoryImpl factory; - qpid::concurrent::MonitorImpl lock; - qpid::concurrent::MonitorImpl monitor; + qpid::concurrent::ThreadFactory factory; + qpid::concurrent::Monitor lock; + qpid::concurrent::Monitor monitor; std::queue<Queue::shared_ptr> queues; QueueRegistry* const registry; const u_int32_t period; diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fe859b240b..7b5f9e3e32 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -18,60 +18,30 @@ #include <iostream> #include <memory> #include "qpid/broker/Broker.h" -#include "qpid/io/Acceptor.h" -#include "qpid/broker/Configuration.h" -#include "qpid/QpidError.h" -#include "qpid/broker/SessionHandlerFactoryImpl.h" -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/io/LFAcceptor.h" using namespace qpid::broker; using namespace qpid::io; -namespace { - Acceptor* createAcceptor(const Configuration& config){ - const string type(config.getAcceptor()); - if("blocking" == type){ - std::cout << "Using blocking acceptor " << std::endl; - return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog()); - }else if("non-blocking" == type){ - std::cout << "Using non-blocking acceptor " << std::endl; - return new LFAcceptor(config.isTrace(), - config.getConnectionBacklog(), - config.getWorkerThreads(), - config.getMaxConnections()); - } - throw Configuration::ParseException("Unrecognised acceptor: " + type); - } -} - Broker::Broker(const Configuration& config) : - acceptor(createAcceptor(config)), - port(config.getPort()), - isBound(false) {} + acceptor(new Acceptor(config.getPort(), + config.getConnectionBacklog(), + config.getWorkerThreads())) +{ } + -Broker::shared_ptr Broker::create(int port) +Broker::SharedPtr Broker::create(int16_t port) { Configuration config; config.setPort(port); return create(config); } -Broker::shared_ptr Broker::create(const Configuration& config) { - return Broker::shared_ptr(new Broker(config)); +Broker::SharedPtr Broker::create(const Configuration& config) { + return Broker::SharedPtr(new Broker(config)); } -int16_t Broker::bind() -{ - if (!isBound) { - port = acceptor->bind(port); - } - return port; -} - void Broker::run() { - bind(); acceptor->run(&factory); } diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 8581093910..dd87c47909 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -19,47 +19,35 @@ * */ -#include "qpid/io/Acceptor.h" #include "qpid/broker/Configuration.h" -#include "qpid/concurrent/Runnable.h" #include "qpid/broker/SessionHandlerFactoryImpl.h" -#include <boost/noncopyable.hpp> -#include <boost/shared_ptr.hpp> +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/Acceptor.h" +#include <qpid/SharedObject.h> namespace qpid { namespace broker { /** * A broker instance. */ - class Broker : public qpid::concurrent::Runnable, private boost::noncopyable { - Broker(const Configuration& config); // Private, use create() - std::auto_ptr<qpid::io::Acceptor> acceptor; - SessionHandlerFactoryImpl factory; - int16_t port; - bool isBound; - + class Broker : public qpid::concurrent::Runnable, + public qpid::SharedObject<Broker> + { public: static const int16_t DEFAULT_PORT; virtual ~Broker(); - typedef boost::shared_ptr<Broker> shared_ptr; /** * Create a broker. * @param port Port to listen on or 0 to pick a port dynamically. */ - static shared_ptr create(int port = DEFAULT_PORT); + static SharedPtr create(int16_t port = DEFAULT_PORT); /** - * Create a broker from a Configuration. + * Create a broker using a Configuration. */ - static shared_ptr create(const Configuration& config); - - /** - * Bind to the listening port. - * @return The port number bound. - */ - virtual int16_t bind(); + static SharedPtr create(const Configuration& config); /** * Return listening port. If called before bind this is @@ -67,7 +55,7 @@ namespace qpid { * port, which will be different if the configured port is * 0. */ - virtual int16_t getPort() { return port; } + virtual int16_t getPort() const { return acceptor->getPort(); } /** * Run the broker. Implements Runnable::run() so the broker @@ -77,6 +65,11 @@ namespace qpid { /** Shut down the broker */ virtual void shutdown(); + + private: + Broker(const Configuration& config); + qpid::io::Acceptor::SharedPtr acceptor; + SessionHandlerFactoryImpl factory; }; } } diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index f5aa0e45ed..13bd4cd450 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -37,7 +37,7 @@ #include "qpid/broker/TxAck.h" #include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxPublish.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" @@ -77,7 +77,7 @@ namespace qpid { u_int32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - qpid::concurrent::MonitorImpl deliveryLock; + qpid::concurrent::Monitor deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; TransactionalStore* store; diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp index 2dcefd878d..550b283d62 100644 --- a/cpp/src/qpid/broker/Configuration.cpp +++ b/cpp/src/qpid/broker/Configuration.cpp @@ -24,10 +24,9 @@ using namespace std; Configuration::Configuration() : trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false), port('p', "port", "Sets the port to listen on (default=5672)", 5672), - workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5), - maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500), + workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5), + maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500), connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10), - acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"), help("help", "Prints usage information", false) { options.push_back(&trace); @@ -35,7 +34,6 @@ Configuration::Configuration() : options.push_back(&workerThreads); options.push_back(&maxConnections); options.push_back(&connectionBacklog); - options.push_back(&acceptor); options.push_back(&help); } @@ -85,10 +83,6 @@ int Configuration::getConnectionBacklog() const { return connectionBacklog.getValue(); } -string Configuration::getAcceptor() const { - return acceptor.getValue(); -} - Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) : flag(string("-") + _flag), name("--" +_name), desc(_desc) {} diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h index 61ecc89ed9..e1e7c40947 100644 --- a/cpp/src/qpid/broker/Configuration.h +++ b/cpp/src/qpid/broker/Configuration.h @@ -92,7 +92,6 @@ namespace qpid { IntOption workerThreads; IntOption maxConnections; IntOption connectionBacklog; - StringOption acceptor; BoolOption help; typedef std::vector<Option*>::iterator op_iterator; @@ -116,7 +115,6 @@ namespace qpid { int getWorkerThreads() const; int getMaxConnections() const; int getConnectionBacklog() const; - std::string getAcceptor() const; void setHelp(bool b) { help.setValue(b); } void setTrace(bool b) { trace.setValue(b); } @@ -124,7 +122,6 @@ namespace qpid { void setWorkerThreads(int i) { workerThreads.setValue(i); } void setMaxConnections(int i) { maxConnections.setValue(i); } void setConnectionBacklog(int i) { connectionBacklog.setValue(i); } - void setAcceptor(const std::string& val) { acceptor.setValue(val); } void usage(); }; diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 5c5f78d90a..2c3143cd3c 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -23,14 +23,14 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ std::map<string, std::vector<Queue::shared_ptr> > bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index fca5462e72..c574a97e14 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -20,7 +20,7 @@ #include <map> #include "qpid/broker/Exchange.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace broker { @@ -29,7 +29,7 @@ namespace broker { class ExchangeRegistry{ typedef std::map<string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException); void destroy(const string& name); diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 334f1ccdcc..83fcdb9b34 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -23,7 +23,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -31,7 +31,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { std::vector<Queue::shared_ptr> bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index 2e2403361e..cf699ac455 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -22,7 +22,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange { typedef std::vector<Binding> Bindings; Bindings bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 962c74864e..e96cc65b95 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -15,7 +15,7 @@ * limitations under the License. * */ -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Message.h" #include <iostream> diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 67fb6764be..88dad7aaf9 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -17,7 +17,7 @@ */ #include "qpid/broker/Queue.h" #include "qpid/broker/MessageStore.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include <iostream> using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 93570f59cc..f954e48c20 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -27,7 +27,7 @@ #include "qpid/broker/ConnectionToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace broker { @@ -56,7 +56,7 @@ namespace qpid { bool queueing; bool dispatching; int next; - mutable qpid::concurrent::MonitorImpl lock; + mutable qpid::concurrent::Monitor lock; apr_time_t lastUsed; Consumer* exclusive; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 973201fe64..949c194bbe 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -16,7 +16,7 @@ * */ #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/SessionHandlerImpl.h" #include <sstream> #include <assert.h> diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 6f80291192..4f9e4b882a 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -19,7 +19,7 @@ #define _QueueRegistry_ #include <map> -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -77,7 +77,7 @@ class QueueRegistry{ private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; int counter; }; diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index 19ea732fbc..cb773b9a56 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -23,7 +23,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -71,7 +71,7 @@ class TopicPattern : public Tokens class TopicExchange : public virtual Exchange{ typedef std::map<TopicPattern, Queue::vector> BindingMap; BindingMap bindings; - qpid::concurrent::MonitorImpl lock; + qpid::concurrent::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 4d994f0510..4579b6126d 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -16,8 +16,8 @@ * */ #include "qpid/client/Channel.h" -#include "qpid/concurrent/MonitorImpl.h" -#include "qpid/concurrent/ThreadFactoryImpl.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" @@ -36,9 +36,9 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) : prefetch(_prefetch), transactional(_transactional) { - threadFactory = new ThreadFactoryImpl(); - dispatchMonitor = new MonitorImpl(); - retrievalMonitor = new MonitorImpl(); + threadFactory = new ThreadFactory(); + dispatchMonitor = new Monitor(); + retrievalMonitor = new Monitor(); } Channel::~Channel(){ diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index b20df92e9b..acd4488813 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -17,7 +17,7 @@ */ #include "qpid/client/Connection.h" #include "qpid/client/Channel.h" -#include "qpid/io/ConnectorImpl.h" +#include "qpid/io/Connector.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" #include <iostream> @@ -30,7 +30,7 @@ using namespace qpid::concurrent; u_int16_t Connection::channelIdCounter; Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){ - connector = new ConnectorImpl(debug, _max_frame_size); + connector = new Connector(debug, _max_frame_size); } Connection::~Connection(){ diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp index ec20dd1a10..fcbc76f625 100644 --- a/cpp/src/qpid/client/ResponseHandler.cpp +++ b/cpp/src/qpid/client/ResponseHandler.cpp @@ -16,11 +16,11 @@ * */ #include "qpid/client/ResponseHandler.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/QpidError.h" qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ - monitor = new qpid::concurrent::MonitorImpl(); + monitor = new qpid::concurrent::Monitor(); } qpid::client::ResponseHandler::~ResponseHandler(){ diff --git a/cpp/src/qpid/concurrent/APRMonitor.h b/cpp/src/qpid/concurrent/APRMonitor.h deleted file mode 100644 index a396beab50..0000000000 --- a/cpp/src/qpid/concurrent/APRMonitor.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRMonitor_ -#define _APRMonitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/concurrent/Monitor.h" - -namespace qpid { -namespace concurrent { - - class APRMonitor : public virtual Monitor - { - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; - - public: - APRMonitor(); - virtual ~APRMonitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); - }; -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.h b/cpp/src/qpid/concurrent/APRThread.h deleted file mode 100644 index 6328765a06..0000000000 --- a/cpp/src/qpid/concurrent/APRThread.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThread_ -#define _APRThread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace concurrent { - - class APRThread : public Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - - public: - APRThread(apr_pool_t* pool, Runnable* runnable); - virtual ~APRThread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.h b/cpp/src/qpid/concurrent/APRThreadFactory.h deleted file mode 100644 index 40e96fc2d1..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadFactory_ -#define _APRThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/concurrent/APRThread.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class APRThreadFactory : public virtual ThreadFactory - { - apr_pool_t* pool; - public: - APRThreadFactory(); - virtual ~APRThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.h b/cpp/src/qpid/concurrent/APRThreadPool.h deleted file mode 100644 index cab5bcc9ce..0000000000 --- a/cpp/src/qpid/concurrent/APRThreadPool.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRThreadPool_ -#define _APRThreadPool_ - -#include <queue> -#include <vector> -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" -#include "qpid/concurrent/Runnable.h" - -namespace qpid { -namespace concurrent { - - class APRThreadPool : public virtual ThreadPool - { - class Worker : public virtual Runnable{ - APRThreadPool* pool; - public: - inline Worker(APRThreadPool* _pool) : pool(_pool){} - inline virtual void run(){ - while(pool->running){ - pool->runTask(); - } - } - }; - const bool deleteFactory; - const int size; - ThreadFactory* factory; - APRMonitor lock; - std::vector<Thread*> threads; - std::queue<Runnable*> tasks; - Worker* worker; - volatile bool running; - - void runTask(); - public: - APRThreadPool(int size); - APRThreadPool(int size, ThreadFactory* factory); - virtual void start(); - virtual void stop(); - virtual void addTask(Runnable* task); - virtual ~APRThreadPool(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRMonitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp index cc5eda800f..ae68cf8751 100644 --- a/cpp/src/qpid/concurrent/APRMonitor.cpp +++ b/cpp/src/qpid/concurrent/Monitor.cpp @@ -16,45 +16,45 @@ * */ #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRMonitor.h" +#include "qpid/concurrent/Monitor.h" #include <iostream> -qpid::concurrent::APRMonitor::APRMonitor(){ +qpid::concurrent::Monitor::Monitor(){ APRBase::increment(); CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); } -qpid::concurrent::APRMonitor::~APRMonitor(){ +qpid::concurrent::Monitor::~Monitor(){ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); apr_pool_destroy(pool); APRBase::decrement(); } -void qpid::concurrent::APRMonitor::wait(){ +void qpid::concurrent::Monitor::wait(){ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); } -void qpid::concurrent::APRMonitor::wait(u_int64_t time){ +void qpid::concurrent::Monitor::wait(u_int64_t time){ apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); } -void qpid::concurrent::APRMonitor::notify(){ +void qpid::concurrent::Monitor::notify(){ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); } -void qpid::concurrent::APRMonitor::notifyAll(){ +void qpid::concurrent::Monitor::notifyAll(){ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); } -void qpid::concurrent::APRMonitor::acquire(){ +void qpid::concurrent::Monitor::acquire(){ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); } -void qpid::concurrent::APRMonitor::release(){ +void qpid::concurrent::Monitor::release(){ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); } diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h index 42e88c0a48..a2777cb2f1 100644 --- a/cpp/src/qpid/concurrent/Monitor.h +++ b/cpp/src/qpid/concurrent/Monitor.h @@ -18,42 +18,39 @@ #ifndef _Monitor_ #define _Monitor_ -#include "qpid/framing/amqp_types.h" +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace concurrent { class Monitor { + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + apr_thread_cond_t* condition; + public: - virtual ~Monitor(){} - virtual void wait() = 0; - virtual void wait(u_int64_t time) = 0; - virtual void notify() = 0; - virtual void notifyAll() = 0; - virtual void acquire() = 0; - virtual void release() = 0; + Monitor(); + virtual ~Monitor(); + virtual void wait(); + virtual void wait(u_int64_t time); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); }; -/** - * Scoped locker for a monitor. - */ class Locker { public: - Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); } - ~Locker() { lock.release(); } - + Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } + ~Locker() { monitor.release(); } private: - Monitor& lock; - - // private and unimplemented to prevent copying - Locker(const Locker&); - void operator=(const Locker&); + Monitor& monitor; }; - -} -} +}} #endif diff --git a/cpp/src/qpid/concurrent/MonitorImpl.h b/cpp/src/qpid/concurrent/MonitorImpl.h deleted file mode 100644 index 258ad140b3..0000000000 --- a/cpp/src/qpid/concurrent/MonitorImpl.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - - -#ifndef _MonitorImpl_ -#define _MonitorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRMonitor.h" -#else /* use POSIX Monitor */ -#include "qpid/concurrent/LMonitor.h" -#endif - - -namespace qpid { -namespace concurrent { - -#ifdef _USE_APR_IO_ - class MonitorImpl : public virtual APRMonitor - { - - public: - MonitorImpl() : APRMonitor(){}; - virtual ~MonitorImpl(){}; - - }; -#else - class MonitorImpl : public virtual LMonitor - { - - public: - MonitorImpl() : LMonitor(){}; - virtual ~MonitorImpl(){}; - - }; -#endif - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThread.cpp b/cpp/src/qpid/concurrent/Thread.cpp index d4d073cac6..9bbc2f8131 100644 --- a/cpp/src/qpid/concurrent/APRThread.cpp +++ b/cpp/src/qpid/concurrent/Thread.cpp @@ -16,7 +16,7 @@ * */ #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThread.h" +#include "qpid/concurrent/Thread.h" #include "apr-1/apr_portable.h" using namespace qpid::concurrent; @@ -27,24 +27,24 @@ void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ return NULL; } -APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} -APRThread::~APRThread(){ +Thread::~Thread(){ } -void APRThread::start(){ +void Thread::start(){ CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); } -void APRThread::join(){ +void Thread::join(){ apr_status_t status; if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); } -void APRThread::interrupt(){ +void Thread::interrupt(){ if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); } -unsigned int qpid::concurrent::APRThread::currentThread(){ +unsigned int qpid::concurrent::Thread::currentThread(){ return apr_os_thread_current(); } diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h index 6bd2a379ce..d18bc153bf 100644 --- a/cpp/src/qpid/concurrent/Thread.h +++ b/cpp/src/qpid/concurrent/Thread.h @@ -18,16 +18,27 @@ #ifndef _Thread_ #define _Thread_ +#include "apr-1/apr_thread_proc.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/concurrent/Thread.h" + namespace qpid { namespace concurrent { class Thread { + const Runnable* runnable; + apr_pool_t* pool; + apr_thread_t* runner; + public: - virtual ~Thread(){} - virtual void start() = 0; - virtual void join() = 0; - virtual void interrupt() = 0; + Thread(apr_pool_t* pool, Runnable* runnable); + virtual ~Thread(); + virtual void start(); + virtual void join(); + virtual void interrupt(); + static unsigned int currentThread(); }; } diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp index 1c99a3da33..b20f9f2b04 100644 --- a/cpp/src/qpid/concurrent/APRThreadFactory.cpp +++ b/cpp/src/qpid/concurrent/ThreadFactory.cpp @@ -16,20 +16,20 @@ * */ #include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/concurrent/ThreadFactory.h" using namespace qpid::concurrent; -APRThreadFactory::APRThreadFactory(){ +ThreadFactory::ThreadFactory(){ APRBase::increment(); CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); } -APRThreadFactory::~APRThreadFactory(){ +ThreadFactory::~ThreadFactory(){ apr_pool_destroy(pool); APRBase::decrement(); } -Thread* APRThreadFactory::create(Runnable* runnable){ - return new APRThread(pool, runnable); +Thread* ThreadFactory::create(Runnable* runnable){ + return new Thread(pool, runnable); } diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h index 60c8ad2556..572419cae6 100644 --- a/cpp/src/qpid/concurrent/ThreadFactory.h +++ b/cpp/src/qpid/concurrent/ThreadFactory.h @@ -18,7 +18,11 @@ #ifndef _ThreadFactory_ #define _ThreadFactory_ +#include "apr-1/apr_thread_proc.h" + +#include "qpid/concurrent/Thread.h" #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/concurrent/Runnable.h" namespace qpid { @@ -26,9 +30,11 @@ namespace concurrent { class ThreadFactory { + apr_pool_t* pool; public: - virtual ~ThreadFactory(){} - virtual Thread* create(Runnable* runnable) = 0; + ThreadFactory(); + virtual ~ThreadFactory(); + virtual Thread* create(Runnable* runnable); }; } diff --git a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h b/cpp/src/qpid/concurrent/ThreadFactoryImpl.h deleted file mode 100644 index 352b77ac21..0000000000 --- a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactoryImpl_ -#define _ThreadFactoryImpl_ - - -#ifdef _USE_APR_IO_ -#include "qpid/concurrent/APRThreadFactory.h" -#else -#include "qpid/concurrent/LThreadFactory.h" -#endif - - -namespace qpid { -namespace concurrent { - - -#ifdef _USE_APR_IO_ - class ThreadFactoryImpl : public virtual APRThreadFactory - { - public: - ThreadFactoryImpl(): APRThreadFactory() {}; - virtual ~ThreadFactoryImpl() {}; - }; -#else - class ThreadFactoryImpl : public virtual LThreadFactory - { - public: - ThreadFactoryImpl(): LThreadFactory() {}; - virtual ~ThreadFactoryImpl() {}; - }; -#endif -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/APRThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp index 3222c71b0c..5da19745a7 100644 --- a/cpp/src/qpid/concurrent/APRThreadPool.cpp +++ b/cpp/src/qpid/concurrent/ThreadPool.cpp @@ -15,33 +15,33 @@ * limitations under the License. * */ -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" #include "qpid/QpidError.h" #include <iostream> using namespace qpid::concurrent; -APRThreadPool::APRThreadPool(int _size) : deleteFactory(true), size(_size), factory(new APRThreadFactory()), running(false){ +ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){ worker = new Worker(this); } -APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ +ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){ worker = new Worker(this); } -APRThreadPool::~APRThreadPool(){ +ThreadPool::~ThreadPool(){ if(deleteFactory) delete factory; } -void APRThreadPool::addTask(Runnable* task){ +void ThreadPool::addTask(Runnable* task){ lock.acquire(); tasks.push(task); lock.notifyAll(); lock.release(); } -void APRThreadPool::runTask(){ +void ThreadPool::runTask(){ lock.acquire(); while(tasks.empty()){ lock.wait(); @@ -56,7 +56,7 @@ void APRThreadPool::runTask(){ } } -void APRThreadPool::start(){ +void ThreadPool::start(){ if(!running){ running = true; for(int i = 0; i < size; i++){ @@ -67,7 +67,7 @@ void APRThreadPool::start(){ } } -void APRThreadPool::stop(){ +void ThreadPool::stop(){ if(!running){ running = false; lock.acquire(); diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h index 925faa76de..11f0cc364f 100644 --- a/cpp/src/qpid/concurrent/ThreadPool.h +++ b/cpp/src/qpid/concurrent/ThreadPool.h @@ -18,7 +18,12 @@ #ifndef _ThreadPool_ #define _ThreadPool_ +#include <queue> +#include <vector> +#include "qpid/concurrent/Monitor.h" #include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" #include "qpid/concurrent/Runnable.h" namespace qpid { @@ -26,11 +31,33 @@ namespace concurrent { class ThreadPool { + class Worker : public virtual Runnable{ + ThreadPool* pool; + public: + inline Worker(ThreadPool* _pool) : pool(_pool){} + inline virtual void run(){ + while(pool->running){ + pool->runTask(); + } + } + }; + const bool deleteFactory; + const int size; + ThreadFactory* factory; + Monitor lock; + std::vector<Thread*> threads; + std::queue<Runnable*> tasks; + Worker* worker; + volatile bool running; + + void runTask(); public: - virtual void start() = 0; - virtual void stop() = 0; - virtual void addTask(Runnable* runnable) = 0; - virtual ~ThreadPool(){} + ThreadPool(int size); + ThreadPool(int size, ThreadFactory* factory); + virtual void start(); + virtual void stop(); + virtual void addTask(Runnable* task); + virtual ~ThreadPool(); }; } diff --git a/cpp/src/qpid/framing/InputHandler.cpp b/cpp/src/qpid/framing/InputHandler.cpp deleted file mode 100644 index accf68421a..0000000000 --- a/cpp/src/qpid/framing/InputHandler.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/InputHandler.h" - -qpid::framing::InputHandler::~InputHandler() {} diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h index e2ad545993..8f56d176b8 100644 --- a/cpp/src/qpid/framing/InputHandler.h +++ b/cpp/src/qpid/framing/InputHandler.h @@ -1,3 +1,5 @@ +#ifndef _InputHandler_ +#define _InputHandler_ /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,24 +17,19 @@ * limitations under the License. * */ -#include <string> - -#ifndef _InputHandler_ -#define _InputHandler_ +#include <qpid/SharedObject.h> #include "qpid/framing/AMQFrame.h" namespace qpid { namespace framing { - class InputHandler{ - public: - virtual ~InputHandler(); - virtual void received(AMQFrame* frame) = 0; - }; +class InputHandler : public qpid::SharedObject<InputHandler> { + public: + virtual void received(AMQFrame* frame) = 0; +}; -} -} +}} #endif diff --git a/cpp/src/qpid/framing/OutputHandler.cpp b/cpp/src/qpid/framing/OutputHandler.cpp deleted file mode 100644 index 22de39b82a..0000000000 --- a/cpp/src/qpid/framing/OutputHandler.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/framing/OutputHandler.h" - -qpid::framing::OutputHandler::~OutputHandler() {} diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h index ed38a321e5..16fb7e8afb 100644 --- a/cpp/src/qpid/framing/OutputHandler.h +++ b/cpp/src/qpid/framing/OutputHandler.h @@ -1,3 +1,6 @@ +#ifndef _OutputHandler_ +#define _OutputHandler_ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,24 +18,18 @@ * limitations under the License. * */ -#include <string> - -#ifndef _OutputHandler_ -#define _OutputHandler_ - +#include <qpid/SharedObject.h> #include "qpid/framing/AMQFrame.h" namespace qpid { namespace framing { - class OutputHandler{ - public: - virtual ~OutputHandler(); - virtual void send(AMQFrame* frame) = 0; - }; +class OutputHandler : public qpid::SharedObject<OutputHandler> { + public: + virtual void send(AMQFrame* frame) = 0; +}; -} -} +}} #endif diff --git a/cpp/src/qpid/io/APRConnector.h b/cpp/src/qpid/io/APRConnector.h deleted file mode 100644 index c835f30056..0000000000 --- a/cpp/src/qpid/io/APRConnector.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRConnector_ -#define _APRConnector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" -#include "qpid/concurrent/APRMonitor.h" - -namespace qpid { -namespace io { - - class APRConnector : public virtual qpid::framing::OutputHandler, - public virtual Connector, - private virtual qpid::concurrent::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - apr_time_t lastIn; - apr_time_t lastOut; - apr_interval_time_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::concurrent::APRMonitor* writeLock; - qpid::concurrent::ThreadFactory* threadFactory; - qpid::concurrent::Thread* receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - APRConnector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~APRConnector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/concurrent/LMonitor.h b/cpp/src/qpid/io/APRPool.cpp index 70e99b9807..edd434f16c 100644 --- a/cpp/src/qpid/concurrent/LMonitor.h +++ b/cpp/src/qpid/io/APRPool.cpp @@ -15,30 +15,25 @@ * limitations under the License. * */ -#ifndef _LMonitor_ -#define _LMonitor_ -/* Native Linux Monitor - Based of Kernel patch 19/20 */ +#include "APRPool.h" +#include "qpid/concurrent/APRBase.h" +#include <boost/pool/singleton_pool.hpp> -#include "qpid/concurrent/Monitor.h" +using namespace qpid::io; +using namespace qpid::concurrent; -namespace qpid { -namespace concurrent { - - class LMonitor : public virtual Monitor - { - - public: - LMonitor(); - virtual ~LMonitor(); - virtual void wait(); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); - }; +APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); } + +APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); } +apr_pool_t* APRPool::get() { + return boost::details::pool::singleton_default<APRPool>::instance().pool; +} -#endif diff --git a/cpp/src/qpid/concurrent/LThreadFactory.h b/cpp/src/qpid/io/APRPool.h index 4a573d1bd1..063eedf1ee 100644 --- a/cpp/src/qpid/concurrent/LThreadFactory.h +++ b/cpp/src/qpid/io/APRPool.h @@ -1,3 +1,6 @@ +#ifndef _APRPool_ +#define _APRPool_ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,23 +18,30 @@ * limitations under the License. * */ -#ifndef _LAPRThreadFactory_ -#define _LAPRThreadFactory_ - +#include <boost/noncopyable.hpp> +#include <apr-1/apr_pools.h> namespace qpid { -namespace concurrent { +namespace io { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + /** Get singleton instance */ + static apr_pool_t* get(); + + private: + apr_pool_t* pool; +}; + +}} + - class LThreadFactory - { - public: - LThreadFactory(); - virtual ~LThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; -} -} -#endif +#endif /*!_APRPool_*/ diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp index 6b76bd4da2..f95d9448cf 100644 --- a/cpp/src/qpid/io/Acceptor.cpp +++ b/cpp/src/qpid/io/Acceptor.cpp @@ -15,7 +15,64 @@ * limitations under the License. * */ - #include "qpid/io/Acceptor.h" +#include "qpid/concurrent/APRBase.h" +#include "APRPool.h" + +using namespace qpid::concurrent; +using namespace qpid::io; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : + port(port_), + processor(APRPool::get(), threads, 1000, 5000000) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + shutdown(); +} + +void Acceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } +} + -qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h index a7f7ad66f0..bc189f7f6e 100644 --- a/cpp/src/qpid/io/Acceptor.h +++ b/cpp/src/qpid/io/Acceptor.h @@ -15,36 +15,43 @@ * limitations under the License. * */ -#ifndef _Acceptor_ -#define _Acceptor_ - +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/io/Acceptor.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/concurrent/ThreadPool.h" +#include "qpid/io/LFProcessor.h" +#include "qpid/io/LFSessionContext.h" +#include "qpid/concurrent/Runnable.h" +#include "qpid/io/SessionContext.h" #include "qpid/io/SessionHandlerFactory.h" +#include "qpid/concurrent/Thread.h" +#include <qpid/SharedObject.h> namespace qpid { namespace io { - class Acceptor - { - public: - /** - * Bind to port. - * @param port Port to bind to, 0 to bind to dynamically chosen port. - * @return The local bound port. - */ - virtual int16_t bind(int16_t port) = 0; - - /** - * Run the acceptor. - */ - virtual void run(SessionHandlerFactory* factory) = 0; - - /** - * Shut down the acceptor. - */ - virtual void shutdown() = 0; - - virtual ~Acceptor(); - }; +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject<Acceptor> +{ + public: + Acceptor(int16_t port, int backlog, int threads); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + int16_t port; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; +}; } } diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp deleted file mode 100644 index 0e1fc535a2..0000000000 --- a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/concurrent/APRThreadFactory.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - -BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : - debug(_debug), - threadFactory(new APRThreadFactory()), - connectionBacklog(c) -{ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); -} - -int16_t BlockingAPRAcceptor::bind(int16_t _port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); - return getPort(); -} - -int16_t BlockingAPRAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void BlockingAPRAcceptor::run(SessionHandlerFactory* factory) -{ - running = true; - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, apr_pool); - if(status == APR_SUCCESS){ - //configure socket: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - - BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug); - session->init(factory->create(session)); - sessions.push_back(session); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void BlockingAPRAcceptor::shutdown() -{ - // TODO aconway 2006-10-12: Not thread safe. - if (running) - { - running = false; - apr_socket_close(socket); // Don't check, exception safety. - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } - } -} - -BlockingAPRAcceptor::~BlockingAPRAcceptor(){ - delete threadFactory; - apr_pool_destroy(apr_pool); - APRBase::decrement(); -} - - -void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ - sessions.erase(find(sessions.begin(), sessions.end(), session)); -} - diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.h b/cpp/src/qpid/io/BlockingAPRAcceptor.h deleted file mode 100644 index a3042605aa..0000000000 --- a/cpp/src/qpid/io/BlockingAPRAcceptor.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _BlockingAPRAcceptor_ -#define _BlockingAPRAcceptor_ - -#include <vector> -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/io/BlockingAPRSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/concurrent/ThreadPool.h" - -namespace qpid { -namespace io { - - class BlockingAPRAcceptor : public virtual Acceptor - { - typedef std::vector<BlockingAPRSessionContext*>::iterator iterator; - - const bool debug; - apr_pool_t* apr_pool; - qpid::concurrent::ThreadFactory* threadFactory; - std::vector<BlockingAPRSessionContext*> sessions; - apr_socket_t* socket; - const int connectionBacklog; - volatile bool running; - - public: - BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); - virtual int16_t bind(int16_t port); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - virtual ~BlockingAPRAcceptor(); - void closed(BlockingAPRSessionContext* session); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp deleted file mode 100644 index 88e6b6b0fc..0000000000 --- a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp +++ /dev/null @@ -1,177 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <assert.h> -#include <iostream> -#include "qpid/io/BlockingAPRSessionContext.h" -#include "qpid/io/BlockingAPRAcceptor.h" -#include "qpid/concurrent/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - - -BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, - ThreadFactory* factory, - BlockingAPRAcceptor* _acceptor, - bool _debug) - : socket(_socket), - debug(_debug), - handler(0), - acceptor(_acceptor), - inbuf(65536), - outbuf(65536), - closed(false){ - - reader = new Reader(this); - writer = new Writer(this); - - rThread = factory->create(reader); - wThread = factory->create(writer); -} - -BlockingAPRSessionContext::~BlockingAPRSessionContext(){ - delete reader; - delete writer; - - delete rThread; - delete wThread; - - delete handler; -} - -void BlockingAPRSessionContext::read(){ - try{ - bool initiated(false); - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out, check closed on loop - }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ - closed = true; - }else{ - inbuf.move(bytes); - inbuf.flip(); - - if(!initiated){ - ProtocolInitiation* protocolInit = new ProtocolInitiation(); - if(protocolInit->decode(inbuf)){ - handler->initiated(protocolInit); - if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; - initiated = true; - } - }else{ - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; - handler->received(&frame); - } - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - - //close socket - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void BlockingAPRSessionContext::write(){ - while(!closed){ - //get next frame - outlock.acquire(); - while(outframes.empty() && !closed){ - outlock.wait(); - } - if(!closed){ - AMQFrame* frame = outframes.front(); - outframes.pop(); - outlock.release(); - - //encode - frame->encode(outbuf); - if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; - delete frame; - outbuf.flip(); - - //write from outbuf to socket - char* data = outbuf.start(); - const int available = outbuf.available(); - int written = 0; - apr_size_t bytes = available; - while(available > written){ - apr_socket_send(socket, data + written, &bytes); - written += bytes; - bytes = available - written; - } - outbuf.clear(); - }else{ - outlock.release(); - } - } -} - -void BlockingAPRSessionContext::send(AMQFrame* frame){ - if(!closed){ - outlock.acquire(); - bool was_empty(outframes.empty()); - outframes.push(frame); - if(was_empty){ - outlock.notify(); - } - outlock.release(); - }else{ - std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; - } -} - -void BlockingAPRSessionContext::init(SessionHandler* _handler){ - handler = _handler; - rThread->start(); - wThread->start(); -} - -void BlockingAPRSessionContext::close(){ - closed = true; - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; - handler->closed(); - acceptor->closed(this); - delete this; -} - -void BlockingAPRSessionContext::shutdown(){ - closed = true; - outlock.acquire(); - outlock.notify(); - outlock.release(); - - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - rThread->join(); - handler->closed(); - delete this; -} diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.h b/cpp/src/qpid/io/BlockingAPRSessionContext.h deleted file mode 100644 index c06142ace5..0000000000 --- a/cpp/src/qpid/io/BlockingAPRSessionContext.h +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _BlockingAPRSessionContext_ -#define _BlockingAPRSessionContext_ - -#include <queue> -#include <vector> - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/framing/Buffer.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" - -namespace qpid { -namespace io { - - class BlockingAPRAcceptor; - - class BlockingAPRSessionContext : public virtual SessionContext - { - class Reader : public virtual qpid::concurrent::Runnable{ - BlockingAPRSessionContext* parent; - public: - inline Reader(BlockingAPRSessionContext* p) : parent(p){} - inline virtual void run(){ parent->read(); } - inline virtual ~Reader(){} - }; - - class Writer : public virtual qpid::concurrent::Runnable{ - BlockingAPRSessionContext* parent; - public: - inline Writer(BlockingAPRSessionContext* p) : parent(p){} - inline virtual void run(){ parent->write(); } - inline virtual ~Writer(){} - }; - - apr_socket_t* socket; - const bool debug; - SessionHandler* handler; - BlockingAPRAcceptor* acceptor; - std::queue<qpid::framing::AMQFrame*> outframes; - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - qpid::concurrent::APRMonitor outlock; - Reader* reader; - Writer* writer; - qpid::concurrent::Thread* rThread; - qpid::concurrent::Thread* wThread; - - volatile bool closed; - - void read(); - void write(); - public: - BlockingAPRSessionContext(apr_socket_t* socket, - qpid::concurrent::ThreadFactory* factory, - BlockingAPRAcceptor* acceptor, - bool debug = false); - ~BlockingAPRSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void shutdown(); - void init(SessionHandler* handler); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/APRConnector.cpp b/cpp/src/qpid/io/Connector.cpp index 91cf01c842..ca487deb86 100644 --- a/cpp/src/qpid/io/APRConnector.cpp +++ b/cpp/src/qpid/io/Connector.cpp @@ -17,8 +17,8 @@ */ #include <iostream> #include "qpid/concurrent/APRBase.h" -#include "qpid/io/APRConnector.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/QpidError.h" using namespace qpid::io; @@ -26,7 +26,7 @@ using namespace qpid::concurrent; using namespace qpid::framing; using qpid::QpidError; -APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : +Connector::Connector(bool _debug, u_int32_t buffer_size) : debug(_debug), receive_buffer_size(buffer_size), send_buffer_size(buffer_size), @@ -44,11 +44,11 @@ APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - threadFactory = new APRThreadFactory(); - writeLock = new APRMonitor(); + threadFactory = new ThreadFactory(); + writeLock = new Monitor(); } -APRConnector::~APRConnector(){ +Connector::~Connector(){ delete receiver; delete writeLock; delete threadFactory; @@ -57,7 +57,7 @@ APRConnector::~APRConnector(){ APRBase::decrement(); } -void APRConnector::connect(const std::string& host, int port){ +void Connector::connect(const std::string& host, int port){ apr_sockaddr_t* address; CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); @@ -67,36 +67,36 @@ void APRConnector::connect(const std::string& host, int port){ receiver->start(); } -void APRConnector::init(ProtocolInitiation* header){ +void Connector::init(ProtocolInitiation* header){ writeBlock(header); delete header; } -void APRConnector::close(){ +void Connector::close(){ closed = true; CHECK_APR_SUCCESS(apr_socket_close(socket)); receiver->join(); } -void APRConnector::setInputHandler(InputHandler* handler){ +void Connector::setInputHandler(InputHandler* handler){ input = handler; } -void APRConnector::setShutdownHandler(ShutdownHandler* handler){ +void Connector::setShutdownHandler(ShutdownHandler* handler){ shutdownHandler = handler; } -OutputHandler* APRConnector::getOutputHandler(){ +OutputHandler* Connector::getOutputHandler(){ return this; } -void APRConnector::send(AMQFrame* frame){ +void Connector::send(AMQFrame* frame){ writeBlock(frame); if(debug) std::cout << "SENT: " << *frame << std::endl; delete frame; } -void APRConnector::writeBlock(AMQDataBlock* data){ +void Connector::writeBlock(AMQDataBlock* data){ writeLock->acquire(); data->encode(outbuf); @@ -107,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){ writeLock->release(); } -void APRConnector::writeToSocket(char* data, size_t available){ +void Connector::writeToSocket(char* data, size_t available){ apr_size_t bytes(available); apr_size_t written(0); while(written < available && !closed){ @@ -124,7 +124,7 @@ void APRConnector::writeToSocket(char* data, size_t available){ } } -void APRConnector::checkIdle(apr_status_t status){ +void Connector::checkIdle(apr_status_t status){ if(timeoutHandler){ apr_time_t now = apr_time_as_msec(apr_time_now()); if(APR_STATUS_IS_TIMEUP(status)){ @@ -144,7 +144,7 @@ void APRConnector::checkIdle(apr_status_t status){ } } -void APRConnector::setReadTimeout(u_int16_t t){ +void Connector::setReadTimeout(u_int16_t t){ idleIn = t * 1000;//t is in secs if(idleIn && (!timeout || idleIn < timeout)){ timeout = idleIn; @@ -153,7 +153,7 @@ void APRConnector::setReadTimeout(u_int16_t t){ } -void APRConnector::setWriteTimeout(u_int16_t t){ +void Connector::setWriteTimeout(u_int16_t t){ idleOut = t * 1000;//t is in secs if(idleOut && (!timeout || idleOut < timeout)){ timeout = idleOut; @@ -161,7 +161,7 @@ void APRConnector::setWriteTimeout(u_int16_t t){ } } -void APRConnector::setSocketTimeout(){ +void Connector::setSocketTimeout(){ //interval is in microseconds, timeout in milliseconds //want the interval to be a bit shorter than the timeout, hence multiply //by 800 rather than 1000. @@ -169,11 +169,11 @@ void APRConnector::setSocketTimeout(){ apr_socket_timeout_set(socket, interval); } -void APRConnector::setTimeoutHandler(TimeoutHandler* handler){ +void Connector::setTimeoutHandler(TimeoutHandler* handler){ timeoutHandler = handler; } -void APRConnector::run(){ +void Connector::run(){ try{ while(!closed){ apr_size_t bytes(inbuf.available()); diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h index d0a2f470a8..7c52f7e87b 100644 --- a/cpp/src/qpid/io/Connector.h +++ b/cpp/src/qpid/io/Connector.h @@ -18,35 +18,74 @@ #ifndef _Connector_ #define _Connector_ +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" + #include "qpid/framing/InputHandler.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/io/ShutdownHandler.h" #include "qpid/io/TimeoutHandler.h" +#include "qpid/concurrent/Thread.h" +#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/io/Connector.h" +#include "qpid/concurrent/Monitor.h" namespace qpid { namespace io { - class Connector + class Connector : public virtual qpid::framing::OutputHandler, + private virtual qpid::concurrent::Runnable { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::concurrent::Monitor* writeLock; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + public: - virtual void connect(const std::string& host, int port) = 0; - virtual void init(qpid::framing::ProtocolInitiation* header) = 0; - virtual void close() = 0; - virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0; - virtual void setTimeoutHandler(TimeoutHandler* handler) = 0; - virtual void setShutdownHandler(ShutdownHandler* handler) = 0; - virtual qpid::framing::OutputHandler* getOutputHandler() = 0; - /** - * Set the timeout for reads, in secs. - */ - virtual void setReadTimeout(u_int16_t timeout) = 0; - /** - * Set the timeout for writes, in secs. - */ - virtual void setWriteTimeout(u_int16_t timeout) = 0; - virtual ~Connector(){} + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); }; } diff --git a/cpp/src/qpid/io/ConnectorImpl.h b/cpp/src/qpid/io/ConnectorImpl.h deleted file mode 100644 index 55dcf7a2d4..0000000000 --- a/cpp/src/qpid/io/ConnectorImpl.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRConnectorImpl_ -#define _APRConnectorImpl_ - -#ifdef _USE_APR_IO_ -#include "qpid/io/APRConnector.h" -#else -#include "qpid/io/LConnector.h" -#endif - -namespace qpid { -namespace io { - -#ifdef _USE_APR_IO_ - class ConnectorImpl : public virtual APRConnector - { - - public: - ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){}; - virtual ~ConnectorImpl(){}; - }; -#else - class ConnectorImpl : public virtual LConnector - { - - public: - ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){}; - virtual ~ConnectorImpl(){}; - }; - -#endif - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LConnector.h b/cpp/src/qpid/io/LConnector.h deleted file mode 100644 index 5fc86597bd..0000000000 --- a/cpp/src/qpid/io/LConnector.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LConnector_ -#define _LConnector_ - - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/concurrent/Thread.h" -#include "qpid/concurrent/ThreadFactory.h" -#include "qpid/io/Connector.h" - -namespace qpid { -namespace io { - - class LConnector : public virtual qpid::framing::OutputHandler, - public virtual Connector, - private virtual qpid::concurrent::Runnable - { - - public: - LConnector(bool debug = false, u_int32_t buffer_size = 1024){}; - virtual ~LConnector(){}; - - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFAcceptor.cpp b/cpp/src/qpid/io/LFAcceptor.cpp deleted file mode 100644 index 7e51a550af..0000000000 --- a/cpp/src/qpid/io/LFAcceptor.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/io/LFAcceptor.h" -#include "qpid/concurrent/APRBase.h" - -using namespace qpid::concurrent; -using namespace qpid::io; - -LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : - processor(aprPool.pool, worker_threads, 1000, 5000000), - max_connections_per_processor(m), - debug(_debug), - connectionBacklog(c) -{ } - - -int16_t LFAcceptor::bind(int16_t _port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); - return getPort(); -} - -int16_t LFAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void LFAcceptor::run(SessionHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug); - session->init(factory->create(session)); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void LFAcceptor::shutdown() { - // TODO aconway 2006-10-12: Cleanup, this is not thread safe. - if (running) { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - } -} - - -LFAcceptor::~LFAcceptor(){} - -LFAcceptor::APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -LFAcceptor::APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} diff --git a/cpp/src/qpid/io/LFAcceptor.h b/cpp/src/qpid/io/LFAcceptor.h deleted file mode 100644 index 35a556d500..0000000000 --- a/cpp/src/qpid/io/LFAcceptor.h +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFAcceptor_ -#define _LFAcceptor_ - -#include <vector> -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/io/Acceptor.h" -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/APRThreadFactory.h" -#include "qpid/concurrent/APRThreadPool.h" -#include "qpid/io/LFProcessor.h" -#include "qpid/io/LFSessionContext.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/concurrent/Thread.h" - -namespace qpid { -namespace io { - - class LFAcceptor : public virtual Acceptor - { - class APRPool{ - public: - apr_pool_t* pool; - APRPool(); - ~APRPool(); - }; - - APRPool aprPool; - LFProcessor processor; - apr_socket_t* socket; - const int max_connections_per_processor; - const bool debug; - const int connectionBacklog; - - volatile bool running; - - public: - LFAcceptor(bool debug = false, - int connectionBacklog = 10, - int worker_threads = 5, - int max_connections_per_processor = 500); - virtual int16_t bind(int16_t port); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - virtual ~LFAcceptor(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h index 16c789f0ff..5b61f444af 100644 --- a/cpp/src/qpid/io/LFProcessor.h +++ b/cpp/src/qpid/io/LFProcessor.h @@ -21,8 +21,8 @@ #include "apr-1/apr_poll.h" #include <iostream> #include <vector> -#include "qpid/concurrent/APRMonitor.h" -#include "qpid/concurrent/APRThreadFactory.h" +#include "qpid/concurrent/Monitor.h" +#include "qpid/concurrent/ThreadFactory.h" #include "qpid/concurrent/Runnable.h" namespace qpid { @@ -50,9 +50,9 @@ namespace io { const int workerCount; bool hasLeader; qpid::concurrent::Thread** const workers; - qpid::concurrent::APRMonitor leadLock; - qpid::concurrent::APRMonitor countLock; - qpid::concurrent::APRThreadFactory factory; + qpid::concurrent::Monitor leadLock; + qpid::concurrent::Monitor countLock; + qpid::concurrent::ThreadFactory factory; std::vector<LFSessionContext*> sessions; volatile bool stopped; diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp index 6d6d786841..ca1e6431a6 100644 --- a/cpp/src/qpid/io/LFSessionContext.cpp +++ b/cpp/src/qpid/io/LFSessionContext.cpp @@ -54,7 +54,7 @@ LFSessionContext::~LFSessionContext(){ void LFSessionContext::read(){ assert(!reading); // No concurrent read. - reading = APRThread::currentThread(); + reading = Thread::currentThread(); socket.read(in); in.flip(); @@ -79,7 +79,7 @@ void LFSessionContext::read(){ void LFSessionContext::write(){ assert(!writing); // No concurrent writes. - writing = APRThread::currentThread(); + writing = Thread::currentThread(); bool done = isClosed(); while(!done){ @@ -186,4 +186,4 @@ void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ logLock.release(); } -APRMonitor LFSessionContext::logLock; +Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h index 9406bb97b6..8d30b54204 100644 --- a/cpp/src/qpid/io/LFSessionContext.h +++ b/cpp/src/qpid/io/LFSessionContext.h @@ -25,7 +25,7 @@ #include "apr-1/apr_time.h" #include "qpid/framing/AMQFrame.h" -#include "qpid/concurrent/APRMonitor.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/io/APRSocket.h" #include "qpid/framing/Buffer.h" #include "qpid/io/LFProcessor.h" @@ -51,7 +51,7 @@ namespace io { apr_pollfd_t fd; std::queue<qpid::framing::AMQFrame*> framesToWrite; - qpid::concurrent::APRMonitor writeLock; + qpid::concurrent::Monitor writeLock; bool processing; bool closing; @@ -60,7 +60,7 @@ namespace io { volatile unsigned int reading; volatile unsigned int writing; - static qpid::concurrent::APRMonitor logLock; + static qpid::concurrent::Monitor logLock; void log(const std::string& desc, qpid::framing::AMQFrame* const frame); public: diff --git a/cpp/src/qpid/io/SessionManager.h b/cpp/src/qpid/io/doxygen_summary.h index e6b17451e4..1086f65f63 100644 --- a/cpp/src/qpid/io/SessionManager.h +++ b/cpp/src/qpid/io/doxygen_summary.h @@ -1,3 +1,6 @@ +#ifndef _doxygen_summary_ +#define _doxygen_summary_ + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,26 +18,17 @@ * limitations under the License. * */ -#ifndef _SessionManager_ -#define _SessionManager_ - -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" - -namespace qpid { -namespace io { - - class SessionManager - { - public: - virtual SessionHandler* init(SessionContext* ctxt) = 0; - virtual void close(SessionContext* ctxt) = 0; - virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0; - virtual ~SessionManager(){} - }; -} -} +// No code just a doxygen comment for the namespace - -#endif +/** \namspace qpid::io + * IO classes used by client and broker. + * + * This namespace contains platform-neutral classes. Platform + * specific classes are in a sub-namespace named after the + * platform. At build time the appropriate platform classes are + * imported into this namespace so other code does not need to be awre + * of the difference. + * + */ +#endif /*!_doxygen_summary_*/ diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp index e93676513a..40be6485f2 100644 --- a/cpp/src/qpidd.cpp +++ b/cpp/src/qpidd.cpp @@ -37,7 +37,7 @@ int main(int argc, char** argv) config.usage(); }else{ apr_signal(SIGINT, handle_signal); - Broker::shared_ptr broker = Broker::create(config); + Broker::SharedPtr broker = Broker::create(config); broker->run(); } return 0; diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp index e1c7d56573..2a1f1a9747 100644 --- a/cpp/test/client/client_test.cpp +++ b/cpp/test/client/client_test.cpp @@ -22,7 +22,7 @@ #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/MessageListener.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "qpid/framing/FieldTable.h" using namespace qpid::client; @@ -65,7 +65,7 @@ int main(int argc, char**) std::cout << "Bound queue to exchange." << std::endl; //set up a message listener - MonitorImpl monitor; + Monitor monitor; SimpleListener listener(&monitor); string tag("MyTag"); channel.consume(queue, tag, &listener); diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp index 9652c6450f..e53b70cc75 100644 --- a/cpp/test/client/topic_publisher.cpp +++ b/cpp/test/client/topic_publisher.cpp @@ -21,7 +21,7 @@ #include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Queue.h" -#include "qpid/concurrent/MonitorImpl.h" +#include "qpid/concurrent/Monitor.h" #include "unistd.h" #include <apr-1/apr_time.h> #include <cstdlib> @@ -34,7 +34,7 @@ class Publisher : public MessageListener{ Channel* const channel; const std::string controlTopic; const bool transactional; - MonitorImpl monitor; + Monitor monitor; int count; void waitForCompletion(int msgs); diff --git a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp index 1c24076ee3..7acee7c8b9 100644 --- a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp +++ b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp @@ -28,8 +28,6 @@ class ConfigurationTest : public CppUnit::TestCase CPPUNIT_TEST(testIsHelp); CPPUNIT_TEST(testPortLongForm); CPPUNIT_TEST(testPortShortForm); - CPPUNIT_TEST(testAcceptorLongForm); - CPPUNIT_TEST(testAcceptorShortForm); CPPUNIT_TEST(testVarious); CPPUNIT_TEST_SUITE_END(); @@ -59,29 +57,12 @@ class ConfigurationTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(6789, conf.getPort()); } - void testAcceptorLongForm() - { - Configuration conf; - char* argv[] = {"ignore", "--acceptor", "blocking"}; - conf.parse(3, argv); - CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor()); - } - - void testAcceptorShortForm() - { - Configuration conf; - char* argv[] = {"ignore", "-a", "blocking"}; - conf.parse(3, argv); - CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor()); - } - void testVarious() { Configuration conf; char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"}; conf.parse(6, argv); CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default - CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor()); CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads()); CPPUNIT_ASSERT(conf.isTrace()); CPPUNIT_ASSERT(!conf.isHelp()); |