diff options
Diffstat (limited to 'cpp/src/qpid/sys')
30 files changed, 2040 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/APRBase.cpp b/cpp/src/qpid/sys/APRBase.cpp new file mode 100644 index 0000000000..91e2b9f428 --- /dev/null +++ b/cpp/src/qpid/sys/APRBase.cpp @@ -0,0 +1,96 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "qpid/sys/APRBase.h" +#include "qpid/QpidError.h" + +using namespace qpid::sys; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ + if(instance == 0){ + instance = new APRBase(); + } + return instance; +} + + +APRBase::APRBase() : count(0){ + apr_initialize(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + apr_terminate(); +} + +bool APRBase::_increment(){ + bool deleted(false); + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(this == instance){ + count++; + }else{ + deleted = true; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + return !deleted; +} + +void APRBase::_decrement(){ + APRBase* copy = 0; + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(--count == 0){ + copy = instance; + instance = 0; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + if(copy != 0){ + delete copy; + } +} + +void APRBase::increment(){ + int count = 0; + while(count++ < 2 && !getInstance()->_increment()){ + std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; + } +} + +void APRBase::decrement(){ + getInstance()->_decrement(); +} + +void qpid::sys::check(apr_status_t status, const std::string& file, const int line){ + if (status != APR_SUCCESS){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + throw QpidError(APR_ERROR + ((int) status), msg, file, line); + } +} + +std::string qpid::sys::get_desc(apr_status_t status){ + const int size = 50; + char tmp[size]; + return std::string(apr_strerror(status, tmp, size)); +} + diff --git a/cpp/src/qpid/sys/APRBase.h b/cpp/src/qpid/sys/APRBase.h new file mode 100644 index 0000000000..9eef07e4c4 --- /dev/null +++ b/cpp/src/qpid/sys/APRBase.h @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include <string> +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_errno.h" + +namespace qpid { +namespace sys { + + /** + * Use of APR libraries necessitates explicit init and terminate + * calls. Any class using APR libs should obtain the reference to + * this singleton and increment on construction, decrement on + * destruction. This class can then correctly initialise apr + * before the first use and terminate after the last use. + */ + class APRBase{ + static APRBase* instance; + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + int count; + + APRBase(); + ~APRBase(); + static APRBase* getInstance(); + bool _increment(); + void _decrement(); + public: + static void increment(); + static void decrement(); + }; + + //this is also a convenient place for a helper function for error checking: + void check(apr_status_t status, const std::string& file, const int line); + std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); + +} +} + + + + +#endif diff --git a/cpp/src/qpid/sys/APRPool.cpp b/cpp/src/qpid/sys/APRPool.cpp new file mode 100644 index 0000000000..0f809ca93c --- /dev/null +++ b/cpp/src/qpid/sys/APRPool.cpp @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "APRPool.h" +#include "qpid/sys/APRBase.h" +#include <boost/pool/detail/singleton.hpp> + +using namespace qpid::sys; +using namespace qpid::sys; + +APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +apr_pool_t* APRPool::get() { + return boost::details::pool::singleton_default<APRPool>::instance().pool; +} + diff --git a/cpp/src/qpid/sys/APRPool.h b/cpp/src/qpid/sys/APRPool.h new file mode 100644 index 0000000000..2196cd64e7 --- /dev/null +++ b/cpp/src/qpid/sys/APRPool.h @@ -0,0 +1,47 @@ +#ifndef _APRPool_ +#define _APRPool_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <boost/noncopyable.hpp> +#include <apr-1/apr_pools.h> + +namespace qpid { +namespace sys { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + /** Get singleton instance */ + static apr_pool_t* get(); + + private: + apr_pool_t* pool; +}; + +}} + + + + + +#endif /*!_APRPool_*/ diff --git a/cpp/src/qpid/sys/APRSocket.cpp b/cpp/src/qpid/sys/APRSocket.cpp new file mode 100644 index 0000000000..586c03475f --- /dev/null +++ b/cpp/src/qpid/sys/APRSocket.cpp @@ -0,0 +1,76 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/APRSocket.h" +#include <assert.h> +#include <iostream> + +using namespace qpid::sys; +using namespace qpid::framing; +using namespace qpid::sys; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + std::cout << "Closing socket " << socket << "@" << this << std::endl; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/cpp/src/qpid/sys/APRSocket.h b/cpp/src/qpid/sys/APRSocket.h new file mode 100644 index 0000000000..f7e7ad107b --- /dev/null +++ b/cpp/src/qpid/sys/APRSocket.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include "apr-1/apr_network_io.h" +#include "qpid/framing/Buffer.h" + +namespace qpid { +namespace sys { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Acceptor.cpp b/cpp/src/qpid/sys/Acceptor.cpp new file mode 100644 index 0000000000..f8e8504c6e --- /dev/null +++ b/cpp/src/qpid/sys/Acceptor.cpp @@ -0,0 +1,78 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/APRBase.h" +#include "APRPool.h" + +using namespace qpid::sys; +using namespace qpid::sys; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : + port(port_), + processor(APRPool::get(), threads, 1000, 5000000) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + shutdown(); +} + +void Acceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } +} + + diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h new file mode 100644 index 0000000000..f0f9d6feba --- /dev/null +++ b/cpp/src/qpid/sys/Acceptor.h @@ -0,0 +1,58 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/sys/Acceptor.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/LFProcessor.h" +#include "qpid/sys/LFSessionContext.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandlerFactory.h" +#include "qpid/sys/Thread.h" +#include <qpid/SharedObject.h> + +namespace qpid { +namespace sys { + +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject<Acceptor> +{ + public: + Acceptor(int16_t port, int backlog, int threads); + virtual int16_t getPort() const; + virtual void run(SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + int16_t port; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Connector.cpp b/cpp/src/qpid/sys/Connector.cpp new file mode 100644 index 0000000000..1d4b237d92 --- /dev/null +++ b/cpp/src/qpid/sys/Connector.cpp @@ -0,0 +1,201 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "qpid/sys/APRBase.h" +#include "qpid/sys/Connector.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/QpidError.h" + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ + + APRBase::increment(); + + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); + + threadFactory = new ThreadFactory(); + writeLock = new Monitor(); +} + +Connector::~Connector(){ + delete receiver; + delete writeLock; + delete threadFactory; + apr_pool_destroy(pool); + + APRBase::decrement(); +} + +void Connector::connect(const std::string& host, int port){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + closed = false; + + receiver = threadFactory->create(this); + receiver->start(); +} + +void Connector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void Connector::close(){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + receiver->join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void Connector::writeBlock(AMQDataBlock* data){ + writeLock->acquire(); + data->encode(outbuf); + + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); + writeLock->release(); +} + +void Connector::writeToSocket(char* data, size_t available){ + apr_size_t bytes(available); + apr_size_t written(0); + while(written < available && !closed){ + apr_status_t status = apr_socket_send(socket, data + written, &bytes); + if(status == APR_TIMEUP){ + std::cout << "Write request timed out." << std::endl; + } + if(bytes == 0){ + std::cout << "Write request wrote 0 bytes." << std::endl; + } + lastOut = apr_time_as_msec(apr_time_now()); + written += bytes; + bytes = available - written; + } +} + +void Connector::checkIdle(apr_status_t status){ + if(timeoutHandler){ + apr_time_t now = apr_time_as_msec(apr_time_now()); + if(APR_STATUS_IS_TIMEUP(status)){ + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(APR_STATUS_IS_EOF(status)){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + //interval is in microseconds, timeout in milliseconds + //want the interval to be a bit shorter than the timeout, hence multiply + //by 800 rather than 1000. + apr_interval_time_t interval(timeout * 800); + apr_socket_timeout_set(socket, interval); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + + if(bytes > 0){ + inbuf.move(bytes); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/src/qpid/sys/Connector.h b/cpp/src/qpid/sys/Connector.h new file mode 100644 index 0000000000..611acc417f --- /dev/null +++ b/cpp/src/qpid/sys/Connector.h @@ -0,0 +1,95 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Connector.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace sys { + + class Connector : public virtual qpid::framing::OutputHandler, + private virtual qpid::sys::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + apr_time_t lastIn; + apr_time_t lastOut; + apr_interval_time_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Monitor* writeLock; + qpid::sys::ThreadFactory* threadFactory; + qpid::sys::Thread* receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + + public: + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/LFProcessor.cpp b/cpp/src/qpid/sys/LFProcessor.cpp new file mode 100644 index 0000000000..8c53c86392 --- /dev/null +++ b/cpp/src/qpid/sys/LFProcessor.cpp @@ -0,0 +1,193 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/LFProcessor.h" +#include "qpid/sys/APRBase.h" +#include "qpid/sys/LFSessionContext.h" +#include "qpid/QpidError.h" +#include <sstream> + +using namespace qpid::sys; +using namespace qpid::sys; +using qpid::QpidError; + +// TODO aconway 2006-10-12: stopped is read outside locks. +// + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread*[_workers]), + stopped(false) +{ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + //create & start the required number of threads + for(int i = 0; i < workerCount; i++){ + workers[i] = factory.create(this); + } +} + + +LFProcessor::~LFProcessor(){ + if (!stopped) stop(); + for(int i = 0; i < workerCount; i++){ + delete workers[i]; + } + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i]->start(); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + countLock.acquire(); + sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); + count++; + countLock.release(); +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); + count--; + countLock.release(); +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + Locker locker(countLock); + return count == size; +} + +bool LFProcessor::empty(){ + Locker locker(countLock); + return count == 0; +} + +void LFProcessor::poll() { + apr_status_t status = APR_EGENERAL; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + leadLock.acquire(); + waitToLead(); + if(!stopped){ + const apr_pollfd_t* evt = getNextEvent(); + if(evt){ + LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); + session->startProcessing(); + + relinquishLead(); + leadLock.release(); + + //process event: + if(evt->rtnevents & APR_POLLIN) session->read(); + if(evt->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), session)); + count--; + countLock.release(); + }else{ + session->stopProcessing(); + } + + }else{ + leadLock.release(); + } + }else{ + leadLock.release(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + leadLock.acquire(); + leadLock.notifyAll(); + leadLock.release(); + + for(int i = 0; i < workerCount; i++){ + workers[i]->join(); + } + + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/cpp/src/qpid/sys/LFProcessor.h b/cpp/src/qpid/sys/LFProcessor.h new file mode 100644 index 0000000000..afbb9ea413 --- /dev/null +++ b/cpp/src/qpid/sys/LFProcessor.h @@ -0,0 +1,119 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include "apr-1/apr_poll.h" +#include <iostream> +#include <vector> +#include "qpid/sys/Monitor.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Runnable.h" + +namespace qpid { +namespace sys { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::sys::Runnable + { + typedef std::vector<LFSessionContext*>::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + bool hasLeader; + qpid::sys::Thread** const workers; + qpid::sys::Monitor leadLock; + qpid::sys::Monitor countLock; + qpid::sys::ThreadFactory factory; + std::vector<LFSessionContext*> sessions; + volatile bool stopped; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + /** + * Is processing stopped? + */ + bool isStopped(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/LFSessionContext.cpp b/cpp/src/qpid/sys/LFSessionContext.cpp new file mode 100644 index 0000000000..f2dff87fd0 --- /dev/null +++ b/cpp/src/qpid/sys/LFSessionContext.cpp @@ -0,0 +1,189 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/LFSessionContext.h" +#include "qpid/sys/APRBase.h" +#include "qpid/QpidError.h" +#include <assert.h> + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(32768), + out(32768), + processor(_processor), + processing(false), + closing(false), + reading(0), + writing(0) +{ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + assert(!reading); // No concurrent read. + reading = Thread::currentThread(); + + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); + + reading = 0; +} + +void LFSessionContext::write(){ + assert(!writing); // No concurrent writes. + writing = Thread::currentThread(); + + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + writing = 0; + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + writeLock.acquire(); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + writing = 0; + + if(closing){ + socket.close(); + } + } + writeLock.release(); + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + writeLock.acquire(); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } + writeLock.release(); +} + +void LFSessionContext::startProcessing(){ + writeLock.acquire(); + processing = true; + processor->deactivate(&fd); + writeLock.release(); +} + +void LFSessionContext::stopProcessing(){ + writeLock.acquire(); + processor->reactivate(&fd); + processing = false; + writeLock.release(); +} + +void LFSessionContext::close(){ + closing = true; + writeLock.acquire(); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } + writeLock.release(); +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + logLock.acquire(); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + logLock.release(); +} + +Monitor LFSessionContext::logLock; diff --git a/cpp/src/qpid/sys/LFSessionContext.h b/cpp/src/qpid/sys/LFSessionContext.h new file mode 100644 index 0000000000..92f52ccf83 --- /dev/null +++ b/cpp/src/qpid/sys/LFSessionContext.h @@ -0,0 +1,88 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/AMQFrame.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/APRSocket.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/LFProcessor.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandler.h" + +namespace qpid { +namespace sys { + + + class LFSessionContext : public virtual SessionContext + { + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue<qpid::framing::AMQFrame*> framesToWrite; + qpid::sys::Monitor writeLock; + + bool processing; + bool closing; + + //these are just for debug, as a crude way of detecting concurrent access + volatile unsigned int reading; + volatile unsigned int writing; + + static qpid::sys::Monitor logLock; + void log(const std::string& desc, qpid::framing::AMQFrame* const frame); + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void read(); + void write(); + void init(SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Monitor.cpp b/cpp/src/qpid/sys/Monitor.cpp new file mode 100644 index 0000000000..79a29c219e --- /dev/null +++ b/cpp/src/qpid/sys/Monitor.cpp @@ -0,0 +1,60 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/Monitor.h" +#include <iostream> + +qpid::sys::Monitor::Monitor(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); +} + +qpid::sys::Monitor::~Monitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + APRBase::decrement(); +} + +void qpid::sys::Monitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + + +void qpid::sys::Monitor::wait(u_int64_t time){ + apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void qpid::sys::Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void qpid::sys::Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + +void qpid::sys::Monitor::acquire(){ + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); +} + +void qpid::sys::Monitor::release(){ + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); +} diff --git a/cpp/src/qpid/sys/Monitor.h b/cpp/src/qpid/sys/Monitor.h new file mode 100644 index 0000000000..ddda613b87 --- /dev/null +++ b/cpp/src/qpid/sys/Monitor.h @@ -0,0 +1,56 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Monitor_ +#define _Monitor_ + +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace sys { + +class Monitor +{ + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + apr_thread_cond_t* condition; + + public: + Monitor(); + virtual ~Monitor(); + virtual void wait(); + virtual void wait(u_int64_t time); + virtual void notify(); + virtual void notifyAll(); + virtual void acquire(); + virtual void release(); +}; + +class Locker +{ + public: + Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } + ~Locker() { monitor.release(); } + private: + Monitor& monitor; +}; +}} + + +#endif diff --git a/cpp/src/qpid/sys/Runnable.cpp b/cpp/src/qpid/sys/Runnable.cpp new file mode 100644 index 0000000000..d7d9e968cc --- /dev/null +++ b/cpp/src/qpid/sys/Runnable.cpp @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/sys/Runnable.h" +qpid::sys::Runnable::~Runnable() {} diff --git a/cpp/src/qpid/sys/Runnable.h b/cpp/src/qpid/sys/Runnable.h new file mode 100644 index 0000000000..ce13eb2039 --- /dev/null +++ b/cpp/src/qpid/sys/Runnable.h @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Runnable_ +#define _Runnable_ + +namespace qpid { +namespace sys { + + class Runnable + { + public: + virtual ~Runnable(); + virtual void run() = 0; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/SessionContext.h b/cpp/src/qpid/sys/SessionContext.h new file mode 100644 index 0000000000..1362b4f2f2 --- /dev/null +++ b/cpp/src/qpid/sys/SessionContext.h @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionContext_ +#define _SessionContext_ + +#include "qpid/framing/OutputHandler.h" + +namespace qpid { +namespace sys { + +/** + * Provides the output handler associated with a connection. + */ +class SessionContext : public virtual qpid::framing::OutputHandler +{ + public: + virtual void close() = 0; +}; + +}} + + +#endif diff --git a/cpp/src/qpid/sys/SessionHandler.h b/cpp/src/qpid/sys/SessionHandler.h new file mode 100644 index 0000000000..130e69caf4 --- /dev/null +++ b/cpp/src/qpid/sys/SessionHandler.h @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandler_ +#define _SessionHandler_ + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/TimeoutHandler.h" + +namespace qpid { +namespace sys { + + class SessionHandler : + public qpid::framing::InitiationHandler, + public qpid::framing::InputHandler, + public TimeoutHandler + { + public: + virtual void closed() = 0; + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/SessionHandlerFactory.h b/cpp/src/qpid/sys/SessionHandlerFactory.h new file mode 100644 index 0000000000..da2f3e95eb --- /dev/null +++ b/cpp/src/qpid/sys/SessionHandlerFactory.h @@ -0,0 +1,41 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _SessionHandlerFactory_ +#define _SessionHandlerFactory_ + +namespace qpid { +namespace sys { + +class SessionContext; +class SessionHandler; + +/** + * Callback interface used by the Acceptor to + * create a SessionHandler for each new connection. + */ +class SessionHandlerFactory : private boost::noncopyable +{ + public: + virtual SessionHandler* create(SessionContext* ctxt) = 0; + virtual ~SessionHandlerFactory(){} +}; + +}} + + +#endif diff --git a/cpp/src/qpid/sys/ShutdownHandler.h b/cpp/src/qpid/sys/ShutdownHandler.h new file mode 100644 index 0000000000..4107f09f89 --- /dev/null +++ b/cpp/src/qpid/sys/ShutdownHandler.h @@ -0,0 +1,34 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ShutdownHandler_ +#define _ShutdownHandler_ + +namespace qpid { +namespace sys { + + class ShutdownHandler + { + public: + virtual void shutdown() = 0; + virtual ~ShutdownHandler(){} + }; + +} +} + +#endif diff --git a/cpp/src/qpid/sys/Thread.cpp b/cpp/src/qpid/sys/Thread.cpp new file mode 100644 index 0000000000..4fb9915993 --- /dev/null +++ b/cpp/src/qpid/sys/Thread.cpp @@ -0,0 +1,50 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/Thread.h" +#include "apr-1/apr_portable.h" + +using namespace qpid::sys; + +void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ + ((Runnable*) data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} + +Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} + +Thread::~Thread(){ +} + +void Thread::start(){ + CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); +} + +void Thread::join(){ + apr_status_t status; + if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); +} + +void Thread::interrupt(){ + if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); +} + +unsigned int qpid::sys::Thread::currentThread(){ + return apr_os_thread_current(); +} diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h new file mode 100644 index 0000000000..e86bd4a8d2 --- /dev/null +++ b/cpp/src/qpid/sys/Thread.h @@ -0,0 +1,48 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Thread_ +#define _Thread_ + +#include "apr-1/apr_thread_proc.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" + +namespace qpid { +namespace sys { + + class Thread + { + const Runnable* runnable; + apr_pool_t* pool; + apr_thread_t* runner; + + public: + Thread(apr_pool_t* pool, Runnable* runnable); + virtual ~Thread(); + virtual void start(); + virtual void join(); + virtual void interrupt(); + static unsigned int currentThread(); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/ThreadFactory.cpp b/cpp/src/qpid/sys/ThreadFactory.cpp new file mode 100644 index 0000000000..d33872b9a2 --- /dev/null +++ b/cpp/src/qpid/sys/ThreadFactory.cpp @@ -0,0 +1,35 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "qpid/sys/APRBase.h" +#include "qpid/sys/ThreadFactory.h" + +using namespace qpid::sys; + +ThreadFactory::ThreadFactory(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +ThreadFactory::~ThreadFactory(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +Thread* ThreadFactory::create(Runnable* runnable){ + return new Thread(pool, runnable); +} diff --git a/cpp/src/qpid/sys/ThreadFactory.h b/cpp/src/qpid/sys/ThreadFactory.h new file mode 100644 index 0000000000..9b7126272a --- /dev/null +++ b/cpp/src/qpid/sys/ThreadFactory.h @@ -0,0 +1,44 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _ThreadFactory_ +#define _ThreadFactory_ + +#include "apr-1/apr_thread_proc.h" + +#include "qpid/sys/Thread.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Runnable.h" + +namespace qpid { +namespace sys { + + class ThreadFactory + { + apr_pool_t* pool; + public: + ThreadFactory(); + virtual ~ThreadFactory(); + virtual Thread* create(Runnable* runnable); + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/Time.cpp b/cpp/src/qpid/sys/Time.cpp new file mode 100644 index 0000000000..c3512b8df3 --- /dev/null +++ b/cpp/src/qpid/sys/Time.cpp @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <qpid/sys/Time.h> +#include <apr-1/apr_time.h> + +namespace qpid { +namespace sys { + +Time Time::now() { + return Time(apr_time_now()*1000); +} + +}} diff --git a/cpp/src/qpid/sys/Time.h b/cpp/src/qpid/sys/Time.h new file mode 100644 index 0000000000..5c7cdfb005 --- /dev/null +++ b/cpp/src/qpid/sys/Time.h @@ -0,0 +1,52 @@ +#ifndef _concurrent_Time_h +#define _concurrent_Time_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <stdint.h> + +namespace qpid { +namespace sys { + +/** + * Time since the epoch. + */ +class Time +{ + public: + static const int64_t NANOS = 1000000000; + static const int64_t MICROS = 1000000; + static const int64_t MILLIS = 1000; + + static Time now(); + + Time(int64_t nsecs_) : ticks(nsecs_) {} + + int64_t nsecs() const { return ticks; } + int64_t usecs() const { return nsecs()/1000; } + int64_t msecs() const { return usecs()/1000; } + int64_t secs() const { return msecs()/1000; } + + private: + int64_t ticks; +}; + +}} + +#endif /*!_concurrent_Time_h*/ diff --git a/cpp/src/qpid/sys/TimeoutHandler.h b/cpp/src/qpid/sys/TimeoutHandler.h new file mode 100644 index 0000000000..845bd72a12 --- /dev/null +++ b/cpp/src/qpid/sys/TimeoutHandler.h @@ -0,0 +1,36 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _TimeoutHandler_ +#define _TimeoutHandler_ + +namespace qpid { +namespace sys { + + class TimeoutHandler + { + public: + virtual void idleOut() = 0; + virtual void idleIn() = 0; + virtual ~TimeoutHandler(){} + }; + +} +} + + +#endif diff --git a/cpp/src/qpid/sys/doxygen_summary.h b/cpp/src/qpid/sys/doxygen_summary.h new file mode 100644 index 0000000000..af89154fdf --- /dev/null +++ b/cpp/src/qpid/sys/doxygen_summary.h @@ -0,0 +1,34 @@ +#ifndef _doxygen_summary_ +#define _doxygen_summary_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// No code just a doxygen comment for the namespace + +/** \namspace qpid::sys + * IO classes used by client and broker. + * + * This namespace contains platform-neutral classes. Platform + * specific classes are in a sub-namespace named after the + * platform. At build time the appropriate platform classes are + * imported into this namespace so other code does not need to be awre + * of the difference. + * + */ +#endif /*!_doxygen_summary_*/ |