summaryrefslogtreecommitdiff
path: root/cpp/common/io/inc
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
committerRafael H. Schloming <rhs@apache.org>2006-09-19 22:06:50 +0000
commit913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch)
tree7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/common/io/inc
downloadqpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze Repository Root: https://etp.108.redhat.com/svn/etp Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48 Revision: 608 git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/inc')
-rw-r--r--cpp/common/io/inc/APRConnector.h95
-rw-r--r--cpp/common/io/inc/APRIOProcessor.h86
-rw-r--r--cpp/common/io/inc/APRSocket.h45
-rw-r--r--cpp/common/io/inc/Acceptor.h38
-rw-r--r--cpp/common/io/inc/BlockingAPRAcceptor.h62
-rw-r--r--cpp/common/io/inc/BlockingAPRSessionContext.h94
-rw-r--r--cpp/common/io/inc/Connector.h56
-rw-r--r--cpp/common/io/inc/ConnectorImpl.h53
-rw-r--r--cpp/common/io/inc/IOSession.h45
-rw-r--r--cpp/common/io/inc/LConnector.h48
-rw-r--r--cpp/common/io/inc/LFAcceptor.h71
-rw-r--r--cpp/common/io/inc/LFProcessor.h116
-rw-r--r--cpp/common/io/inc/LFSessionContext.h89
-rw-r--r--cpp/common/io/inc/SessionContext.h37
-rw-r--r--cpp/common/io/inc/SessionHandler.h42
-rw-r--r--cpp/common/io/inc/SessionHandlerFactory.h38
-rw-r--r--cpp/common/io/inc/SessionManager.h40
-rw-r--r--cpp/common/io/inc/ShutdownHandler.h34
-rw-r--r--cpp/common/io/inc/TimeoutHandler.h36
19 files changed, 1125 insertions, 0 deletions
diff --git a/cpp/common/io/inc/APRConnector.h b/cpp/common/io/inc/APRConnector.h
new file mode 100644
index 0000000000..c292c4d7e0
--- /dev/null
+++ b/cpp/common/io/inc/APRConnector.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRConnector_
+#define _APRConnector_
+
+#include "apr_network_io.h"
+#include "apr_time.h"
+
+#include "InputHandler.h"
+#include "OutputHandler.h"
+#include "InitiationHandler.h"
+#include "ProtocolInitiation.h"
+#include "ShutdownHandler.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+#include "Connector.h"
+#include "APRMonitor.h"
+
+namespace qpid {
+namespace io {
+
+ class APRConnector : public virtual qpid::framing::OutputHandler,
+ public virtual Connector,
+ private virtual qpid::concurrent::Runnable
+ {
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ apr_time_t lastIn;
+ apr_time_t lastOut;
+ apr_interval_time_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ TimeoutHandler* timeoutHandler;
+ ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::concurrent::APRMonitor* writeLock;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* receiver;
+
+ apr_pool_t* pool;
+ apr_socket_t* socket;
+
+ void checkIdle(apr_status_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, int available);
+ void setSocketTimeout();
+
+ void run();
+
+ public:
+ APRConnector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~APRConnector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(TimeoutHandler* handler);
+ virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/APRIOProcessor.h b/cpp/common/io/inc/APRIOProcessor.h
new file mode 100644
index 0000000000..de0d67a9c4
--- /dev/null
+++ b/cpp/common/io/inc/APRIOProcessor.h
@@ -0,0 +1,86 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRIOProcessor_
+#define _APRIOProcessor_
+
+#include "apr_poll.h"
+#include <queue>
+#include <iostream>
+#include "APRMonitor.h"
+#include "APRThread.h"
+#include "IOSession.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace io {
+
+ /**
+ * Manages non-blocking io through the APR polling
+ * routines. Interacts with the actual io tasks to be performed
+ * through the IOSession interface, an implementing instance of
+ * which must be set as the client_data of the apr_pollfd_t
+ * structures registered.
+ */
+ class APRIOProcessor : private virtual qpid::concurrent::Runnable
+ {
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int count;
+ qpid::concurrent::APRThread thread;
+ qpid::concurrent::APRMonitor lock;
+ volatile bool stopped;
+
+ void poll();
+ virtual void run();
+
+ public:
+ APRIOProcessor(apr_pool_t* pool, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance implementing IOSession, through which the write
+ * and read operations will be performed when readiness is
+ * indicated by the poll response.
+ */
+ void add(apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+
+ ~APRIOProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/APRSocket.h b/cpp/common/io/inc/APRSocket.h
new file mode 100644
index 0000000000..610cf0e175
--- /dev/null
+++ b/cpp/common/io/inc/APRSocket.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRSocket_
+#define _APRSocket_
+
+#include "apr_network_io.h"
+#include "Buffer.h"
+
+namespace qpid {
+namespace io {
+
+ class APRSocket
+ {
+ apr_socket_t* const socket;
+ volatile bool closed;
+ public:
+ APRSocket(apr_socket_t* socket);
+ void read(qpid::framing::Buffer& b);
+ void write(qpid::framing::Buffer& b);
+ void close();
+ bool isOpen();
+ u_int8_t read();
+ ~APRSocket();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/Acceptor.h b/cpp/common/io/inc/Acceptor.h
new file mode 100644
index 0000000000..5c690c546f
--- /dev/null
+++ b/cpp/common/io/inc/Acceptor.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Acceptor_
+#define _Acceptor_
+
+#include "SessionHandlerFactory.h"
+
+
+namespace qpid {
+namespace io {
+
+ class Acceptor
+ {
+ public:
+ virtual void bind(int port, SessionHandlerFactory* factory) = 0;
+ virtual ~Acceptor(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/BlockingAPRAcceptor.h b/cpp/common/io/inc/BlockingAPRAcceptor.h
new file mode 100644
index 0000000000..b77371b02e
--- /dev/null
+++ b/cpp/common/io/inc/BlockingAPRAcceptor.h
@@ -0,0 +1,62 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _BlockingAPRAcceptor_
+#define _BlockingAPRAcceptor_
+
+#include <vector>
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "Acceptor.h"
+#include "APRMonitor.h"
+#include "BlockingAPRSessionContext.h"
+#include "Runnable.h"
+#include "SessionContext.h"
+#include "SessionHandlerFactory.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+#include "ThreadPool.h"
+
+namespace qpid {
+namespace io {
+
+ class BlockingAPRAcceptor : public virtual Acceptor
+ {
+ typedef std::vector<BlockingAPRSessionContext*>::iterator iterator;
+
+ const bool debug;
+ apr_pool_t* apr_pool;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ std::vector<BlockingAPRSessionContext*> sessions;
+ apr_socket_t* socket;
+ const int connectionBacklog;
+ volatile bool running;
+
+ public:
+ BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10);
+ virtual void bind(int port, SessionHandlerFactory* factory);
+ virtual ~BlockingAPRAcceptor();
+ void closed(BlockingAPRSessionContext* session);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/BlockingAPRSessionContext.h b/cpp/common/io/inc/BlockingAPRSessionContext.h
new file mode 100644
index 0000000000..038ebd6662
--- /dev/null
+++ b/cpp/common/io/inc/BlockingAPRSessionContext.h
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _BlockingAPRSessionContext_
+#define _BlockingAPRSessionContext_
+
+#include <queue>
+#include <vector>
+
+#include "apr_network_io.h"
+#include "apr_time.h"
+
+#include "AMQFrame.h"
+#include "APRMonitor.h"
+#include "Buffer.h"
+#include "Runnable.h"
+#include "SessionContext.h"
+#include "SessionHandler.h"
+#include "SessionHandlerFactory.h"
+#include "ShutdownHandler.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+
+namespace qpid {
+namespace io {
+
+ class BlockingAPRAcceptor;
+
+ class BlockingAPRSessionContext : public virtual SessionContext
+ {
+ class Reader : public virtual qpid::concurrent::Runnable{
+ BlockingAPRSessionContext* parent;
+ public:
+ inline Reader(BlockingAPRSessionContext* p) : parent(p){}
+ inline virtual void run(){ parent->read(); }
+ inline virtual ~Reader(){}
+ };
+
+ class Writer : public virtual qpid::concurrent::Runnable{
+ BlockingAPRSessionContext* parent;
+ public:
+ inline Writer(BlockingAPRSessionContext* p) : parent(p){}
+ inline virtual void run(){ parent->write(); }
+ inline virtual ~Writer(){}
+ };
+
+ apr_socket_t* socket;
+ const bool debug;
+ SessionHandler* handler;
+ BlockingAPRAcceptor* acceptor;
+ std::queue<qpid::framing::AMQFrame*> outframes;
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+ qpid::concurrent::APRMonitor outlock;
+ Reader* reader;
+ Writer* writer;
+ qpid::concurrent::Thread* rThread;
+ qpid::concurrent::Thread* wThread;
+
+ volatile bool closed;
+
+ void read();
+ void write();
+ public:
+ BlockingAPRSessionContext(apr_socket_t* socket,
+ qpid::concurrent::ThreadFactory* factory,
+ BlockingAPRAcceptor* acceptor,
+ bool debug = false);
+ ~BlockingAPRSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void shutdown();
+ void init(SessionHandler* handler);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/Connector.h b/cpp/common/io/inc/Connector.h
new file mode 100644
index 0000000000..52684329f1
--- /dev/null
+++ b/cpp/common/io/inc/Connector.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+#include "InputHandler.h"
+#include "OutputHandler.h"
+#include "InitiationHandler.h"
+#include "ProtocolInitiation.h"
+#include "ShutdownHandler.h"
+#include "TimeoutHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class Connector
+ {
+ public:
+ virtual void connect(const std::string& host, int port) = 0;
+ virtual void init(qpid::framing::ProtocolInitiation* header) = 0;
+ virtual void close() = 0;
+ virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0;
+ virtual void setTimeoutHandler(TimeoutHandler* handler) = 0;
+ virtual void setShutdownHandler(ShutdownHandler* handler) = 0;
+ virtual qpid::framing::OutputHandler* getOutputHandler() = 0;
+ /**
+ * Set the timeout for reads, in secs.
+ */
+ virtual void setReadTimeout(u_int16_t timeout) = 0;
+ /**
+ * Set the timeout for writes, in secs.
+ */
+ virtual void setWriteTimeout(u_int16_t timeout) = 0;
+ virtual ~Connector(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/ConnectorImpl.h b/cpp/common/io/inc/ConnectorImpl.h
new file mode 100644
index 0000000000..242f3aed49
--- /dev/null
+++ b/cpp/common/io/inc/ConnectorImpl.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRConnectorImpl_
+#define _APRConnectorImpl_
+
+#ifdef _USE_APR_IO_
+#include "APRConnector.h"
+#else
+#include "LConnector.h"
+#endif
+
+namespace qpid {
+namespace io {
+
+#ifdef _USE_APR_IO_
+ class ConnectorImpl : public virtual APRConnector
+ {
+
+ public:
+ ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):APRConnector(debug,buffer_size){};
+ virtual ~ConnectorImpl(){};
+ };
+#else
+ class ConnectorImpl : public virtual LConnector
+ {
+
+ public:
+ ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):LConnector(debug, buffer_size){};
+ virtual ~ConnectorImpl(){};
+ };
+
+#endif
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/IOSession.h b/cpp/common/io/inc/IOSession.h
new file mode 100644
index 0000000000..45e84d29b1
--- /dev/null
+++ b/cpp/common/io/inc/IOSession.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _IOSession_
+#define _IOSession_
+
+namespace qpid {
+namespace io {
+
+ class IOSession
+ {
+ public:
+ virtual void read() = 0;
+ virtual void write() = 0;
+ virtual ~IOSession(){}
+ };
+
+
+ class IOSessionHolder
+ {
+ IOSession* session;
+ public:
+ IOSessionHolder(IOSession* _session) : session(_session) {}
+ void read(){ session->read(); }
+ void write(){ session->write(); }
+ };
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/LConnector.h b/cpp/common/io/inc/LConnector.h
new file mode 100644
index 0000000000..59d95a6b57
--- /dev/null
+++ b/cpp/common/io/inc/LConnector.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LConnector_
+#define _LConnector_
+
+
+#include "InputHandler.h"
+#include "OutputHandler.h"
+#include "InitiationHandler.h"
+#include "ProtocolInitiation.h"
+#include "Thread.h"
+#include "ThreadFactory.h"
+#include "Connector.h"
+
+namespace qpid {
+namespace io {
+
+ class LConnector : public virtual qpid::framing::OutputHandler,
+ public virtual Connector,
+ private virtual qpid::concurrent::Runnable
+ {
+
+ public:
+ LConnector(bool debug = false, u_int32_t buffer_size = 1024){};
+ virtual ~LConnector(){};
+
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/LFAcceptor.h b/cpp/common/io/inc/LFAcceptor.h
new file mode 100644
index 0000000000..314f811827
--- /dev/null
+++ b/cpp/common/io/inc/LFAcceptor.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+
+#include <vector>
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "Acceptor.h"
+#include "APRMonitor.h"
+#include "APRThreadFactory.h"
+#include "APRThreadPool.h"
+#include "LFProcessor.h"
+#include "LFSessionContext.h"
+#include "Runnable.h"
+#include "SessionContext.h"
+#include "SessionHandlerFactory.h"
+#include "Thread.h"
+
+namespace qpid {
+namespace io {
+
+ class LFAcceptor : public virtual Acceptor
+ {
+ class APRPool{
+ public:
+ apr_pool_t* pool;
+ APRPool();
+ ~APRPool();
+ };
+
+ APRPool aprPool;
+ LFProcessor processor;
+
+ const int max_connections_per_processor;
+ const bool debug;
+ const int connectionBacklog;
+
+ volatile bool running;
+
+ public:
+ LFAcceptor(bool debug = false,
+ int connectionBacklog = 10,
+ int worker_threads = 5,
+ int max_connections_per_processor = 500);
+ virtual void bind(int port, SessionHandlerFactory* factory);
+ virtual ~LFAcceptor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h
new file mode 100644
index 0000000000..7d99d51943
--- /dev/null
+++ b/cpp/common/io/inc/LFProcessor.h
@@ -0,0 +1,116 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include "apr_poll.h"
+#include <iostream>
+#include <vector>
+#include "APRMonitor.h"
+#include "APRThreadFactory.h"
+#include "IOSession.h"
+#include "Runnable.h"
+
+namespace qpid {
+namespace io {
+
+ class LFSessionContext;
+
+ /**
+ * This class processes a poll set using the leaders-followers
+ * pattern for thread synchronization: the leader will poll and on
+ * the poll returning, it will remove a session, promote a
+ * follower to leadership, then process the session.
+ */
+ class LFProcessor : private virtual qpid::concurrent::Runnable
+ {
+ typedef std::vector<LFSessionContext*>::iterator iterator;
+
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int signalledCount;
+ int current;
+ const apr_pollfd_t* signalledFDs;
+ int count;
+ const int workerCount;
+ qpid::concurrent::Thread** const workers;
+ qpid::concurrent::APRMonitor leadLock;
+ qpid::concurrent::APRMonitor countLock;
+ qpid::concurrent::APRThreadFactory factory;
+ std::vector<LFSessionContext*> sessions;
+ bool hasLeader;
+ volatile bool stopped;
+
+ const apr_pollfd_t* getNextEvent();
+ void waitToLead();
+ void relinquishLead();
+ void poll();
+ virtual void run();
+
+ public:
+ LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance of LFSessionContext.
+ */
+ void add(const apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(const apr_pollfd_t* const fd);
+ /**
+ * Signal that the fd passed in, already part of the pollset,
+ * has had its flags altered.
+ */
+ void update(const apr_pollfd_t* const fd);
+ /**
+ * Add an fd back to the poll set after deactivation.
+ */
+ void reactivate(const apr_pollfd_t* const fd);
+ /**
+ * Temporarily remove the fd from the poll set. Called when processing
+ * is about to begin.
+ */
+ void deactivate(const apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+ /**
+ * Start processing.
+ */
+ void start();
+
+ ~LFProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/LFSessionContext.h b/cpp/common/io/inc/LFSessionContext.h
new file mode 100644
index 0000000000..ef6a0d07b0
--- /dev/null
+++ b/cpp/common/io/inc/LFSessionContext.h
@@ -0,0 +1,89 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "AMQFrame.h"
+#include "APRMonitor.h"
+#include "APRSocket.h"
+#include "Buffer.h"
+#include "IOSession.h"
+#include "LFProcessor.h"
+#include "SessionContext.h"
+#include "SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+
+ class LFSessionContext : public virtual SessionContext, public virtual IOSession
+ {
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ SessionHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::concurrent::APRMonitor writeLock;
+
+ bool processing;
+ bool closing;
+
+ //these are just for debug, as a crude way of detecting concurrent access
+ volatile unsigned int reading;
+ volatile unsigned int writing;
+
+ static qpid::concurrent::APRMonitor logLock;
+ void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
+
+ public:
+ LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ virtual void read();
+ virtual void write();
+ void init(SessionHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/SessionContext.h b/cpp/common/io/inc/SessionContext.h
new file mode 100644
index 0000000000..f223a70daa
--- /dev/null
+++ b/cpp/common/io/inc/SessionContext.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionContext_
+#define _SessionContext_
+
+#include "OutputHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionContext : public virtual qpid::framing::OutputHandler
+ {
+ public:
+ virtual void close() = 0;
+ virtual ~SessionContext(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/SessionHandler.h b/cpp/common/io/inc/SessionHandler.h
new file mode 100644
index 0000000000..21a992ab65
--- /dev/null
+++ b/cpp/common/io/inc/SessionHandler.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandler_
+#define _SessionHandler_
+
+#include "InputHandler.h"
+#include "InitiationHandler.h"
+#include "ProtocolInitiation.h"
+#include "TimeoutHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionHandler : public virtual qpid::framing::InitiationHandler,
+ public virtual qpid::framing::InputHandler,
+ public virtual TimeoutHandler
+ {
+ public:
+ virtual void closed() = 0;
+ virtual ~SessionHandler(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/SessionHandlerFactory.h b/cpp/common/io/inc/SessionHandlerFactory.h
new file mode 100644
index 0000000000..67d968b72e
--- /dev/null
+++ b/cpp/common/io/inc/SessionHandlerFactory.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactory_
+#define _SessionHandlerFactory_
+
+#include "SessionContext.h"
+#include "SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionHandlerFactory
+ {
+ public:
+ virtual SessionHandler* create(SessionContext* ctxt) = 0;
+ virtual ~SessionHandlerFactory(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/SessionManager.h b/cpp/common/io/inc/SessionManager.h
new file mode 100644
index 0000000000..30c5208532
--- /dev/null
+++ b/cpp/common/io/inc/SessionManager.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionManager_
+#define _SessionManager_
+
+#include "SessionContext.h"
+#include "SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionManager
+ {
+ public:
+ virtual SessionHandler* init(SessionContext* ctxt) = 0;
+ virtual void close(SessionContext* ctxt) = 0;
+ virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0;
+ virtual ~SessionManager(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/common/io/inc/ShutdownHandler.h b/cpp/common/io/inc/ShutdownHandler.h
new file mode 100644
index 0000000000..186d9eeca4
--- /dev/null
+++ b/cpp/common/io/inc/ShutdownHandler.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ShutdownHandler_
+#define _ShutdownHandler_
+
+namespace qpid {
+namespace io {
+
+ class ShutdownHandler
+ {
+ public:
+ virtual void shutdown() = 0;
+ virtual ~ShutdownHandler(){}
+ };
+
+}
+}
+
+#endif
diff --git a/cpp/common/io/inc/TimeoutHandler.h b/cpp/common/io/inc/TimeoutHandler.h
new file mode 100644
index 0000000000..c92220fd6e
--- /dev/null
+++ b/cpp/common/io/inc/TimeoutHandler.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TimeoutHandler_
+#define _TimeoutHandler_
+
+namespace qpid {
+namespace io {
+
+ class TimeoutHandler
+ {
+ public:
+ virtual void idleOut() = 0;
+ virtual void idleIn() = 0;
+ virtual ~TimeoutHandler(){}
+ };
+
+}
+}
+
+
+#endif