diff options
author | Alan Conway <aconway@apache.org> | 2006-11-06 16:43:31 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-11-06 16:43:31 +0000 |
commit | 0c3f1084652f7d81f1ca992676e90c158eeb3e65 (patch) | |
tree | 7488712d1ce7cc8ac5dcda656a36ee303d2d2e14 /cpp/src/qpid | |
parent | 7847c1c0326e654845868ab4ab4ec27863a3e777 (diff) | |
download | qpid-python-0c3f1084652f7d81f1ca992676e90c158eeb3e65.tar.gz |
Minor source reorg, see README.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@471789 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
58 files changed, 1796 insertions, 87 deletions
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h index 509ac3bec1..f3347e6cc5 100644 --- a/cpp/src/qpid/broker/AutoDelete.h +++ b/cpp/src/qpid/broker/AutoDelete.h @@ -20,22 +20,22 @@ #include <iostream> #include <queue> -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/sys/ThreadFactory.h" namespace qpid { namespace broker{ - class AutoDelete : private virtual qpid::concurrent::Runnable{ - qpid::concurrent::ThreadFactory factory; - qpid::concurrent::Monitor lock; - qpid::concurrent::Monitor monitor; + class AutoDelete : private virtual qpid::sys::Runnable{ + qpid::sys::ThreadFactory factory; + qpid::sys::Monitor lock; + qpid::sys::Monitor monitor; std::queue<Queue::shared_ptr> queues; QueueRegistry* const registry; const u_int32_t period; volatile bool stopped; - qpid::concurrent::Thread* runner; + qpid::sys::Thread* runner; Queue::shared_ptr const pop(); void process(); diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 0456b9f133..90a705f173 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -21,7 +21,7 @@ using namespace qpid::broker; -using namespace qpid::io; +using namespace qpid::sys; Broker::Broker(const Configuration& config) : acceptor(new Acceptor(config.getPort(), diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 55ae17571a..c4a3a3fd4a 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -21,8 +21,8 @@ #include "qpid/broker/Configuration.h" #include "qpid/broker/SessionHandlerFactoryImpl.h" -#include "qpid/concurrent/Runnable.h" -#include "qpid/io/Acceptor.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Acceptor.h" #include <qpid/SharedObject.h> namespace qpid { @@ -30,7 +30,7 @@ namespace qpid { /** * A broker instance. */ - class Broker : public qpid::concurrent::Runnable, + class Broker : public qpid::sys::Runnable, public qpid::SharedObject<Broker> { public: @@ -68,7 +68,7 @@ namespace qpid { private: Broker(const Configuration& config); - qpid::io::Acceptor::shared_ptr acceptor; + qpid::sys::Acceptor::shared_ptr acceptor; SessionHandlerFactoryImpl factory; }; } diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp index eae0a743db..45375a9fd3 100644 --- a/cpp/src/qpid/broker/Channel.cpp +++ b/cpp/src/qpid/broker/Channel.cpp @@ -25,7 +25,7 @@ using std::mem_fun_ref; using std::bind2nd; using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::concurrent; +using namespace qpid::sys; Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h index 13bd4cd450..56f0e6b4af 100644 --- a/cpp/src/qpid/broker/Channel.h +++ b/cpp/src/qpid/broker/Channel.h @@ -37,7 +37,7 @@ #include "qpid/broker/TxAck.h" #include "qpid/broker/TxBuffer.h" #include "qpid/broker/TxPublish.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/framing/OutputHandler.h" #include "qpid/framing/AMQContentBody.h" #include "qpid/framing/AMQHeaderBody.h" @@ -77,7 +77,7 @@ namespace qpid { u_int32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - qpid::concurrent::Monitor deliveryLock; + qpid::sys::Monitor deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; TransactionalStore* store; diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h index 2c3143cd3c..a452fe3b4b 100644 --- a/cpp/src/qpid/broker/DirectExchange.h +++ b/cpp/src/qpid/broker/DirectExchange.h @@ -23,14 +23,14 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ std::map<string, std::vector<Queue::shared_ptr> > bindings; - qpid::concurrent::Monitor lock; + qpid::sys::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index b2d2afa5f4..6f6c759aa2 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/TopicExchange.h" using namespace qpid::broker; -using namespace qpid::concurrent; +using namespace qpid::sys; using std::pair; pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index c574a97e14..33deb743f4 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -20,7 +20,7 @@ #include <map> #include "qpid/broker/Exchange.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" namespace qpid { namespace broker { @@ -29,7 +29,7 @@ namespace broker { class ExchangeRegistry{ typedef std::map<string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; - qpid::concurrent::Monitor lock; + qpid::sys::Monitor lock; public: std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException); void destroy(const string& name); diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index c519771132..8f5143c8c0 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -21,7 +21,7 @@ using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::concurrent; +using namespace qpid::sys; FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h index 83fcdb9b34..53b5c39789 100644 --- a/cpp/src/qpid/broker/FanOutExchange.h +++ b/cpp/src/qpid/broker/FanOutExchange.h @@ -23,7 +23,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -31,7 +31,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { std::vector<Queue::shared_ptr> bindings; - qpid::concurrent::Monitor lock; + qpid::sys::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index aece22a413..5d5cf2392c 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -24,7 +24,7 @@ using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::concurrent; +using namespace qpid::sys; // TODO aconway 2006-09-20: More efficient matching algorithm. // The current search algorithm really sucks. diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h index cf699ac455..3cd25739f7 100644 --- a/cpp/src/qpid/broker/HeadersExchange.h +++ b/cpp/src/qpid/broker/HeadersExchange.h @@ -22,7 +22,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange { typedef std::vector<Binding> Bindings; Bindings bindings; - qpid::concurrent::Monitor lock; + qpid::sys::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index baa1b0d915..24fc996f1f 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -15,14 +15,14 @@ * limitations under the License. * */ -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Message.h" #include <iostream> using namespace boost; using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::concurrent; +using namespace qpid::sys; Message::Message(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 8a81b07aef..7f3cfdc470 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -17,11 +17,11 @@ */ #include "qpid/broker/Queue.h" #include "qpid/broker/MessageStore.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include <iostream> using namespace qpid::broker; -using namespace qpid::concurrent; +using namespace qpid::sys; Queue::Queue(const string& _name, u_int32_t _autodelete, MessageStore* const _store, diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 393ca6b196..8473465cab 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -26,8 +26,8 @@ #include "qpid/broker/ConnectionToken.h" #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/Time.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" namespace qpid { namespace broker { @@ -55,7 +55,7 @@ namespace qpid { bool queueing; bool dispatching; int next; - mutable qpid::concurrent::Monitor lock; + mutable qpid::sys::Monitor lock; int64_t lastUsed; Consumer* exclusive; diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index 56452ca907..aa05db9a16 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -16,13 +16,13 @@ * */ #include "qpid/broker/QueueRegistry.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/SessionHandlerImpl.h" #include <sstream> #include <assert.h> using namespace qpid::broker; -using namespace qpid::concurrent; +using namespace qpid::sys; QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){} diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index fb22ef148a..c2fc1cc830 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -19,7 +19,7 @@ #define _QueueRegistry_ #include <map> -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -76,7 +76,7 @@ class QueueRegistry{ private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; - qpid::concurrent::Monitor lock; + qpid::sys::Monitor lock; int counter; MessageStore* const store; }; diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp index 76723881dc..8d4f955270 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/DirectExchange.h" using namespace qpid::broker; -using namespace qpid::io; +using namespace qpid::sys; namespace { diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h index cea5c0fa00..6e679592ff 100644 --- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h @@ -24,16 +24,16 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" -#include "qpid/io/SessionHandlerFactory.h" -#include "qpid/io/TimeoutHandler.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandler.h" +#include "qpid/sys/SessionHandlerFactory.h" +#include "qpid/sys/TimeoutHandler.h" #include <memory> namespace qpid { namespace broker { - class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory + class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory { std::auto_ptr<MessageStore> store; QueueRegistry queues; @@ -43,7 +43,7 @@ namespace qpid { public: SessionHandlerFactoryImpl(u_int32_t timeout = 30000); void recover(); - virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt); + virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); virtual ~SessionHandlerFactoryImpl(); }; diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp index 7a03132671..5c46ed1b1b 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp +++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp @@ -24,9 +24,9 @@ using namespace boost; using namespace qpid::broker; -using namespace qpid::io; +using namespace qpid::sys; using namespace qpid::framing; -using namespace qpid::concurrent; +using namespace qpid::sys; SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, QueueRegistry* _queues, diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h index 62e7ecd4c9..55cc2170ec 100644 --- a/cpp/src/qpid/broker/SessionHandlerImpl.h +++ b/cpp/src/qpid/broker/SessionHandlerImpl.h @@ -33,9 +33,9 @@ #include "qpid/framing/OutputHandler.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/io/SessionContext.h" -#include "qpid/io/SessionHandler.h" -#include "qpid/io/TimeoutHandler.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandler.h" +#include "qpid/sys/TimeoutHandler.h" #include "qpid/broker/TopicExchange.h" namespace qpid { @@ -57,14 +57,14 @@ struct ConnectionException : public std::exception { const char* what() const throw() { return text.c_str(); } }; -class SessionHandlerImpl : public virtual qpid::io::SessionHandler, +class SessionHandlerImpl : public virtual qpid::sys::SessionHandler, public virtual qpid::framing::AMQP_ServerOperations, public virtual ConnectionToken { typedef std::map<u_int16_t, Channel*>::iterator channel_iterator; typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; - qpid::io::SessionContext* context; + qpid::sys::SessionContext* context; qpid::framing::AMQP_ClientProxy client; QueueRegistry* queues; ExchangeRegistry* const exchanges; @@ -100,7 +100,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler, Exchange::shared_ptr findExchange(const string& name); public: - SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues, + SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout); virtual void received(qpid::framing::AMQFrame* frame); virtual void initiated(qpid::framing::ProtocolInitiation* header); diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h index cb773b9a56..e3b9040cb2 100644 --- a/cpp/src/qpid/broker/TopicExchange.h +++ b/cpp/src/qpid/broker/TopicExchange.h @@ -23,7 +23,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Message.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/broker/Queue.h" namespace qpid { @@ -71,7 +71,7 @@ class TopicPattern : public Tokens class TopicExchange : public virtual Exchange{ typedef std::map<TopicPattern, Queue::vector> BindingMap; BindingMap bindings; - qpid::concurrent::Monitor lock; + qpid::sys::Monitor lock; public: static const std::string typeName; diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp index 4579b6126d..a7b30f2f94 100644 --- a/cpp/src/qpid/client/Channel.cpp +++ b/cpp/src/qpid/client/Channel.cpp @@ -16,15 +16,15 @@ * */ #include "qpid/client/Channel.h" -#include "qpid/concurrent/Monitor.h" -#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ThreadFactory.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; using namespace qpid::framing; -using namespace qpid::concurrent; +using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h index 0b60a827b7..fa8cd3afe0 100644 --- a/cpp/src/qpid/client/Channel.h +++ b/cpp/src/qpid/client/Channel.h @@ -25,7 +25,7 @@ #include "qpid/framing/amqp_framing.h" -#include "qpid/concurrent/ThreadFactory.h" +#include "qpid/sys/ThreadFactory.h" #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" @@ -40,7 +40,7 @@ namespace qpid { namespace client { enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3}; - class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{ + class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{ struct Consumer{ MessageListener* listener; int ackMode; @@ -51,15 +51,15 @@ namespace client { u_int16_t id; Connection* con; - qpid::concurrent::ThreadFactory* threadFactory; - qpid::concurrent::Thread* dispatcher; + qpid::sys::ThreadFactory* threadFactory; + qpid::sys::Thread* dispatcher; qpid::framing::OutputHandler* out; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume IncomingMessage* retrieved;//holds response to basic.get - qpid::concurrent::Monitor* dispatchMonitor; - qpid::concurrent::Monitor* retrievalMonitor; + qpid::sys::Monitor* dispatchMonitor; + qpid::sys::Monitor* retrievalMonitor; std::map<std::string, Consumer*> consumers; ReturnedMessageHandler* returnsHandler; bool closed; diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp index acd4488813..5a104bd302 100644 --- a/cpp/src/qpid/client/Connection.cpp +++ b/cpp/src/qpid/client/Connection.cpp @@ -17,15 +17,15 @@ */ #include "qpid/client/Connection.h" #include "qpid/client/Channel.h" -#include "qpid/io/Connector.h" +#include "qpid/sys/Connector.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" #include <iostream> using namespace qpid::client; using namespace qpid::framing; -using namespace qpid::io; -using namespace qpid::concurrent; +using namespace qpid::sys; +using namespace qpid::sys; u_int16_t Connection::channelIdCounter; diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index 5f0b222196..da747d0e1d 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -22,9 +22,9 @@ #define _Connection_ #include "qpid/QpidError.h" -#include "qpid/io/Connector.h" -#include "qpid/io/ShutdownHandler.h" -#include "qpid/io/TimeoutHandler.h" +#include "qpid/sys/Connector.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" #include "qpid/framing/amqp_framing.h" #include "qpid/client/Exchange.h" @@ -40,8 +40,8 @@ namespace client { class Channel; class Connection : public virtual qpid::framing::InputHandler, - public virtual qpid::io::TimeoutHandler, - public virtual qpid::io::ShutdownHandler, + public virtual qpid::sys::TimeoutHandler, + public virtual qpid::sys::ShutdownHandler, private virtual qpid::framing::BodyHandler{ typedef std::map<int, Channel*>::iterator iterator; @@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler, int port; const u_int32_t max_frame_size; std::map<int, Channel*> channels; - qpid::io::Connector* connector; + qpid::sys::Connector* connector; qpid::framing::OutputHandler* out; ResponseHandler responses; volatile bool closed; diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp index fcbc76f625..16989e2c25 100644 --- a/cpp/src/qpid/client/ResponseHandler.cpp +++ b/cpp/src/qpid/client/ResponseHandler.cpp @@ -16,11 +16,11 @@ * */ #include "qpid/client/ResponseHandler.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #include "qpid/QpidError.h" qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ - monitor = new qpid::concurrent::Monitor(); + monitor = new qpid::sys::Monitor(); } qpid::client::ResponseHandler::~ResponseHandler(){ diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h index 179daa1f1f..ac4c351211 100644 --- a/cpp/src/qpid/client/ResponseHandler.h +++ b/cpp/src/qpid/client/ResponseHandler.h @@ -17,7 +17,7 @@ */ #include <string> #include "qpid/framing/amqp_framing.h" -#include "qpid/concurrent/Monitor.h" +#include "qpid/sys/Monitor.h" #ifndef _ResponseHandler_ #define _ResponseHandler_ @@ -28,7 +28,7 @@ namespace qpid { class ResponseHandler{ bool waiting; qpid::framing::AMQMethodBody::shared_ptr response; - qpid::concurrent::Monitor* monitor; + qpid::sys::Monitor* monitor; public: ResponseHandler(); diff --git a/cpp/src/qpid/sys/APRBase.cpp b/cpp/src/qpid/sys/APRBase.cpp new file mode 100644 index 0000000000..91e2b9f428 --- /dev/null +++ b/cpp/src/qpid/sys/APRBase.cpp @@ -0,0 +1,96 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "qpid/sys/APRBase.h" +#include "qpid/QpidError.h" + +using namespace qpid::sys; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ + if(instance == 0){ + instance = new APRBase(); + } + return instance; +} + + +APRBase::APRBase() : count(0){ + apr_initialize(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + apr_terminate(); +} + +bool APRBase::_increment(){ + bool deleted(false); + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(this == instance){ + count++; + }else{ + deleted = true; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + return !deleted; +} + +void APRBase::_decrement(){ + APRBase* copy = 0; + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(--count == 0){ + copy = instance; + instance = 0; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + if(copy != 0){ + delete copy; + } +} + +void APRBase::increment(){ + int count = 0; + while(count++ < 2 && !getInstance()->_increment()){ + std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; + } +} + +void APRBase::decrement(){ + getInstance()->_decrement(); +} + +void qpid::sys::check(apr_status_t status, const std::string& file, const int line){ + if (status != APR_SUCCESS){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + throw QpidError(APR_ERROR + ((int) status), msg, file, line); + } +} + +std::string qpid::sys::get_desc(apr_status_t status){ + const int size = 50; + char tmp[size]; + return std::string(apr_strerror(status, tmp, size)); +} + diff --git a/cpp/src/qpid/sys/APRBase.h b/cpp/src/qpid/sys/APRBase.h new file mode 100644 index 0000000000..9eef07e4c4 --- /dev/null +++ b/cpp/src/qpid/sys/APRBase.h @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include <string> +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_errno.h" + +namespace qpid { +namespace sys { + + /** + * Use of APR libraries necessitates explicit init and terminate + * calls. Any class using APR libs should obtain the reference to + * this singleton and increment on construction, decrement on + * destruction. This class can then correctly initialise apr + * before the first use and terminate after the last use. + */ + class APRBase{ + static APRBase* instance; + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + int count; + + APRBase(); + ~APRBase(); + static APRBase* getInstance(); + bool _increment(); + void _decrement(); + public: + static void increment(); + static void decrement(); + }; + + //this is also a convenient place for a helper function for error checking: + void check(apr_status_t status, const std::string& file, const int line); + std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); + +} +} + + + + +#endif diff --git a/cpp/src/qpid/sys/APRPool.cpp b/cpp/src/qpid/sys/APRPool.cpp new file mode 100644 index 0000000000..0f809ca93c --- /dev/null +++ b/cpp/src/qpid/sys/APRPool.cpp @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "APRPool.h" +#include "qpid/sys/APRBase.h" +#include <boost/pool/detail/singleton.hpp> + +using namespace qpid::sys; +using namespace qpid::sys; + +APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +apr_pool_t* APRPool::get() { + return boost::details::pool::singleton_default<APRPool>::instance().pool; +} + diff --git a/cpp/src/qpid/sys/APRPool.h b/cpp/src/qpid/sys/APRPool.h new file mode 100644 index 0000000000..2196cd64e7 --- /dev/null +++ b/cpp/src/qpid/sys/APRPool.h @@ -0,0 +1,47 @@ +#ifndef _APRPool_ +#define _APRPool_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <apr-1/apr_pools.h> + +namespace qpid { +namespace sys { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + /** Get singleton instance */ + static apr_pool_t* get(); + + private: + apr_pool_t* pool; +}; + +}} + + + + + +#endif /*!_APRPool_*/ diff --git a/cpp/src/qpid/sys/APRSocket.cpp b/cpp/src/qpid/sys/APRSocket.cpp new file mode 100644 index 0000000000..586c03475f --- /dev/null +++ b/cpp/src/qpid/sys/APRSocket.cpp @@ -0,0 +1,76 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/APRSocket.h" +#include <assert.h> +#include <iostream> + +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + std::cout << "Closing socket " << socket << "@" << this << std::endl; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/cpp/src/qpid/sys/APRSocket.h b/cpp/src/qpid/sys/APRSocket.h new file mode 100644 index 0000000000..f7e7ad107b --- /dev/null +++ b/cpp/src/qpid/sys/APRSocket.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include "apr-1/apr_network_io.h" +#include "qpid/framing/Buffer.h" + +namespace qpid { +namespace sys { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Acceptor.cpp b/cpp/src/qpid/sys/Acceptor.cpp new file mode 100644 index 0000000000..f8e8504c6e --- /dev/null +++ b/cpp/src/qpid/sys/Acceptor.cpp @@ -0,0 +1,78 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/APRBase.h" +#include "APRPool.h" + +using namespace qpid::sys; +using namespace qpid::sys; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : + port(port_), + processor(APRPool::get(), threads, 1000, 5000000) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + shutdown(); +} + +void Acceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } +} + + diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h new file mode 100644 index 0000000000..f0f9d6feba --- /dev/null +++ b/cpp/src/qpid/sys/Acceptor.h @@ -0,0 +1,58 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/LFProcessor.h" +#include "qpid/sys/LFSessionContext.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandlerFactory.h" +#include "qpid/sys/Thread.h" +#include <qpid/SharedObject.h> + +namespace qpid { +namespace sys { + +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject<Acceptor> +{ + public: + Acceptor(int16_t port, int backlog, int threads); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + int16_t port; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Connector.cpp b/cpp/src/qpid/sys/Connector.cpp new file mode 100644 index 0000000000..1d4b237d92 --- /dev/null +++ b/cpp/src/qpid/sys/Connector.cpp @@ -0,0 +1,201 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "qpid/sys/APRBase.h" +#include "qpid/sys/Connector.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/QpidError.h" + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ + + APRBase::increment(); + + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); + + threadFactory = new ThreadFactory(); + writeLock = new Monitor(); +} + +Connector::~Connector(){ + delete receiver; + delete writeLock; + delete threadFactory; + apr_pool_destroy(pool); + + APRBase::decrement(); +} + +void Connector::connect(const std::string& host, int port){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + closed = false; + + receiver = threadFactory->create(this); + receiver->start(); +} + +void Connector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void Connector::close(){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + receiver->join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void Connector::writeBlock(AMQDataBlock* data){ + writeLock->acquire(); + data->encode(outbuf); + + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); + writeLock->release(); +} + +void Connector::writeToSocket(char* data, size_t available){ + apr_size_t bytes(available); + apr_size_t written(0); + while(written < available && !closed){ + apr_status_t status = apr_socket_send(socket, data + written, &bytes); + if(status == APR_TIMEUP){ + std::cout << "Write request timed out." << std::endl; + } + if(bytes == 0){ + std::cout << "Write request wrote 0 bytes." << std::endl; + } + lastOut = apr_time_as_msec(apr_time_now()); + written += bytes; + bytes = available - written; + } +} + +void Connector::checkIdle(apr_status_t status){ + if(timeoutHandler){ + apr_time_t now = apr_time_as_msec(apr_time_now()); + if(APR_STATUS_IS_TIMEUP(status)){ + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(APR_STATUS_IS_EOF(status)){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + //interval is in microseconds, timeout in milliseconds + //want the interval to be a bit shorter than the timeout, hence multiply + //by 800 rather than 1000. + apr_interval_time_t interval(timeout * 800); + apr_socket_timeout_set(socket, interval); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + + if(bytes > 0){ + inbuf.move(bytes); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/src/qpid/sys/Connector.h b/cpp/src/qpid/sys/Connector.h new file mode 100644 index 0000000000..611acc417f --- /dev/null +++ b/cpp/src/qpid/sys/Connector.h @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Connector.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace sys { + + class Connector : public virtual qpid::framing::OutputHandler, + private virtual qpid::sys::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Monitor* writeLock; + qpid::sys::ThreadFactory* threadFactory; + qpid::sys::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + + public: + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/LFProcessor.cpp b/cpp/src/qpid/sys/LFProcessor.cpp new file mode 100644 index 0000000000..8c53c86392 --- /dev/null +++ b/cpp/src/qpid/sys/LFProcessor.cpp @@ -0,0 +1,193 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/LFProcessor.h" +#include "qpid/sys/APRBase.h" +#include "qpid/sys/LFSessionContext.h" +#include "qpid/QpidError.h" +#include <sstream> + +using namespace qpid::sys; +using namespace qpid::sys; +using qpid::QpidError; + +// TODO aconway 2006-10-12: stopped is read outside locks. +// + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread*[_workers]), + stopped(false) +{ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + //create & start the required number of threads + for(int i = 0; i < workerCount; i++){ + workers[i] = factory.create(this); + } +} + + +LFProcessor::~LFProcessor(){ + if (!stopped) stop(); + for(int i = 0; i < workerCount; i++){ + delete workers[i]; + } + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i]->start(); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + countLock.acquire(); + sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); + count++; + countLock.release(); +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); + count--; + countLock.release(); +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + Locker locker(countLock); + return count == size; +} + +bool LFProcessor::empty(){ + Locker locker(countLock); + return count == 0; +} + +void LFProcessor::poll() { + apr_status_t status = APR_EGENERAL; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + leadLock.acquire(); + waitToLead(); + if(!stopped){ + const apr_pollfd_t* evt = getNextEvent(); + if(evt){ + LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); + session->startProcessing(); + + relinquishLead(); + leadLock.release(); + + //process event: + if(evt->rtnevents & APR_POLLIN) session->read(); + if(evt->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), session)); + count--; + countLock.release(); + }else{ + session->stopProcessing(); + } + + }else{ + leadLock.release(); + } + }else{ + leadLock.release(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + leadLock.acquire(); + leadLock.notifyAll(); + leadLock.release(); + + for(int i = 0; i < workerCount; i++){ + workers[i]->join(); + } + + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/cpp/src/qpid/sys/LFProcessor.h b/cpp/src/qpid/sys/LFProcessor.h new file mode 100644 index 0000000000..afbb9ea413 --- /dev/null +++ b/cpp/src/qpid/sys/LFProcessor.h @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include "apr-1/apr_poll.h" +#include <iostream> +#include <vector> +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Runnable.h" + +namespace qpid { +namespace sys { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::sys::Runnable + { + typedef std::vector<LFSessionContext*>::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + bool hasLeader; + qpid::sys::Thread** const workers; + qpid::sys::Monitor leadLock; + qpid::sys::Monitor countLock; + qpid::sys::ThreadFactory factory; + std::vector<LFSessionContext*> sessions; + volatile bool stopped; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + /** + * Is processing stopped? + */ + bool isStopped(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/LFSessionContext.cpp b/cpp/src/qpid/sys/LFSessionContext.cpp new file mode 100644 index 0000000000..f2dff87fd0 --- /dev/null +++ b/cpp/src/qpid/sys/LFSessionContext.cpp @@ -0,0 +1,189 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/LFSessionContext.h" +#include "qpid/sys/APRBase.h" +#include "qpid/QpidError.h" +#include <assert.h> + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(32768), + out(32768), + processor(_processor), + processing(false), + closing(false), + reading(0), + writing(0) +{ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + assert(!reading); // No concurrent read. + reading = Thread::currentThread(); + + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); + + reading = 0; +} + +void LFSessionContext::write(){ + assert(!writing); // No concurrent writes. + writing = Thread::currentThread(); + + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + writing = 0; + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + writeLock.acquire(); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + writing = 0; + + if(closing){ + socket.close(); + } + } + writeLock.release(); + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + writeLock.acquire(); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } + writeLock.release(); +} + +void LFSessionContext::startProcessing(){ + writeLock.acquire(); + processing = true; + processor->deactivate(&fd); + writeLock.release(); +} + +void LFSessionContext::stopProcessing(){ + writeLock.acquire(); + processor->reactivate(&fd); + processing = false; + writeLock.release(); +} + +void LFSessionContext::close(){ + closing = true; + writeLock.acquire(); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } + writeLock.release(); +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + logLock.acquire(); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + logLock.release(); +} + +Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/sys/LFSessionContext.h b/cpp/src/qpid/sys/LFSessionContext.h new file mode 100644 index 0000000000..92f52ccf83 --- /dev/null +++ b/cpp/src/qpid/sys/LFSessionContext.h @@ -0,0 +1,88 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/AMQFrame.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/APRSocket.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/LFProcessor.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandler.h" + +namespace qpid { +namespace sys { + + + class LFSessionContext : public virtual SessionContext + { + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue<qpid::framing::AMQFrame*> framesToWrite; + qpid::sys::Monitor writeLock; + + bool processing; + bool closing; + + //these are just for debug, as a crude way of detecting concurrent access + volatile unsigned int reading; + volatile unsigned int writing; + + static qpid::sys::Monitor logLock; + void log(const std::string& desc, qpid::framing::AMQFrame* const frame); + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void read(); + void write(); + void init(SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Monitor.cpp b/cpp/src/qpid/sys/Monitor.cpp new file mode 100644 index 0000000000..79a29c219e --- /dev/null +++ b/cpp/src/qpid/sys/Monitor.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/Monitor.h" +#include <iostream> + +qpid::sys::Monitor::Monitor(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); +} + +qpid::sys::Monitor::~Monitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + APRBase::decrement(); +} + +void qpid::sys::Monitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + + +void qpid::sys::Monitor::wait(u_int64_t time){ + apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void qpid::sys::Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void qpid::sys::Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +void qpid::sys::Monitor::acquire(){ + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} + +void qpid::sys::Monitor::release(){ + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h new file mode 100644 index 0000000000..ddda613b87 --- /dev/null +++ b/cpp/src/qpid/sys/Monitor.h @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Monitor_ +#define _Monitor_ + +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace sys { + +class Monitor +{ + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + apr_thread_cond_t* condition; + + public: + Monitor(); + virtual ~Monitor(); + virtual void wait(); + virtual void wait(u_int64_t time); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); +}; + +class Locker +{ + public: + Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } + ~Locker() { monitor.release(); } + private: + Monitor& monitor; +}; +}} + + +#endif diff --git a/cpp/src/qpid/concurrent/Runnable.cpp b/cpp/src/qpid/sys/Runnable.cpp index 5b18ccfab6..d7d9e968cc 100644 --- a/cpp/src/qpid/concurrent/Runnable.cpp +++ b/cpp/src/qpid/sys/Runnable.cpp @@ -15,5 +15,5 @@ * */ -#include "qpid/concurrent/Runnable.h" -qpid::concurrent::Runnable::~Runnable() {} +#include "qpid/sys/Runnable.h" +qpid::sys::Runnable::~Runnable() {} diff --git a/cpp/src/qpid/concurrent/Runnable.h b/cpp/src/qpid/sys/Runnable.h index 9753a1ad0a..ce13eb2039 100644 --- a/cpp/src/qpid/concurrent/Runnable.h +++ b/cpp/src/qpid/sys/Runnable.h @@ -19,7 +19,7 @@ #define _Runnable_ namespace qpid { -namespace concurrent { +namespace sys { class Runnable { diff --git a/cpp/src/qpid/io/SessionContext.h b/cpp/src/qpid/sys/SessionContext.h index c9a2ce49f2..1362b4f2f2 100644 --- a/cpp/src/qpid/io/SessionContext.h +++ b/cpp/src/qpid/sys/SessionContext.h @@ -21,7 +21,7 @@ #include "qpid/framing/OutputHandler.h" namespace qpid { -namespace io { +namespace sys { /** * Provides the output handler associated with a connection. diff --git a/cpp/src/qpid/io/SessionHandler.h b/cpp/src/qpid/sys/SessionHandler.h index ac455122d6..130e69caf4 100644 --- a/cpp/src/qpid/io/SessionHandler.h +++ b/cpp/src/qpid/sys/SessionHandler.h @@ -21,10 +21,10 @@ #include "qpid/framing/InputHandler.h" #include "qpid/framing/InitiationHandler.h" #include "qpid/framing/ProtocolInitiation.h" -#include "qpid/io/TimeoutHandler.h" +#include "qpid/sys/TimeoutHandler.h" namespace qpid { -namespace io { +namespace sys { class SessionHandler : public qpid::framing::InitiationHandler, diff --git a/cpp/src/qpid/io/SessionHandlerFactory.h b/cpp/src/qpid/sys/SessionHandlerFactory.h index 441b8e9fd6..da2f3e95eb 100644 --- a/cpp/src/qpid/io/SessionHandlerFactory.h +++ b/cpp/src/qpid/sys/SessionHandlerFactory.h @@ -19,7 +19,7 @@ #define _SessionHandlerFactory_ namespace qpid { -namespace io { +namespace sys { class SessionContext; class SessionHandler; diff --git a/cpp/src/qpid/io/ShutdownHandler.h b/cpp/src/qpid/sys/ShutdownHandler.h index 186d9eeca4..4107f09f89 100644 --- a/cpp/src/qpid/io/ShutdownHandler.h +++ b/cpp/src/qpid/sys/ShutdownHandler.h @@ -19,7 +19,7 @@ #define _ShutdownHandler_ namespace qpid { -namespace io { +namespace sys { class ShutdownHandler { diff --git a/cpp/src/qpid/sys/Thread.cpp b/cpp/src/qpid/sys/Thread.cpp new file mode 100644 index 0000000000..4fb9915993 --- /dev/null +++ b/cpp/src/qpid/sys/Thread.cpp @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/Thread.h" +#include "apr-1/apr_portable.h" + +using namespace qpid::sys; + +void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ + ((Runnable*) data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} + +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} + +Thread::~Thread(){ +} + +void Thread::start(){ + CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); +} + +void Thread::join(){ + apr_status_t status; + if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); +} + +void Thread::interrupt(){ + if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); +} + +unsigned int qpid::sys::Thread::currentThread(){ + return apr_os_thread_current(); +} diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h new file mode 100644 index 0000000000..e86bd4a8d2 --- /dev/null +++ b/cpp/src/qpid/sys/Thread.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Thread_ +#define _Thread_ + +#include "apr-1/apr_thread_proc.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" + +namespace qpid { +namespace sys { + + class Thread + { + const Runnable* runnable; + apr_pool_t* pool; + apr_thread_t* runner; + + public: + Thread(apr_pool_t* pool, Runnable* runnable); + virtual ~Thread(); + virtual void start(); + virtual void join(); + virtual void interrupt(); + static unsigned int currentThread(); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/ThreadFactory.cpp b/cpp/src/qpid/sys/ThreadFactory.cpp new file mode 100644 index 0000000000..d33872b9a2 --- /dev/null +++ b/cpp/src/qpid/sys/ThreadFactory.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/ThreadFactory.h" + +using namespace qpid::sys; + +ThreadFactory::ThreadFactory(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +ThreadFactory::~ThreadFactory(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +Thread* ThreadFactory::create(Runnable* runnable){ + return new Thread(pool, runnable); +} diff --git a/cpp/src/qpid/sys/ThreadFactory.h b/cpp/src/qpid/sys/ThreadFactory.h new file mode 100644 index 0000000000..9b7126272a --- /dev/null +++ b/cpp/src/qpid/sys/ThreadFactory.h @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadFactory_ +#define _ThreadFactory_ + +#include "apr-1/apr_thread_proc.h" + +#include "qpid/sys/Thread.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Runnable.h" + +namespace qpid { +namespace sys { + + class ThreadFactory + { + apr_pool_t* pool; + public: + ThreadFactory(); + virtual ~ThreadFactory(); + virtual Thread* create(Runnable* runnable); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp new file mode 100644 index 0000000000..c3512b8df3 --- /dev/null +++ b/cpp/src/qpid/sys/Time.cpp @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/sys/Time.h> +#include <apr-1/apr_time.h> + +namespace qpid { +namespace sys { + +Time Time::now() { + return Time(apr_time_now()*1000); +} + +}} diff --git a/cpp/src/qpid/concurrent/Time.h b/cpp/src/qpid/sys/Time.h index ec64ce8a85..5c7cdfb005 100644 --- a/cpp/src/qpid/concurrent/Time.h +++ b/cpp/src/qpid/sys/Time.h @@ -22,7 +22,7 @@ #include <stdint.h> namespace qpid { -namespace concurrent { +namespace sys { /** * Time since the epoch. diff --git a/cpp/src/qpid/io/TimeoutHandler.h b/cpp/src/qpid/sys/TimeoutHandler.h index c92220fd6e..845bd72a12 100644 --- a/cpp/src/qpid/io/TimeoutHandler.h +++ b/cpp/src/qpid/sys/TimeoutHandler.h @@ -19,7 +19,7 @@ #define _TimeoutHandler_ namespace qpid { -namespace io { +namespace sys { class TimeoutHandler { diff --git a/cpp/src/qpid/io/doxygen_summary.h b/cpp/src/qpid/sys/doxygen_summary.h index 1086f65f63..af89154fdf 100644 --- a/cpp/src/qpid/io/doxygen_summary.h +++ b/cpp/src/qpid/sys/doxygen_summary.h @@ -21,7 +21,7 @@ // No code just a doxygen comment for the namespace -/** \namspace qpid::io +/** \namspace qpid::sys * IO classes used by client and broker. * * This namespace contains platform-neutral classes. Platform |