summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-06 16:43:31 +0000
committerAlan Conway <aconway@apache.org>2006-11-06 16:43:31 +0000
commit0c3f1084652f7d81f1ca992676e90c158eeb3e65 (patch)
tree7488712d1ce7cc8ac5dcda656a36ee303d2d2e14 /cpp/src/qpid
parent7847c1c0326e654845868ab4ab4ec27863a3e777 (diff)
downloadqpid-python-0c3f1084652f7d81f1ca992676e90c158eeb3e65.tar.gz
Minor source reorg, see README.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@471789 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/AutoDelete.h14
-rw-r--r--cpp/src/qpid/broker/Broker.cpp2
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/broker/Channel.cpp2
-rw-r--r--cpp/src/qpid/broker/Channel.h4
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h4
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h4
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h4
-rw-r--r--cpp/src/qpid/broker/Message.cpp4
-rw-r--r--cpp/src/qpid/broker/Queue.cpp4
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h4
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/SessionHandlerFactoryImpl.h12
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.cpp4
-rw-r--r--cpp/src/qpid/broker/SessionHandlerImpl.h12
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h4
-rw-r--r--cpp/src/qpid/client/Channel.cpp6
-rw-r--r--cpp/src/qpid/client/Channel.h12
-rw-r--r--cpp/src/qpid/client/Connection.cpp6
-rw-r--r--cpp/src/qpid/client/Connection.h12
-rw-r--r--cpp/src/qpid/client/ResponseHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ResponseHandler.h4
-rw-r--r--cpp/src/qpid/sys/APRBase.cpp96
-rw-r--r--cpp/src/qpid/sys/APRBase.h63
-rw-r--r--cpp/src/qpid/sys/APRPool.cpp39
-rw-r--r--cpp/src/qpid/sys/APRPool.h47
-rw-r--r--cpp/src/qpid/sys/APRSocket.cpp76
-rw-r--r--cpp/src/qpid/sys/APRSocket.h45
-rw-r--r--cpp/src/qpid/sys/Acceptor.cpp78
-rw-r--r--cpp/src/qpid/sys/Acceptor.h58
-rw-r--r--cpp/src/qpid/sys/Connector.cpp201
-rw-r--r--cpp/src/qpid/sys/Connector.h95
-rw-r--r--cpp/src/qpid/sys/LFProcessor.cpp193
-rw-r--r--cpp/src/qpid/sys/LFProcessor.h119
-rw-r--r--cpp/src/qpid/sys/LFSessionContext.cpp189
-rw-r--r--cpp/src/qpid/sys/LFSessionContext.h88
-rw-r--r--cpp/src/qpid/sys/Monitor.cpp60
-rw-r--r--cpp/src/qpid/sys/Monitor.h56
-rw-r--r--cpp/src/qpid/sys/Runnable.cpp (renamed from cpp/src/qpid/concurrent/Runnable.cpp)4
-rw-r--r--cpp/src/qpid/sys/Runnable.h (renamed from cpp/src/qpid/concurrent/Runnable.h)2
-rw-r--r--cpp/src/qpid/sys/SessionContext.h (renamed from cpp/src/qpid/io/SessionContext.h)2
-rw-r--r--cpp/src/qpid/sys/SessionHandler.h (renamed from cpp/src/qpid/io/SessionHandler.h)4
-rw-r--r--cpp/src/qpid/sys/SessionHandlerFactory.h (renamed from cpp/src/qpid/io/SessionHandlerFactory.h)2
-rw-r--r--cpp/src/qpid/sys/ShutdownHandler.h (renamed from cpp/src/qpid/io/ShutdownHandler.h)2
-rw-r--r--cpp/src/qpid/sys/Thread.cpp50
-rw-r--r--cpp/src/qpid/sys/Thread.h48
-rw-r--r--cpp/src/qpid/sys/ThreadFactory.cpp35
-rw-r--r--cpp/src/qpid/sys/ThreadFactory.h44
-rw-r--r--cpp/src/qpid/sys/Time.cpp29
-rw-r--r--cpp/src/qpid/sys/Time.h (renamed from cpp/src/qpid/concurrent/Time.h)2
-rw-r--r--cpp/src/qpid/sys/TimeoutHandler.h (renamed from cpp/src/qpid/io/TimeoutHandler.h)2
-rw-r--r--cpp/src/qpid/sys/doxygen_summary.h (renamed from cpp/src/qpid/io/doxygen_summary.h)2
58 files changed, 1796 insertions, 87 deletions
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h
index 509ac3bec1..f3347e6cc5 100644
--- a/cpp/src/qpid/broker/AutoDelete.h
+++ b/cpp/src/qpid/broker/AutoDelete.h
@@ -20,22 +20,22 @@
#include <iostream>
#include <queue>
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/sys/ThreadFactory.h"
namespace qpid {
namespace broker{
- class AutoDelete : private virtual qpid::concurrent::Runnable{
- qpid::concurrent::ThreadFactory factory;
- qpid::concurrent::Monitor lock;
- qpid::concurrent::Monitor monitor;
+ class AutoDelete : private virtual qpid::sys::Runnable{
+ qpid::sys::ThreadFactory factory;
+ qpid::sys::Monitor lock;
+ qpid::sys::Monitor monitor;
std::queue<Queue::shared_ptr> queues;
QueueRegistry* const registry;
const u_int32_t period;
volatile bool stopped;
- qpid::concurrent::Thread* runner;
+ qpid::sys::Thread* runner;
Queue::shared_ptr const pop();
void process();
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 0456b9f133..90a705f173 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -21,7 +21,7 @@
using namespace qpid::broker;
-using namespace qpid::io;
+using namespace qpid::sys;
Broker::Broker(const Configuration& config) :
acceptor(new Acceptor(config.getPort(),
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 55ae17571a..c4a3a3fd4a 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -21,8 +21,8 @@
#include "qpid/broker/Configuration.h"
#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/Acceptor.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Acceptor.h"
#include <qpid/SharedObject.h>
namespace qpid {
@@ -30,7 +30,7 @@ namespace qpid {
/**
* A broker instance.
*/
- class Broker : public qpid::concurrent::Runnable,
+ class Broker : public qpid::sys::Runnable,
public qpid::SharedObject<Broker>
{
public:
@@ -68,7 +68,7 @@ namespace qpid {
private:
Broker(const Configuration& config);
- qpid::io::Acceptor::shared_ptr acceptor;
+ qpid::sys::Acceptor::shared_ptr acceptor;
SessionHandlerFactoryImpl factory;
};
}
diff --git a/cpp/src/qpid/broker/Channel.cpp b/cpp/src/qpid/broker/Channel.cpp
index eae0a743db..45375a9fd3 100644
--- a/cpp/src/qpid/broker/Channel.cpp
+++ b/cpp/src/qpid/broker/Channel.cpp
@@ -25,7 +25,7 @@ using std::mem_fun_ref;
using std::bind2nd;
using namespace qpid::broker;
using namespace qpid::framing;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) :
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index 13bd4cd450..56f0e6b4af 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -37,7 +37,7 @@
#include "qpid/broker/TxAck.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -77,7 +77,7 @@ namespace qpid {
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::concurrent::Monitor deliveryLock;
+ qpid::sys::Monitor deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
TransactionalStore* store;
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 2c3143cd3c..a452fe3b4b 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -23,14 +23,14 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::concurrent::Monitor lock;
+ qpid::sys::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp
index b2d2afa5f4..6f6c759aa2 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/TopicExchange.h"
using namespace qpid::broker;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
using std::pair;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index c574a97e14..33deb743f4 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -20,7 +20,7 @@
#include <map>
#include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
namespace qpid {
namespace broker {
@@ -29,7 +29,7 @@ namespace broker {
class ExchangeRegistry{
typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::concurrent::Monitor lock;
+ qpid::sys::Monitor lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index c519771132..8f5143c8c0 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -21,7 +21,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 83fcdb9b34..53b5c39789 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -31,7 +31,7 @@ namespace broker {
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::concurrent::Monitor lock;
+ qpid::sys::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index aece22a413..5d5cf2392c 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -24,7 +24,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
// TODO aconway 2006-09-20: More efficient matching algorithm.
// The current search algorithm really sucks.
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index cf699ac455..3cd25739f7 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -22,7 +22,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange {
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::concurrent::Monitor lock;
+ qpid::sys::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index baa1b0d915..24fc996f1f 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -15,14 +15,14 @@
* limitations under the License.
*
*/
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Message.h"
#include <iostream>
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
Message::Message(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 8a81b07aef..7f3cfdc470 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -17,11 +17,11 @@
*/
#include "qpid/broker/Queue.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include <iostream>
using namespace qpid::broker;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
Queue::Queue(const string& _name, u_int32_t _autodelete,
MessageStore* const _store,
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 393ca6b196..8473465cab 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -26,8 +26,8 @@
#include "qpid/broker/ConnectionToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/Monitor.h"
-#include "qpid/concurrent/Time.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Time.h"
namespace qpid {
namespace broker {
@@ -55,7 +55,7 @@ namespace qpid {
bool queueing;
bool dispatching;
int next;
- mutable qpid::concurrent::Monitor lock;
+ mutable qpid::sys::Monitor lock;
int64_t lastUsed;
Consumer* exclusive;
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 56452ca907..aa05db9a16 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -16,13 +16,13 @@
*
*/
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/SessionHandlerImpl.h"
#include <sstream>
#include <assert.h>
using namespace qpid::broker;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index fb22ef148a..c2fc1cc830 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -19,7 +19,7 @@
#define _QueueRegistry_
#include <map>
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -76,7 +76,7 @@ class QueueRegistry{
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::concurrent::Monitor lock;
+ qpid::sys::Monitor lock;
int counter;
MessageStore* const store;
};
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
index 76723881dc..8d4f955270 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/DirectExchange.h"
using namespace qpid::broker;
-using namespace qpid::io;
+using namespace qpid::sys;
namespace
{
diff --git a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
index cea5c0fa00..6e679592ff 100644
--- a/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerFactoryImpl.h
@@ -24,16 +24,16 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/io/TimeoutHandler.h"
+#include "qpid/sys/SessionContext.h"
+#include "qpid/sys/SessionHandler.h"
+#include "qpid/sys/SessionHandlerFactory.h"
+#include "qpid/sys/TimeoutHandler.h"
#include <memory>
namespace qpid {
namespace broker {
- class SessionHandlerFactoryImpl : public virtual qpid::io::SessionHandlerFactory
+ class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory
{
std::auto_ptr<MessageStore> store;
QueueRegistry queues;
@@ -43,7 +43,7 @@ namespace qpid {
public:
SessionHandlerFactoryImpl(u_int32_t timeout = 30000);
void recover();
- virtual qpid::io::SessionHandler* create(qpid::io::SessionContext* ctxt);
+ virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt);
virtual ~SessionHandlerFactoryImpl();
};
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.cpp b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
index 7a03132671..5c46ed1b1b 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.cpp
@@ -24,9 +24,9 @@
using namespace boost;
using namespace qpid::broker;
-using namespace qpid::io;
+using namespace qpid::sys;
using namespace qpid::framing;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context,
QueueRegistry* _queues,
diff --git a/cpp/src/qpid/broker/SessionHandlerImpl.h b/cpp/src/qpid/broker/SessionHandlerImpl.h
index 62e7ecd4c9..55cc2170ec 100644
--- a/cpp/src/qpid/broker/SessionHandlerImpl.h
+++ b/cpp/src/qpid/broker/SessionHandlerImpl.h
@@ -33,9 +33,9 @@
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
-#include "qpid/io/TimeoutHandler.h"
+#include "qpid/sys/SessionContext.h"
+#include "qpid/sys/SessionHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
#include "qpid/broker/TopicExchange.h"
namespace qpid {
@@ -57,14 +57,14 @@ struct ConnectionException : public std::exception {
const char* what() const throw() { return text.c_str(); }
};
-class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
+class SessionHandlerImpl : public virtual qpid::sys::SessionHandler,
public virtual qpid::framing::AMQP_ServerOperations,
public virtual ConnectionToken
{
typedef std::map<u_int16_t, Channel*>::iterator channel_iterator;
typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;
- qpid::io::SessionContext* context;
+ qpid::sys::SessionContext* context;
qpid::framing::AMQP_ClientProxy client;
QueueRegistry* queues;
ExchangeRegistry* const exchanges;
@@ -100,7 +100,7 @@ class SessionHandlerImpl : public virtual qpid::io::SessionHandler,
Exchange::shared_ptr findExchange(const string& name);
public:
- SessionHandlerImpl(qpid::io::SessionContext* context, QueueRegistry* queues,
+ SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues,
ExchangeRegistry* exchanges, AutoDelete* cleaner, const u_int32_t timeout);
virtual void received(qpid::framing::AMQFrame* frame);
virtual void initiated(qpid::framing::ProtocolInitiation* header);
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index cb773b9a56..e3b9040cb2 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -71,7 +71,7 @@ class TopicPattern : public Tokens
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::concurrent::Monitor lock;
+ qpid::sys::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index 4579b6126d..a7b30f2f94 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -16,15 +16,15 @@
*
*/
#include "qpid/client/Channel.h"
-#include "qpid/concurrent/Monitor.h"
-#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ThreadFactory.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
using namespace boost; //to use dynamic_pointer_cast
using namespace qpid::client;
using namespace qpid::framing;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
id(0),
diff --git a/cpp/src/qpid/client/Channel.h b/cpp/src/qpid/client/Channel.h
index 0b60a827b7..fa8cd3afe0 100644
--- a/cpp/src/qpid/client/Channel.h
+++ b/cpp/src/qpid/client/Channel.h
@@ -25,7 +25,7 @@
#include "qpid/framing/amqp_framing.h"
-#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/sys/ThreadFactory.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Exchange.h"
@@ -40,7 +40,7 @@ namespace qpid {
namespace client {
enum ack_modes {NO_ACK=0, AUTO_ACK=1, LAZY_ACK=2, CLIENT_ACK=3};
- class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::concurrent::Runnable{
+ class Channel : private virtual qpid::framing::BodyHandler, public virtual qpid::sys::Runnable{
struct Consumer{
MessageListener* listener;
int ackMode;
@@ -51,15 +51,15 @@ namespace client {
u_int16_t id;
Connection* con;
- qpid::concurrent::ThreadFactory* threadFactory;
- qpid::concurrent::Thread* dispatcher;
+ qpid::sys::ThreadFactory* threadFactory;
+ qpid::sys::Thread* dispatcher;
qpid::framing::OutputHandler* out;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
IncomingMessage* retrieved;//holds response to basic.get
- qpid::concurrent::Monitor* dispatchMonitor;
- qpid::concurrent::Monitor* retrievalMonitor;
+ qpid::sys::Monitor* dispatchMonitor;
+ qpid::sys::Monitor* retrievalMonitor;
std::map<std::string, Consumer*> consumers;
ReturnedMessageHandler* returnsHandler;
bool closed;
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index acd4488813..5a104bd302 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -17,15 +17,15 @@
*/
#include "qpid/client/Connection.h"
#include "qpid/client/Channel.h"
-#include "qpid/io/Connector.h"
+#include "qpid/sys/Connector.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
#include <iostream>
using namespace qpid::client;
using namespace qpid::framing;
-using namespace qpid::io;
-using namespace qpid::concurrent;
+using namespace qpid::sys;
+using namespace qpid::sys;
u_int16_t Connection::channelIdCounter;
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index 5f0b222196..da747d0e1d 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -22,9 +22,9 @@
#define _Connection_
#include "qpid/QpidError.h"
-#include "qpid/io/Connector.h"
-#include "qpid/io/ShutdownHandler.h"
-#include "qpid/io/TimeoutHandler.h"
+#include "qpid/sys/Connector.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
#include "qpid/framing/amqp_framing.h"
#include "qpid/client/Exchange.h"
@@ -40,8 +40,8 @@ namespace client {
class Channel;
class Connection : public virtual qpid::framing::InputHandler,
- public virtual qpid::io::TimeoutHandler,
- public virtual qpid::io::ShutdownHandler,
+ public virtual qpid::sys::TimeoutHandler,
+ public virtual qpid::sys::ShutdownHandler,
private virtual qpid::framing::BodyHandler{
typedef std::map<int, Channel*>::iterator iterator;
@@ -52,7 +52,7 @@ class Connection : public virtual qpid::framing::InputHandler,
int port;
const u_int32_t max_frame_size;
std::map<int, Channel*> channels;
- qpid::io::Connector* connector;
+ qpid::sys::Connector* connector;
qpid::framing::OutputHandler* out;
ResponseHandler responses;
volatile bool closed;
diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp
index fcbc76f625..16989e2c25 100644
--- a/cpp/src/qpid/client/ResponseHandler.cpp
+++ b/cpp/src/qpid/client/ResponseHandler.cpp
@@ -16,11 +16,11 @@
*
*/
#include "qpid/client/ResponseHandler.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#include "qpid/QpidError.h"
qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
- monitor = new qpid::concurrent::Monitor();
+ monitor = new qpid::sys::Monitor();
}
qpid::client::ResponseHandler::~ResponseHandler(){
diff --git a/cpp/src/qpid/client/ResponseHandler.h b/cpp/src/qpid/client/ResponseHandler.h
index 179daa1f1f..ac4c351211 100644
--- a/cpp/src/qpid/client/ResponseHandler.h
+++ b/cpp/src/qpid/client/ResponseHandler.h
@@ -17,7 +17,7 @@
*/
#include <string>
#include "qpid/framing/amqp_framing.h"
-#include "qpid/concurrent/Monitor.h"
+#include "qpid/sys/Monitor.h"
#ifndef _ResponseHandler_
#define _ResponseHandler_
@@ -28,7 +28,7 @@ namespace qpid {
class ResponseHandler{
bool waiting;
qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::concurrent::Monitor* monitor;
+ qpid::sys::Monitor* monitor;
public:
ResponseHandler();
diff --git a/cpp/src/qpid/sys/APRBase.cpp b/cpp/src/qpid/sys/APRBase.cpp
new file mode 100644
index 0000000000..91e2b9f428
--- /dev/null
+++ b/cpp/src/qpid/sys/APRBase.cpp
@@ -0,0 +1,96 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/sys/APRBase.h"
+#include "qpid/QpidError.h"
+
+using namespace qpid::sys;
+
+APRBase* APRBase::instance = 0;
+
+APRBase* APRBase::getInstance(){
+ if(instance == 0){
+ instance = new APRBase();
+ }
+ return instance;
+}
+
+
+APRBase::APRBase() : count(0){
+ apr_initialize();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, 0));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+}
+
+APRBase::~APRBase(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ apr_terminate();
+}
+
+bool APRBase::_increment(){
+ bool deleted(false);
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(this == instance){
+ count++;
+ }else{
+ deleted = true;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ return !deleted;
+}
+
+void APRBase::_decrement(){
+ APRBase* copy = 0;
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+ if(--count == 0){
+ copy = instance;
+ instance = 0;
+ }
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+ if(copy != 0){
+ delete copy;
+ }
+}
+
+void APRBase::increment(){
+ int count = 0;
+ while(count++ < 2 && !getInstance()->_increment()){
+ std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl;
+ }
+}
+
+void APRBase::decrement(){
+ getInstance()->_decrement();
+}
+
+void qpid::sys::check(apr_status_t status, const std::string& file, const int line){
+ if (status != APR_SUCCESS){
+ const int size = 50;
+ char tmp[size];
+ std::string msg(apr_strerror(status, tmp, size));
+ throw QpidError(APR_ERROR + ((int) status), msg, file, line);
+ }
+}
+
+std::string qpid::sys::get_desc(apr_status_t status){
+ const int size = 50;
+ char tmp[size];
+ return std::string(apr_strerror(status, tmp, size));
+}
+
diff --git a/cpp/src/qpid/sys/APRBase.h b/cpp/src/qpid/sys/APRBase.h
new file mode 100644
index 0000000000..9eef07e4c4
--- /dev/null
+++ b/cpp/src/qpid/sys/APRBase.h
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRBase_
+#define _APRBase_
+
+#include <string>
+#include "apr-1/apr_thread_mutex.h"
+#include "apr-1/apr_errno.h"
+
+namespace qpid {
+namespace sys {
+
+ /**
+ * Use of APR libraries necessitates explicit init and terminate
+ * calls. Any class using APR libs should obtain the reference to
+ * this singleton and increment on construction, decrement on
+ * destruction. This class can then correctly initialise apr
+ * before the first use and terminate after the last use.
+ */
+ class APRBase{
+ static APRBase* instance;
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ int count;
+
+ APRBase();
+ ~APRBase();
+ static APRBase* getInstance();
+ bool _increment();
+ void _decrement();
+ public:
+ static void increment();
+ static void decrement();
+ };
+
+ //this is also a convenient place for a helper function for error checking:
+ void check(apr_status_t status, const std::string& file, const int line);
+ std::string get_desc(apr_status_t status);
+
+#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__);
+
+}
+}
+
+
+
+
+#endif
diff --git a/cpp/src/qpid/sys/APRPool.cpp b/cpp/src/qpid/sys/APRPool.cpp
new file mode 100644
index 0000000000..0f809ca93c
--- /dev/null
+++ b/cpp/src/qpid/sys/APRPool.cpp
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "APRPool.h"
+#include "qpid/sys/APRBase.h"
+#include <boost/pool/detail/singleton.hpp>
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+
+APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+apr_pool_t* APRPool::get() {
+ return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
+
diff --git a/cpp/src/qpid/sys/APRPool.h b/cpp/src/qpid/sys/APRPool.h
new file mode 100644
index 0000000000..2196cd64e7
--- /dev/null
+++ b/cpp/src/qpid/sys/APRPool.h
@@ -0,0 +1,47 @@
+#ifndef _APRPool_
+#define _APRPool_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <boost/noncopyable.hpp>
+#include <apr-1/apr_pools.h>
+
+namespace qpid {
+namespace sys {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ /** Get singleton instance */
+ static apr_pool_t* get();
+
+ private:
+ apr_pool_t* pool;
+};
+
+}}
+
+
+
+
+
+#endif /*!_APRPool_*/
diff --git a/cpp/src/qpid/sys/APRSocket.cpp b/cpp/src/qpid/sys/APRSocket.cpp
new file mode 100644
index 0000000000..586c03475f
--- /dev/null
+++ b/cpp/src/qpid/sys/APRSocket.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/APRBase.h"
+#include "qpid/sys/APRSocket.h"
+#include <assert.h>
+#include <iostream>
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace qpid::sys;
+
+APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
+
+}
+
+void APRSocket::read(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ bytes = buffer.available();
+ apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out
+ }else if(APR_STATUS_IS_EOF(s)){
+ close();
+ }
+}
+
+void APRSocket::write(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ do{
+ bytes = buffer.available();
+ apr_socket_send(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ }while(bytes > 0);
+}
+
+void APRSocket::close(){
+ if(!closed){
+ std::cout << "Closing socket " << socket << "@" << this << std::endl;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ closed = true;
+ }
+}
+
+bool APRSocket::isOpen(){
+ return !closed;
+}
+
+u_int8_t APRSocket::read(){
+ char data[1];
+ apr_size_t bytes = 1;
+ apr_status_t s = apr_socket_recv(socket, data, &bytes);
+ if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ return 0;
+ }else{
+ return *data;
+ }
+}
+
+APRSocket::~APRSocket(){
+}
diff --git a/cpp/src/qpid/sys/APRSocket.h b/cpp/src/qpid/sys/APRSocket.h
new file mode 100644
index 0000000000..f7e7ad107b
--- /dev/null
+++ b/cpp/src/qpid/sys/APRSocket.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRSocket_
+#define _APRSocket_
+
+#include "apr-1/apr_network_io.h"
+#include "qpid/framing/Buffer.h"
+
+namespace qpid {
+namespace sys {
+
+ class APRSocket
+ {
+ apr_socket_t* const socket;
+ volatile bool closed;
+ public:
+ APRSocket(apr_socket_t* socket);
+ void read(qpid::framing::Buffer& b);
+ void write(qpid::framing::Buffer& b);
+ void close();
+ bool isOpen();
+ u_int8_t read();
+ ~APRSocket();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/Acceptor.cpp b/cpp/src/qpid/sys/Acceptor.cpp
new file mode 100644
index 0000000000..f8e8504c6e
--- /dev/null
+++ b/cpp/src/qpid/sys/Acceptor.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/APRBase.h"
+#include "APRPool.h"
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+
+Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
+ port(port_),
+ processor(APRPool::get(), threads, 1000, 5000000)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t Acceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void Acceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void Acceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
+
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
new file mode 100644
index 0000000000..f0f9d6feba
--- /dev/null
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/LFProcessor.h"
+#include "qpid/sys/LFSessionContext.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/SessionContext.h"
+#include "qpid/sys/SessionHandlerFactory.h"
+#include "qpid/sys/Thread.h"
+#include <qpid/SharedObject.h>
+
+namespace qpid {
+namespace sys {
+
+/** APR Acceptor. */
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ Acceptor(int16_t port, int backlog, int threads);
+ virtual int16_t getPort() const;
+ virtual void run(SessionHandlerFactory* factory);
+ virtual void shutdown();
+
+ private:
+ int16_t port;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/Connector.cpp b/cpp/src/qpid/sys/Connector.cpp
new file mode 100644
index 0000000000..1d4b237d92
--- /dev/null
+++ b/cpp/src/qpid/sys/Connector.cpp
@@ -0,0 +1,201 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/sys/APRBase.h"
+#include "qpid/sys/Connector.h"
+#include "qpid/sys/ThreadFactory.h"
+#include "qpid/QpidError.h"
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size){
+
+ APRBase::increment();
+
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
+
+ threadFactory = new ThreadFactory();
+ writeLock = new Monitor();
+}
+
+Connector::~Connector(){
+ delete receiver;
+ delete writeLock;
+ delete threadFactory;
+ apr_pool_destroy(pool);
+
+ APRBase::decrement();
+}
+
+void Connector::connect(const std::string& host, int port){
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ closed = false;
+
+ receiver = threadFactory->create(this);
+ receiver->start();
+}
+
+void Connector::init(ProtocolInitiation* header){
+ writeBlock(header);
+ delete header;
+}
+
+void Connector::close(){
+ closed = true;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ receiver->join();
+}
+
+void Connector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void Connector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* Connector::getOutputHandler(){
+ return this;
+}
+
+void Connector::send(AMQFrame* frame){
+ writeBlock(frame);
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+ delete frame;
+}
+
+void Connector::writeBlock(AMQDataBlock* data){
+ writeLock->acquire();
+ data->encode(outbuf);
+
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+ writeLock->release();
+}
+
+void Connector::writeToSocket(char* data, size_t available){
+ apr_size_t bytes(available);
+ apr_size_t written(0);
+ while(written < available && !closed){
+ apr_status_t status = apr_socket_send(socket, data + written, &bytes);
+ if(status == APR_TIMEUP){
+ std::cout << "Write request timed out." << std::endl;
+ }
+ if(bytes == 0){
+ std::cout << "Write request wrote 0 bytes." << std::endl;
+ }
+ lastOut = apr_time_as_msec(apr_time_now());
+ written += bytes;
+ bytes = available - written;
+ }
+}
+
+void Connector::checkIdle(apr_status_t status){
+ if(timeoutHandler){
+ apr_time_t now = apr_time_as_msec(apr_time_now());
+ if(APR_STATUS_IS_TIMEUP(status)){
+ if(idleIn && (now - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }else if(APR_STATUS_IS_EOF(status)){
+ closed = true;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }else{
+ lastIn = now;
+ }
+ if(idleOut && (now - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void Connector::setReadTimeout(u_int16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void Connector::setWriteTimeout(u_int16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void Connector::setSocketTimeout(){
+ //interval is in microseconds, timeout in milliseconds
+ //want the interval to be a bit shorter than the timeout, hence multiply
+ //by 800 rather than 1000.
+ apr_interval_time_t interval(timeout * 800);
+ apr_socket_timeout_set(socket, interval);
+}
+
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void Connector::run(){
+ try{
+ while(!closed){
+ apr_size_t bytes(inbuf.available());
+ if(bytes < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
+
+ if(bytes > 0){
+ inbuf.move(bytes);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame;
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
diff --git a/cpp/src/qpid/sys/Connector.h b/cpp/src/qpid/sys/Connector.h
new file mode 100644
index 0000000000..611acc417f
--- /dev/null
+++ b/cpp/src/qpid/sys/Connector.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/sys/ShutdownHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/ThreadFactory.h"
+#include "qpid/sys/Connector.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+ class Connector : public virtual qpid::framing::OutputHandler,
+ private virtual qpid::sys::Runnable
+ {
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ apr_time_t lastIn;
+ apr_time_t lastOut;
+ apr_interval_time_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ TimeoutHandler* timeoutHandler;
+ ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::sys::Monitor* writeLock;
+ qpid::sys::ThreadFactory* threadFactory;
+ qpid::sys::Thread* receiver;
+
+ apr_pool_t* pool;
+ apr_socket_t* socket;
+
+ void checkIdle(apr_status_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+
+ public:
+ Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(TimeoutHandler* handler);
+ virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/LFProcessor.cpp b/cpp/src/qpid/sys/LFProcessor.cpp
new file mode 100644
index 0000000000..8c53c86392
--- /dev/null
+++ b/cpp/src/qpid/sys/LFProcessor.cpp
@@ -0,0 +1,193 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/LFProcessor.h"
+#include "qpid/sys/APRBase.h"
+#include "qpid/sys/LFSessionContext.h"
+#include "qpid/QpidError.h"
+#include <sstream>
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+using qpid::QpidError;
+
+// TODO aconway 2006-10-12: stopped is read outside locks.
+//
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+ size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ workerCount(_workers),
+ hasLeader(false),
+ workers(new Thread*[_workers]),
+ stopped(false)
+{
+
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+ //create & start the required number of threads
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = factory.create(this);
+ }
+}
+
+
+LFProcessor::~LFProcessor(){
+ if (!stopped) stop();
+ for(int i = 0; i < workerCount; i++){
+ delete workers[i];
+ }
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i]->start();
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ countLock.acquire();
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+ countLock.release();
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ countLock.acquire();
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+ countLock.release();
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ Locker locker(countLock);
+ return count == size;
+}
+
+bool LFProcessor::empty(){
+ Locker locker(countLock);
+ return count == 0;
+}
+
+void LFProcessor::poll() {
+ apr_status_t status = APR_EGENERAL;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ leadLock.acquire();
+ waitToLead();
+ if(!stopped){
+ const apr_pollfd_t* evt = getNextEvent();
+ if(evt){
+ LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
+ session->startProcessing();
+
+ relinquishLead();
+ leadLock.release();
+
+ //process event:
+ if(evt->rtnevents & APR_POLLIN) session->read();
+ if(evt->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ countLock.acquire();
+ sessions.erase(find(sessions.begin(), sessions.end(), session));
+ count--;
+ countLock.release();
+ }else{
+ session->stopProcessing();
+ }
+
+ }else{
+ leadLock.release();
+ }
+ }else{
+ leadLock.release();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ leadLock.acquire();
+ leadLock.notifyAll();
+ leadLock.release();
+
+ for(int i = 0; i < workerCount; i++){
+ workers[i]->join();
+ }
+
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+
diff --git a/cpp/src/qpid/sys/LFProcessor.h b/cpp/src/qpid/sys/LFProcessor.h
new file mode 100644
index 0000000000..afbb9ea413
--- /dev/null
+++ b/cpp/src/qpid/sys/LFProcessor.h
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include "apr-1/apr_poll.h"
+#include <iostream>
+#include <vector>
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/ThreadFactory.h"
+#include "qpid/sys/Runnable.h"
+
+namespace qpid {
+namespace sys {
+
+ class LFSessionContext;
+
+ /**
+ * This class processes a poll set using the leaders-followers
+ * pattern for thread synchronization: the leader will poll and on
+ * the poll returning, it will remove a session, promote a
+ * follower to leadership, then process the session.
+ */
+ class LFProcessor : private virtual qpid::sys::Runnable
+ {
+ typedef std::vector<LFSessionContext*>::iterator iterator;
+
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int signalledCount;
+ int current;
+ const apr_pollfd_t* signalledFDs;
+ int count;
+ const int workerCount;
+ bool hasLeader;
+ qpid::sys::Thread** const workers;
+ qpid::sys::Monitor leadLock;
+ qpid::sys::Monitor countLock;
+ qpid::sys::ThreadFactory factory;
+ std::vector<LFSessionContext*> sessions;
+ volatile bool stopped;
+
+ const apr_pollfd_t* getNextEvent();
+ void waitToLead();
+ void relinquishLead();
+ void poll();
+ virtual void run();
+
+ public:
+ LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance of LFSessionContext.
+ */
+ void add(const apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(const apr_pollfd_t* const fd);
+ /**
+ * Signal that the fd passed in, already part of the pollset,
+ * has had its flags altered.
+ */
+ void update(const apr_pollfd_t* const fd);
+ /**
+ * Add an fd back to the poll set after deactivation.
+ */
+ void reactivate(const apr_pollfd_t* const fd);
+ /**
+ * Temporarily remove the fd from the poll set. Called when processing
+ * is about to begin.
+ */
+ void deactivate(const apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+ /**
+ * Start processing.
+ */
+ void start();
+ /**
+ * Is processing stopped?
+ */
+ bool isStopped();
+
+ ~LFProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/LFSessionContext.cpp b/cpp/src/qpid/sys/LFSessionContext.cpp
new file mode 100644
index 0000000000..f2dff87fd0
--- /dev/null
+++ b/cpp/src/qpid/sys/LFSessionContext.cpp
@@ -0,0 +1,189 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/LFSessionContext.h"
+#include "qpid/sys/APRBase.h"
+#include "qpid/QpidError.h"
+#include <assert.h>
+
+using namespace qpid::sys;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+ LFProcessor* const _processor,
+ bool _debug) :
+ debug(_debug),
+ socket(_socket),
+ initiated(false),
+ in(32768),
+ out(32768),
+ processor(_processor),
+ processing(false),
+ closing(false),
+ reading(0),
+ writing(0)
+{
+
+ fd.p = _pool;
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.reqevents = APR_POLLIN;
+ fd.client_data = this;
+ fd.desc.s = _socket;
+
+ out.flip();
+}
+
+LFSessionContext::~LFSessionContext(){
+
+}
+
+void LFSessionContext::read(){
+ assert(!reading); // No concurrent read.
+ reading = Thread::currentThread();
+
+ socket.read(in);
+ in.flip();
+ if(initiated){
+ AMQFrame frame;
+ while(frame.decode(in)){
+ if(debug) log("RECV", &frame);
+ handler->received(&frame);
+ }
+ }else{
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ initiated = true;
+ if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
+ }
+ }
+ in.compact();
+
+ reading = 0;
+}
+
+void LFSessionContext::write(){
+ assert(!writing); // No concurrent writes.
+ writing = Thread::currentThread();
+
+ bool done = isClosed();
+ while(!done){
+ if(out.available() > 0){
+ socket.write(out);
+ if(out.available() > 0){
+ writing = 0;
+
+ //incomplete write, leave flags to receive notification of readiness to write
+ done = true;//finished processing for now, but write is still in progress
+ }
+ }else{
+ //do we have any frames to write?
+ writeLock.acquire();
+ if(!framesToWrite.empty()){
+ out.clear();
+ bool encoded(false);
+ AMQFrame* frame = framesToWrite.front();
+ while(frame && out.available() >= frame->size()){
+ encoded = true;
+ frame->encode(out);
+ if(debug) log("SENT", frame);
+ delete frame;
+ framesToWrite.pop();
+ frame = framesToWrite.empty() ? 0 : framesToWrite.front();
+ }
+ if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ out.flip();
+ }else{
+ //reset flags, don't care about writability anymore
+ fd.reqevents = APR_POLLIN;
+ done = true;
+
+ writing = 0;
+
+ if(closing){
+ socket.close();
+ }
+ }
+ writeLock.release();
+ }
+ }
+}
+
+void LFSessionContext::send(AMQFrame* frame){
+ writeLock.acquire();
+ if(!closing){
+ framesToWrite.push(frame);
+ if(!(fd.reqevents & APR_POLLOUT)){
+ fd.reqevents |= APR_POLLOUT;
+ if(!processing){
+ processor->update(&fd);
+ }
+ }
+ }
+ writeLock.release();
+}
+
+void LFSessionContext::startProcessing(){
+ writeLock.acquire();
+ processing = true;
+ processor->deactivate(&fd);
+ writeLock.release();
+}
+
+void LFSessionContext::stopProcessing(){
+ writeLock.acquire();
+ processor->reactivate(&fd);
+ processing = false;
+ writeLock.release();
+}
+
+void LFSessionContext::close(){
+ closing = true;
+ writeLock.acquire();
+ if(!processing){
+ //allow pending frames to be written to socket
+ fd.reqevents = APR_POLLOUT;
+ processor->update(&fd);
+ }
+ writeLock.release();
+}
+
+void LFSessionContext::handleClose(){
+ handler->closed();
+ std::cout << "Session closed [" << &socket << "]" << std::endl;
+ delete handler;
+ delete this;
+}
+
+void LFSessionContext::shutdown(){
+ socket.close();
+ handleClose();
+}
+
+void LFSessionContext::init(SessionHandler* _handler){
+ handler = _handler;
+ processor->add(&fd);
+}
+
+void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+ logLock.acquire();
+ std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+ logLock.release();
+}
+
+Monitor LFSessionContext::logLock;
diff --git a/cpp/src/qpid/sys/LFSessionContext.h b/cpp/src/qpid/sys/LFSessionContext.h
new file mode 100644
index 0000000000..92f52ccf83
--- /dev/null
+++ b/cpp/src/qpid/sys/LFSessionContext.h
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/APRSocket.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/LFProcessor.h"
+#include "qpid/sys/SessionContext.h"
+#include "qpid/sys/SessionHandler.h"
+
+namespace qpid {
+namespace sys {
+
+
+ class LFSessionContext : public virtual SessionContext
+ {
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ SessionHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::sys::Monitor writeLock;
+
+ bool processing;
+ bool closing;
+
+ //these are just for debug, as a crude way of detecting concurrent access
+ volatile unsigned int reading;
+ volatile unsigned int writing;
+
+ static qpid::sys::Monitor logLock;
+ void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
+
+ public:
+ LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void read();
+ void write();
+ void init(SessionHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/Monitor.cpp b/cpp/src/qpid/sys/Monitor.cpp
new file mode 100644
index 0000000000..79a29c219e
--- /dev/null
+++ b/cpp/src/qpid/sys/Monitor.cpp
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/APRBase.h"
+#include "qpid/sys/Monitor.h"
+#include <iostream>
+
+qpid::sys::Monitor::Monitor(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+}
+
+qpid::sys::Monitor::~Monitor(){
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+void qpid::sys::Monitor::wait(){
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+
+void qpid::sys::Monitor::wait(u_int64_t time){
+ apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
+ if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
+}
+
+void qpid::sys::Monitor::notify(){
+ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
+}
+
+void qpid::sys::Monitor::notifyAll(){
+ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
+}
+
+void qpid::sys::Monitor::acquire(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
+}
+
+void qpid::sys::Monitor::release(){
+ CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
+}
diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h
new file mode 100644
index 0000000000..ddda613b87
--- /dev/null
+++ b/cpp/src/qpid/sys/Monitor.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Monitor_
+#define _Monitor_
+
+#include "apr-1/apr_thread_mutex.h"
+#include "apr-1/apr_thread_cond.h"
+#include "qpid/sys/Monitor.h"
+
+namespace qpid {
+namespace sys {
+
+class Monitor
+{
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ apr_thread_cond_t* condition;
+
+ public:
+ Monitor();
+ virtual ~Monitor();
+ virtual void wait();
+ virtual void wait(u_int64_t time);
+ virtual void notify();
+ virtual void notifyAll();
+ virtual void acquire();
+ virtual void release();
+};
+
+class Locker
+{
+ public:
+ Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
+ ~Locker() { monitor.release(); }
+ private:
+ Monitor& monitor;
+};
+}}
+
+
+#endif
diff --git a/cpp/src/qpid/concurrent/Runnable.cpp b/cpp/src/qpid/sys/Runnable.cpp
index 5b18ccfab6..d7d9e968cc 100644
--- a/cpp/src/qpid/concurrent/Runnable.cpp
+++ b/cpp/src/qpid/sys/Runnable.cpp
@@ -15,5 +15,5 @@
*
*/
-#include "qpid/concurrent/Runnable.h"
-qpid::concurrent::Runnable::~Runnable() {}
+#include "qpid/sys/Runnable.h"
+qpid::sys::Runnable::~Runnable() {}
diff --git a/cpp/src/qpid/concurrent/Runnable.h b/cpp/src/qpid/sys/Runnable.h
index 9753a1ad0a..ce13eb2039 100644
--- a/cpp/src/qpid/concurrent/Runnable.h
+++ b/cpp/src/qpid/sys/Runnable.h
@@ -19,7 +19,7 @@
#define _Runnable_
namespace qpid {
-namespace concurrent {
+namespace sys {
class Runnable
{
diff --git a/cpp/src/qpid/io/SessionContext.h b/cpp/src/qpid/sys/SessionContext.h
index c9a2ce49f2..1362b4f2f2 100644
--- a/cpp/src/qpid/io/SessionContext.h
+++ b/cpp/src/qpid/sys/SessionContext.h
@@ -21,7 +21,7 @@
#include "qpid/framing/OutputHandler.h"
namespace qpid {
-namespace io {
+namespace sys {
/**
* Provides the output handler associated with a connection.
diff --git a/cpp/src/qpid/io/SessionHandler.h b/cpp/src/qpid/sys/SessionHandler.h
index ac455122d6..130e69caf4 100644
--- a/cpp/src/qpid/io/SessionHandler.h
+++ b/cpp/src/qpid/sys/SessionHandler.h
@@ -21,10 +21,10 @@
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/io/TimeoutHandler.h"
+#include "qpid/sys/TimeoutHandler.h"
namespace qpid {
-namespace io {
+namespace sys {
class SessionHandler :
public qpid::framing::InitiationHandler,
diff --git a/cpp/src/qpid/io/SessionHandlerFactory.h b/cpp/src/qpid/sys/SessionHandlerFactory.h
index 441b8e9fd6..da2f3e95eb 100644
--- a/cpp/src/qpid/io/SessionHandlerFactory.h
+++ b/cpp/src/qpid/sys/SessionHandlerFactory.h
@@ -19,7 +19,7 @@
#define _SessionHandlerFactory_
namespace qpid {
-namespace io {
+namespace sys {
class SessionContext;
class SessionHandler;
diff --git a/cpp/src/qpid/io/ShutdownHandler.h b/cpp/src/qpid/sys/ShutdownHandler.h
index 186d9eeca4..4107f09f89 100644
--- a/cpp/src/qpid/io/ShutdownHandler.h
+++ b/cpp/src/qpid/sys/ShutdownHandler.h
@@ -19,7 +19,7 @@
#define _ShutdownHandler_
namespace qpid {
-namespace io {
+namespace sys {
class ShutdownHandler
{
diff --git a/cpp/src/qpid/sys/Thread.cpp b/cpp/src/qpid/sys/Thread.cpp
new file mode 100644
index 0000000000..4fb9915993
--- /dev/null
+++ b/cpp/src/qpid/sys/Thread.cpp
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/APRBase.h"
+#include "qpid/sys/Thread.h"
+#include "apr-1/apr_portable.h"
+
+using namespace qpid::sys;
+
+void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
+ ((Runnable*) data)->run();
+ CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
+ return NULL;
+}
+
+Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
+
+Thread::~Thread(){
+}
+
+void Thread::start(){
+ CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
+}
+
+void Thread::join(){
+ apr_status_t status;
+ if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
+}
+
+void Thread::interrupt(){
+ if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
+}
+
+unsigned int qpid::sys::Thread::currentThread(){
+ return apr_os_thread_current();
+}
diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h
new file mode 100644
index 0000000000..e86bd4a8d2
--- /dev/null
+++ b/cpp/src/qpid/sys/Thread.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Thread_
+#define _Thread_
+
+#include "apr-1/apr_thread_proc.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+
+namespace qpid {
+namespace sys {
+
+ class Thread
+ {
+ const Runnable* runnable;
+ apr_pool_t* pool;
+ apr_thread_t* runner;
+
+ public:
+ Thread(apr_pool_t* pool, Runnable* runnable);
+ virtual ~Thread();
+ virtual void start();
+ virtual void join();
+ virtual void interrupt();
+ static unsigned int currentThread();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/ThreadFactory.cpp b/cpp/src/qpid/sys/ThreadFactory.cpp
new file mode 100644
index 0000000000..d33872b9a2
--- /dev/null
+++ b/cpp/src/qpid/sys/ThreadFactory.cpp
@@ -0,0 +1,35 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/sys/APRBase.h"
+#include "qpid/sys/ThreadFactory.h"
+
+using namespace qpid::sys;
+
+ThreadFactory::ThreadFactory(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+ThreadFactory::~ThreadFactory(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
+
+Thread* ThreadFactory::create(Runnable* runnable){
+ return new Thread(pool, runnable);
+}
diff --git a/cpp/src/qpid/sys/ThreadFactory.h b/cpp/src/qpid/sys/ThreadFactory.h
new file mode 100644
index 0000000000..9b7126272a
--- /dev/null
+++ b/cpp/src/qpid/sys/ThreadFactory.h
@@ -0,0 +1,44 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ThreadFactory_
+#define _ThreadFactory_
+
+#include "apr-1/apr_thread_proc.h"
+
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/ThreadFactory.h"
+#include "qpid/sys/Runnable.h"
+
+namespace qpid {
+namespace sys {
+
+ class ThreadFactory
+ {
+ apr_pool_t* pool;
+ public:
+ ThreadFactory();
+ virtual ~ThreadFactory();
+ virtual Thread* create(Runnable* runnable);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp
new file mode 100644
index 0000000000..c3512b8df3
--- /dev/null
+++ b/cpp/src/qpid/sys/Time.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <qpid/sys/Time.h>
+#include <apr-1/apr_time.h>
+
+namespace qpid {
+namespace sys {
+
+Time Time::now() {
+ return Time(apr_time_now()*1000);
+}
+
+}}
diff --git a/cpp/src/qpid/concurrent/Time.h b/cpp/src/qpid/sys/Time.h
index ec64ce8a85..5c7cdfb005 100644
--- a/cpp/src/qpid/concurrent/Time.h
+++ b/cpp/src/qpid/sys/Time.h
@@ -22,7 +22,7 @@
#include <stdint.h>
namespace qpid {
-namespace concurrent {
+namespace sys {
/**
* Time since the epoch.
diff --git a/cpp/src/qpid/io/TimeoutHandler.h b/cpp/src/qpid/sys/TimeoutHandler.h
index c92220fd6e..845bd72a12 100644
--- a/cpp/src/qpid/io/TimeoutHandler.h
+++ b/cpp/src/qpid/sys/TimeoutHandler.h
@@ -19,7 +19,7 @@
#define _TimeoutHandler_
namespace qpid {
-namespace io {
+namespace sys {
class TimeoutHandler
{
diff --git a/cpp/src/qpid/io/doxygen_summary.h b/cpp/src/qpid/sys/doxygen_summary.h
index 1086f65f63..af89154fdf 100644
--- a/cpp/src/qpid/io/doxygen_summary.h
+++ b/cpp/src/qpid/sys/doxygen_summary.h
@@ -21,7 +21,7 @@
// No code just a doxygen comment for the namespace
-/** \namspace qpid::io
+/** \namspace qpid::sys
* IO classes used by client and broker.
*
* This namespace contains platform-neutral classes. Platform