diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/common/io/inc | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/inc')
-rw-r--r-- | cpp/common/io/inc/APRConnector.h | 95 | ||||
-rw-r--r-- | cpp/common/io/inc/APRIOProcessor.h | 86 | ||||
-rw-r--r-- | cpp/common/io/inc/APRSocket.h | 45 | ||||
-rw-r--r-- | cpp/common/io/inc/Acceptor.h | 38 | ||||
-rw-r--r-- | cpp/common/io/inc/BlockingAPRAcceptor.h | 62 | ||||
-rw-r--r-- | cpp/common/io/inc/BlockingAPRSessionContext.h | 94 | ||||
-rw-r--r-- | cpp/common/io/inc/Connector.h | 56 | ||||
-rw-r--r-- | cpp/common/io/inc/ConnectorImpl.h | 53 | ||||
-rw-r--r-- | cpp/common/io/inc/IOSession.h | 45 | ||||
-rw-r--r-- | cpp/common/io/inc/LConnector.h | 48 | ||||
-rw-r--r-- | cpp/common/io/inc/LFAcceptor.h | 71 | ||||
-rw-r--r-- | cpp/common/io/inc/LFProcessor.h | 116 | ||||
-rw-r--r-- | cpp/common/io/inc/LFSessionContext.h | 89 | ||||
-rw-r--r-- | cpp/common/io/inc/SessionContext.h | 37 | ||||
-rw-r--r-- | cpp/common/io/inc/SessionHandler.h | 42 | ||||
-rw-r--r-- | cpp/common/io/inc/SessionHandlerFactory.h | 38 | ||||
-rw-r--r-- | cpp/common/io/inc/SessionManager.h | 40 | ||||
-rw-r--r-- | cpp/common/io/inc/ShutdownHandler.h | 34 | ||||
-rw-r--r-- | cpp/common/io/inc/TimeoutHandler.h | 36 |
19 files changed, 1125 insertions, 0 deletions
diff --git a/cpp/common/io/inc/APRConnector.h b/cpp/common/io/inc/APRConnector.h new file mode 100644 index 0000000000..c292c4d7e0 --- /dev/null +++ b/cpp/common/io/inc/APRConnector.h @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRConnector_ +#define _APRConnector_ + +#include "apr_network_io.h" +#include "apr_time.h" + +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "ShutdownHandler.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "Connector.h" +#include "APRMonitor.h" + +namespace qpid { +namespace io { + + class APRConnector : public virtual qpid::framing::OutputHandler, + public virtual Connector, + private virtual qpid::concurrent::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::concurrent::APRMonitor* writeLock; + qpid::concurrent::ThreadFactory* threadFactory; + qpid::concurrent::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, int available); + void setSocketTimeout(); + + void run(); + + public: + APRConnector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~APRConnector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/APRIOProcessor.h b/cpp/common/io/inc/APRIOProcessor.h new file mode 100644 index 0000000000..de0d67a9c4 --- /dev/null +++ b/cpp/common/io/inc/APRIOProcessor.h @@ -0,0 +1,86 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRIOProcessor_ +#define _APRIOProcessor_ + +#include "apr_poll.h" +#include <queue> +#include <iostream> +#include "APRMonitor.h" +#include "APRThread.h" +#include "IOSession.h" +#include "Runnable.h" + +namespace qpid { +namespace io { + + /** + * Manages non-blocking io through the APR polling + * routines. Interacts with the actual io tasks to be performed + * through the IOSession interface, an implementing instance of + * which must be set as the client_data of the apr_pollfd_t + * structures registered. + */ + class APRIOProcessor : private virtual qpid::concurrent::Runnable + { + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int count; + qpid::concurrent::APRThread thread; + qpid::concurrent::APRMonitor lock; + volatile bool stopped; + + void poll(); + virtual void run(); + + public: + APRIOProcessor(apr_pool_t* pool, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance implementing IOSession, through which the write + * and read operations will be performed when readiness is + * indicated by the poll response. + */ + void add(apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + + ~APRIOProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/APRSocket.h b/cpp/common/io/inc/APRSocket.h new file mode 100644 index 0000000000..610cf0e175 --- /dev/null +++ b/cpp/common/io/inc/APRSocket.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include "apr_network_io.h" +#include "Buffer.h" + +namespace qpid { +namespace io { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/Acceptor.h b/cpp/common/io/inc/Acceptor.h new file mode 100644 index 0000000000..5c690c546f --- /dev/null +++ b/cpp/common/io/inc/Acceptor.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Acceptor_ +#define _Acceptor_ + +#include "SessionHandlerFactory.h" + + +namespace qpid { +namespace io { + + class Acceptor + { + public: + virtual void bind(int port, SessionHandlerFactory* factory) = 0; + virtual ~Acceptor(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/BlockingAPRAcceptor.h b/cpp/common/io/inc/BlockingAPRAcceptor.h new file mode 100644 index 0000000000..b77371b02e --- /dev/null +++ b/cpp/common/io/inc/BlockingAPRAcceptor.h @@ -0,0 +1,62 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _BlockingAPRAcceptor_ +#define _BlockingAPRAcceptor_ + +#include <vector> +#include "apr_network_io.h" +#include "apr_poll.h" +#include "apr_time.h" + +#include "Acceptor.h" +#include "APRMonitor.h" +#include "BlockingAPRSessionContext.h" +#include "Runnable.h" +#include "SessionContext.h" +#include "SessionHandlerFactory.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "ThreadPool.h" + +namespace qpid { +namespace io { + + class BlockingAPRAcceptor : public virtual Acceptor + { + typedef std::vector<BlockingAPRSessionContext*>::iterator iterator; + + const bool debug; + apr_pool_t* apr_pool; + qpid::concurrent::ThreadFactory* threadFactory; + std::vector<BlockingAPRSessionContext*> sessions; + apr_socket_t* socket; + const int connectionBacklog; + volatile bool running; + + public: + BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10); + virtual void bind(int port, SessionHandlerFactory* factory); + virtual ~BlockingAPRAcceptor(); + void closed(BlockingAPRSessionContext* session); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/BlockingAPRSessionContext.h b/cpp/common/io/inc/BlockingAPRSessionContext.h new file mode 100644 index 0000000000..038ebd6662 --- /dev/null +++ b/cpp/common/io/inc/BlockingAPRSessionContext.h @@ -0,0 +1,94 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _BlockingAPRSessionContext_ +#define _BlockingAPRSessionContext_ + +#include <queue> +#include <vector> + +#include "apr_network_io.h" +#include "apr_time.h" + +#include "AMQFrame.h" +#include "APRMonitor.h" +#include "Buffer.h" +#include "Runnable.h" +#include "SessionContext.h" +#include "SessionHandler.h" +#include "SessionHandlerFactory.h" +#include "ShutdownHandler.h" +#include "Thread.h" +#include "ThreadFactory.h" + +namespace qpid { +namespace io { + + class BlockingAPRAcceptor; + + class BlockingAPRSessionContext : public virtual SessionContext + { + class Reader : public virtual qpid::concurrent::Runnable{ + BlockingAPRSessionContext* parent; + public: + inline Reader(BlockingAPRSessionContext* p) : parent(p){} + inline virtual void run(){ parent->read(); } + inline virtual ~Reader(){} + }; + + class Writer : public virtual qpid::concurrent::Runnable{ + BlockingAPRSessionContext* parent; + public: + inline Writer(BlockingAPRSessionContext* p) : parent(p){} + inline virtual void run(){ parent->write(); } + inline virtual ~Writer(){} + }; + + apr_socket_t* socket; + const bool debug; + SessionHandler* handler; + BlockingAPRAcceptor* acceptor; + std::queue<qpid::framing::AMQFrame*> outframes; + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + qpid::concurrent::APRMonitor outlock; + Reader* reader; + Writer* writer; + qpid::concurrent::Thread* rThread; + qpid::concurrent::Thread* wThread; + + volatile bool closed; + + void read(); + void write(); + public: + BlockingAPRSessionContext(apr_socket_t* socket, + qpid::concurrent::ThreadFactory* factory, + BlockingAPRAcceptor* acceptor, + bool debug = false); + ~BlockingAPRSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void shutdown(); + void init(SessionHandler* handler); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/Connector.h b/cpp/common/io/inc/Connector.h new file mode 100644 index 0000000000..52684329f1 --- /dev/null +++ b/cpp/common/io/inc/Connector.h @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "ShutdownHandler.h" +#include "TimeoutHandler.h" + +namespace qpid { +namespace io { + + class Connector + { + public: + virtual void connect(const std::string& host, int port) = 0; + virtual void init(qpid::framing::ProtocolInitiation* header) = 0; + virtual void close() = 0; + virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0; + virtual void setTimeoutHandler(TimeoutHandler* handler) = 0; + virtual void setShutdownHandler(ShutdownHandler* handler) = 0; + virtual qpid::framing::OutputHandler* getOutputHandler() = 0; + /** + * Set the timeout for reads, in secs. + */ + virtual void setReadTimeout(u_int16_t timeout) = 0; + /** + * Set the timeout for writes, in secs. + */ + virtual void setWriteTimeout(u_int16_t timeout) = 0; + virtual ~Connector(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/ConnectorImpl.h b/cpp/common/io/inc/ConnectorImpl.h new file mode 100644 index 0000000000..242f3aed49 --- /dev/null +++ b/cpp/common/io/inc/ConnectorImpl.h @@ -0,0 +1,53 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRConnectorImpl_ +#define _APRConnectorImpl_ + +#ifdef _USE_APR_IO_ +#include "APRConnector.h" +#else +#include "LConnector.h" +#endif + +namespace qpid { +namespace io { + +#ifdef _USE_APR_IO_ + class ConnectorImpl : public virtual APRConnector + { + + public: + ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):APRConnector(debug,buffer_size){}; + virtual ~ConnectorImpl(){}; + }; +#else + class ConnectorImpl : public virtual LConnector + { + + public: + ConnectorImpl(bool debug = false, u_int32_t buffer_size = 1024):LConnector(debug, buffer_size){}; + virtual ~ConnectorImpl(){}; + }; + +#endif + +} +} + + +#endif diff --git a/cpp/common/io/inc/IOSession.h b/cpp/common/io/inc/IOSession.h new file mode 100644 index 0000000000..45e84d29b1 --- /dev/null +++ b/cpp/common/io/inc/IOSession.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _IOSession_ +#define _IOSession_ + +namespace qpid { +namespace io { + + class IOSession + { + public: + virtual void read() = 0; + virtual void write() = 0; + virtual ~IOSession(){} + }; + + + class IOSessionHolder + { + IOSession* session; + public: + IOSessionHolder(IOSession* _session) : session(_session) {} + void read(){ session->read(); } + void write(){ session->write(); } + }; +} +} + + +#endif diff --git a/cpp/common/io/inc/LConnector.h b/cpp/common/io/inc/LConnector.h new file mode 100644 index 0000000000..59d95a6b57 --- /dev/null +++ b/cpp/common/io/inc/LConnector.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LConnector_ +#define _LConnector_ + + +#include "InputHandler.h" +#include "OutputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "Thread.h" +#include "ThreadFactory.h" +#include "Connector.h" + +namespace qpid { +namespace io { + + class LConnector : public virtual qpid::framing::OutputHandler, + public virtual Connector, + private virtual qpid::concurrent::Runnable + { + + public: + LConnector(bool debug = false, u_int32_t buffer_size = 1024){}; + virtual ~LConnector(){}; + + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/LFAcceptor.h b/cpp/common/io/inc/LFAcceptor.h new file mode 100644 index 0000000000..314f811827 --- /dev/null +++ b/cpp/common/io/inc/LFAcceptor.h @@ -0,0 +1,71 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include <vector> +#include "apr_network_io.h" +#include "apr_poll.h" +#include "apr_time.h" + +#include "Acceptor.h" +#include "APRMonitor.h" +#include "APRThreadFactory.h" +#include "APRThreadPool.h" +#include "LFProcessor.h" +#include "LFSessionContext.h" +#include "Runnable.h" +#include "SessionContext.h" +#include "SessionHandlerFactory.h" +#include "Thread.h" + +namespace qpid { +namespace io { + + class LFAcceptor : public virtual Acceptor + { + class APRPool{ + public: + apr_pool_t* pool; + APRPool(); + ~APRPool(); + }; + + APRPool aprPool; + LFProcessor processor; + + const int max_connections_per_processor; + const bool debug; + const int connectionBacklog; + + volatile bool running; + + public: + LFAcceptor(bool debug = false, + int connectionBacklog = 10, + int worker_threads = 5, + int max_connections_per_processor = 500); + virtual void bind(int port, SessionHandlerFactory* factory); + virtual ~LFAcceptor(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/LFProcessor.h b/cpp/common/io/inc/LFProcessor.h new file mode 100644 index 0000000000..7d99d51943 --- /dev/null +++ b/cpp/common/io/inc/LFProcessor.h @@ -0,0 +1,116 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include "apr_poll.h" +#include <iostream> +#include <vector> +#include "APRMonitor.h" +#include "APRThreadFactory.h" +#include "IOSession.h" +#include "Runnable.h" + +namespace qpid { +namespace io { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::concurrent::Runnable + { + typedef std::vector<LFSessionContext*>::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + qpid::concurrent::Thread** const workers; + qpid::concurrent::APRMonitor leadLock; + qpid::concurrent::APRMonitor countLock; + qpid::concurrent::APRThreadFactory factory; + std::vector<LFSessionContext*> sessions; + bool hasLeader; + volatile bool stopped; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/LFSessionContext.h b/cpp/common/io/inc/LFSessionContext.h new file mode 100644 index 0000000000..ef6a0d07b0 --- /dev/null +++ b/cpp/common/io/inc/LFSessionContext.h @@ -0,0 +1,89 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include "apr_network_io.h" +#include "apr_poll.h" +#include "apr_time.h" + +#include "AMQFrame.h" +#include "APRMonitor.h" +#include "APRSocket.h" +#include "Buffer.h" +#include "IOSession.h" +#include "LFProcessor.h" +#include "SessionContext.h" +#include "SessionHandler.h" + +namespace qpid { +namespace io { + + + class LFSessionContext : public virtual SessionContext, public virtual IOSession + { + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue<qpid::framing::AMQFrame*> framesToWrite; + qpid::concurrent::APRMonitor writeLock; + + bool processing; + bool closing; + + //these are just for debug, as a crude way of detecting concurrent access + volatile unsigned int reading; + volatile unsigned int writing; + + static qpid::concurrent::APRMonitor logLock; + void log(const std::string& desc, qpid::framing::AMQFrame* const frame); + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + virtual void read(); + virtual void write(); + void init(SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionContext.h b/cpp/common/io/inc/SessionContext.h new file mode 100644 index 0000000000..f223a70daa --- /dev/null +++ b/cpp/common/io/inc/SessionContext.h @@ -0,0 +1,37 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionContext_ +#define _SessionContext_ + +#include "OutputHandler.h" + +namespace qpid { +namespace io { + + class SessionContext : public virtual qpid::framing::OutputHandler + { + public: + virtual void close() = 0; + virtual ~SessionContext(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionHandler.h b/cpp/common/io/inc/SessionHandler.h new file mode 100644 index 0000000000..21a992ab65 --- /dev/null +++ b/cpp/common/io/inc/SessionHandler.h @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandler_ +#define _SessionHandler_ + +#include "InputHandler.h" +#include "InitiationHandler.h" +#include "ProtocolInitiation.h" +#include "TimeoutHandler.h" + +namespace qpid { +namespace io { + + class SessionHandler : public virtual qpid::framing::InitiationHandler, + public virtual qpid::framing::InputHandler, + public virtual TimeoutHandler + { + public: + virtual void closed() = 0; + virtual ~SessionHandler(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionHandlerFactory.h b/cpp/common/io/inc/SessionHandlerFactory.h new file mode 100644 index 0000000000..67d968b72e --- /dev/null +++ b/cpp/common/io/inc/SessionHandlerFactory.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandlerFactory_ +#define _SessionHandlerFactory_ + +#include "SessionContext.h" +#include "SessionHandler.h" + +namespace qpid { +namespace io { + + class SessionHandlerFactory + { + public: + virtual SessionHandler* create(SessionContext* ctxt) = 0; + virtual ~SessionHandlerFactory(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/SessionManager.h b/cpp/common/io/inc/SessionManager.h new file mode 100644 index 0000000000..30c5208532 --- /dev/null +++ b/cpp/common/io/inc/SessionManager.h @@ -0,0 +1,40 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionManager_ +#define _SessionManager_ + +#include "SessionContext.h" +#include "SessionHandler.h" + +namespace qpid { +namespace io { + + class SessionManager + { + public: + virtual SessionHandler* init(SessionContext* ctxt) = 0; + virtual void close(SessionContext* ctxt) = 0; + virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0; + virtual ~SessionManager(){} + }; + +} +} + + +#endif diff --git a/cpp/common/io/inc/ShutdownHandler.h b/cpp/common/io/inc/ShutdownHandler.h new file mode 100644 index 0000000000..186d9eeca4 --- /dev/null +++ b/cpp/common/io/inc/ShutdownHandler.h @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ShutdownHandler_ +#define _ShutdownHandler_ + +namespace qpid { +namespace io { + + class ShutdownHandler + { + public: + virtual void shutdown() = 0; + virtual ~ShutdownHandler(){} + }; + +} +} + +#endif diff --git a/cpp/common/io/inc/TimeoutHandler.h b/cpp/common/io/inc/TimeoutHandler.h new file mode 100644 index 0000000000..c92220fd6e --- /dev/null +++ b/cpp/common/io/inc/TimeoutHandler.h @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TimeoutHandler_ +#define _TimeoutHandler_ + +namespace qpid { +namespace io { + + class TimeoutHandler + { + public: + virtual void idleOut() = 0; + virtual void idleIn() = 0; + virtual ~TimeoutHandler(){} + }; + +} +} + + +#endif |