From f34ace35220726fa64f063a0fccc6eeaaa40af3c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Wed, 8 Nov 2006 17:07:44 +0000 Subject: More reorg to separate APR/posix code, work in progress. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@472545 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/apr/APRBase.cpp | 96 ++++++++++++ qpid/cpp/src/qpid/apr/APRBase.h | 63 ++++++++ qpid/cpp/src/qpid/apr/APRPool.cpp | 38 +++++ qpid/cpp/src/qpid/apr/APRPool.h | 47 ++++++ qpid/cpp/src/qpid/apr/APRSocket.cpp | 75 ++++++++++ qpid/cpp/src/qpid/apr/APRSocket.h | 45 ++++++ qpid/cpp/src/qpid/apr/Acceptor.cpp | 77 ++++++++++ qpid/cpp/src/qpid/apr/Acceptor.h | 55 +++++++ qpid/cpp/src/qpid/apr/Connector.cpp | 190 ++++++++++++++++++++++++ qpid/cpp/src/qpid/apr/Connector.h | 93 ++++++++++++ qpid/cpp/src/qpid/apr/LFProcessor.cpp | 176 ++++++++++++++++++++++ qpid/cpp/src/qpid/apr/LFProcessor.h | 118 +++++++++++++++ qpid/cpp/src/qpid/apr/LFSessionContext.cpp | 170 ++++++++++++++++++++++ qpid/cpp/src/qpid/apr/LFSessionContext.h | 87 +++++++++++ qpid/cpp/src/qpid/apr/Monitor.cpp | 64 ++++++++ qpid/cpp/src/qpid/apr/Monitor.h | 75 ++++++++++ qpid/cpp/src/qpid/apr/Thread.cpp | 55 +++++++ qpid/cpp/src/qpid/apr/Thread.h | 45 ++++++ qpid/cpp/src/qpid/apr/Time.cpp | 39 +++++ qpid/cpp/src/qpid/broker/AutoDelete.cpp | 39 ++--- qpid/cpp/src/qpid/broker/AutoDelete.h | 13 +- qpid/cpp/src/qpid/broker/Channel.cpp | 10 +- qpid/cpp/src/qpid/broker/Channel.h | 2 +- qpid/cpp/src/qpid/broker/DirectExchange.cpp | 10 +- qpid/cpp/src/qpid/broker/DirectExchange.h | 2 +- qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp | 6 +- qpid/cpp/src/qpid/broker/ExchangeRegistry.h | 2 +- qpid/cpp/src/qpid/broker/FanOutExchange.cpp | 6 +- qpid/cpp/src/qpid/broker/FanOutExchange.h | 2 +- qpid/cpp/src/qpid/broker/HeadersExchange.cpp | 6 +- qpid/cpp/src/qpid/broker/HeadersExchange.h | 2 +- qpid/cpp/src/qpid/broker/Message.cpp | 2 - qpid/cpp/src/qpid/broker/Prefetch.cpp | 26 ---- qpid/cpp/src/qpid/broker/Prefetch.h | 2 +- qpid/cpp/src/qpid/broker/Queue.cpp | 26 ++-- qpid/cpp/src/qpid/broker/Queue.h | 3 +- qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 7 +- qpid/cpp/src/qpid/broker/QueueRegistry.h | 2 +- qpid/cpp/src/qpid/broker/TopicExchange.cpp | 11 +- qpid/cpp/src/qpid/broker/TopicExchange.h | 2 +- qpid/cpp/src/qpid/client/Channel.cpp | 51 ++----- qpid/cpp/src/qpid/client/Channel.h | 10 +- qpid/cpp/src/qpid/client/ResponseHandler.cpp | 25 ++-- qpid/cpp/src/qpid/client/ResponseHandler.h | 4 +- qpid/cpp/src/qpid/sys/APRBase.cpp | 96 ------------ qpid/cpp/src/qpid/sys/APRBase.h | 63 -------- qpid/cpp/src/qpid/sys/APRPool.cpp | 39 ----- qpid/cpp/src/qpid/sys/APRPool.h | 47 ------ qpid/cpp/src/qpid/sys/APRSocket.cpp | 76 ---------- qpid/cpp/src/qpid/sys/APRSocket.h | 45 ------ qpid/cpp/src/qpid/sys/Acceptor.cpp | 78 ---------- qpid/cpp/src/qpid/sys/Acceptor.h | 58 -------- qpid/cpp/src/qpid/sys/Connector.cpp | 201 -------------------------- qpid/cpp/src/qpid/sys/Connector.h | 95 ------------ qpid/cpp/src/qpid/sys/LFProcessor.cpp | 193 ------------------------- qpid/cpp/src/qpid/sys/LFProcessor.h | 119 --------------- qpid/cpp/src/qpid/sys/LFSessionContext.cpp | 189 ------------------------ qpid/cpp/src/qpid/sys/LFSessionContext.h | 88 ----------- qpid/cpp/src/qpid/sys/Monitor.cpp | 60 -------- qpid/cpp/src/qpid/sys/Monitor.h | 56 ------- qpid/cpp/src/qpid/sys/Runnable.cpp | 19 --- qpid/cpp/src/qpid/sys/Runnable.h | 16 +- qpid/cpp/src/qpid/sys/Thread.cpp | 50 ------- qpid/cpp/src/qpid/sys/Thread.h | 33 +---- qpid/cpp/src/qpid/sys/ThreadFactory.cpp | 35 ----- qpid/cpp/src/qpid/sys/ThreadFactory.h | 44 ------ qpid/cpp/src/qpid/sys/Time.cpp | 29 ---- qpid/cpp/src/qpid/sys/Time.h | 26 +--- qpid/cpp/src/qpid/sys/platform.h | 29 ++++ qpid/cpp/src/qpidd.cpp | 5 +- 70 files changed, 1759 insertions(+), 1909 deletions(-) create mode 100644 qpid/cpp/src/qpid/apr/APRBase.cpp create mode 100644 qpid/cpp/src/qpid/apr/APRBase.h create mode 100644 qpid/cpp/src/qpid/apr/APRPool.cpp create mode 100644 qpid/cpp/src/qpid/apr/APRPool.h create mode 100644 qpid/cpp/src/qpid/apr/APRSocket.cpp create mode 100644 qpid/cpp/src/qpid/apr/APRSocket.h create mode 100644 qpid/cpp/src/qpid/apr/Acceptor.cpp create mode 100644 qpid/cpp/src/qpid/apr/Acceptor.h create mode 100644 qpid/cpp/src/qpid/apr/Connector.cpp create mode 100644 qpid/cpp/src/qpid/apr/Connector.h create mode 100644 qpid/cpp/src/qpid/apr/LFProcessor.cpp create mode 100644 qpid/cpp/src/qpid/apr/LFProcessor.h create mode 100644 qpid/cpp/src/qpid/apr/LFSessionContext.cpp create mode 100644 qpid/cpp/src/qpid/apr/LFSessionContext.h create mode 100644 qpid/cpp/src/qpid/apr/Monitor.cpp create mode 100644 qpid/cpp/src/qpid/apr/Monitor.h create mode 100644 qpid/cpp/src/qpid/apr/Thread.cpp create mode 100644 qpid/cpp/src/qpid/apr/Thread.h create mode 100644 qpid/cpp/src/qpid/apr/Time.cpp delete mode 100644 qpid/cpp/src/qpid/broker/Prefetch.cpp delete mode 100644 qpid/cpp/src/qpid/sys/APRBase.cpp delete mode 100644 qpid/cpp/src/qpid/sys/APRBase.h delete mode 100644 qpid/cpp/src/qpid/sys/APRPool.cpp delete mode 100644 qpid/cpp/src/qpid/sys/APRPool.h delete mode 100644 qpid/cpp/src/qpid/sys/APRSocket.cpp delete mode 100644 qpid/cpp/src/qpid/sys/APRSocket.h delete mode 100644 qpid/cpp/src/qpid/sys/Acceptor.cpp delete mode 100644 qpid/cpp/src/qpid/sys/Acceptor.h delete mode 100644 qpid/cpp/src/qpid/sys/Connector.cpp delete mode 100644 qpid/cpp/src/qpid/sys/Connector.h delete mode 100644 qpid/cpp/src/qpid/sys/LFProcessor.cpp delete mode 100644 qpid/cpp/src/qpid/sys/LFProcessor.h delete mode 100644 qpid/cpp/src/qpid/sys/LFSessionContext.cpp delete mode 100644 qpid/cpp/src/qpid/sys/LFSessionContext.h delete mode 100644 qpid/cpp/src/qpid/sys/Monitor.cpp delete mode 100644 qpid/cpp/src/qpid/sys/Monitor.h delete mode 100644 qpid/cpp/src/qpid/sys/Runnable.cpp delete mode 100644 qpid/cpp/src/qpid/sys/Thread.cpp delete mode 100644 qpid/cpp/src/qpid/sys/ThreadFactory.cpp delete mode 100644 qpid/cpp/src/qpid/sys/ThreadFactory.h delete mode 100644 qpid/cpp/src/qpid/sys/Time.cpp create mode 100644 qpid/cpp/src/qpid/sys/platform.h (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/apr/APRBase.cpp b/qpid/cpp/src/qpid/apr/APRBase.cpp new file mode 100644 index 0000000000..f629a5381d --- /dev/null +++ b/qpid/cpp/src/qpid/apr/APRBase.cpp @@ -0,0 +1,96 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include +#include "APRBase.h" + +using namespace qpid::sys; + +APRBase* APRBase::instance = 0; + +APRBase* APRBase::getInstance(){ + if(instance == 0){ + instance = new APRBase(); + } + return instance; +} + + +APRBase::APRBase() : count(0){ + apr_initialize(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); +} + +APRBase::~APRBase(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + apr_pool_destroy(pool); + apr_terminate(); +} + +bool APRBase::_increment(){ + bool deleted(false); + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(this == instance){ + count++; + }else{ + deleted = true; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + return !deleted; +} + +void APRBase::_decrement(){ + APRBase* copy = 0; + CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); + if(--count == 0){ + copy = instance; + instance = 0; + } + CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); + if(copy != 0){ + delete copy; + } +} + +void APRBase::increment(){ + int count = 0; + while(count++ < 2 && !getInstance()->_increment()){ + std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; + } +} + +void APRBase::decrement(){ + getInstance()->_decrement(); +} + +void qpid::sys::check(apr_status_t status, const std::string& file, const int line){ + if (status != APR_SUCCESS){ + const int size = 50; + char tmp[size]; + std::string msg(apr_strerror(status, tmp, size)); + throw QpidError(APR_ERROR + ((int) status), msg, file, line); + } +} + +std::string qpid::sys::get_desc(apr_status_t status){ + const int size = 50; + char tmp[size]; + return std::string(apr_strerror(status, tmp, size)); +} + diff --git a/qpid/cpp/src/qpid/apr/APRBase.h b/qpid/cpp/src/qpid/apr/APRBase.h new file mode 100644 index 0000000000..b84e9860df --- /dev/null +++ b/qpid/cpp/src/qpid/apr/APRBase.h @@ -0,0 +1,63 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRBase_ +#define _APRBase_ + +#include +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_errno.h" + +namespace qpid { +namespace sys { + + /** + * Use of APR libraries necessitates explicit init and terminate + * calls. Any class using APR libs should obtain the reference to + * this singleton and increment on construction, decrement on + * destruction. This class can then correctly initialise apr + * before the first use and terminate after the last use. + */ + class APRBase{ + static APRBase* instance; + apr_pool_t* pool; + apr_thread_mutex_t* mutex; + int count; + + APRBase(); + ~APRBase(); + static APRBase* getInstance(); + bool _increment(); + void _decrement(); + public: + static void increment(); + static void decrement(); + }; + + //this is also a convenient place for a helper function for error checking: + void check(apr_status_t status, const std::string& file, const int line); + std::string get_desc(apr_status_t status); + +#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__); + +} +} + + + + +#endif diff --git a/qpid/cpp/src/qpid/apr/APRPool.cpp b/qpid/cpp/src/qpid/apr/APRPool.cpp new file mode 100644 index 0000000000..e465a6b40d --- /dev/null +++ b/qpid/cpp/src/qpid/apr/APRPool.cpp @@ -0,0 +1,38 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "APRPool.h" +#include "APRBase.h" +#include + +using namespace qpid::sys; + +APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} + +apr_pool_t* APRPool::get() { + return boost::details::pool::singleton_default::instance().pool; +} + diff --git a/qpid/cpp/src/qpid/apr/APRPool.h b/qpid/cpp/src/qpid/apr/APRPool.h new file mode 100644 index 0000000000..2196cd64e7 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/APRPool.h @@ -0,0 +1,47 @@ +#ifndef _APRPool_ +#define _APRPool_ + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include + +namespace qpid { +namespace sys { +/** + * Singleton APR memory pool. + */ +class APRPool : private boost::noncopyable { + public: + APRPool(); + ~APRPool(); + + /** Get singleton instance */ + static apr_pool_t* get(); + + private: + apr_pool_t* pool; +}; + +}} + + + + + +#endif /*!_APRPool_*/ diff --git a/qpid/cpp/src/qpid/apr/APRSocket.cpp b/qpid/cpp/src/qpid/apr/APRSocket.cpp new file mode 100644 index 0000000000..0c5a29c216 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/APRSocket.cpp @@ -0,0 +1,75 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRBase.h" +#include "APRSocket.h" +#include +#include + +using namespace qpid::sys; +using namespace qpid::framing; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + std::cout << "Closing socket " << socket << "@" << this << std::endl; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/qpid/cpp/src/qpid/apr/APRSocket.h b/qpid/cpp/src/qpid/apr/APRSocket.h new file mode 100644 index 0000000000..f7e7ad107b --- /dev/null +++ b/qpid/cpp/src/qpid/apr/APRSocket.h @@ -0,0 +1,45 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _APRSocket_ +#define _APRSocket_ + +#include "apr-1/apr_network_io.h" +#include "qpid/framing/Buffer.h" + +namespace qpid { +namespace sys { + + class APRSocket + { + apr_socket_t* const socket; + volatile bool closed; + public: + APRSocket(apr_socket_t* socket); + void read(qpid::framing::Buffer& b); + void write(qpid::framing::Buffer& b); + void close(); + bool isOpen(); + u_int8_t read(); + ~APRSocket(); + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/apr/Acceptor.cpp b/qpid/cpp/src/qpid/apr/Acceptor.cpp new file mode 100644 index 0000000000..cbeea9902b --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Acceptor.cpp @@ -0,0 +1,77 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Acceptor.h" +#include "APRBase.h" +#include "APRPool.h" + +using namespace qpid::sys; + +Acceptor::Acceptor(int16_t port_, int backlog, int threads) : + port(port_), + processor(APRPool::get(), threads, 1000, 5000000) +{ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); +} + +int16_t Acceptor::getPort() const { + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); + return address->port; +} + +void Acceptor::run(SessionHandlerFactory* factory) { + running = true; + processor.start(); + std::cout << "Listening on port " << getPort() << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + shutdown(); +} + +void Acceptor::shutdown() { + // TODO aconway 2006-10-12: Cleanup, this is not thread safe. + if (running) { + running = false; + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + } +} + + diff --git a/qpid/cpp/src/qpid/apr/Acceptor.h b/qpid/cpp/src/qpid/apr/Acceptor.h new file mode 100644 index 0000000000..1813b391c1 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Acceptor.h @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFAcceptor_ +#define _LFAcceptor_ + +#include "LFProcessor.h" +#include "LFSessionContext.h" +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" +#include "Monitor.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandlerFactory.h" +#include + +namespace qpid { +namespace sys { + +/** APR Acceptor. */ +class Acceptor : public qpid::SharedObject +{ + public: + Acceptor(int16_t port, int backlog, int threads); + virtual int16_t getPort() const; + virtual void run(qpid::sys::SessionHandlerFactory* factory); + virtual void shutdown(); + + private: + int16_t port; + LFProcessor processor; + apr_socket_t* socket; + volatile bool running; +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/apr/Connector.cpp b/qpid/cpp/src/qpid/apr/Connector.cpp new file mode 100644 index 0000000000..4446731654 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Connector.cpp @@ -0,0 +1,190 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include +#include "APRBase.h" +#include "Connector.h" + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; +using qpid::QpidError; + +Connector::Connector(bool _debug, u_int32_t buffer_size) : + debug(_debug), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + closed(true), + lastIn(0), lastOut(0), + timeout(0), + idleIn(0), idleOut(0), + timeoutHandler(0), + shutdownHandler(0), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ + + APRBase::increment(); + + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); +} + +Connector::~Connector(){ + apr_pool_destroy(pool); + + APRBase::decrement(); +} + +void Connector::connect(const std::string& host, int port){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + closed = false; + receiver = Thread(this); +} + +void Connector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void Connector::close(){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + receiver.join(); +} + +void Connector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void Connector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* Connector::getOutputHandler(){ + return this; +} + +void Connector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void Connector::writeBlock(AMQDataBlock* data){ + Mutex::ScopedLock l(writeLock); + data->encode(outbuf); + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); +} + +void Connector::writeToSocket(char* data, size_t available){ + apr_size_t bytes(available); + apr_size_t written(0); + while(written < available && !closed){ + apr_status_t status = apr_socket_send(socket, data + written, &bytes); + if(status == APR_TIMEUP){ + std::cout << "Write request timed out." << std::endl; + } + if(bytes == 0){ + std::cout << "Write request wrote 0 bytes." << std::endl; + } + lastOut = apr_time_as_msec(apr_time_now()); + written += bytes; + bytes = available - written; + } +} + +void Connector::checkIdle(apr_status_t status){ + if(timeoutHandler){ + int64_t now = apr_time_as_msec(apr_time_now()); + if(APR_STATUS_IS_TIMEUP(status)){ + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(APR_STATUS_IS_EOF(status)){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void Connector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void Connector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void Connector::setSocketTimeout(){ + //interval is in microseconds, timeout in milliseconds + //want the interval to be a bit shorter than the timeout, hence multiply + //by 800 rather than 1000. + apr_interval_time_t interval(timeout * 800); + apr_socket_timeout_set(socket, interval); +} + +void Connector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void Connector::run(){ + try{ + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + + if(bytes > 0){ + inbuf.move(bytes); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/qpid/cpp/src/qpid/apr/Connector.h b/qpid/cpp/src/qpid/apr/Connector.h new file mode 100644 index 0000000000..e69a7205f3 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Connector.h @@ -0,0 +1,93 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Connector_ +#define _Connector_ + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/InputHandler.h" +#include "qpid/framing/OutputHandler.h" +#include "qpid/framing/InitiationHandler.h" +#include "qpid/framing/ProtocolInitiation.h" +#include "qpid/sys/ShutdownHandler.h" +#include "qpid/sys/TimeoutHandler.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Connector.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace sys { + + class Connector : public qpid::framing::OutputHandler, + private qpid::sys::Runnable + { + const bool debug; + const int receive_buffer_size; + const int send_buffer_size; + + bool closed; + + int64_t lastIn; + int64_t lastOut; + int64_t timeout; + u_int32_t idleIn; + u_int32_t idleOut; + + TimeoutHandler* timeoutHandler; + ShutdownHandler* shutdownHandler; + qpid::framing::InputHandler* input; + qpid::framing::InitiationHandler* initialiser; + qpid::framing::OutputHandler* output; + + qpid::framing::Buffer inbuf; + qpid::framing::Buffer outbuf; + + qpid::sys::Mutex writeLock; + qpid::sys::Thread receiver; + + apr_pool_t* pool; + apr_socket_t* socket; + + void checkIdle(apr_status_t status); + void writeBlock(qpid::framing::AMQDataBlock* data); + void writeToSocket(char* data, size_t available); + void setSocketTimeout(); + + void run(); + + public: + Connector(bool debug = false, u_int32_t buffer_size = 1024); + virtual ~Connector(); + virtual void connect(const std::string& host, int port); + virtual void init(qpid::framing::ProtocolInitiation* header); + virtual void close(); + virtual void setInputHandler(qpid::framing::InputHandler* handler); + virtual void setTimeoutHandler(TimeoutHandler* handler); + virtual void setShutdownHandler(ShutdownHandler* handler); + virtual qpid::framing::OutputHandler* getOutputHandler(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void setReadTimeout(u_int16_t timeout); + virtual void setWriteTimeout(u_int16_t timeout); + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/apr/LFProcessor.cpp b/qpid/cpp/src/qpid/apr/LFProcessor.cpp new file mode 100644 index 0000000000..f4d4258f6f --- /dev/null +++ b/qpid/cpp/src/qpid/apr/LFProcessor.cpp @@ -0,0 +1,176 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include +#include +#include "LFProcessor.h" +#include "APRBase.h" +#include "LFSessionContext.h" + +using namespace qpid::sys; +using qpid::QpidError; + +// TODO aconway 2006-10-12: stopped is read outside locks. +// + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : + size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + workerCount(_workers), + hasLeader(false), + workers(new Thread[_workers]), + stopped(false) +{ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); +} + + +LFProcessor::~LFProcessor(){ + if (!stopped) stop(); + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i] = Thread(this); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + Monitor::ScopedLock l(countLock); + sessions.push_back(reinterpret_cast(fd->client_data)); + count++; +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + Monitor::ScopedLock l(countLock); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast(fd->client_data))); + count--; +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + Mutex::ScopedLock locker(countLock); + return count == size; +} + +bool LFProcessor::empty(){ + Mutex::ScopedLock locker(countLock); + return count == 0; +} + +void LFProcessor::poll() { + apr_status_t status = APR_EGENERAL; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + const apr_pollfd_t* event = 0; + LFSessionContext* session = 0; + { + Monitor::ScopedLock l(leadLock); + waitToLead(); + event = getNextEvent(); + if(!event) return; + session = reinterpret_cast( + event->client_data); + session->startProcessing(); + relinquishLead(); + } + + //process event: + if(event->rtnevents & APR_POLLIN) session->read(); + if(event->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + Monitor::ScopedLock l(countLock); + sessions.erase(find(sessions.begin(),sessions.end(), session)); + count--; + }else{ + session->stopProcessing(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + { + Monitor::ScopedLock l(leadLock); + leadLock.notifyAll(); + } + for(int i = 0; i < workerCount; i++){ + workers[i].join(); + } + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/qpid/cpp/src/qpid/apr/LFProcessor.h b/qpid/cpp/src/qpid/apr/LFProcessor.h new file mode 100644 index 0000000000..dd85ad9e84 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/LFProcessor.h @@ -0,0 +1,118 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFProcessor_ +#define _LFProcessor_ + +#include "apr-1/apr_poll.h" +#include +#include +#include +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" + +namespace qpid { +namespace sys { + + class LFSessionContext; + + /** + * This class processes a poll set using the leaders-followers + * pattern for thread synchronization: the leader will poll and on + * the poll returning, it will remove a session, promote a + * follower to leadership, then process the session. + */ + class LFProcessor : private virtual qpid::sys::Runnable + { + typedef std::vector::iterator iterator; + + const int size; + const apr_interval_time_t timeout; + apr_pollset_t* pollset; + int signalledCount; + int current; + const apr_pollfd_t* signalledFDs; + int count; + const int workerCount; + bool hasLeader; + qpid::sys::Thread* workers; + qpid::sys::Monitor leadLock; + qpid::sys::Mutex countLock; + std::vector sessions; + volatile bool stopped; + + const apr_pollfd_t* getNextEvent(); + void waitToLead(); + void relinquishLead(); + void poll(); + virtual void run(); + + public: + LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); + /** + * Add the fd to the poll set. Relies on the client_data being + * an instance of LFSessionContext. + */ + void add(const apr_pollfd_t* const fd); + /** + * Remove the fd from the poll set. + */ + void remove(const apr_pollfd_t* const fd); + /** + * Signal that the fd passed in, already part of the pollset, + * has had its flags altered. + */ + void update(const apr_pollfd_t* const fd); + /** + * Add an fd back to the poll set after deactivation. + */ + void reactivate(const apr_pollfd_t* const fd); + /** + * Temporarily remove the fd from the poll set. Called when processing + * is about to begin. + */ + void deactivate(const apr_pollfd_t* const fd); + /** + * Indicates whether the capacity of this processor has been + * reached (or whether it can still handle further fd's). + */ + bool full(); + /** + * Indicates whether there are any fd's registered. + */ + bool empty(); + /** + * Stop processing. + */ + void stop(); + /** + * Start processing. + */ + void start(); + /** + * Is processing stopped? + */ + bool isStopped(); + + ~LFProcessor(); + }; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/apr/LFSessionContext.cpp b/qpid/cpp/src/qpid/apr/LFSessionContext.cpp new file mode 100644 index 0000000000..4a704013a8 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/LFSessionContext.cpp @@ -0,0 +1,170 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include "qpid/QpidError.h" +#include + +using namespace qpid::sys; +using namespace qpid::sys; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : + debug(_debug), + socket(_socket), + initiated(false), + in(32768), + out(32768), + processor(_processor), + processing(false), + closing(false) +{ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation protocolInit; + if(protocolInit.decode(in)){ + handler->initiated(&protocolInit); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); +} + +void LFSessionContext::write(){ + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + Mutex::ScopedLock l(writeLock); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + if(closing){ + socket.close(); + } + } + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + Mutex::ScopedLock l(writeLock); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } +} + +void LFSessionContext::startProcessing(){ + Mutex::ScopedLock l(writeLock); + processing = true; + processor->deactivate(&fd); +} + +void LFSessionContext::stopProcessing(){ + Mutex::ScopedLock l(writeLock); + processor->reactivate(&fd); + processing = false; +} + +void LFSessionContext::close(){ + closing = true; + Mutex::ScopedLock l(writeLock); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* _handler){ + handler = _handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + Mutex::ScopedLock l(logLock); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; +} + +Mutex LFSessionContext::logLock; diff --git a/qpid/cpp/src/qpid/apr/LFSessionContext.h b/qpid/cpp/src/qpid/apr/LFSessionContext.h new file mode 100644 index 0000000000..9b3104b085 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/LFSessionContext.h @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandler.h" + +#include "APRSocket.h" +#include "LFProcessor.h" + +namespace qpid { +namespace sys { + + +class LFSessionContext : public virtual qpid::sys::SessionContext +{ + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + qpid::sys::SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue framesToWrite; + qpid::sys::Mutex writeLock; + + bool processing; + bool closing; + + static qpid::sys::Mutex logLock; + void log(const std::string& desc, + qpid::framing::AMQFrame* const frame); + + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + virtual ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void read(); + void write(); + void init(qpid::sys::SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/apr/Monitor.cpp b/qpid/cpp/src/qpid/apr/Monitor.cpp new file mode 100644 index 0000000000..69fb2f6ffd --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Monitor.cpp @@ -0,0 +1,64 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "Monitor.h" +#include "APRPool.h" + +using namespace qpid::sys; + +Mutex::Mutex() +{ + APRBase::increment(); + // TODO aconway 2006-11-08: Switch to non-nested. + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); +} + +Mutex::~Mutex(){ + CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); + APRBase::decrement(); +} + +Monitor::Monitor() +{ + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); +} + +Monitor::~Monitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + + + +void Monitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +void Monitor::wait(int64_t nsecs){ + // APR uses microseconds. + apr_status_t status = apr_thread_cond_timedwait( + condition, mutex, nsecs/1000); + if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); +} + +void Monitor::notify(){ + CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); +} + +void Monitor::notifyAll(){ + CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); +} + diff --git a/qpid/cpp/src/qpid/apr/Monitor.h b/qpid/cpp/src/qpid/apr/Monitor.h new file mode 100644 index 0000000000..a51baf8d94 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Monitor.h @@ -0,0 +1,75 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _Monitor_ +#define _Monitor_ + +#include +#include +#include "apr-1/apr_thread_mutex.h" +#include "apr-1/apr_thread_cond.h" +#include "APRBase.h" + +namespace qpid { +namespace sys { + +template +class ScopedLock +{ + public: + ScopedLock(L& l) : mutex(l) { l.lock(); } + ~ScopedLock() { mutex.unlock(); } + private: + L& mutex; +}; + + +class Mutex : private boost::noncopyable +{ + public: + typedef ScopedLock ScopedLock; + + Mutex(); + ~Mutex(); + void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); } + void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); } + void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); } + + protected: + apr_thread_mutex_t* mutex; +}; + +/** A condition variable and a mutex */ +class Monitor : public Mutex +{ + public: + Monitor(); + ~Monitor(); + void wait(); + void wait(int64_t nsecs); + void notify(); + void notifyAll(); + + private: + apr_thread_cond_t* condition; +}; + + +}} + + +#endif diff --git a/qpid/cpp/src/qpid/apr/Thread.cpp b/qpid/cpp/src/qpid/apr/Thread.cpp new file mode 100644 index 0000000000..6d5cadb009 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Thread.cpp @@ -0,0 +1,55 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Thread.h" +#include "APRPool.h" +#include "APRBase.h" +#include + +using namespace qpid::sys; +using qpid::sys::Runnable; + +namespace { +void* APR_THREAD_FUNC run(apr_thread_t* thread, void *data) { + reinterpret_cast(data)->run(); + CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); + return NULL; +} +} + +Thread::Thread() : thread(0) {} + +Thread::Thread(Runnable* runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, NULL, run, runnable, APRPool::get())); +} + +void Thread::join(){ + apr_status_t status; + if (thread != 0) + CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); +} + +Thread::Thread(apr_thread_t* t) : thread(t) {} + +Thread Thread::current(){ + apr_thread_t* thr; + apr_os_thread_t osthr = apr_os_thread_current(); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); + return Thread(thr); +} diff --git a/qpid/cpp/src/qpid/apr/Thread.h b/qpid/cpp/src/qpid/apr/Thread.h new file mode 100644 index 0000000000..0c717dea70 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Thread.h @@ -0,0 +1,45 @@ +#ifndef _apr_Thread_h +#define _apr_Thread_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include + +namespace qpid { +namespace sys { + +class Thread +{ + + public: + Thread(); + explicit Thread(qpid::sys::Runnable*); + void join(); + static Thread current(); + + private: + Thread(apr_thread_t*); + + apr_thread_t* thread; +}; + +}} + +#endif /*!_apr_Thread_h*/ diff --git a/qpid/cpp/src/qpid/apr/Time.cpp b/qpid/cpp/src/qpid/apr/Time.cpp new file mode 100644 index 0000000000..8b5590481d --- /dev/null +++ b/qpid/cpp/src/qpid/apr/Time.cpp @@ -0,0 +1,39 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include "apr-1/apr_time.h" + +namespace qpid { +namespace sys { + +int64_t getTimeNsecs() +{ + // APR returns microseconds. + return apr_time_now() * 1000; +} + +int64_t getTimeMsecs() +{ + // APR returns microseconds. + return apr_time_now() / 1000; +} + + +}} + diff --git a/qpid/cpp/src/qpid/broker/AutoDelete.cpp b/qpid/cpp/src/qpid/broker/AutoDelete.cpp index 434bd4a3a0..d96105ba7d 100644 --- a/qpid/cpp/src/qpid/broker/AutoDelete.cpp +++ b/qpid/cpp/src/qpid/broker/AutoDelete.cpp @@ -18,26 +18,23 @@ #include "qpid/broker/AutoDelete.h" using namespace qpid::broker; +using namespace qpid::sys; -AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry), - period(_period), - stopped(true), - runner(0){} +AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) + : registry(_registry), period(_period), stopped(true) { } void AutoDelete::add(Queue::shared_ptr const queue){ - lock.acquire(); + Mutex::ScopedLock l(lock); queues.push(queue); - lock.release(); } Queue::shared_ptr const AutoDelete::pop(){ Queue::shared_ptr next; - lock.acquire(); + Mutex::ScopedLock l(lock); if(!queues.empty()){ next = queues.front(); queues.pop(); } - lock.release(); return next; } @@ -59,35 +56,27 @@ void AutoDelete::process(){ } void AutoDelete::run(){ - monitor.acquire(); + Monitor::ScopedLock l(monitor); while(!stopped){ process(); - monitor.wait(period); + monitor.wait(msecsToNsecs(period)); } - monitor.release(); } void AutoDelete::start(){ - monitor.acquire(); + Monitor::ScopedLock l(monitor); if(stopped){ - runner = factory.create(this); stopped = false; - monitor.release(); - runner->start(); - }else{ - monitor.release(); + runner = Thread(this); } } void AutoDelete::stop(){ - monitor.acquire(); - if(!stopped){ + { + Monitor::ScopedLock l(monitor); + if(stopped) return; stopped = true; - monitor.notify(); - monitor.release(); - runner->join(); - delete runner; - }else{ - monitor.release(); } + monitor.notify(); + runner.join(); } diff --git a/qpid/cpp/src/qpid/broker/AutoDelete.h b/qpid/cpp/src/qpid/broker/AutoDelete.h index f3347e6cc5..e0efe7b399 100644 --- a/qpid/cpp/src/qpid/broker/AutoDelete.h +++ b/qpid/cpp/src/qpid/broker/AutoDelete.h @@ -20,22 +20,21 @@ #include #include -#include "qpid/sys/Monitor.h" +#include #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Thread.h" namespace qpid { namespace broker{ - class AutoDelete : private virtual qpid::sys::Runnable{ - qpid::sys::ThreadFactory factory; - qpid::sys::Monitor lock; + class AutoDelete : private qpid::sys::Runnable { + qpid::sys::Mutex lock; qpid::sys::Monitor monitor; std::queue queues; QueueRegistry* const registry; - const u_int32_t period; + u_int32_t period; volatile bool stopped; - qpid::sys::Thread* runner; + qpid::sys::Thread runner; Queue::shared_ptr const pop(); void process(); diff --git a/qpid/cpp/src/qpid/broker/Channel.cpp b/qpid/cpp/src/qpid/broker/Channel.cpp index 967c5855fa..947a97ae7c 100644 --- a/qpid/cpp/src/qpid/broker/Channel.cpp +++ b/qpid/cpp/src/qpid/broker/Channel.cpp @@ -105,7 +105,7 @@ void Channel::rollback(){ } void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ - Locker locker(deliveryLock); + Mutex::ScopedLock locker(deliveryLock); u_int64_t deliveryTag = currentDeliveryTag++; if(ackExpected){ @@ -118,7 +118,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar } bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Locker locker(deliveryLock); + Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; @@ -191,7 +191,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers }else{ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); if(i == unacked.end()){ @@ -219,7 +219,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ } void Channel::recover(bool requeue){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ outstanding.reset(); @@ -234,7 +234,7 @@ void Channel::recover(bool requeue){ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ - Locker locker(deliveryLock); + Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); if(ackExpected){ diff --git a/qpid/cpp/src/qpid/broker/Channel.h b/qpid/cpp/src/qpid/broker/Channel.h index 56f0e6b4af..24dbf728ba 100644 --- a/qpid/cpp/src/qpid/broker/Channel.h +++ b/qpid/cpp/src/qpid/broker/Channel.h @@ -77,7 +77,7 @@ namespace qpid { u_int32_t framesize; NameGenerator tagGenerator; std::list unacked; - qpid::sys::Monitor deliveryLock; + qpid::sys::Mutex deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; TransactionalStore* store; diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 46693f6f3c..3f9d23cdc7 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -21,24 +21,24 @@ using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { } void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); + Mutex::ScopedLock l(lock); std::vector& queues(bindings[routingKey]); std::vector::iterator i = find(queues.begin(), queues.end(), queue); if(i == queues.end()){ bindings[routingKey].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); } - lock.release(); } void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Mutex::ScopedLock l(lock); std::vector& queues(bindings[routingKey]); std::vector::iterator i = find(queues.begin(), queues.end(), queue); @@ -48,11 +48,10 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F bindings.erase(routingKey); } } - lock.release(); } void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Mutex::ScopedLock l(lock); std::vector& queues(bindings[routingKey]); int count(0); for(std::vector::iterator i = queues.begin(); i != queues.end(); i++, count++){ @@ -61,7 +60,6 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTabl if(!count){ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; } - lock.release(); } DirectExchange::~DirectExchange(){ diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index a452fe3b4b..0ee9ce2705 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -30,7 +30,7 @@ namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ std::map > bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 6f6c759aa2..1c3a4af026 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -26,7 +26,7 @@ using namespace qpid::sys; using std::pair; pair ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) { Exchange::shared_ptr exchange; @@ -50,12 +50,12 @@ pair ExchangeRegistry::declare(const string& name, c } void ExchangeRegistry::destroy(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); exchanges.erase(name); } Exchange::shared_ptr ExchangeRegistry::get(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); return exchanges[name]; } diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h index 33deb743f4..5d4cf10de8 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h @@ -29,7 +29,7 @@ namespace broker { class ExchangeRegistry{ typedef std::map ExchangeMap; ExchangeMap exchanges; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: std::pair declare(const string& name, const string& type) throw(UnknownExchangeTypeException); void destroy(const string& name); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 8f5143c8c0..2f8d4eadb2 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -26,7 +26,7 @@ using namespace qpid::sys; FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); // Add if not already present. Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i == bindings.end()) { @@ -36,7 +36,7 @@ void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie } void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i != bindings.end()) { bindings.erase(i); @@ -45,7 +45,7 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ msg.deliverTo(*i); } diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index 53b5c39789..910acdc203 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -31,7 +31,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { std::vector bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 5d5cf2392c..0c4c290bbd 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -41,7 +41,7 @@ namespace { HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); std::string what = args->getString("x-match"); if (what != all && what != any) { THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); @@ -51,7 +51,7 @@ void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fi } void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); if (i != bindings.end()) bindings.erase(i); @@ -59,7 +59,7 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){ - Locker locker(lock);; + Mutex::ScopedLock locker(lock);; for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (match(i->first, *args)) msg.deliverTo(i->second); } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 3cd25739f7..77af612fe6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange { typedef std::vector Bindings; Bindings bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 24fc996f1f..e3a98ae8f5 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -15,14 +15,12 @@ * limitations under the License. * */ -#include "qpid/sys/Monitor.h" #include "qpid/broker/Message.h" #include using namespace boost; using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::sys; Message::Message(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, diff --git a/qpid/cpp/src/qpid/broker/Prefetch.cpp b/qpid/cpp/src/qpid/broker/Prefetch.cpp deleted file mode 100644 index 6d9dbda13c..0000000000 --- a/qpid/cpp/src/qpid/broker/Prefetch.cpp +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/broker/Prefetch.h" - -using namespace qpid::broker; - -void Prefetch::reset(){ - size = 0; - count = 0; -} diff --git a/qpid/cpp/src/qpid/broker/Prefetch.h b/qpid/cpp/src/qpid/broker/Prefetch.h index 97abb4102d..d56799f835 100644 --- a/qpid/cpp/src/qpid/broker/Prefetch.h +++ b/qpid/cpp/src/qpid/broker/Prefetch.h @@ -30,7 +30,7 @@ namespace qpid { u_int32_t size; u_int16_t count; - void reset(); + void reset() { size = 0; count = 0; } }; } } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 000552715b..46b14a23f5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -38,7 +38,7 @@ Queue::Queue(const string& _name, u_int32_t _autodelete, exclusive(0), persistenceId(0) { - if(autodelete) lastUsed = Time::now().msecs(); + if(autodelete) lastUsed = getTimeMsecs(); } Queue::~Queue(){ @@ -58,7 +58,7 @@ void Queue::deliver(Message::shared_ptr& msg){ } void Queue::process(Message::shared_ptr& msg){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(queueing || !dispatch(msg)){ queueing = true; messages.push(msg); @@ -90,7 +90,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){ } bool Queue::startDispatching(){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(queueing && !dispatching){ dispatching = true; return true; @@ -102,7 +102,7 @@ bool Queue::startDispatching(){ void Queue::dispatch(){ bool proceed = startDispatching(); while(proceed){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(!messages.empty() && dispatch(messages.front())){ messages.pop(); }else{ @@ -114,7 +114,7 @@ void Queue::dispatch(){ } void Queue::consume(Consumer* c, bool requestExclusive){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(exclusive) throw ExclusiveAccessException(); if(requestExclusive){ if(!consumers.empty()) throw ExclusiveAccessException(); @@ -126,14 +126,14 @@ void Queue::consume(Consumer* c, bool requestExclusive){ } void Queue::cancel(Consumer* c){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = Time::now().msecs(); + if(autodelete && consumers.empty()) lastUsed = getTimeMsecs(); if(exclusive == c) exclusive = 0; } Message::shared_ptr Queue::dequeue(){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); @@ -143,25 +143,25 @@ Message::shared_ptr Queue::dequeue(){ } u_int32_t Queue::purge(){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); int count = messages.size(); while(!messages.empty()) messages.pop(); return count; } u_int32_t Queue::getMessageCount() const{ - Locker locker(lock); + Mutex::ScopedLock locker(lock); return messages.size(); } u_int32_t Queue::getConsumerCount() const{ - Locker locker(lock); + Mutex::ScopedLock locker(lock); return consumers.size(); } bool Queue::canAutoDelete() const{ - Locker locker(lock); - return lastUsed && (Time::now().msecs() - lastUsed > autodelete); + Mutex::ScopedLock locker(lock); + return lastUsed && (getTimeMsecs() - lastUsed > autodelete); } void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index fd0bad43ff..c146de1353 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -27,7 +27,6 @@ #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Time.h" namespace qpid { namespace broker { @@ -55,7 +54,7 @@ namespace qpid { bool queueing; bool dispatching; int next; - mutable qpid::sys::Monitor lock; + mutable qpid::sys::Mutex lock; int64_t lastUsed; Consumer* exclusive; u_int64_t persistenceId; diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index aa05db9a16..1976da812d 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -16,7 +16,6 @@ * */ #include "qpid/broker/QueueRegistry.h" -#include "qpid/sys/Monitor.h" #include "qpid/broker/SessionHandlerImpl.h" #include #include @@ -32,7 +31,7 @@ std::pair QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner) { - Locker locker(lock); + Mutex::ScopedLock locker(lock); string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); @@ -46,12 +45,12 @@ QueueRegistry::declare(const string& declareName, bool durable, } void QueueRegistry::destroy(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); queues.erase(name); } Queue::shared_ptr QueueRegistry::find(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); QueueMap::iterator i = queues.find(name); if (i == queues.end()) { return Queue::shared_ptr(); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index c2fc1cc830..e3d03a06b1 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -76,7 +76,7 @@ class QueueRegistry{ private: typedef std::map QueueMap; QueueMap queues; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; int counter; MessageStore* const store; }; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index dc252d208f..eecd9918d4 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -21,7 +21,7 @@ using namespace qpid::broker; using namespace qpid::framing; - +using namespace qpid::sys; // TODO aconway 2006-09-20: More efficient matching algorithm. // Areas for improvement: @@ -115,15 +115,14 @@ bool TopicPattern::match(const Tokens& target) const TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); + Monitor::ScopedLock l(lock); TopicPattern routingPattern(routingKey); bindings[routingPattern].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - lock.release(); } void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Monitor::ScopedLock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); Queue::vector& qv(bi->second); if (bi == bindings.end()) return; @@ -131,12 +130,11 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi if(q == qv.end()) return; qv.erase(q); if(qv.empty()) bindings.erase(bi); - lock.release(); } void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Monitor::ScopedLock l(lock); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(routingKey)) { Queue::vector& qv(i->second); @@ -145,7 +143,6 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable } } } - lock.release(); } TopicExchange::~TopicExchange() {} diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index e3b9040cb2..a3e133845f 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -71,7 +71,7 @@ class TopicPattern : public Tokens class TopicExchange : public virtual Exchange{ typedef std::map BindingMap; BindingMap bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp index a7b30f2f94..fad648f27d 100644 --- a/qpid/cpp/src/qpid/client/Channel.cpp +++ b/qpid/cpp/src/qpid/client/Channel.cpp @@ -17,7 +17,6 @@ */ #include "qpid/client/Channel.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/ThreadFactory.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" @@ -29,26 +28,15 @@ using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), con(0), - dispatcher(0), out(0), incoming(0), closed(true), prefetch(_prefetch), transactional(_transactional) -{ - threadFactory = new ThreadFactory(); - dispatchMonitor = new Monitor(); - retrievalMonitor = new Monitor(); -} +{ } Channel::~Channel(){ - if(dispatcher){ - stop(); - delete dispatcher; - } - delete retrievalMonitor; - delete dispatchMonitor; - delete threadFactory; + stop(); } void Channel::setPrefetch(u_int16_t _prefetch){ @@ -176,9 +164,9 @@ void Channel::cancelAll(){ } void Channel::retrieve(Message& msg){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); while(retrieved == 0){ - retrievalMonitor->wait(); + retrievalMonitor.wait(); } msg.header = retrieved->getHeader(); @@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){ retrieved->getData(msg.data); delete retrieved; retrieved = 0; - - retrievalMonitor->release(); } bool Channel::get(Message& msg, const Queue& queue, int ackMode){ @@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - dispatcher = threadFactory->create(this); - dispatcher->start(); + dispatcher = Thread(this); } void Channel::stop(){ - closed = true; - dispatchMonitor->acquire(); - dispatchMonitor->notify(); - dispatchMonitor->release(); - if(dispatcher){ - dispatcher->join(); + { + Monitor::ScopedLock l(dispatchMonitor); + closed = true; + dispatchMonitor.notify(); } + dispatcher.join(); } void Channel::run(){ @@ -335,30 +319,27 @@ void Channel::run(){ void Channel::enqueue(){ if(incoming->isResponse()){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); retrieved = incoming; - retrievalMonitor->notify(); - retrievalMonitor->release(); + retrievalMonitor.notify(); }else{ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); messages.push(incoming); - dispatchMonitor->notify(); - dispatchMonitor->release(); + dispatchMonitor.notify(); } incoming = 0; } IncomingMessage* Channel::dequeue(){ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); while(messages.empty() && !closed){ - dispatchMonitor->wait(); + dispatchMonitor.wait(); } IncomingMessage* msg = 0; if(!messages.empty()){ msg = messages.front(); messages.pop(); } - dispatchMonitor->release(); return msg; } diff --git a/qpid/cpp/src/qpid/client/Channel.h b/qpid/cpp/src/qpid/client/Channel.h index fa8cd3afe0..daf2b6f9d9 100644 --- a/qpid/cpp/src/qpid/client/Channel.h +++ b/qpid/cpp/src/qpid/client/Channel.h @@ -24,9 +24,6 @@ #define _Channel_ #include "qpid/framing/amqp_framing.h" - -#include "qpid/sys/ThreadFactory.h" - #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" #include "qpid/client/IncomingMessage.h" @@ -51,15 +48,14 @@ namespace client { u_int16_t id; Connection* con; - qpid::sys::ThreadFactory* threadFactory; - qpid::sys::Thread* dispatcher; + qpid::sys::Thread dispatcher; qpid::framing::OutputHandler* out; IncomingMessage* incoming; ResponseHandler responses; std::queue messages;//holds returned messages or those delivered for a consume IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor* dispatchMonitor; - qpid::sys::Monitor* retrievalMonitor; + qpid::sys::Monitor dispatchMonitor; + qpid::sys::Monitor retrievalMonitor; std::map consumers; ReturnedMessageHandler* returnsHandler; bool closed; diff --git a/qpid/cpp/src/qpid/client/ResponseHandler.cpp b/qpid/cpp/src/qpid/client/ResponseHandler.cpp index 16989e2c25..5d2e03c9d9 100644 --- a/qpid/cpp/src/qpid/client/ResponseHandler.cpp +++ b/qpid/cpp/src/qpid/client/ResponseHandler.cpp @@ -19,40 +19,35 @@ #include "qpid/sys/Monitor.h" #include "qpid/QpidError.h" -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ - monitor = new qpid::sys::Monitor(); -} +using namespace qpid::sys; -qpid::client::ResponseHandler::~ResponseHandler(){ - delete monitor; -} +qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} + +qpid::client::ResponseHandler::~ResponseHandler(){} bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ return expected.match(response.get()); } void qpid::client::ResponseHandler::waitForResponse(){ - monitor->acquire(); + Monitor::ScopedLock l(monitor); if(waiting){ - monitor->wait(); + monitor.wait(); } - monitor->release(); } void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ response = _response; - monitor->acquire(); + Monitor::ScopedLock l(monitor); waiting = false; - monitor->notify(); - monitor->release(); + monitor.notify(); } void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ - monitor->acquire(); + Monitor::ScopedLock l(monitor); if(waiting){ - monitor->wait(); + monitor.wait(); } - monitor->release(); if(!validate(expected)){ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); } diff --git a/qpid/cpp/src/qpid/client/ResponseHandler.h b/qpid/cpp/src/qpid/client/ResponseHandler.h index ac4c351211..247c974c14 100644 --- a/qpid/cpp/src/qpid/client/ResponseHandler.h +++ b/qpid/cpp/src/qpid/client/ResponseHandler.h @@ -17,7 +17,7 @@ */ #include #include "qpid/framing/amqp_framing.h" -#include "qpid/sys/Monitor.h" +#include #ifndef _ResponseHandler_ #define _ResponseHandler_ @@ -28,7 +28,7 @@ namespace qpid { class ResponseHandler{ bool waiting; qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor* monitor; + qpid::sys::Monitor monitor; public: ResponseHandler(); diff --git a/qpid/cpp/src/qpid/sys/APRBase.cpp b/qpid/cpp/src/qpid/sys/APRBase.cpp deleted file mode 100644 index 91e2b9f428..0000000000 --- a/qpid/cpp/src/qpid/sys/APRBase.cpp +++ /dev/null @@ -1,96 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include "qpid/sys/APRBase.h" -#include "qpid/QpidError.h" - -using namespace qpid::sys; - -APRBase* APRBase::instance = 0; - -APRBase* APRBase::getInstance(){ - if(instance == 0){ - instance = new APRBase(); - } - return instance; -} - - -APRBase::APRBase() : count(0){ - apr_initialize(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, 0)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); -} - -APRBase::~APRBase(){ - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - apr_terminate(); -} - -bool APRBase::_increment(){ - bool deleted(false); - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(this == instance){ - count++; - }else{ - deleted = true; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - return !deleted; -} - -void APRBase::_decrement(){ - APRBase* copy = 0; - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); - if(--count == 0){ - copy = instance; - instance = 0; - } - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); - if(copy != 0){ - delete copy; - } -} - -void APRBase::increment(){ - int count = 0; - while(count++ < 2 && !getInstance()->_increment()){ - std::cout << "WARNING: APR initialization triggered concurrently with termination." << std::endl; - } -} - -void APRBase::decrement(){ - getInstance()->_decrement(); -} - -void qpid::sys::check(apr_status_t status, const std::string& file, const int line){ - if (status != APR_SUCCESS){ - const int size = 50; - char tmp[size]; - std::string msg(apr_strerror(status, tmp, size)); - throw QpidError(APR_ERROR + ((int) status), msg, file, line); - } -} - -std::string qpid::sys::get_desc(apr_status_t status){ - const int size = 50; - char tmp[size]; - return std::string(apr_strerror(status, tmp, size)); -} - diff --git a/qpid/cpp/src/qpid/sys/APRBase.h b/qpid/cpp/src/qpid/sys/APRBase.h deleted file mode 100644 index 9eef07e4c4..0000000000 --- a/qpid/cpp/src/qpid/sys/APRBase.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRBase_ -#define _APRBase_ - -#include -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_errno.h" - -namespace qpid { -namespace sys { - - /** - * Use of APR libraries necessitates explicit init and terminate - * calls. Any class using APR libs should obtain the reference to - * this singleton and increment on construction, decrement on - * destruction. This class can then correctly initialise apr - * before the first use and terminate after the last use. - */ - class APRBase{ - static APRBase* instance; - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - int count; - - APRBase(); - ~APRBase(); - static APRBase* getInstance(); - bool _increment(); - void _decrement(); - public: - static void increment(); - static void decrement(); - }; - - //this is also a convenient place for a helper function for error checking: - void check(apr_status_t status, const std::string& file, const int line); - std::string get_desc(apr_status_t status); - -#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); - -} -} - - - - -#endif diff --git a/qpid/cpp/src/qpid/sys/APRPool.cpp b/qpid/cpp/src/qpid/sys/APRPool.cpp deleted file mode 100644 index 0f809ca93c..0000000000 --- a/qpid/cpp/src/qpid/sys/APRPool.cpp +++ /dev/null @@ -1,39 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "APRPool.h" -#include "qpid/sys/APRBase.h" -#include - -using namespace qpid::sys; -using namespace qpid::sys; - -APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -apr_pool_t* APRPool::get() { - return boost::details::pool::singleton_default::instance().pool; -} - diff --git a/qpid/cpp/src/qpid/sys/APRPool.h b/qpid/cpp/src/qpid/sys/APRPool.h deleted file mode 100644 index 2196cd64e7..0000000000 --- a/qpid/cpp/src/qpid/sys/APRPool.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef _APRPool_ -#define _APRPool_ - -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include - -namespace qpid { -namespace sys { -/** - * Singleton APR memory pool. - */ -class APRPool : private boost::noncopyable { - public: - APRPool(); - ~APRPool(); - - /** Get singleton instance */ - static apr_pool_t* get(); - - private: - apr_pool_t* pool; -}; - -}} - - - - - -#endif /*!_APRPool_*/ diff --git a/qpid/cpp/src/qpid/sys/APRSocket.cpp b/qpid/cpp/src/qpid/sys/APRSocket.cpp deleted file mode 100644 index 586c03475f..0000000000 --- a/qpid/cpp/src/qpid/sys/APRSocket.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/APRSocket.h" -#include -#include - -using namespace qpid::sys; -using namespace qpid::framing; -using namespace qpid::sys; - -APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ - -} - -void APRSocket::read(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - bytes = buffer.available(); - apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); - buffer.move(bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out - }else if(APR_STATUS_IS_EOF(s)){ - close(); - } -} - -void APRSocket::write(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - do{ - bytes = buffer.available(); - apr_socket_send(socket, buffer.start(), &bytes); - buffer.move(bytes); - }while(bytes > 0); -} - -void APRSocket::close(){ - if(!closed){ - std::cout << "Closing socket " << socket << "@" << this << std::endl; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - closed = true; - } -} - -bool APRSocket::isOpen(){ - return !closed; -} - -u_int8_t APRSocket::read(){ - char data[1]; - apr_size_t bytes = 1; - apr_status_t s = apr_socket_recv(socket, data, &bytes); - if(APR_STATUS_IS_EOF(s) || bytes == 0){ - return 0; - }else{ - return *data; - } -} - -APRSocket::~APRSocket(){ -} diff --git a/qpid/cpp/src/qpid/sys/APRSocket.h b/qpid/cpp/src/qpid/sys/APRSocket.h deleted file mode 100644 index f7e7ad107b..0000000000 --- a/qpid/cpp/src/qpid/sys/APRSocket.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _APRSocket_ -#define _APRSocket_ - -#include "apr-1/apr_network_io.h" -#include "qpid/framing/Buffer.h" - -namespace qpid { -namespace sys { - - class APRSocket - { - apr_socket_t* const socket; - volatile bool closed; - public: - APRSocket(apr_socket_t* socket); - void read(qpid::framing::Buffer& b); - void write(qpid::framing::Buffer& b); - void close(); - bool isOpen(); - u_int8_t read(); - ~APRSocket(); - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/Acceptor.cpp b/qpid/cpp/src/qpid/sys/Acceptor.cpp deleted file mode 100644 index f8e8504c6e..0000000000 --- a/qpid/cpp/src/qpid/sys/Acceptor.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/Acceptor.h" -#include "qpid/sys/APRBase.h" -#include "APRPool.h" - -using namespace qpid::sys; -using namespace qpid::sys; - -Acceptor::Acceptor(int16_t port_, int backlog, int threads) : - port(port_), - processor(APRPool::get(), threads, 1000, 5000000) -{ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); -} - -int16_t Acceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void Acceptor::run(SessionHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false); - session->init(factory->create(session)); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void Acceptor::shutdown() { - // TODO aconway 2006-10-12: Cleanup, this is not thread safe. - if (running) { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - } -} - - diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/sys/Acceptor.h deleted file mode 100644 index f0f9d6feba..0000000000 --- a/qpid/cpp/src/qpid/sys/Acceptor.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFAcceptor_ -#define _LFAcceptor_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/sys/Acceptor.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/LFProcessor.h" -#include "qpid/sys/LFSessionContext.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/SessionContext.h" -#include "qpid/sys/SessionHandlerFactory.h" -#include "qpid/sys/Thread.h" -#include - -namespace qpid { -namespace sys { - -/** APR Acceptor. */ -class Acceptor : public qpid::SharedObject -{ - public: - Acceptor(int16_t port, int backlog, int threads); - virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); - virtual void shutdown(); - - private: - int16_t port; - LFProcessor processor; - apr_socket_t* socket; - volatile bool running; -}; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/Connector.cpp b/qpid/cpp/src/qpid/sys/Connector.cpp deleted file mode 100644 index 1d4b237d92..0000000000 --- a/qpid/cpp/src/qpid/sys/Connector.cpp +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include -#include "qpid/sys/APRBase.h" -#include "qpid/sys/Connector.h" -#include "qpid/sys/ThreadFactory.h" -#include "qpid/QpidError.h" - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; -using qpid::QpidError; - -Connector::Connector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - - threadFactory = new ThreadFactory(); - writeLock = new Monitor(); -} - -Connector::~Connector(){ - delete receiver; - delete writeLock; - delete threadFactory; - apr_pool_destroy(pool); - - APRBase::decrement(); -} - -void Connector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); - closed = false; - - receiver = threadFactory->create(this); - receiver->start(); -} - -void Connector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void Connector::close(){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver->join(); -} - -void Connector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void Connector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* Connector::getOutputHandler(){ - return this; -} - -void Connector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void Connector::writeBlock(AMQDataBlock* data){ - writeLock->acquire(); - data->encode(outbuf); - - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); - writeLock->release(); -} - -void Connector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); - while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; - } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; - } -} - -void Connector::checkIdle(apr_status_t status){ - if(timeoutHandler){ - apr_time_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ - if(idleIn && (now - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(APR_STATUS_IS_EOF(status)){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(shutdownHandler) shutdownHandler->shutdown(); - }else{ - lastIn = now; - } - if(idleOut && (now - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void Connector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void Connector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void Connector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); -} - -void Connector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void Connector::run(){ - try{ - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); - - if(bytes > 0){ - inbuf.move(bytes); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} diff --git a/qpid/cpp/src/qpid/sys/Connector.h b/qpid/cpp/src/qpid/sys/Connector.h deleted file mode 100644 index 611acc417f..0000000000 --- a/qpid/cpp/src/qpid/sys/Connector.h +++ /dev/null @@ -1,95 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Connector_ -#define _Connector_ - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/InputHandler.h" -#include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" -#include "qpid/sys/ShutdownHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/ThreadFactory.h" -#include "qpid/sys/Connector.h" -#include "qpid/sys/Monitor.h" - -namespace qpid { -namespace sys { - - class Connector : public virtual qpid::framing::OutputHandler, - private virtual qpid::sys::Runnable - { - const bool debug; - const int receive_buffer_size; - const int send_buffer_size; - - bool closed; - - apr_time_t lastIn; - apr_time_t lastOut; - apr_interval_time_t timeout; - u_int32_t idleIn; - u_int32_t idleOut; - - TimeoutHandler* timeoutHandler; - ShutdownHandler* shutdownHandler; - qpid::framing::InputHandler* input; - qpid::framing::InitiationHandler* initialiser; - qpid::framing::OutputHandler* output; - - qpid::framing::Buffer inbuf; - qpid::framing::Buffer outbuf; - - qpid::sys::Monitor* writeLock; - qpid::sys::ThreadFactory* threadFactory; - qpid::sys::Thread* receiver; - - apr_pool_t* pool; - apr_socket_t* socket; - - void checkIdle(apr_status_t status); - void writeBlock(qpid::framing::AMQDataBlock* data); - void writeToSocket(char* data, size_t available); - void setSocketTimeout(); - - void run(); - - public: - Connector(bool debug = false, u_int32_t buffer_size = 1024); - virtual ~Connector(); - virtual void connect(const std::string& host, int port); - virtual void init(qpid::framing::ProtocolInitiation* header); - virtual void close(); - virtual void setInputHandler(qpid::framing::InputHandler* handler); - virtual void setTimeoutHandler(TimeoutHandler* handler); - virtual void setShutdownHandler(ShutdownHandler* handler); - virtual qpid::framing::OutputHandler* getOutputHandler(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void setReadTimeout(u_int16_t timeout); - virtual void setWriteTimeout(u_int16_t timeout); - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/LFProcessor.cpp b/qpid/cpp/src/qpid/sys/LFProcessor.cpp deleted file mode 100644 index 8c53c86392..0000000000 --- a/qpid/cpp/src/qpid/sys/LFProcessor.cpp +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/LFProcessor.h" -#include "qpid/sys/APRBase.h" -#include "qpid/sys/LFSessionContext.h" -#include "qpid/QpidError.h" -#include - -using namespace qpid::sys; -using namespace qpid::sys; -using qpid::QpidError; - -// TODO aconway 2006-10-12: stopped is read outside locks. -// - -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : - size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - workerCount(_workers), - hasLeader(false), - workers(new Thread*[_workers]), - stopped(false) -{ - - CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); - //create & start the required number of threads - for(int i = 0; i < workerCount; i++){ - workers[i] = factory.create(this); - } -} - - -LFProcessor::~LFProcessor(){ - if (!stopped) stop(); - for(int i = 0; i < workerCount; i++){ - delete workers[i]; - } - delete[] workers; - CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); -} - -void LFProcessor::start(){ - for(int i = 0; i < workerCount; i++){ - workers[i]->start(); - } -} - -void LFProcessor::add(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - countLock.acquire(); - sessions.push_back(reinterpret_cast(fd->client_data)); - count++; - countLock.release(); -} - -void LFProcessor::remove(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast(fd->client_data))); - count--; - countLock.release(); -} - -void LFProcessor::reactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -void LFProcessor::deactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); -} - -void LFProcessor::update(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -bool LFProcessor::full(){ - Locker locker(countLock); - return count == size; -} - -bool LFProcessor::empty(){ - Locker locker(countLock); - return count == 0; -} - -void LFProcessor::poll() { - apr_status_t status = APR_EGENERAL; - do{ - current = 0; - if(!stopped){ - status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); - } - }while(status != APR_SUCCESS && !stopped); -} - -void LFProcessor::run(){ - try{ - while(!stopped){ - leadLock.acquire(); - waitToLead(); - if(!stopped){ - const apr_pollfd_t* evt = getNextEvent(); - if(evt){ - LFSessionContext* session = reinterpret_cast(evt->client_data); - session->startProcessing(); - - relinquishLead(); - leadLock.release(); - - //process event: - if(evt->rtnevents & APR_POLLIN) session->read(); - if(evt->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), session)); - count--; - countLock.release(); - }else{ - session->stopProcessing(); - } - - }else{ - leadLock.release(); - } - }else{ - leadLock.release(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void LFProcessor::waitToLead(){ - while(hasLeader && !stopped) leadLock.wait(); - hasLeader = !stopped; -} - -void LFProcessor::relinquishLead(){ - hasLeader = false; - leadLock.notify(); -} - -const apr_pollfd_t* LFProcessor::getNextEvent(){ - while(true){ - if(stopped){ - return 0; - }else if(current < signalledCount){ - //use result of previous poll if one is available - return signalledFDs + (current++); - }else{ - //else poll to get new events - poll(); - } - } -} - -void LFProcessor::stop(){ - stopped = true; - leadLock.acquire(); - leadLock.notifyAll(); - leadLock.release(); - - for(int i = 0; i < workerCount; i++){ - workers[i]->join(); - } - - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } -} - diff --git a/qpid/cpp/src/qpid/sys/LFProcessor.h b/qpid/cpp/src/qpid/sys/LFProcessor.h deleted file mode 100644 index afbb9ea413..0000000000 --- a/qpid/cpp/src/qpid/sys/LFProcessor.h +++ /dev/null @@ -1,119 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFProcessor_ -#define _LFProcessor_ - -#include "apr-1/apr_poll.h" -#include -#include -#include "qpid/sys/Monitor.h" -#include "qpid/sys/ThreadFactory.h" -#include "qpid/sys/Runnable.h" - -namespace qpid { -namespace sys { - - class LFSessionContext; - - /** - * This class processes a poll set using the leaders-followers - * pattern for thread synchronization: the leader will poll and on - * the poll returning, it will remove a session, promote a - * follower to leadership, then process the session. - */ - class LFProcessor : private virtual qpid::sys::Runnable - { - typedef std::vector::iterator iterator; - - const int size; - const apr_interval_time_t timeout; - apr_pollset_t* pollset; - int signalledCount; - int current; - const apr_pollfd_t* signalledFDs; - int count; - const int workerCount; - bool hasLeader; - qpid::sys::Thread** const workers; - qpid::sys::Monitor leadLock; - qpid::sys::Monitor countLock; - qpid::sys::ThreadFactory factory; - std::vector sessions; - volatile bool stopped; - - const apr_pollfd_t* getNextEvent(); - void waitToLead(); - void relinquishLead(); - void poll(); - virtual void run(); - - public: - LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); - /** - * Add the fd to the poll set. Relies on the client_data being - * an instance of LFSessionContext. - */ - void add(const apr_pollfd_t* const fd); - /** - * Remove the fd from the poll set. - */ - void remove(const apr_pollfd_t* const fd); - /** - * Signal that the fd passed in, already part of the pollset, - * has had its flags altered. - */ - void update(const apr_pollfd_t* const fd); - /** - * Add an fd back to the poll set after deactivation. - */ - void reactivate(const apr_pollfd_t* const fd); - /** - * Temporarily remove the fd from the poll set. Called when processing - * is about to begin. - */ - void deactivate(const apr_pollfd_t* const fd); - /** - * Indicates whether the capacity of this processor has been - * reached (or whether it can still handle further fd's). - */ - bool full(); - /** - * Indicates whether there are any fd's registered. - */ - bool empty(); - /** - * Stop processing. - */ - void stop(); - /** - * Start processing. - */ - void start(); - /** - * Is processing stopped? - */ - bool isStopped(); - - ~LFProcessor(); - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/LFSessionContext.cpp b/qpid/cpp/src/qpid/sys/LFSessionContext.cpp deleted file mode 100644 index f2dff87fd0..0000000000 --- a/qpid/cpp/src/qpid/sys/LFSessionContext.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/LFSessionContext.h" -#include "qpid/sys/APRBase.h" -#include "qpid/QpidError.h" -#include - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(32768), - out(32768), - processor(_processor), - processing(false), - closing(false), - reading(0), - writing(0) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - assert(!reading); // No concurrent read. - reading = Thread::currentThread(); - - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - while(frame.decode(in)){ - if(debug) log("RECV", &frame); - handler->received(&frame); - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); - initiated = true; - if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; - } - } - in.compact(); - - reading = 0; -} - -void LFSessionContext::write(){ - assert(!writing); // No concurrent writes. - writing = Thread::currentThread(); - - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - writing = 0; - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - writeLock.acquire(); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ - encoded = true; - frame->encode(out); - if(debug) log("SENT", frame); - delete frame; - framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - writing = 0; - - if(closing){ - socket.close(); - } - } - writeLock.release(); - } - } -} - -void LFSessionContext::send(AMQFrame* frame){ - writeLock.acquire(); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } - writeLock.release(); -} - -void LFSessionContext::startProcessing(){ - writeLock.acquire(); - processing = true; - processor->deactivate(&fd); - writeLock.release(); -} - -void LFSessionContext::stopProcessing(){ - writeLock.acquire(); - processor->reactivate(&fd); - processing = false; - writeLock.release(); -} - -void LFSessionContext::close(){ - closing = true; - writeLock.acquire(); - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } - writeLock.release(); -} - -void LFSessionContext::handleClose(){ - handler->closed(); - std::cout << "Session closed [" << &socket << "]" << std::endl; - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(SessionHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - logLock.acquire(); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; - logLock.release(); -} - -Monitor LFSessionContext::logLock; diff --git a/qpid/cpp/src/qpid/sys/LFSessionContext.h b/qpid/cpp/src/qpid/sys/LFSessionContext.h deleted file mode 100644 index 92f52ccf83..0000000000 --- a/qpid/cpp/src/qpid/sys/LFSessionContext.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFSessionContext_ -#define _LFSessionContext_ - -#include - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/APRSocket.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/LFProcessor.h" -#include "qpid/sys/SessionContext.h" -#include "qpid/sys/SessionHandler.h" - -namespace qpid { -namespace sys { - - - class LFSessionContext : public virtual SessionContext - { - const bool debug; - APRSocket socket; - bool initiated; - - qpid::framing::Buffer in; - qpid::framing::Buffer out; - - SessionHandler* handler; - LFProcessor* const processor; - - apr_pollfd_t fd; - - std::queue framesToWrite; - qpid::sys::Monitor writeLock; - - bool processing; - bool closing; - - //these are just for debug, as a crude way of detecting concurrent access - volatile unsigned int reading; - volatile unsigned int writing; - - static qpid::sys::Monitor logLock; - void log(const std::string& desc, qpid::framing::AMQFrame* const frame); - - public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, - LFProcessor* const processor, - bool debug = false); - ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void read(); - void write(); - void init(SessionHandler* handler); - void startProcessing(); - void stopProcessing(); - void handleClose(); - void shutdown(); - inline apr_pollfd_t* const getFd(){ return &fd; } - inline bool isClosed(){ return !socket.isOpen(); } - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/Monitor.cpp b/qpid/cpp/src/qpid/sys/Monitor.cpp deleted file mode 100644 index 79a29c219e..0000000000 --- a/qpid/cpp/src/qpid/sys/Monitor.cpp +++ /dev/null @@ -1,60 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/Monitor.h" -#include - -qpid::sys::Monitor::Monitor(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); -} - -qpid::sys::Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); - CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); - APRBase::decrement(); -} - -void qpid::sys::Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); -} - - -void qpid::sys::Monitor::wait(u_int64_t time){ - apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); - if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); -} - -void qpid::sys::Monitor::notify(){ - CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); -} - -void qpid::sys::Monitor::notifyAll(){ - CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); -} - -void qpid::sys::Monitor::acquire(){ - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} - -void qpid::sys::Monitor::release(){ - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} diff --git a/qpid/cpp/src/qpid/sys/Monitor.h b/qpid/cpp/src/qpid/sys/Monitor.h deleted file mode 100644 index ddda613b87..0000000000 --- a/qpid/cpp/src/qpid/sys/Monitor.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _Monitor_ -#define _Monitor_ - -#include "apr-1/apr_thread_mutex.h" -#include "apr-1/apr_thread_cond.h" -#include "qpid/sys/Monitor.h" - -namespace qpid { -namespace sys { - -class Monitor -{ - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; - - public: - Monitor(); - virtual ~Monitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); -}; - -class Locker -{ - public: - Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } - ~Locker() { monitor.release(); } - private: - Monitor& monitor; -}; -}} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/Runnable.cpp b/qpid/cpp/src/qpid/sys/Runnable.cpp deleted file mode 100644 index d7d9e968cc..0000000000 --- a/qpid/cpp/src/qpid/sys/Runnable.cpp +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/sys/Runnable.h" -qpid::sys::Runnable::~Runnable() {} diff --git a/qpid/cpp/src/qpid/sys/Runnable.h b/qpid/cpp/src/qpid/sys/Runnable.h index ce13eb2039..c06698bb93 100644 --- a/qpid/cpp/src/qpid/sys/Runnable.h +++ b/qpid/cpp/src/qpid/sys/Runnable.h @@ -21,15 +21,15 @@ namespace qpid { namespace sys { - class Runnable - { - public: - virtual ~Runnable(); - virtual void run() = 0; - }; +/** Base class for classes that run in a thread. */ +class Runnable +{ + public: + virtual ~Runnable() {} + virtual void run() = 0; +}; -} -} +}} #endif diff --git a/qpid/cpp/src/qpid/sys/Thread.cpp b/qpid/cpp/src/qpid/sys/Thread.cpp deleted file mode 100644 index 4fb9915993..0000000000 --- a/qpid/cpp/src/qpid/sys/Thread.cpp +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/Thread.h" -#include "apr-1/apr_portable.h" - -using namespace qpid::sys; - -void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ - ((Runnable*) data)->run(); - CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); - return NULL; -} - -Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} - -Thread::~Thread(){ -} - -void Thread::start(){ - CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); -} - -void Thread::join(){ - apr_status_t status; - if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); -} - -void Thread::interrupt(){ - if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); -} - -unsigned int qpid::sys::Thread::currentThread(){ - return apr_os_thread_current(); -} diff --git a/qpid/cpp/src/qpid/sys/Thread.h b/qpid/cpp/src/qpid/sys/Thread.h index e86bd4a8d2..d884add776 100644 --- a/qpid/cpp/src/qpid/sys/Thread.h +++ b/qpid/cpp/src/qpid/sys/Thread.h @@ -1,3 +1,6 @@ +#ifndef _sys_Thread_h +#define _sys_Thread_h + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,34 +18,10 @@ * limitations under the License. * */ -#ifndef _Thread_ -#define _Thread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" - -namespace qpid { -namespace sys { - - class Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - public: - Thread(apr_pool_t* pool, Runnable* runnable); - virtual ~Thread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; +#include +#include QPID_PLATFORM_H(Thread.h) -} -} -#endif +#endif /*!_sys_Thread_h*/ diff --git a/qpid/cpp/src/qpid/sys/ThreadFactory.cpp b/qpid/cpp/src/qpid/sys/ThreadFactory.cpp deleted file mode 100644 index d33872b9a2..0000000000 --- a/qpid/cpp/src/qpid/sys/ThreadFactory.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/ThreadFactory.h" - -using namespace qpid::sys; - -ThreadFactory::ThreadFactory(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -ThreadFactory::~ThreadFactory(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -Thread* ThreadFactory::create(Runnable* runnable){ - return new Thread(pool, runnable); -} diff --git a/qpid/cpp/src/qpid/sys/ThreadFactory.h b/qpid/cpp/src/qpid/sys/ThreadFactory.h deleted file mode 100644 index 9b7126272a..0000000000 --- a/qpid/cpp/src/qpid/sys/ThreadFactory.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _ThreadFactory_ -#define _ThreadFactory_ - -#include "apr-1/apr_thread_proc.h" - -#include "qpid/sys/Thread.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/ThreadFactory.h" -#include "qpid/sys/Runnable.h" - -namespace qpid { -namespace sys { - - class ThreadFactory - { - apr_pool_t* pool; - public: - ThreadFactory(); - virtual ~ThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/Time.cpp b/qpid/cpp/src/qpid/sys/Time.cpp deleted file mode 100644 index c3512b8df3..0000000000 --- a/qpid/cpp/src/qpid/sys/Time.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include -#include - -namespace qpid { -namespace sys { - -Time Time::now() { - return Time(apr_time_now()*1000); -} - -}} diff --git a/qpid/cpp/src/qpid/sys/Time.h b/qpid/cpp/src/qpid/sys/Time.h index 5c7cdfb005..79d17b433b 100644 --- a/qpid/cpp/src/qpid/sys/Time.h +++ b/qpid/cpp/src/qpid/sys/Time.h @@ -24,28 +24,14 @@ namespace qpid { namespace sys { -/** - * Time since the epoch. - */ -class Time -{ - public: - static const int64_t NANOS = 1000000000; - static const int64_t MICROS = 1000000; - static const int64_t MILLIS = 1000; - - static Time now(); - - Time(int64_t nsecs_) : ticks(nsecs_) {} +inline int64_t msecsToNsecs(int64_t msecs) { return msecs * 1000 *1000; } +inline int64_t nsecsToMsecs(int64_t nsecs) { return nsecs / (1000 *1000); } - int64_t nsecs() const { return ticks; } - int64_t usecs() const { return nsecs()/1000; } - int64_t msecs() const { return usecs()/1000; } - int64_t secs() const { return msecs()/1000; } +/** Nanoseconds since epoch */ +int64_t getTimeNsecs(); - private: - int64_t ticks; -}; +/** Milliseconds since epoch */ +int64_t getTimeMsecs(); }} diff --git a/qpid/cpp/src/qpid/sys/platform.h b/qpid/cpp/src/qpid/sys/platform.h new file mode 100644 index 0000000000..878c724953 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/platform.h @@ -0,0 +1,29 @@ +#ifndef _sys_platform_h +#define _sys_platform_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/** + * Macros for including platform-specific headers and aliasing + * platform-specific classes into the qpid::sys namespace. + */ + +#define QPID_PLATFORM_H(HEADER) + +#endif /*!_sys_platform_h*/ diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index f170aed95e..2974cc8654 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -24,8 +24,11 @@ using namespace qpid::broker; using namespace qpid::sys; +Broker::shared_ptr broker; + void handle_signal(int /*signal*/){ std::cout << "Shutting down..." << std::endl; + broker->shutdown(); } int main(int argc, char** argv) @@ -36,8 +39,8 @@ int main(int argc, char** argv) if(config.isHelp()){ config.usage(); }else{ + broker = Broker::create(config); apr_signal(SIGINT, handle_signal); - Broker::shared_ptr broker = Broker::create(config); broker->run(); } return 0; -- cgit v1.2.1