diff options
60 files changed, 458 insertions, 612 deletions
diff --git a/qpid/cpp/Makefile b/qpid/cpp/Makefile index 36a56425ac..ab542f7086 100644 --- a/qpid/cpp/Makefile +++ b/qpid/cpp/Makefile @@ -52,7 +52,7 @@ $(BUILDDIRS): ## Library rules LIB_common := $(call LIBFILE,common,1.0) -$(LIB_common): $(call OBJECTS,qpid qpid/framing qpid/sys) +$(LIB_common): $(call OBJECTS,qpid qpid/framing qpid/sys qpid/$(PLATFORM)) $(LIB_COMMAND) LIB_client :=$(call LIBFILE,client,1.0) diff --git a/qpid/cpp/README b/qpid/cpp/README index 789c535023..2653873b1a 100644 --- a/qpid/cpp/README +++ b/qpid/cpp/README @@ -52,8 +52,8 @@ The source tree is structured as follows: * src/ - .h and .cpp source files, directories mirror namespaces. * qpid/ * sys/ - system abstractions: threading, IO. - * posix/ - posix implementation - * apr/ - portable APR implementation (for client side) + * posix/ - posix implementations for sys + * apr/ - portable APR implementation for sys (client side) * framing - encoding/decoding AMQP messages * client - client classes. * broker - broker classes. diff --git a/qpid/cpp/options.mk b/qpid/cpp/options.mk index ef14480113..59933fafd0 100644 --- a/qpid/cpp/options.mk +++ b/qpid/cpp/options.mk @@ -65,7 +65,8 @@ CXXFLAGS_release := -O3 -DNDEBUG WARN := -Werror -pedantic -Wall -Wextra -Wshadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch INCLUDES := $(SRCDIRS:%=-I%) $(EXTRA_INCLUDES) -LDFLAGS := -L$(LIBDIR) $(LDFLAGS_$(PLATFORM)) +DEFINES := -DPLATFORM=$(PLATFORM) +LDFLAGS := -L$(LIBDIR) $(LDFLAGS_$(PLATFORM)) CXXFLAGS := $(DEFINES) $(WARN) -MMD -fpic $(INCLUDES) $(CXXFLAGS_$(PLATFORM)) $(CXXFLAGS_$(TYPE)) ## Macros for linking, must be late evaluated diff --git a/qpid/cpp/src/qpid/sys/APRBase.cpp b/qpid/cpp/src/qpid/apr/APRBase.cpp index 91e2b9f428..f629a5381d 100644 --- a/qpid/cpp/src/qpid/sys/APRBase.cpp +++ b/qpid/cpp/src/qpid/apr/APRBase.cpp @@ -16,8 +16,8 @@ * */ #include <iostream> -#include "qpid/sys/APRBase.h" -#include "qpid/QpidError.h" +#include <qpid/QpidError.h> +#include "APRBase.h" using namespace qpid::sys; diff --git a/qpid/cpp/src/qpid/sys/APRBase.h b/qpid/cpp/src/qpid/apr/APRBase.h index 9eef07e4c4..b84e9860df 100644 --- a/qpid/cpp/src/qpid/sys/APRBase.h +++ b/qpid/cpp/src/qpid/apr/APRBase.h @@ -52,7 +52,7 @@ namespace sys { void check(apr_status_t status, const std::string& file, const int line); std::string get_desc(apr_status_t status); -#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__); +#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__); } } diff --git a/qpid/cpp/src/qpid/sys/APRPool.cpp b/qpid/cpp/src/qpid/apr/APRPool.cpp index 0f809ca93c..e465a6b40d 100644 --- a/qpid/cpp/src/qpid/sys/APRPool.cpp +++ b/qpid/cpp/src/qpid/apr/APRPool.cpp @@ -17,11 +17,10 @@ */ #include "APRPool.h" -#include "qpid/sys/APRBase.h" +#include "APRBase.h" #include <boost/pool/detail/singleton.hpp> using namespace qpid::sys; -using namespace qpid::sys; APRPool::APRPool(){ APRBase::increment(); diff --git a/qpid/cpp/src/qpid/sys/APRPool.h b/qpid/cpp/src/qpid/apr/APRPool.h index 2196cd64e7..2196cd64e7 100644 --- a/qpid/cpp/src/qpid/sys/APRPool.h +++ b/qpid/cpp/src/qpid/apr/APRPool.h diff --git a/qpid/cpp/src/qpid/sys/APRSocket.cpp b/qpid/cpp/src/qpid/apr/APRSocket.cpp index 586c03475f..0c5a29c216 100644 --- a/qpid/cpp/src/qpid/sys/APRSocket.cpp +++ b/qpid/cpp/src/qpid/apr/APRSocket.cpp @@ -15,14 +15,13 @@ * limitations under the License. * */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/APRSocket.h" +#include "APRBase.h" +#include "APRSocket.h" #include <assert.h> #include <iostream> using namespace qpid::sys; using namespace qpid::framing; -using namespace qpid::sys; APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ diff --git a/qpid/cpp/src/qpid/sys/APRSocket.h b/qpid/cpp/src/qpid/apr/APRSocket.h index f7e7ad107b..f7e7ad107b 100644 --- a/qpid/cpp/src/qpid/sys/APRSocket.h +++ b/qpid/cpp/src/qpid/apr/APRSocket.h diff --git a/qpid/cpp/src/qpid/sys/Acceptor.cpp b/qpid/cpp/src/qpid/apr/Acceptor.cpp index f8e8504c6e..cbeea9902b 100644 --- a/qpid/cpp/src/qpid/sys/Acceptor.cpp +++ b/qpid/cpp/src/qpid/apr/Acceptor.cpp @@ -15,12 +15,11 @@ * limitations under the License. * */ -#include "qpid/sys/Acceptor.h" -#include "qpid/sys/APRBase.h" +#include "Acceptor.h" +#include "APRBase.h" #include "APRPool.h" using namespace qpid::sys; -using namespace qpid::sys; Acceptor::Acceptor(int16_t port_, int backlog, int threads) : port(port_), diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/apr/Acceptor.h index f0f9d6feba..1813b391c1 100644 --- a/qpid/cpp/src/qpid/sys/Acceptor.h +++ b/qpid/cpp/src/qpid/apr/Acceptor.h @@ -18,18 +18,15 @@ #ifndef _LFAcceptor_ #define _LFAcceptor_ +#include "LFProcessor.h" +#include "LFSessionContext.h" #include "apr-1/apr_network_io.h" #include "apr-1/apr_poll.h" #include "apr-1/apr_time.h" - -#include "qpid/sys/Acceptor.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/LFProcessor.h" -#include "qpid/sys/LFSessionContext.h" +#include "Monitor.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/SessionContext.h" #include "qpid/sys/SessionHandlerFactory.h" -#include "qpid/sys/Thread.h" #include <qpid/SharedObject.h> namespace qpid { @@ -41,7 +38,7 @@ class Acceptor : public qpid::SharedObject<Acceptor> public: Acceptor(int16_t port, int backlog, int threads); virtual int16_t getPort() const; - virtual void run(SessionHandlerFactory* factory); + virtual void run(qpid::sys::SessionHandlerFactory* factory); virtual void shutdown(); private: diff --git a/qpid/cpp/src/qpid/sys/Connector.cpp b/qpid/cpp/src/qpid/apr/Connector.cpp index 1d4b237d92..4446731654 100644 --- a/qpid/cpp/src/qpid/sys/Connector.cpp +++ b/qpid/cpp/src/qpid/apr/Connector.cpp @@ -16,10 +16,9 @@ * */ #include <iostream> -#include "qpid/sys/APRBase.h" -#include "qpid/sys/Connector.h" -#include "qpid/sys/ThreadFactory.h" -#include "qpid/QpidError.h" +#include <qpid/QpidError.h> +#include "APRBase.h" +#include "Connector.h" using namespace qpid::sys; using namespace qpid::sys; @@ -43,15 +42,9 @@ Connector::Connector(bool _debug, u_int32_t buffer_size) : CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - - threadFactory = new ThreadFactory(); - writeLock = new Monitor(); } Connector::~Connector(){ - delete receiver; - delete writeLock; - delete threadFactory; apr_pool_destroy(pool); APRBase::decrement(); @@ -62,9 +55,7 @@ void Connector::connect(const std::string& host, int port){ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); closed = false; - - receiver = threadFactory->create(this); - receiver->start(); + receiver = Thread(this); } void Connector::init(ProtocolInitiation* header){ @@ -75,7 +66,7 @@ void Connector::init(ProtocolInitiation* header){ void Connector::close(){ closed = true; CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver->join(); + receiver.join(); } void Connector::setInputHandler(InputHandler* handler){ @@ -97,14 +88,12 @@ void Connector::send(AMQFrame* frame){ } void Connector::writeBlock(AMQDataBlock* data){ - writeLock->acquire(); + Mutex::ScopedLock l(writeLock); data->encode(outbuf); - //transfer data to wire outbuf.flip(); writeToSocket(outbuf.start(), outbuf.available()); outbuf.clear(); - writeLock->release(); } void Connector::writeToSocket(char* data, size_t available){ @@ -126,7 +115,7 @@ void Connector::writeToSocket(char* data, size_t available){ void Connector::checkIdle(apr_status_t status){ if(timeoutHandler){ - apr_time_t now = apr_time_as_msec(apr_time_now()); + int64_t now = apr_time_as_msec(apr_time_now()); if(APR_STATUS_IS_TIMEUP(status)){ if(idleIn && (now - lastIn > idleIn)){ timeoutHandler->idleIn(); diff --git a/qpid/cpp/src/qpid/sys/Connector.h b/qpid/cpp/src/qpid/apr/Connector.h index 611acc417f..e69a7205f3 100644 --- a/qpid/cpp/src/qpid/sys/Connector.h +++ b/qpid/cpp/src/qpid/apr/Connector.h @@ -28,15 +28,14 @@ #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" #include "qpid/sys/Thread.h" -#include "qpid/sys/ThreadFactory.h" #include "qpid/sys/Connector.h" #include "qpid/sys/Monitor.h" namespace qpid { namespace sys { - class Connector : public virtual qpid::framing::OutputHandler, - private virtual qpid::sys::Runnable + class Connector : public qpid::framing::OutputHandler, + private qpid::sys::Runnable { const bool debug; const int receive_buffer_size; @@ -44,9 +43,9 @@ namespace sys { bool closed; - apr_time_t lastIn; - apr_time_t lastOut; - apr_interval_time_t timeout; + int64_t lastIn; + int64_t lastOut; + int64_t timeout; u_int32_t idleIn; u_int32_t idleOut; @@ -59,9 +58,8 @@ namespace sys { qpid::framing::Buffer inbuf; qpid::framing::Buffer outbuf; - qpid::sys::Monitor* writeLock; - qpid::sys::ThreadFactory* threadFactory; - qpid::sys::Thread* receiver; + qpid::sys::Mutex writeLock; + qpid::sys::Thread receiver; apr_pool_t* pool; apr_socket_t* socket; diff --git a/qpid/cpp/src/qpid/sys/LFProcessor.cpp b/qpid/cpp/src/qpid/apr/LFProcessor.cpp index 8c53c86392..f4d4258f6f 100644 --- a/qpid/cpp/src/qpid/sys/LFProcessor.cpp +++ b/qpid/cpp/src/qpid/apr/LFProcessor.cpp @@ -15,14 +15,13 @@ * limitations under the License. * */ -#include "qpid/sys/LFProcessor.h" -#include "qpid/sys/APRBase.h" -#include "qpid/sys/LFSessionContext.h" -#include "qpid/QpidError.h" #include <sstream> +#include <qpid/QpidError.h> +#include "LFProcessor.h" +#include "APRBase.h" +#include "LFSessionContext.h" using namespace qpid::sys; -using namespace qpid::sys; using qpid::QpidError; // TODO aconway 2006-10-12: stopped is read outside locks. @@ -36,47 +35,38 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout count(0), workerCount(_workers), hasLeader(false), - workers(new Thread*[_workers]), + workers(new Thread[_workers]), stopped(false) { CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); - //create & start the required number of threads - for(int i = 0; i < workerCount; i++){ - workers[i] = factory.create(this); - } } LFProcessor::~LFProcessor(){ if (!stopped) stop(); - for(int i = 0; i < workerCount; i++){ - delete workers[i]; - } delete[] workers; CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); } void LFProcessor::start(){ for(int i = 0; i < workerCount; i++){ - workers[i]->start(); + workers[i] = Thread(this); } } void LFProcessor::add(const apr_pollfd_t* const fd){ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - countLock.acquire(); + Monitor::ScopedLock l(countLock); sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); count++; - countLock.release(); } void LFProcessor::remove(const apr_pollfd_t* const fd){ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - countLock.acquire(); + Monitor::ScopedLock l(countLock); sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); count--; - countLock.release(); } void LFProcessor::reactivate(const apr_pollfd_t* const fd){ @@ -93,12 +83,12 @@ void LFProcessor::update(const apr_pollfd_t* const fd){ } bool LFProcessor::full(){ - Locker locker(countLock); + Mutex::ScopedLock locker(countLock); return count == size; } bool LFProcessor::empty(){ - Locker locker(countLock); + Mutex::ScopedLock locker(countLock); return count == 0; } @@ -115,36 +105,30 @@ void LFProcessor::poll() { void LFProcessor::run(){ try{ while(!stopped){ - leadLock.acquire(); - waitToLead(); - if(!stopped){ - const apr_pollfd_t* evt = getNextEvent(); - if(evt){ - LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); - session->startProcessing(); - - relinquishLead(); - leadLock.release(); - - //process event: - if(evt->rtnevents & APR_POLLIN) session->read(); - if(evt->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), session)); - count--; - countLock.release(); - }else{ - session->stopProcessing(); - } - - }else{ - leadLock.release(); - } + const apr_pollfd_t* event = 0; + LFSessionContext* session = 0; + { + Monitor::ScopedLock l(leadLock); + waitToLead(); + event = getNextEvent(); + if(!event) return; + session = reinterpret_cast<LFSessionContext*>( + event->client_data); + session->startProcessing(); + relinquishLead(); + } + + //process event: + if(event->rtnevents & APR_POLLIN) session->read(); + if(event->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + Monitor::ScopedLock l(countLock); + sessions.erase(find(sessions.begin(),sessions.end(), session)); + count--; }else{ - leadLock.release(); + session->stopProcessing(); } } }catch(QpidError error){ @@ -178,14 +162,13 @@ const apr_pollfd_t* LFProcessor::getNextEvent(){ void LFProcessor::stop(){ stopped = true; - leadLock.acquire(); - leadLock.notifyAll(); - leadLock.release(); - + { + Monitor::ScopedLock l(leadLock); + leadLock.notifyAll(); + } for(int i = 0; i < workerCount; i++){ - workers[i]->join(); + workers[i].join(); } - for(iterator i = sessions.begin(); i < sessions.end(); i++){ (*i)->shutdown(); } diff --git a/qpid/cpp/src/qpid/sys/LFProcessor.h b/qpid/cpp/src/qpid/apr/LFProcessor.h index afbb9ea413..dd85ad9e84 100644 --- a/qpid/cpp/src/qpid/sys/LFProcessor.h +++ b/qpid/cpp/src/qpid/apr/LFProcessor.h @@ -21,9 +21,9 @@ #include "apr-1/apr_poll.h" #include <iostream> #include <vector> -#include "qpid/sys/Monitor.h" -#include "qpid/sys/ThreadFactory.h" +#include <qpid/sys/Monitor.h> #include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" namespace qpid { namespace sys { @@ -49,10 +49,9 @@ namespace sys { int count; const int workerCount; bool hasLeader; - qpid::sys::Thread** const workers; + qpid::sys::Thread* workers; qpid::sys::Monitor leadLock; - qpid::sys::Monitor countLock; - qpid::sys::ThreadFactory factory; + qpid::sys::Mutex countLock; std::vector<LFSessionContext*> sessions; volatile bool stopped; diff --git a/qpid/cpp/src/qpid/sys/LFSessionContext.cpp b/qpid/cpp/src/qpid/apr/LFSessionContext.cpp index f2dff87fd0..4a704013a8 100644 --- a/qpid/cpp/src/qpid/sys/LFSessionContext.cpp +++ b/qpid/cpp/src/qpid/apr/LFSessionContext.cpp @@ -15,8 +15,8 @@ * limitations under the License. * */ -#include "qpid/sys/LFSessionContext.h" -#include "qpid/sys/APRBase.h" +#include "LFSessionContext.h" +#include "APRBase.h" #include "qpid/QpidError.h" #include <assert.h> @@ -34,9 +34,7 @@ LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, out(32768), processor(_processor), processing(false), - closing(false), - reading(0), - writing(0) + closing(false) { fd.p = _pool; @@ -53,9 +51,6 @@ LFSessionContext::~LFSessionContext(){ } void LFSessionContext::read(){ - assert(!reading); // No concurrent read. - reading = Thread::currentThread(); - socket.read(in); in.flip(); if(initiated){ @@ -73,27 +68,21 @@ void LFSessionContext::read(){ } } in.compact(); - - reading = 0; } void LFSessionContext::write(){ - assert(!writing); // No concurrent writes. - writing = Thread::currentThread(); - bool done = isClosed(); while(!done){ if(out.available() > 0){ socket.write(out); if(out.available() > 0){ - writing = 0; //incomplete write, leave flags to receive notification of readiness to write done = true;//finished processing for now, but write is still in progress } }else{ //do we have any frames to write? - writeLock.acquire(); + Mutex::ScopedLock l(writeLock); if(!framesToWrite.empty()){ out.clear(); bool encoded(false); @@ -113,19 +102,16 @@ void LFSessionContext::write(){ fd.reqevents = APR_POLLIN; done = true; - writing = 0; - if(closing){ socket.close(); } } - writeLock.release(); } } } void LFSessionContext::send(AMQFrame* frame){ - writeLock.acquire(); + Mutex::ScopedLock l(writeLock); if(!closing){ framesToWrite.push(frame); if(!(fd.reqevents & APR_POLLOUT)){ @@ -135,32 +121,28 @@ void LFSessionContext::send(AMQFrame* frame){ } } } - writeLock.release(); } void LFSessionContext::startProcessing(){ - writeLock.acquire(); + Mutex::ScopedLock l(writeLock); processing = true; processor->deactivate(&fd); - writeLock.release(); } void LFSessionContext::stopProcessing(){ - writeLock.acquire(); + Mutex::ScopedLock l(writeLock); processor->reactivate(&fd); processing = false; - writeLock.release(); } void LFSessionContext::close(){ closing = true; - writeLock.acquire(); + Mutex::ScopedLock l(writeLock); if(!processing){ //allow pending frames to be written to socket fd.reqevents = APR_POLLOUT; processor->update(&fd); } - writeLock.release(); } void LFSessionContext::handleClose(){ @@ -181,9 +163,8 @@ void LFSessionContext::init(SessionHandler* _handler){ } void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - logLock.acquire(); + Mutex::ScopedLock l(logLock); std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; - logLock.release(); } -Monitor LFSessionContext::logLock; +Mutex LFSessionContext::logLock; diff --git a/qpid/cpp/src/qpid/apr/LFSessionContext.h b/qpid/cpp/src/qpid/apr/LFSessionContext.h new file mode 100644 index 0000000000..9b3104b085 --- /dev/null +++ b/qpid/cpp/src/qpid/apr/LFSessionContext.h @@ -0,0 +1,87 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#ifndef _LFSessionContext_ +#define _LFSessionContext_ + +#include <queue> + +#include "apr-1/apr_network_io.h" +#include "apr-1/apr_poll.h" +#include "apr-1/apr_time.h" + +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/SessionContext.h" +#include "qpid/sys/SessionHandler.h" + +#include "APRSocket.h" +#include "LFProcessor.h" + +namespace qpid { +namespace sys { + + +class LFSessionContext : public virtual qpid::sys::SessionContext +{ + const bool debug; + APRSocket socket; + bool initiated; + + qpid::framing::Buffer in; + qpid::framing::Buffer out; + + qpid::sys::SessionHandler* handler; + LFProcessor* const processor; + + apr_pollfd_t fd; + + std::queue<qpid::framing::AMQFrame*> framesToWrite; + qpid::sys::Mutex writeLock; + + bool processing; + bool closing; + + static qpid::sys::Mutex logLock; + void log(const std::string& desc, + qpid::framing::AMQFrame* const frame); + + + public: + LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, + LFProcessor* const processor, + bool debug = false); + virtual ~LFSessionContext(); + virtual void send(qpid::framing::AMQFrame* frame); + virtual void close(); + void read(); + void write(); + void init(qpid::sys::SessionHandler* handler); + void startProcessing(); + void stopProcessing(); + void handleClose(); + void shutdown(); + inline apr_pollfd_t* const getFd(){ return &fd; } + inline bool isClosed(){ return !socket.isOpen(); } +}; + +} +} + + +#endif diff --git a/qpid/cpp/src/qpid/sys/Monitor.cpp b/qpid/cpp/src/qpid/apr/Monitor.cpp index 79a29c219e..69fb2f6ffd 100644 --- a/qpid/cpp/src/qpid/sys/Monitor.cpp +++ b/qpid/cpp/src/qpid/apr/Monitor.cpp @@ -15,46 +15,50 @@ * limitations under the License. * */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/Monitor.h" -#include <iostream> +#include "Monitor.h" +#include "APRPool.h" -qpid::sys::Monitor::Monitor(){ +using namespace qpid::sys; + +Mutex::Mutex() +{ APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool)); - CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool)); + // TODO aconway 2006-11-08: Switch to non-nested. + CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get())); } -qpid::sys::Monitor::~Monitor(){ - CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +Mutex::~Mutex(){ CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex)); - apr_pool_destroy(pool); APRBase::decrement(); } -void qpid::sys::Monitor::wait(){ - CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +Monitor::Monitor() +{ + CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get())); } +Monitor::~Monitor(){ + CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition)); +} + + -void qpid::sys::Monitor::wait(u_int64_t time){ - apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000); +void Monitor::wait(){ + CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex)); +} + +void Monitor::wait(int64_t nsecs){ + // APR uses microseconds. + apr_status_t status = apr_thread_cond_timedwait( + condition, mutex, nsecs/1000); if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status); } -void qpid::sys::Monitor::notify(){ +void Monitor::notify(){ CHECK_APR_SUCCESS(apr_thread_cond_signal(condition)); } -void qpid::sys::Monitor::notifyAll(){ +void Monitor::notifyAll(){ CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition)); } -void qpid::sys::Monitor::acquire(){ - CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); -} - -void qpid::sys::Monitor::release(){ - CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); -} diff --git a/qpid/cpp/src/qpid/sys/Monitor.h b/qpid/cpp/src/qpid/apr/Monitor.h index ddda613b87..a51baf8d94 100644 --- a/qpid/cpp/src/qpid/sys/Monitor.h +++ b/qpid/cpp/src/qpid/apr/Monitor.h @@ -18,38 +18,57 @@ #ifndef _Monitor_ #define _Monitor_ +#include <boost/noncopyable.hpp> +#include <qpid/sys/Time.h> #include "apr-1/apr_thread_mutex.h" #include "apr-1/apr_thread_cond.h" -#include "qpid/sys/Monitor.h" +#include "APRBase.h" namespace qpid { namespace sys { -class Monitor +template <class L> +class ScopedLock { - apr_pool_t* pool; - apr_thread_mutex_t* mutex; - apr_thread_cond_t* condition; + public: + ScopedLock(L& l) : mutex(l) { l.lock(); } + ~ScopedLock() { mutex.unlock(); } + private: + L& mutex; +}; + +class Mutex : private boost::noncopyable +{ public: - Monitor(); - virtual ~Monitor(); - virtual void wait(); - virtual void wait(u_int64_t time); - virtual void notify(); - virtual void notifyAll(); - virtual void acquire(); - virtual void release(); + typedef ScopedLock<Mutex> ScopedLock; + + Mutex(); + ~Mutex(); + void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); } + void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); } + void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); } + + protected: + apr_thread_mutex_t* mutex; }; -class Locker +/** A condition variable and a mutex */ +class Monitor : public Mutex { public: - Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); } - ~Locker() { monitor.release(); } + Monitor(); + ~Monitor(); + void wait(); + void wait(int64_t nsecs); + void notify(); + void notifyAll(); + private: - Monitor& monitor; + apr_thread_cond_t* condition; }; + + }} diff --git a/qpid/cpp/src/qpid/sys/Thread.cpp b/qpid/cpp/src/qpid/apr/Thread.cpp index 4fb9915993..6d5cadb009 100644 --- a/qpid/cpp/src/qpid/sys/Thread.cpp +++ b/qpid/cpp/src/qpid/apr/Thread.cpp @@ -15,36 +15,41 @@ * limitations under the License. * */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/Thread.h" -#include "apr-1/apr_portable.h" + +#include "Thread.h" +#include "APRPool.h" +#include "APRBase.h" +#include <apr-1/apr_portable.h> using namespace qpid::sys; +using qpid::sys::Runnable; -void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){ - ((Runnable*) data)->run(); +namespace { +void* APR_THREAD_FUNC run(apr_thread_t* thread, void *data) { + reinterpret_cast<Runnable*>(data)->run(); CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS)); return NULL; } - -Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {} - -Thread::~Thread(){ } -void Thread::start(){ - CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool)); +Thread::Thread() : thread(0) {} + +Thread::Thread(Runnable* runnable) { + CHECK_APR_SUCCESS( + apr_thread_create(&thread, NULL, run, runnable, APRPool::get())); } void Thread::join(){ apr_status_t status; - if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner)); + if (thread != 0) + CHECK_APR_SUCCESS(apr_thread_join(&status, thread)); } -void Thread::interrupt(){ - if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS)); -} +Thread::Thread(apr_thread_t* t) : thread(t) {} -unsigned int qpid::sys::Thread::currentThread(){ - return apr_os_thread_current(); +Thread Thread::current(){ + apr_thread_t* thr; + apr_os_thread_t osthr = apr_os_thread_current(); + CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get())); + return Thread(thr); } diff --git a/qpid/cpp/src/qpid/sys/ThreadFactory.h b/qpid/cpp/src/qpid/apr/Thread.h index 9b7126272a..0c717dea70 100644 --- a/qpid/cpp/src/qpid/sys/ThreadFactory.h +++ b/qpid/cpp/src/qpid/apr/Thread.h @@ -1,3 +1,6 @@ +#ifndef _apr_Thread_h +#define _apr_Thread_h + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,30 +18,28 @@ * limitations under the License. * */ -#ifndef _ThreadFactory_ -#define _ThreadFactory_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/ThreadFactory.h" -#include "qpid/sys/Runnable.h" +#include <apr-1/apr_thread_proc.h> +#include <qpid/sys/Runnable.h> namespace qpid { namespace sys { - class ThreadFactory - { - apr_pool_t* pool; - public: - ThreadFactory(); - virtual ~ThreadFactory(); - virtual Thread* create(Runnable* runnable); - }; +class Thread +{ + + public: + Thread(); + explicit Thread(qpid::sys::Runnable*); + void join(); + static Thread current(); -} -} + private: + Thread(apr_thread_t*); + + apr_thread_t* thread; +}; +}} -#endif +#endif /*!_apr_Thread_h*/ diff --git a/qpid/cpp/src/qpid/sys/Time.cpp b/qpid/cpp/src/qpid/apr/Time.cpp index c3512b8df3..8b5590481d 100644 --- a/qpid/cpp/src/qpid/sys/Time.cpp +++ b/qpid/cpp/src/qpid/apr/Time.cpp @@ -17,13 +17,23 @@ */ #include <qpid/sys/Time.h> -#include <apr-1/apr_time.h> +#include "apr-1/apr_time.h" namespace qpid { namespace sys { -Time Time::now() { - return Time(apr_time_now()*1000); +int64_t getTimeNsecs() +{ + // APR returns microseconds. + return apr_time_now() * 1000; } +int64_t getTimeMsecs() +{ + // APR returns microseconds. + return apr_time_now() / 1000; +} + + }} + diff --git a/qpid/cpp/src/qpid/broker/AutoDelete.cpp b/qpid/cpp/src/qpid/broker/AutoDelete.cpp index 434bd4a3a0..d96105ba7d 100644 --- a/qpid/cpp/src/qpid/broker/AutoDelete.cpp +++ b/qpid/cpp/src/qpid/broker/AutoDelete.cpp @@ -18,26 +18,23 @@ #include "qpid/broker/AutoDelete.h" using namespace qpid::broker; +using namespace qpid::sys; -AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry), - period(_period), - stopped(true), - runner(0){} +AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) + : registry(_registry), period(_period), stopped(true) { } void AutoDelete::add(Queue::shared_ptr const queue){ - lock.acquire(); + Mutex::ScopedLock l(lock); queues.push(queue); - lock.release(); } Queue::shared_ptr const AutoDelete::pop(){ Queue::shared_ptr next; - lock.acquire(); + Mutex::ScopedLock l(lock); if(!queues.empty()){ next = queues.front(); queues.pop(); } - lock.release(); return next; } @@ -59,35 +56,27 @@ void AutoDelete::process(){ } void AutoDelete::run(){ - monitor.acquire(); + Monitor::ScopedLock l(monitor); while(!stopped){ process(); - monitor.wait(period); + monitor.wait(msecsToNsecs(period)); } - monitor.release(); } void AutoDelete::start(){ - monitor.acquire(); + Monitor::ScopedLock l(monitor); if(stopped){ - runner = factory.create(this); stopped = false; - monitor.release(); - runner->start(); - }else{ - monitor.release(); + runner = Thread(this); } } void AutoDelete::stop(){ - monitor.acquire(); - if(!stopped){ + { + Monitor::ScopedLock l(monitor); + if(stopped) return; stopped = true; - monitor.notify(); - monitor.release(); - runner->join(); - delete runner; - }else{ - monitor.release(); } + monitor.notify(); + runner.join(); } diff --git a/qpid/cpp/src/qpid/broker/AutoDelete.h b/qpid/cpp/src/qpid/broker/AutoDelete.h index f3347e6cc5..e0efe7b399 100644 --- a/qpid/cpp/src/qpid/broker/AutoDelete.h +++ b/qpid/cpp/src/qpid/broker/AutoDelete.h @@ -20,22 +20,21 @@ #include <iostream> #include <queue> -#include "qpid/sys/Monitor.h" +#include <qpid/sys/Monitor.h> #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" -#include "qpid/sys/ThreadFactory.h" +#include "qpid/sys/Thread.h" namespace qpid { namespace broker{ - class AutoDelete : private virtual qpid::sys::Runnable{ - qpid::sys::ThreadFactory factory; - qpid::sys::Monitor lock; + class AutoDelete : private qpid::sys::Runnable { + qpid::sys::Mutex lock; qpid::sys::Monitor monitor; std::queue<Queue::shared_ptr> queues; QueueRegistry* const registry; - const u_int32_t period; + u_int32_t period; volatile bool stopped; - qpid::sys::Thread* runner; + qpid::sys::Thread runner; Queue::shared_ptr const pop(); void process(); diff --git a/qpid/cpp/src/qpid/broker/Channel.cpp b/qpid/cpp/src/qpid/broker/Channel.cpp index 967c5855fa..947a97ae7c 100644 --- a/qpid/cpp/src/qpid/broker/Channel.cpp +++ b/qpid/cpp/src/qpid/broker/Channel.cpp @@ -105,7 +105,7 @@ void Channel::rollback(){ } void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ - Locker locker(deliveryLock); + Mutex::ScopedLock locker(deliveryLock); u_int64_t deliveryTag = currentDeliveryTag++; if(ackExpected){ @@ -118,7 +118,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar } bool Channel::checkPrefetch(Message::shared_ptr& msg){ - Locker locker(deliveryLock); + Mutex::ScopedLock locker(deliveryLock); bool countOk = !prefetchCount || prefetchCount > unacked.size(); bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); return countOk && sizeOk; @@ -191,7 +191,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers }else{ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); if(i == unacked.end()){ @@ -219,7 +219,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ } void Channel::recover(bool requeue){ - Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery if(requeue){ outstanding.reset(); @@ -234,7 +234,7 @@ void Channel::recover(bool requeue){ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ - Locker locker(deliveryLock); + Mutex::ScopedLock locker(deliveryLock); u_int64_t myDeliveryTag = currentDeliveryTag++; msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); if(ackExpected){ diff --git a/qpid/cpp/src/qpid/broker/Channel.h b/qpid/cpp/src/qpid/broker/Channel.h index 56f0e6b4af..24dbf728ba 100644 --- a/qpid/cpp/src/qpid/broker/Channel.h +++ b/qpid/cpp/src/qpid/broker/Channel.h @@ -77,7 +77,7 @@ namespace qpid { u_int32_t framesize; NameGenerator tagGenerator; std::list<DeliveryRecord> unacked; - qpid::sys::Monitor deliveryLock; + qpid::sys::Mutex deliveryLock; TxBuffer txBuffer; AccumulatedAck accumulatedAck; TransactionalStore* store; diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp index 46693f6f3c..3f9d23cdc7 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp +++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp @@ -21,24 +21,24 @@ using namespace qpid::broker; using namespace qpid::framing; +using namespace qpid::sys; DirectExchange::DirectExchange(const string& _name) : Exchange(_name) { } void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); + Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); if(i == queues.end()){ bindings[routingKey].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); } - lock.release(); } void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue); @@ -48,11 +48,10 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F bindings.erase(routingKey); } } - lock.release(); } void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Mutex::ScopedLock l(lock); std::vector<Queue::shared_ptr>& queues(bindings[routingKey]); int count(0); for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){ @@ -61,7 +60,6 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTabl if(!count){ std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl; } - lock.release(); } DirectExchange::~DirectExchange(){ diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h index a452fe3b4b..0ee9ce2705 100644 --- a/qpid/cpp/src/qpid/broker/DirectExchange.h +++ b/qpid/cpp/src/qpid/broker/DirectExchange.h @@ -30,7 +30,7 @@ namespace qpid { namespace broker { class DirectExchange : public virtual Exchange{ std::map<string, std::vector<Queue::shared_ptr> > bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp index 6f6c759aa2..1c3a4af026 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -26,7 +26,7 @@ using namespace qpid::sys; using std::pair; pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); ExchangeMap::iterator i = exchanges.find(name); if (i == exchanges.end()) { Exchange::shared_ptr exchange; @@ -50,12 +50,12 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c } void ExchangeRegistry::destroy(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); exchanges.erase(name); } Exchange::shared_ptr ExchangeRegistry::get(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); return exchanges[name]; } diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h index 33deb743f4..5d4cf10de8 100644 --- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h @@ -29,7 +29,7 @@ namespace broker { class ExchangeRegistry{ typedef std::map<string, Exchange::shared_ptr> ExchangeMap; ExchangeMap exchanges; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException); void destroy(const string& name); diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp index 8f5143c8c0..2f8d4eadb2 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp @@ -26,7 +26,7 @@ using namespace qpid::sys; FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {} void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); // Add if not already present. Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i == bindings.end()) { @@ -36,7 +36,7 @@ void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie } void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue); if (i != bindings.end()) { bindings.erase(i); @@ -45,7 +45,7 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey* } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){ msg.deliverTo(*i); } diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h index 53b5c39789..910acdc203 100644 --- a/qpid/cpp/src/qpid/broker/FanOutExchange.h +++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h @@ -31,7 +31,7 @@ namespace broker { class FanOutExchange : public virtual Exchange { std::vector<Queue::shared_ptr> bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp index 5d5cf2392c..0c4c290bbd 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp @@ -41,7 +41,7 @@ namespace { HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { } void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); std::string what = args->getString("x-match"); if (what != all && what != any) { THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange."); @@ -51,7 +51,7 @@ void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fi } void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); Bindings::iterator i = std::find(bindings.begin(),bindings.end(), Binding(*args, queue)); if (i != bindings.end()) bindings.erase(i); @@ -59,7 +59,7 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){ - Locker locker(lock);; + Mutex::ScopedLock locker(lock);; for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (match(i->first, *args)) msg.deliverTo(i->second); } diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h index 3cd25739f7..77af612fe6 100644 --- a/qpid/cpp/src/qpid/broker/HeadersExchange.h +++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h @@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange { typedef std::vector<Binding> Bindings; Bindings bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 24fc996f1f..e3a98ae8f5 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -15,14 +15,12 @@ * limitations under the License. * */ -#include "qpid/sys/Monitor.h" #include "qpid/broker/Message.h" #include <iostream> using namespace boost; using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::sys; Message::Message(const ConnectionToken* const _publisher, const string& _exchange, const string& _routingKey, diff --git a/qpid/cpp/src/qpid/broker/Prefetch.h b/qpid/cpp/src/qpid/broker/Prefetch.h index 97abb4102d..d56799f835 100644 --- a/qpid/cpp/src/qpid/broker/Prefetch.h +++ b/qpid/cpp/src/qpid/broker/Prefetch.h @@ -30,7 +30,7 @@ namespace qpid { u_int32_t size; u_int16_t count; - void reset(); + void reset() { size = 0; count = 0; } }; } } diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 000552715b..46b14a23f5 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -38,7 +38,7 @@ Queue::Queue(const string& _name, u_int32_t _autodelete, exclusive(0), persistenceId(0) { - if(autodelete) lastUsed = Time::now().msecs(); + if(autodelete) lastUsed = getTimeMsecs(); } Queue::~Queue(){ @@ -58,7 +58,7 @@ void Queue::deliver(Message::shared_ptr& msg){ } void Queue::process(Message::shared_ptr& msg){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(queueing || !dispatch(msg)){ queueing = true; messages.push(msg); @@ -90,7 +90,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){ } bool Queue::startDispatching(){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(queueing && !dispatching){ dispatching = true; return true; @@ -102,7 +102,7 @@ bool Queue::startDispatching(){ void Queue::dispatch(){ bool proceed = startDispatching(); while(proceed){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(!messages.empty() && dispatch(messages.front())){ messages.pop(); }else{ @@ -114,7 +114,7 @@ void Queue::dispatch(){ } void Queue::consume(Consumer* c, bool requestExclusive){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); if(exclusive) throw ExclusiveAccessException(); if(requestExclusive){ if(!consumers.empty()) throw ExclusiveAccessException(); @@ -126,14 +126,14 @@ void Queue::consume(Consumer* c, bool requestExclusive){ } void Queue::cancel(Consumer* c){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); consumers.erase(find(consumers.begin(), consumers.end(), c)); - if(autodelete && consumers.empty()) lastUsed = Time::now().msecs(); + if(autodelete && consumers.empty()) lastUsed = getTimeMsecs(); if(exclusive == c) exclusive = 0; } Message::shared_ptr Queue::dequeue(){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); Message::shared_ptr msg; if(!messages.empty()){ msg = messages.front(); @@ -143,25 +143,25 @@ Message::shared_ptr Queue::dequeue(){ } u_int32_t Queue::purge(){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); int count = messages.size(); while(!messages.empty()) messages.pop(); return count; } u_int32_t Queue::getMessageCount() const{ - Locker locker(lock); + Mutex::ScopedLock locker(lock); return messages.size(); } u_int32_t Queue::getConsumerCount() const{ - Locker locker(lock); + Mutex::ScopedLock locker(lock); return consumers.size(); } bool Queue::canAutoDelete() const{ - Locker locker(lock); - return lastUsed && (Time::now().msecs() - lastUsed > autodelete); + Mutex::ScopedLock locker(lock); + return lastUsed && (getTimeMsecs() - lastUsed > autodelete); } void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index fd0bad43ff..c146de1353 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -27,7 +27,6 @@ #include "qpid/broker/Consumer.h" #include "qpid/broker/Message.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/Time.h" namespace qpid { namespace broker { @@ -55,7 +54,7 @@ namespace qpid { bool queueing; bool dispatching; int next; - mutable qpid::sys::Monitor lock; + mutable qpid::sys::Mutex lock; int64_t lastUsed; Consumer* exclusive; u_int64_t persistenceId; diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index aa05db9a16..1976da812d 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -16,7 +16,6 @@ * */ #include "qpid/broker/QueueRegistry.h" -#include "qpid/sys/Monitor.h" #include "qpid/broker/SessionHandlerImpl.h" #include <sstream> #include <assert.h> @@ -32,7 +31,7 @@ std::pair<Queue::shared_ptr, bool> QueueRegistry::declare(const string& declareName, bool durable, u_int32_t autoDelete, const ConnectionToken* owner) { - Locker locker(lock); + Mutex::ScopedLock locker(lock); string name = declareName.empty() ? generateName() : declareName; assert(!name.empty()); QueueMap::iterator i = queues.find(name); @@ -46,12 +45,12 @@ QueueRegistry::declare(const string& declareName, bool durable, } void QueueRegistry::destroy(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); queues.erase(name); } Queue::shared_ptr QueueRegistry::find(const string& name){ - Locker locker(lock); + Mutex::ScopedLock locker(lock); QueueMap::iterator i = queues.find(name); if (i == queues.end()) { return Queue::shared_ptr(); diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index c2fc1cc830..e3d03a06b1 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -76,7 +76,7 @@ class QueueRegistry{ private: typedef std::map<string, Queue::shared_ptr> QueueMap; QueueMap queues; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; int counter; MessageStore* const store; }; diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp index dc252d208f..eecd9918d4 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp +++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp @@ -21,7 +21,7 @@ using namespace qpid::broker; using namespace qpid::framing; - +using namespace qpid::sys; // TODO aconway 2006-09-20: More efficient matching algorithm. // Areas for improvement: @@ -115,15 +115,14 @@ bool TopicPattern::match(const Tokens& target) const TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { } void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){ - lock.acquire(); + Monitor::ScopedLock l(lock); TopicPattern routingPattern(routingKey); bindings[routingPattern].push_back(queue); queue->bound(new ExchangeBinding(this, queue, routingKey, args)); - lock.release(); } void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Monitor::ScopedLock l(lock); BindingMap::iterator bi = bindings.find(TopicPattern(routingKey)); Queue::vector& qv(bi->second); if (bi == bindings.end()) return; @@ -131,12 +130,11 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi if(q == qv.end()) return; qv.erase(q); if(qv.empty()) bindings.erase(bi); - lock.release(); } void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){ - lock.acquire(); + Monitor::ScopedLock l(lock); for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) { if (i->first.match(routingKey)) { Queue::vector& qv(i->second); @@ -145,7 +143,6 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable } } } - lock.release(); } TopicExchange::~TopicExchange() {} diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h index e3b9040cb2..a3e133845f 100644 --- a/qpid/cpp/src/qpid/broker/TopicExchange.h +++ b/qpid/cpp/src/qpid/broker/TopicExchange.h @@ -71,7 +71,7 @@ class TopicPattern : public Tokens class TopicExchange : public virtual Exchange{ typedef std::map<TopicPattern, Queue::vector> BindingMap; BindingMap bindings; - qpid::sys::Monitor lock; + qpid::sys::Mutex lock; public: static const std::string typeName; diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp index a7b30f2f94..fad648f27d 100644 --- a/qpid/cpp/src/qpid/client/Channel.cpp +++ b/qpid/cpp/src/qpid/client/Channel.cpp @@ -17,7 +17,6 @@ */ #include "qpid/client/Channel.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/ThreadFactory.h" #include "qpid/client/Message.h" #include "qpid/QpidError.h" @@ -29,26 +28,15 @@ using namespace qpid::sys; Channel::Channel(bool _transactional, u_int16_t _prefetch) : id(0), con(0), - dispatcher(0), out(0), incoming(0), closed(true), prefetch(_prefetch), transactional(_transactional) -{ - threadFactory = new ThreadFactory(); - dispatchMonitor = new Monitor(); - retrievalMonitor = new Monitor(); -} +{ } Channel::~Channel(){ - if(dispatcher){ - stop(); - delete dispatcher; - } - delete retrievalMonitor; - delete dispatchMonitor; - delete threadFactory; + stop(); } void Channel::setPrefetch(u_int16_t _prefetch){ @@ -176,9 +164,9 @@ void Channel::cancelAll(){ } void Channel::retrieve(Message& msg){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); while(retrieved == 0){ - retrievalMonitor->wait(); + retrievalMonitor.wait(); } msg.header = retrieved->getHeader(); @@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){ retrieved->getData(msg.data); delete retrieved; retrieved = 0; - - retrievalMonitor->release(); } bool Channel::get(Message& msg, const Queue& queue, int ackMode){ @@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){ } void Channel::start(){ - dispatcher = threadFactory->create(this); - dispatcher->start(); + dispatcher = Thread(this); } void Channel::stop(){ - closed = true; - dispatchMonitor->acquire(); - dispatchMonitor->notify(); - dispatchMonitor->release(); - if(dispatcher){ - dispatcher->join(); + { + Monitor::ScopedLock l(dispatchMonitor); + closed = true; + dispatchMonitor.notify(); } + dispatcher.join(); } void Channel::run(){ @@ -335,30 +319,27 @@ void Channel::run(){ void Channel::enqueue(){ if(incoming->isResponse()){ - retrievalMonitor->acquire(); + Monitor::ScopedLock l(retrievalMonitor); retrieved = incoming; - retrievalMonitor->notify(); - retrievalMonitor->release(); + retrievalMonitor.notify(); }else{ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); messages.push(incoming); - dispatchMonitor->notify(); - dispatchMonitor->release(); + dispatchMonitor.notify(); } incoming = 0; } IncomingMessage* Channel::dequeue(){ - dispatchMonitor->acquire(); + Monitor::ScopedLock l(dispatchMonitor); while(messages.empty() && !closed){ - dispatchMonitor->wait(); + dispatchMonitor.wait(); } IncomingMessage* msg = 0; if(!messages.empty()){ msg = messages.front(); messages.pop(); } - dispatchMonitor->release(); return msg; } diff --git a/qpid/cpp/src/qpid/client/Channel.h b/qpid/cpp/src/qpid/client/Channel.h index fa8cd3afe0..daf2b6f9d9 100644 --- a/qpid/cpp/src/qpid/client/Channel.h +++ b/qpid/cpp/src/qpid/client/Channel.h @@ -24,9 +24,6 @@ #define _Channel_ #include "qpid/framing/amqp_framing.h" - -#include "qpid/sys/ThreadFactory.h" - #include "qpid/client/Connection.h" #include "qpid/client/Exchange.h" #include "qpid/client/IncomingMessage.h" @@ -51,15 +48,14 @@ namespace client { u_int16_t id; Connection* con; - qpid::sys::ThreadFactory* threadFactory; - qpid::sys::Thread* dispatcher; + qpid::sys::Thread dispatcher; qpid::framing::OutputHandler* out; IncomingMessage* incoming; ResponseHandler responses; std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume IncomingMessage* retrieved;//holds response to basic.get - qpid::sys::Monitor* dispatchMonitor; - qpid::sys::Monitor* retrievalMonitor; + qpid::sys::Monitor dispatchMonitor; + qpid::sys::Monitor retrievalMonitor; std::map<std::string, Consumer*> consumers; ReturnedMessageHandler* returnsHandler; bool closed; diff --git a/qpid/cpp/src/qpid/client/ResponseHandler.cpp b/qpid/cpp/src/qpid/client/ResponseHandler.cpp index 16989e2c25..5d2e03c9d9 100644 --- a/qpid/cpp/src/qpid/client/ResponseHandler.cpp +++ b/qpid/cpp/src/qpid/client/ResponseHandler.cpp @@ -19,40 +19,35 @@ #include "qpid/sys/Monitor.h" #include "qpid/QpidError.h" -qpid::client::ResponseHandler::ResponseHandler() : waiting(false){ - monitor = new qpid::sys::Monitor(); -} +using namespace qpid::sys; -qpid::client::ResponseHandler::~ResponseHandler(){ - delete monitor; -} +qpid::client::ResponseHandler::ResponseHandler() : waiting(false){} + +qpid::client::ResponseHandler::~ResponseHandler(){} bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){ return expected.match(response.get()); } void qpid::client::ResponseHandler::waitForResponse(){ - monitor->acquire(); + Monitor::ScopedLock l(monitor); if(waiting){ - monitor->wait(); + monitor.wait(); } - monitor->release(); } void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){ response = _response; - monitor->acquire(); + Monitor::ScopedLock l(monitor); waiting = false; - monitor->notify(); - monitor->release(); + monitor.notify(); } void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){ - monitor->acquire(); + Monitor::ScopedLock l(monitor); if(waiting){ - monitor->wait(); + monitor.wait(); } - monitor->release(); if(!validate(expected)){ THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error"); } diff --git a/qpid/cpp/src/qpid/client/ResponseHandler.h b/qpid/cpp/src/qpid/client/ResponseHandler.h index ac4c351211..247c974c14 100644 --- a/qpid/cpp/src/qpid/client/ResponseHandler.h +++ b/qpid/cpp/src/qpid/client/ResponseHandler.h @@ -17,7 +17,7 @@ */ #include <string> #include "qpid/framing/amqp_framing.h" -#include "qpid/sys/Monitor.h" +#include <qpid/sys/Monitor.h> #ifndef _ResponseHandler_ #define _ResponseHandler_ @@ -28,7 +28,7 @@ namespace qpid { class ResponseHandler{ bool waiting; qpid::framing::AMQMethodBody::shared_ptr response; - qpid::sys::Monitor* monitor; + qpid::sys::Monitor monitor; public: ResponseHandler(); diff --git a/qpid/cpp/src/qpid/sys/LFSessionContext.h b/qpid/cpp/src/qpid/sys/LFSessionContext.h deleted file mode 100644 index 92f52ccf83..0000000000 --- a/qpid/cpp/src/qpid/sys/LFSessionContext.h +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#ifndef _LFSessionContext_ -#define _LFSessionContext_ - -#include <queue> - -#include "apr-1/apr_network_io.h" -#include "apr-1/apr_poll.h" -#include "apr-1/apr_time.h" - -#include "qpid/framing/AMQFrame.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/APRSocket.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/LFProcessor.h" -#include "qpid/sys/SessionContext.h" -#include "qpid/sys/SessionHandler.h" - -namespace qpid { -namespace sys { - - - class LFSessionContext : public virtual SessionContext - { - const bool debug; - APRSocket socket; - bool initiated; - - qpid::framing::Buffer in; - qpid::framing::Buffer out; - - SessionHandler* handler; - LFProcessor* const processor; - - apr_pollfd_t fd; - - std::queue<qpid::framing::AMQFrame*> framesToWrite; - qpid::sys::Monitor writeLock; - - bool processing; - bool closing; - - //these are just for debug, as a crude way of detecting concurrent access - volatile unsigned int reading; - volatile unsigned int writing; - - static qpid::sys::Monitor logLock; - void log(const std::string& desc, qpid::framing::AMQFrame* const frame); - - public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, - LFProcessor* const processor, - bool debug = false); - ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); - virtual void close(); - void read(); - void write(); - void init(SessionHandler* handler); - void startProcessing(); - void stopProcessing(); - void handleClose(); - void shutdown(); - inline apr_pollfd_t* const getFd(){ return &fd; } - inline bool isClosed(){ return !socket.isOpen(); } - }; - -} -} - - -#endif diff --git a/qpid/cpp/src/qpid/sys/Runnable.cpp b/qpid/cpp/src/qpid/sys/Runnable.cpp deleted file mode 100644 index d7d9e968cc..0000000000 --- a/qpid/cpp/src/qpid/sys/Runnable.cpp +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "qpid/sys/Runnable.h" -qpid::sys::Runnable::~Runnable() {} diff --git a/qpid/cpp/src/qpid/sys/Runnable.h b/qpid/cpp/src/qpid/sys/Runnable.h index ce13eb2039..c06698bb93 100644 --- a/qpid/cpp/src/qpid/sys/Runnable.h +++ b/qpid/cpp/src/qpid/sys/Runnable.h @@ -21,15 +21,15 @@ namespace qpid { namespace sys { - class Runnable - { - public: - virtual ~Runnable(); - virtual void run() = 0; - }; +/** Base class for classes that run in a thread. */ +class Runnable +{ + public: + virtual ~Runnable() {} + virtual void run() = 0; +}; -} -} +}} #endif diff --git a/qpid/cpp/src/qpid/sys/Thread.h b/qpid/cpp/src/qpid/sys/Thread.h index e86bd4a8d2..d884add776 100644 --- a/qpid/cpp/src/qpid/sys/Thread.h +++ b/qpid/cpp/src/qpid/sys/Thread.h @@ -1,3 +1,6 @@ +#ifndef _sys_Thread_h +#define _sys_Thread_h + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -15,34 +18,10 @@ * limitations under the License. * */ -#ifndef _Thread_ -#define _Thread_ - -#include "apr-1/apr_thread_proc.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" - -namespace qpid { -namespace sys { - - class Thread - { - const Runnable* runnable; - apr_pool_t* pool; - apr_thread_t* runner; - public: - Thread(apr_pool_t* pool, Runnable* runnable); - virtual ~Thread(); - virtual void start(); - virtual void join(); - virtual void interrupt(); - static unsigned int currentThread(); - }; +#include <qpid/sys/platform.h> +#include QPID_PLATFORM_H(Thread.h) -} -} -#endif +#endif /*!_sys_Thread_h*/ diff --git a/qpid/cpp/src/qpid/sys/ThreadFactory.cpp b/qpid/cpp/src/qpid/sys/ThreadFactory.cpp deleted file mode 100644 index d33872b9a2..0000000000 --- a/qpid/cpp/src/qpid/sys/ThreadFactory.cpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "qpid/sys/APRBase.h" -#include "qpid/sys/ThreadFactory.h" - -using namespace qpid::sys; - -ThreadFactory::ThreadFactory(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -ThreadFactory::~ThreadFactory(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} - -Thread* ThreadFactory::create(Runnable* runnable){ - return new Thread(pool, runnable); -} diff --git a/qpid/cpp/src/qpid/sys/Time.h b/qpid/cpp/src/qpid/sys/Time.h index 5c7cdfb005..79d17b433b 100644 --- a/qpid/cpp/src/qpid/sys/Time.h +++ b/qpid/cpp/src/qpid/sys/Time.h @@ -24,28 +24,14 @@ namespace qpid { namespace sys { -/** - * Time since the epoch. - */ -class Time -{ - public: - static const int64_t NANOS = 1000000000; - static const int64_t MICROS = 1000000; - static const int64_t MILLIS = 1000; - - static Time now(); - - Time(int64_t nsecs_) : ticks(nsecs_) {} +inline int64_t msecsToNsecs(int64_t msecs) { return msecs * 1000 *1000; } +inline int64_t nsecsToMsecs(int64_t nsecs) { return nsecs / (1000 *1000); } - int64_t nsecs() const { return ticks; } - int64_t usecs() const { return nsecs()/1000; } - int64_t msecs() const { return usecs()/1000; } - int64_t secs() const { return msecs()/1000; } +/** Nanoseconds since epoch */ +int64_t getTimeNsecs(); - private: - int64_t ticks; -}; +/** Milliseconds since epoch */ +int64_t getTimeMsecs(); }} diff --git a/qpid/cpp/src/qpid/broker/Prefetch.cpp b/qpid/cpp/src/qpid/sys/platform.h index 6d9dbda13c..878c724953 100644 --- a/qpid/cpp/src/qpid/broker/Prefetch.cpp +++ b/qpid/cpp/src/qpid/sys/platform.h @@ -1,3 +1,6 @@ +#ifndef _sys_platform_h +#define _sys_platform_h + /* * * Copyright (c) 2006 The Apache Software Foundation @@ -16,11 +19,11 @@ * */ -#include "qpid/broker/Prefetch.h" +/** + * Macros for including platform-specific headers and aliasing + * platform-specific classes into the qpid::sys namespace. + */ -using namespace qpid::broker; +#define QPID_PLATFORM_H(HEADER) <qpid/PLATFORM/HEADER> -void Prefetch::reset(){ - size = 0; - count = 0; -} +#endif /*!_sys_platform_h*/ diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp index f170aed95e..2974cc8654 100644 --- a/qpid/cpp/src/qpidd.cpp +++ b/qpid/cpp/src/qpidd.cpp @@ -24,8 +24,11 @@ using namespace qpid::broker; using namespace qpid::sys; +Broker::shared_ptr broker; + void handle_signal(int /*signal*/){ std::cout << "Shutting down..." << std::endl; + broker->shutdown(); } int main(int argc, char** argv) @@ -36,8 +39,8 @@ int main(int argc, char** argv) if(config.isHelp()){ config.usage(); }else{ + broker = Broker::create(config); apr_signal(SIGINT, handle_signal); - Broker::shared_ptr broker = Broker::create(config); broker->run(); } return 0; diff --git a/qpid/cpp/test/client/client_test.cpp b/qpid/cpp/test/client/client_test.cpp index 0e57babbef..8e9c58179a 100644 --- a/qpid/cpp/test/client/client_test.cpp +++ b/qpid/cpp/test/client/client_test.cpp @@ -36,9 +36,7 @@ public: inline virtual void received(Message& /*msg*/){ std::cout << "Received message " /**<< msg **/<< std::endl; - monitor->acquire(); monitor->notify(); - monitor->release(); } }; @@ -77,12 +75,12 @@ int main(int argc, char**) msg.setData(data); channel.publish(msg, exchange, "MyTopic"); std::cout << "Published message." << std::endl; - - monitor.acquire(); - monitor.wait(); - monitor.release(); - + { + Monitor::ScopedLock l(monitor); + monitor.wait(); + } + con.closeChannel(&channel); std::cout << "Closed channel." << std::endl; con.close(); diff --git a/qpid/cpp/test/client/topic_listener.cpp b/qpid/cpp/test/client/topic_listener.cpp index 0f383134b5..9aa93bc2b5 100644 --- a/qpid/cpp/test/client/topic_listener.cpp +++ b/qpid/cpp/test/client/topic_listener.cpp @@ -21,11 +21,12 @@ #include "qpid/client/Exchange.h" #include "qpid/client/MessageListener.h" #include "qpid/client/Queue.h" -#include <apr-1/apr_time.h> +#include <qpid/sys/Time.h> #include <iostream> #include <sstream> using namespace qpid::client; +using namespace qpid::sys; class Listener : public MessageListener{ Channel* const channel; @@ -33,7 +34,7 @@ class Listener : public MessageListener{ const bool transactional; bool init; int count; - apr_time_t start; + int64_t start; void shutdown(); void report(); @@ -101,7 +102,7 @@ Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) : void Listener::received(Message& message){ if(!init){ - start = apr_time_as_msec(apr_time_now()); + start = getTimeMsecs(); count = 0; init = true; } @@ -123,8 +124,8 @@ void Listener::shutdown(){ } void Listener::report(){ - apr_time_t finish = apr_time_as_msec(apr_time_now()); - apr_time_t time = finish - start; + int64_t finish = getTimeMsecs(); + int64_t time = finish - start; std::stringstream reportstr; reportstr << "Received " << count << " messages in " << time << " ms."; Message msg; diff --git a/qpid/cpp/test/client/topic_publisher.cpp b/qpid/cpp/test/client/topic_publisher.cpp index 119d275cfd..22c36ea9e3 100644 --- a/qpid/cpp/test/client/topic_publisher.cpp +++ b/qpid/cpp/test/client/topic_publisher.cpp @@ -23,7 +23,7 @@ #include "qpid/client/Queue.h" #include "qpid/sys/Monitor.h" #include "unistd.h" -#include <apr-1/apr_time.h> +#include <qpid/sys/Time.h> #include <cstdlib> #include <iostream> @@ -43,7 +43,7 @@ class Publisher : public MessageListener{ public: Publisher(Channel* channel, const std::string& controlTopic, bool tx); virtual void received(Message& msg); - apr_time_t publish(int msgs, int listeners, int size); + int64_t publish(int msgs, int listeners, int size); void terminate(); }; @@ -105,19 +105,19 @@ int main(int argc, char** argv){ channel.start(); int batchSize(args.getBatches()); - apr_time_t max(0); - apr_time_t min(0); - apr_time_t sum(0); + int64_t max(0); + int64_t min(0); + int64_t sum(0); for(int i = 0; i < batchSize; i++){ if(i > 0 && args.getDelay()) sleep(args.getDelay()); - apr_time_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize()); + int64_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize()); if(!max || time > max) max = time; if(!min || time < min) min = time; sum += time; - std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl; + std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << nsecsToMsecs(time) << "ms" << std::endl; } publisher.terminate(); - apr_time_t avg = sum / batchSize; + int64_t avg = sum / batchSize; if(batchSize > 1){ std::cout << batchSize << " batches completed. avg=" << avg << ", max=" << max << ", min=" << min << std::endl; @@ -135,12 +135,11 @@ Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool t void Publisher::received(Message& msg){ //count responses and when all are received end the current batch - monitor.acquire(); + Monitor::ScopedLock l(monitor); if(--count == 0){ monitor.notify(); } std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl; - monitor.release(); } void Publisher::waitForCompletion(int msgs){ @@ -148,26 +147,27 @@ void Publisher::waitForCompletion(int msgs){ monitor.wait(); } -apr_time_t Publisher::publish(int msgs, int listeners, int size){ - monitor.acquire(); +int64_t Publisher::publish(int msgs, int listeners, int size){ Message msg; msg.setData(generateData(size)); - apr_time_t start(apr_time_as_msec(apr_time_now())); - for(int i = 0; i < msgs; i++){ - channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); - } - //send report request - Message reportRequest; - reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); - if(transactional){ - channel->commit(); + int64_t start = getTimeMsecs(); + { + Monitor::ScopedLock l(monitor); + for(int i = 0; i < msgs; i++){ + channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + } + //send report request + Message reportRequest; + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic); + if(transactional){ + channel->commit(); + } + + waitForCompletion(listeners); } - waitForCompletion(listeners); - monitor.release(); - apr_time_t finish(apr_time_as_msec(apr_time_now())); - + int64_t finish(getTimeMsecs()); return finish - start; } diff --git a/qpid/cpp/test/unit/qpid/sys/APRBaseTest.cpp b/qpid/cpp/test/unit/qpid/apr/APRBaseTest.cpp index fc0c7dd9e1..a0f88f78db 100644 --- a/qpid/cpp/test/unit/qpid/sys/APRBaseTest.cpp +++ b/qpid/cpp/test/unit/qpid/apr/APRBaseTest.cpp @@ -15,7 +15,7 @@ * limitations under the License. * */ -#include "qpid/sys/APRBase.h" +#include "qpid/apr/APRBase.h" #include <qpid_test_plugin.h> #include <iostream> diff --git a/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp b/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp index 1dbbeda827..c0b9225483 100644 --- a/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp +++ b/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp @@ -15,14 +15,12 @@ * limitations under the License. * */ -#include "qpid/sys/APRBase.h" -#include "qpid/broker/Message.h" +#include <qpid/broker/Message.h> #include <qpid_test_plugin.h> #include <iostream> using namespace qpid::broker; using namespace qpid::framing; -using namespace qpid::sys; class MessageTest : public CppUnit::TestCase { @@ -34,7 +32,6 @@ class MessageTest : public CppUnit::TestCase void testMe() { - APRBase::increment(); const int size(10); for(int i = 0; i < size; i++){ Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true)); diff --git a/qpid/python/tests/basic.py b/qpid/python/tests/basic.py index afdf1a4003..314c20c8a0 100644 --- a/qpid/python/tests/basic.py +++ b/qpid/python/tests/basic.py @@ -260,7 +260,6 @@ class BasicTests(TestBase): for i in range(1, 6): msg = queue.get(timeout=1) self.assertEqual("Message %d" % i, msg.content.body) - try: extra = queue.get(timeout=1) self.fail("Got unexpected 6th message in original queue: " + extra.content.body) |