summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-11-08 17:07:44 +0000
committerAlan Conway <aconway@apache.org>2006-11-08 17:07:44 +0000
commitf34ace35220726fa64f063a0fccc6eeaaa40af3c (patch)
tree77f817feb9560fac0abe22b55327c1057e94bcb9
parent21d986eff6cd096c0e4a3db4c603ad72f64270fa (diff)
downloadqpid-python-f34ace35220726fa64f063a0fccc6eeaaa40af3c.tar.gz
More reorg to separate APR/posix code, work in progress.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@472545 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/Makefile2
-rw-r--r--qpid/cpp/README4
-rw-r--r--qpid/cpp/options.mk3
-rw-r--r--qpid/cpp/src/qpid/apr/APRBase.cpp (renamed from qpid/cpp/src/qpid/sys/APRBase.cpp)4
-rw-r--r--qpid/cpp/src/qpid/apr/APRBase.h (renamed from qpid/cpp/src/qpid/sys/APRBase.h)2
-rw-r--r--qpid/cpp/src/qpid/apr/APRPool.cpp (renamed from qpid/cpp/src/qpid/sys/APRPool.cpp)3
-rw-r--r--qpid/cpp/src/qpid/apr/APRPool.h (renamed from qpid/cpp/src/qpid/sys/APRPool.h)0
-rw-r--r--qpid/cpp/src/qpid/apr/APRSocket.cpp (renamed from qpid/cpp/src/qpid/sys/APRSocket.cpp)5
-rw-r--r--qpid/cpp/src/qpid/apr/APRSocket.h (renamed from qpid/cpp/src/qpid/sys/APRSocket.h)0
-rw-r--r--qpid/cpp/src/qpid/apr/Acceptor.cpp (renamed from qpid/cpp/src/qpid/sys/Acceptor.cpp)5
-rw-r--r--qpid/cpp/src/qpid/apr/Acceptor.h (renamed from qpid/cpp/src/qpid/sys/Acceptor.h)11
-rw-r--r--qpid/cpp/src/qpid/apr/Connector.cpp (renamed from qpid/cpp/src/qpid/sys/Connector.cpp)25
-rw-r--r--qpid/cpp/src/qpid/apr/Connector.h (renamed from qpid/cpp/src/qpid/sys/Connector.h)16
-rw-r--r--qpid/cpp/src/qpid/apr/LFProcessor.cpp (renamed from qpid/cpp/src/qpid/sys/LFProcessor.cpp)93
-rw-r--r--qpid/cpp/src/qpid/apr/LFProcessor.h (renamed from qpid/cpp/src/qpid/sys/LFProcessor.h)9
-rw-r--r--qpid/cpp/src/qpid/apr/LFSessionContext.cpp (renamed from qpid/cpp/src/qpid/sys/LFSessionContext.cpp)39
-rw-r--r--qpid/cpp/src/qpid/apr/LFSessionContext.h87
-rw-r--r--qpid/cpp/src/qpid/apr/Monitor.cpp (renamed from qpid/cpp/src/qpid/sys/Monitor.cpp)50
-rw-r--r--qpid/cpp/src/qpid/apr/Monitor.h (renamed from qpid/cpp/src/qpid/sys/Monitor.h)53
-rw-r--r--qpid/cpp/src/qpid/apr/Thread.cpp (renamed from qpid/cpp/src/qpid/sys/Thread.cpp)39
-rw-r--r--qpid/cpp/src/qpid/apr/Thread.h (renamed from qpid/cpp/src/qpid/sys/ThreadFactory.h)39
-rw-r--r--qpid/cpp/src/qpid/apr/Time.cpp (renamed from qpid/cpp/src/qpid/sys/Time.cpp)16
-rw-r--r--qpid/cpp/src/qpid/broker/AutoDelete.cpp39
-rw-r--r--qpid/cpp/src/qpid/broker/AutoDelete.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Channel.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/Channel.h2
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.cpp10
-rw-r--r--qpid/cpp/src/qpid/broker/DirectExchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/ExchangeRegistry.h2
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/FanOutExchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/HeadersExchange.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/Prefetch.h2
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp26
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h3
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/QueueRegistry.h2
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/TopicExchange.h2
-rw-r--r--qpid/cpp/src/qpid/client/Channel.cpp51
-rw-r--r--qpid/cpp/src/qpid/client/Channel.h10
-rw-r--r--qpid/cpp/src/qpid/client/ResponseHandler.cpp25
-rw-r--r--qpid/cpp/src/qpid/client/ResponseHandler.h4
-rw-r--r--qpid/cpp/src/qpid/sys/LFSessionContext.h88
-rw-r--r--qpid/cpp/src/qpid/sys/Runnable.cpp19
-rw-r--r--qpid/cpp/src/qpid/sys/Runnable.h16
-rw-r--r--qpid/cpp/src/qpid/sys/Thread.h33
-rw-r--r--qpid/cpp/src/qpid/sys/ThreadFactory.cpp35
-rw-r--r--qpid/cpp/src/qpid/sys/Time.h26
-rw-r--r--qpid/cpp/src/qpid/sys/platform.h (renamed from qpid/cpp/src/qpid/broker/Prefetch.cpp)15
-rw-r--r--qpid/cpp/src/qpidd.cpp5
-rw-r--r--qpid/cpp/test/client/client_test.cpp12
-rw-r--r--qpid/cpp/test/client/topic_listener.cpp11
-rw-r--r--qpid/cpp/test/client/topic_publisher.cpp52
-rw-r--r--qpid/cpp/test/unit/qpid/apr/APRBaseTest.cpp (renamed from qpid/cpp/test/unit/qpid/sys/APRBaseTest.cpp)2
-rw-r--r--qpid/cpp/test/unit/qpid/broker/MessageTest.cpp5
-rw-r--r--qpid/python/tests/basic.py1
60 files changed, 458 insertions, 612 deletions
diff --git a/qpid/cpp/Makefile b/qpid/cpp/Makefile
index 36a56425ac..ab542f7086 100644
--- a/qpid/cpp/Makefile
+++ b/qpid/cpp/Makefile
@@ -52,7 +52,7 @@ $(BUILDDIRS):
## Library rules
LIB_common := $(call LIBFILE,common,1.0)
-$(LIB_common): $(call OBJECTS,qpid qpid/framing qpid/sys)
+$(LIB_common): $(call OBJECTS,qpid qpid/framing qpid/sys qpid/$(PLATFORM))
$(LIB_COMMAND)
LIB_client :=$(call LIBFILE,client,1.0)
diff --git a/qpid/cpp/README b/qpid/cpp/README
index 789c535023..2653873b1a 100644
--- a/qpid/cpp/README
+++ b/qpid/cpp/README
@@ -52,8 +52,8 @@ The source tree is structured as follows:
* src/ - .h and .cpp source files, directories mirror namespaces.
* qpid/
* sys/ - system abstractions: threading, IO.
- * posix/ - posix implementation
- * apr/ - portable APR implementation (for client side)
+ * posix/ - posix implementations for sys
+ * apr/ - portable APR implementation for sys (client side)
* framing - encoding/decoding AMQP messages
* client - client classes.
* broker - broker classes.
diff --git a/qpid/cpp/options.mk b/qpid/cpp/options.mk
index ef14480113..59933fafd0 100644
--- a/qpid/cpp/options.mk
+++ b/qpid/cpp/options.mk
@@ -65,7 +65,8 @@ CXXFLAGS_release := -O3 -DNDEBUG
WARN := -Werror -pedantic -Wall -Wextra -Wshadow -Wpointer-arith -Wcast-qual -Wcast-align -Wno-long-long -Wvolatile-register-var -Winvalid-pch
INCLUDES := $(SRCDIRS:%=-I%) $(EXTRA_INCLUDES)
-LDFLAGS := -L$(LIBDIR) $(LDFLAGS_$(PLATFORM))
+DEFINES := -DPLATFORM=$(PLATFORM)
+LDFLAGS := -L$(LIBDIR) $(LDFLAGS_$(PLATFORM))
CXXFLAGS := $(DEFINES) $(WARN) -MMD -fpic $(INCLUDES) $(CXXFLAGS_$(PLATFORM)) $(CXXFLAGS_$(TYPE))
## Macros for linking, must be late evaluated
diff --git a/qpid/cpp/src/qpid/sys/APRBase.cpp b/qpid/cpp/src/qpid/apr/APRBase.cpp
index 91e2b9f428..f629a5381d 100644
--- a/qpid/cpp/src/qpid/sys/APRBase.cpp
+++ b/qpid/cpp/src/qpid/apr/APRBase.cpp
@@ -16,8 +16,8 @@
*
*/
#include <iostream>
-#include "qpid/sys/APRBase.h"
-#include "qpid/QpidError.h"
+#include <qpid/QpidError.h>
+#include "APRBase.h"
using namespace qpid::sys;
diff --git a/qpid/cpp/src/qpid/sys/APRBase.h b/qpid/cpp/src/qpid/apr/APRBase.h
index 9eef07e4c4..b84e9860df 100644
--- a/qpid/cpp/src/qpid/sys/APRBase.h
+++ b/qpid/cpp/src/qpid/apr/APRBase.h
@@ -52,7 +52,7 @@ namespace sys {
void check(apr_status_t status, const std::string& file, const int line);
std::string get_desc(apr_status_t status);
-#define CHECK_APR_SUCCESS(A) check(A, __FILE__, __LINE__);
+#define CHECK_APR_SUCCESS(A) qpid::sys::check(A, __FILE__, __LINE__);
}
}
diff --git a/qpid/cpp/src/qpid/sys/APRPool.cpp b/qpid/cpp/src/qpid/apr/APRPool.cpp
index 0f809ca93c..e465a6b40d 100644
--- a/qpid/cpp/src/qpid/sys/APRPool.cpp
+++ b/qpid/cpp/src/qpid/apr/APRPool.cpp
@@ -17,11 +17,10 @@
*/
#include "APRPool.h"
-#include "qpid/sys/APRBase.h"
+#include "APRBase.h"
#include <boost/pool/detail/singleton.hpp>
using namespace qpid::sys;
-using namespace qpid::sys;
APRPool::APRPool(){
APRBase::increment();
diff --git a/qpid/cpp/src/qpid/sys/APRPool.h b/qpid/cpp/src/qpid/apr/APRPool.h
index 2196cd64e7..2196cd64e7 100644
--- a/qpid/cpp/src/qpid/sys/APRPool.h
+++ b/qpid/cpp/src/qpid/apr/APRPool.h
diff --git a/qpid/cpp/src/qpid/sys/APRSocket.cpp b/qpid/cpp/src/qpid/apr/APRSocket.cpp
index 586c03475f..0c5a29c216 100644
--- a/qpid/cpp/src/qpid/sys/APRSocket.cpp
+++ b/qpid/cpp/src/qpid/apr/APRSocket.cpp
@@ -15,14 +15,13 @@
* limitations under the License.
*
*/
-#include "qpid/sys/APRBase.h"
-#include "qpid/sys/APRSocket.h"
+#include "APRBase.h"
+#include "APRSocket.h"
#include <assert.h>
#include <iostream>
using namespace qpid::sys;
using namespace qpid::framing;
-using namespace qpid::sys;
APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
diff --git a/qpid/cpp/src/qpid/sys/APRSocket.h b/qpid/cpp/src/qpid/apr/APRSocket.h
index f7e7ad107b..f7e7ad107b 100644
--- a/qpid/cpp/src/qpid/sys/APRSocket.h
+++ b/qpid/cpp/src/qpid/apr/APRSocket.h
diff --git a/qpid/cpp/src/qpid/sys/Acceptor.cpp b/qpid/cpp/src/qpid/apr/Acceptor.cpp
index f8e8504c6e..cbeea9902b 100644
--- a/qpid/cpp/src/qpid/sys/Acceptor.cpp
+++ b/qpid/cpp/src/qpid/apr/Acceptor.cpp
@@ -15,12 +15,11 @@
* limitations under the License.
*
*/
-#include "qpid/sys/Acceptor.h"
-#include "qpid/sys/APRBase.h"
+#include "Acceptor.h"
+#include "APRBase.h"
#include "APRPool.h"
using namespace qpid::sys;
-using namespace qpid::sys;
Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
port(port_),
diff --git a/qpid/cpp/src/qpid/sys/Acceptor.h b/qpid/cpp/src/qpid/apr/Acceptor.h
index f0f9d6feba..1813b391c1 100644
--- a/qpid/cpp/src/qpid/sys/Acceptor.h
+++ b/qpid/cpp/src/qpid/apr/Acceptor.h
@@ -18,18 +18,15 @@
#ifndef _LFAcceptor_
#define _LFAcceptor_
+#include "LFProcessor.h"
+#include "LFSessionContext.h"
#include "apr-1/apr_network_io.h"
#include "apr-1/apr_poll.h"
#include "apr-1/apr_time.h"
-
-#include "qpid/sys/Acceptor.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/LFProcessor.h"
-#include "qpid/sys/LFSessionContext.h"
+#include "Monitor.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/SessionContext.h"
#include "qpid/sys/SessionHandlerFactory.h"
-#include "qpid/sys/Thread.h"
#include <qpid/SharedObject.h>
namespace qpid {
@@ -41,7 +38,7 @@ class Acceptor : public qpid::SharedObject<Acceptor>
public:
Acceptor(int16_t port, int backlog, int threads);
virtual int16_t getPort() const;
- virtual void run(SessionHandlerFactory* factory);
+ virtual void run(qpid::sys::SessionHandlerFactory* factory);
virtual void shutdown();
private:
diff --git a/qpid/cpp/src/qpid/sys/Connector.cpp b/qpid/cpp/src/qpid/apr/Connector.cpp
index 1d4b237d92..4446731654 100644
--- a/qpid/cpp/src/qpid/sys/Connector.cpp
+++ b/qpid/cpp/src/qpid/apr/Connector.cpp
@@ -16,10 +16,9 @@
*
*/
#include <iostream>
-#include "qpid/sys/APRBase.h"
-#include "qpid/sys/Connector.h"
-#include "qpid/sys/ThreadFactory.h"
-#include "qpid/QpidError.h"
+#include <qpid/QpidError.h>
+#include "APRBase.h"
+#include "Connector.h"
using namespace qpid::sys;
using namespace qpid::sys;
@@ -43,15 +42,9 @@ Connector::Connector(bool _debug, u_int32_t buffer_size) :
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
-
- threadFactory = new ThreadFactory();
- writeLock = new Monitor();
}
Connector::~Connector(){
- delete receiver;
- delete writeLock;
- delete threadFactory;
apr_pool_destroy(pool);
APRBase::decrement();
@@ -62,9 +55,7 @@ void Connector::connect(const std::string& host, int port){
CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
closed = false;
-
- receiver = threadFactory->create(this);
- receiver->start();
+ receiver = Thread(this);
}
void Connector::init(ProtocolInitiation* header){
@@ -75,7 +66,7 @@ void Connector::init(ProtocolInitiation* header){
void Connector::close(){
closed = true;
CHECK_APR_SUCCESS(apr_socket_close(socket));
- receiver->join();
+ receiver.join();
}
void Connector::setInputHandler(InputHandler* handler){
@@ -97,14 +88,12 @@ void Connector::send(AMQFrame* frame){
}
void Connector::writeBlock(AMQDataBlock* data){
- writeLock->acquire();
+ Mutex::ScopedLock l(writeLock);
data->encode(outbuf);
-
//transfer data to wire
outbuf.flip();
writeToSocket(outbuf.start(), outbuf.available());
outbuf.clear();
- writeLock->release();
}
void Connector::writeToSocket(char* data, size_t available){
@@ -126,7 +115,7 @@ void Connector::writeToSocket(char* data, size_t available){
void Connector::checkIdle(apr_status_t status){
if(timeoutHandler){
- apr_time_t now = apr_time_as_msec(apr_time_now());
+ int64_t now = apr_time_as_msec(apr_time_now());
if(APR_STATUS_IS_TIMEUP(status)){
if(idleIn && (now - lastIn > idleIn)){
timeoutHandler->idleIn();
diff --git a/qpid/cpp/src/qpid/sys/Connector.h b/qpid/cpp/src/qpid/apr/Connector.h
index 611acc417f..e69a7205f3 100644
--- a/qpid/cpp/src/qpid/sys/Connector.h
+++ b/qpid/cpp/src/qpid/apr/Connector.h
@@ -28,15 +28,14 @@
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
#include "qpid/sys/Thread.h"
-#include "qpid/sys/ThreadFactory.h"
#include "qpid/sys/Connector.h"
#include "qpid/sys/Monitor.h"
namespace qpid {
namespace sys {
- class Connector : public virtual qpid::framing::OutputHandler,
- private virtual qpid::sys::Runnable
+ class Connector : public qpid::framing::OutputHandler,
+ private qpid::sys::Runnable
{
const bool debug;
const int receive_buffer_size;
@@ -44,9 +43,9 @@ namespace sys {
bool closed;
- apr_time_t lastIn;
- apr_time_t lastOut;
- apr_interval_time_t timeout;
+ int64_t lastIn;
+ int64_t lastOut;
+ int64_t timeout;
u_int32_t idleIn;
u_int32_t idleOut;
@@ -59,9 +58,8 @@ namespace sys {
qpid::framing::Buffer inbuf;
qpid::framing::Buffer outbuf;
- qpid::sys::Monitor* writeLock;
- qpid::sys::ThreadFactory* threadFactory;
- qpid::sys::Thread* receiver;
+ qpid::sys::Mutex writeLock;
+ qpid::sys::Thread receiver;
apr_pool_t* pool;
apr_socket_t* socket;
diff --git a/qpid/cpp/src/qpid/sys/LFProcessor.cpp b/qpid/cpp/src/qpid/apr/LFProcessor.cpp
index 8c53c86392..f4d4258f6f 100644
--- a/qpid/cpp/src/qpid/sys/LFProcessor.cpp
+++ b/qpid/cpp/src/qpid/apr/LFProcessor.cpp
@@ -15,14 +15,13 @@
* limitations under the License.
*
*/
-#include "qpid/sys/LFProcessor.h"
-#include "qpid/sys/APRBase.h"
-#include "qpid/sys/LFSessionContext.h"
-#include "qpid/QpidError.h"
#include <sstream>
+#include <qpid/QpidError.h>
+#include "LFProcessor.h"
+#include "APRBase.h"
+#include "LFSessionContext.h"
using namespace qpid::sys;
-using namespace qpid::sys;
using qpid::QpidError;
// TODO aconway 2006-10-12: stopped is read outside locks.
@@ -36,47 +35,38 @@ LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout
count(0),
workerCount(_workers),
hasLeader(false),
- workers(new Thread*[_workers]),
+ workers(new Thread[_workers]),
stopped(false)
{
CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
- //create & start the required number of threads
- for(int i = 0; i < workerCount; i++){
- workers[i] = factory.create(this);
- }
}
LFProcessor::~LFProcessor(){
if (!stopped) stop();
- for(int i = 0; i < workerCount; i++){
- delete workers[i];
- }
delete[] workers;
CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
}
void LFProcessor::start(){
for(int i = 0; i < workerCount; i++){
- workers[i]->start();
+ workers[i] = Thread(this);
}
}
void LFProcessor::add(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
- countLock.acquire();
+ Monitor::ScopedLock l(countLock);
sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
count++;
- countLock.release();
}
void LFProcessor::remove(const apr_pollfd_t* const fd){
CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- countLock.acquire();
+ Monitor::ScopedLock l(countLock);
sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
count--;
- countLock.release();
}
void LFProcessor::reactivate(const apr_pollfd_t* const fd){
@@ -93,12 +83,12 @@ void LFProcessor::update(const apr_pollfd_t* const fd){
}
bool LFProcessor::full(){
- Locker locker(countLock);
+ Mutex::ScopedLock locker(countLock);
return count == size;
}
bool LFProcessor::empty(){
- Locker locker(countLock);
+ Mutex::ScopedLock locker(countLock);
return count == 0;
}
@@ -115,36 +105,30 @@ void LFProcessor::poll() {
void LFProcessor::run(){
try{
while(!stopped){
- leadLock.acquire();
- waitToLead();
- if(!stopped){
- const apr_pollfd_t* evt = getNextEvent();
- if(evt){
- LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
- session->startProcessing();
-
- relinquishLead();
- leadLock.release();
-
- //process event:
- if(evt->rtnevents & APR_POLLIN) session->read();
- if(evt->rtnevents & APR_POLLOUT) session->write();
-
- if(session->isClosed()){
- session->handleClose();
- countLock.acquire();
- sessions.erase(find(sessions.begin(), sessions.end(), session));
- count--;
- countLock.release();
- }else{
- session->stopProcessing();
- }
-
- }else{
- leadLock.release();
- }
+ const apr_pollfd_t* event = 0;
+ LFSessionContext* session = 0;
+ {
+ Monitor::ScopedLock l(leadLock);
+ waitToLead();
+ event = getNextEvent();
+ if(!event) return;
+ session = reinterpret_cast<LFSessionContext*>(
+ event->client_data);
+ session->startProcessing();
+ relinquishLead();
+ }
+
+ //process event:
+ if(event->rtnevents & APR_POLLIN) session->read();
+ if(event->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ Monitor::ScopedLock l(countLock);
+ sessions.erase(find(sessions.begin(),sessions.end(), session));
+ count--;
}else{
- leadLock.release();
+ session->stopProcessing();
}
}
}catch(QpidError error){
@@ -178,14 +162,13 @@ const apr_pollfd_t* LFProcessor::getNextEvent(){
void LFProcessor::stop(){
stopped = true;
- leadLock.acquire();
- leadLock.notifyAll();
- leadLock.release();
-
+ {
+ Monitor::ScopedLock l(leadLock);
+ leadLock.notifyAll();
+ }
for(int i = 0; i < workerCount; i++){
- workers[i]->join();
+ workers[i].join();
}
-
for(iterator i = sessions.begin(); i < sessions.end(); i++){
(*i)->shutdown();
}
diff --git a/qpid/cpp/src/qpid/sys/LFProcessor.h b/qpid/cpp/src/qpid/apr/LFProcessor.h
index afbb9ea413..dd85ad9e84 100644
--- a/qpid/cpp/src/qpid/sys/LFProcessor.h
+++ b/qpid/cpp/src/qpid/apr/LFProcessor.h
@@ -21,9 +21,9 @@
#include "apr-1/apr_poll.h"
#include <iostream>
#include <vector>
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/ThreadFactory.h"
+#include <qpid/sys/Monitor.h>
#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
namespace qpid {
namespace sys {
@@ -49,10 +49,9 @@ namespace sys {
int count;
const int workerCount;
bool hasLeader;
- qpid::sys::Thread** const workers;
+ qpid::sys::Thread* workers;
qpid::sys::Monitor leadLock;
- qpid::sys::Monitor countLock;
- qpid::sys::ThreadFactory factory;
+ qpid::sys::Mutex countLock;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
diff --git a/qpid/cpp/src/qpid/sys/LFSessionContext.cpp b/qpid/cpp/src/qpid/apr/LFSessionContext.cpp
index f2dff87fd0..4a704013a8 100644
--- a/qpid/cpp/src/qpid/sys/LFSessionContext.cpp
+++ b/qpid/cpp/src/qpid/apr/LFSessionContext.cpp
@@ -15,8 +15,8 @@
* limitations under the License.
*
*/
-#include "qpid/sys/LFSessionContext.h"
-#include "qpid/sys/APRBase.h"
+#include "LFSessionContext.h"
+#include "APRBase.h"
#include "qpid/QpidError.h"
#include <assert.h>
@@ -34,9 +34,7 @@ LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
out(32768),
processor(_processor),
processing(false),
- closing(false),
- reading(0),
- writing(0)
+ closing(false)
{
fd.p = _pool;
@@ -53,9 +51,6 @@ LFSessionContext::~LFSessionContext(){
}
void LFSessionContext::read(){
- assert(!reading); // No concurrent read.
- reading = Thread::currentThread();
-
socket.read(in);
in.flip();
if(initiated){
@@ -73,27 +68,21 @@ void LFSessionContext::read(){
}
}
in.compact();
-
- reading = 0;
}
void LFSessionContext::write(){
- assert(!writing); // No concurrent writes.
- writing = Thread::currentThread();
-
bool done = isClosed();
while(!done){
if(out.available() > 0){
socket.write(out);
if(out.available() > 0){
- writing = 0;
//incomplete write, leave flags to receive notification of readiness to write
done = true;//finished processing for now, but write is still in progress
}
}else{
//do we have any frames to write?
- writeLock.acquire();
+ Mutex::ScopedLock l(writeLock);
if(!framesToWrite.empty()){
out.clear();
bool encoded(false);
@@ -113,19 +102,16 @@ void LFSessionContext::write(){
fd.reqevents = APR_POLLIN;
done = true;
- writing = 0;
-
if(closing){
socket.close();
}
}
- writeLock.release();
}
}
}
void LFSessionContext::send(AMQFrame* frame){
- writeLock.acquire();
+ Mutex::ScopedLock l(writeLock);
if(!closing){
framesToWrite.push(frame);
if(!(fd.reqevents & APR_POLLOUT)){
@@ -135,32 +121,28 @@ void LFSessionContext::send(AMQFrame* frame){
}
}
}
- writeLock.release();
}
void LFSessionContext::startProcessing(){
- writeLock.acquire();
+ Mutex::ScopedLock l(writeLock);
processing = true;
processor->deactivate(&fd);
- writeLock.release();
}
void LFSessionContext::stopProcessing(){
- writeLock.acquire();
+ Mutex::ScopedLock l(writeLock);
processor->reactivate(&fd);
processing = false;
- writeLock.release();
}
void LFSessionContext::close(){
closing = true;
- writeLock.acquire();
+ Mutex::ScopedLock l(writeLock);
if(!processing){
//allow pending frames to be written to socket
fd.reqevents = APR_POLLOUT;
processor->update(&fd);
}
- writeLock.release();
}
void LFSessionContext::handleClose(){
@@ -181,9 +163,8 @@ void LFSessionContext::init(SessionHandler* _handler){
}
void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
- logLock.acquire();
+ Mutex::ScopedLock l(logLock);
std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
- logLock.release();
}
-Monitor LFSessionContext::logLock;
+Mutex LFSessionContext::logLock;
diff --git a/qpid/cpp/src/qpid/apr/LFSessionContext.h b/qpid/cpp/src/qpid/apr/LFSessionContext.h
new file mode 100644
index 0000000000..9b3104b085
--- /dev/null
+++ b/qpid/cpp/src/qpid/apr/LFSessionContext.h
@@ -0,0 +1,87 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/SessionContext.h"
+#include "qpid/sys/SessionHandler.h"
+
+#include "APRSocket.h"
+#include "LFProcessor.h"
+
+namespace qpid {
+namespace sys {
+
+
+class LFSessionContext : public virtual qpid::sys::SessionContext
+{
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ qpid::sys::SessionHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::sys::Mutex writeLock;
+
+ bool processing;
+ bool closing;
+
+ static qpid::sys::Mutex logLock;
+ void log(const std::string& desc,
+ qpid::framing::AMQFrame* const frame);
+
+
+ public:
+ LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ virtual ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void read();
+ void write();
+ void init(qpid::sys::SessionHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+};
+
+}
+}
+
+
+#endif
diff --git a/qpid/cpp/src/qpid/sys/Monitor.cpp b/qpid/cpp/src/qpid/apr/Monitor.cpp
index 79a29c219e..69fb2f6ffd 100644
--- a/qpid/cpp/src/qpid/sys/Monitor.cpp
+++ b/qpid/cpp/src/qpid/apr/Monitor.cpp
@@ -15,46 +15,50 @@
* limitations under the License.
*
*/
-#include "qpid/sys/APRBase.h"
-#include "qpid/sys/Monitor.h"
-#include <iostream>
+#include "Monitor.h"
+#include "APRPool.h"
-qpid::sys::Monitor::Monitor(){
+using namespace qpid::sys;
+
+Mutex::Mutex()
+{
APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
- CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
- CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
+ // TODO aconway 2006-11-08: Switch to non-nested.
+ CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, APRPool::get()));
}
-qpid::sys::Monitor::~Monitor(){
- CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+Mutex::~Mutex(){
CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
- apr_pool_destroy(pool);
APRBase::decrement();
}
-void qpid::sys::Monitor::wait(){
- CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+Monitor::Monitor()
+{
+ CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, APRPool::get()));
}
+Monitor::~Monitor(){
+ CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
+}
+
+
-void qpid::sys::Monitor::wait(u_int64_t time){
- apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
+void Monitor::wait(){
+ CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
+}
+
+void Monitor::wait(int64_t nsecs){
+ // APR uses microseconds.
+ apr_status_t status = apr_thread_cond_timedwait(
+ condition, mutex, nsecs/1000);
if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
}
-void qpid::sys::Monitor::notify(){
+void Monitor::notify(){
CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
}
-void qpid::sys::Monitor::notifyAll(){
+void Monitor::notifyAll(){
CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
}
-void qpid::sys::Monitor::acquire(){
- CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
-}
-
-void qpid::sys::Monitor::release(){
- CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
-}
diff --git a/qpid/cpp/src/qpid/sys/Monitor.h b/qpid/cpp/src/qpid/apr/Monitor.h
index ddda613b87..a51baf8d94 100644
--- a/qpid/cpp/src/qpid/sys/Monitor.h
+++ b/qpid/cpp/src/qpid/apr/Monitor.h
@@ -18,38 +18,57 @@
#ifndef _Monitor_
#define _Monitor_
+#include <boost/noncopyable.hpp>
+#include <qpid/sys/Time.h>
#include "apr-1/apr_thread_mutex.h"
#include "apr-1/apr_thread_cond.h"
-#include "qpid/sys/Monitor.h"
+#include "APRBase.h"
namespace qpid {
namespace sys {
-class Monitor
+template <class L>
+class ScopedLock
{
- apr_pool_t* pool;
- apr_thread_mutex_t* mutex;
- apr_thread_cond_t* condition;
+ public:
+ ScopedLock(L& l) : mutex(l) { l.lock(); }
+ ~ScopedLock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+class Mutex : private boost::noncopyable
+{
public:
- Monitor();
- virtual ~Monitor();
- virtual void wait();
- virtual void wait(u_int64_t time);
- virtual void notify();
- virtual void notifyAll();
- virtual void acquire();
- virtual void release();
+ typedef ScopedLock<Mutex> ScopedLock;
+
+ Mutex();
+ ~Mutex();
+ void lock() { CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex)); }
+ void unlock() { CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex)); }
+ void trylock() { CHECK_APR_SUCCESS(apr_thread_mutex_trylock(mutex)); }
+
+ protected:
+ apr_thread_mutex_t* mutex;
};
-class Locker
+/** A condition variable and a mutex */
+class Monitor : public Mutex
{
public:
- Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
- ~Locker() { monitor.release(); }
+ Monitor();
+ ~Monitor();
+ void wait();
+ void wait(int64_t nsecs);
+ void notify();
+ void notifyAll();
+
private:
- Monitor& monitor;
+ apr_thread_cond_t* condition;
};
+
+
}}
diff --git a/qpid/cpp/src/qpid/sys/Thread.cpp b/qpid/cpp/src/qpid/apr/Thread.cpp
index 4fb9915993..6d5cadb009 100644
--- a/qpid/cpp/src/qpid/sys/Thread.cpp
+++ b/qpid/cpp/src/qpid/apr/Thread.cpp
@@ -15,36 +15,41 @@
* limitations under the License.
*
*/
-#include "qpid/sys/APRBase.h"
-#include "qpid/sys/Thread.h"
-#include "apr-1/apr_portable.h"
+
+#include "Thread.h"
+#include "APRPool.h"
+#include "APRBase.h"
+#include <apr-1/apr_portable.h>
using namespace qpid::sys;
+using qpid::sys::Runnable;
-void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
- ((Runnable*) data)->run();
+namespace {
+void* APR_THREAD_FUNC run(apr_thread_t* thread, void *data) {
+ reinterpret_cast<Runnable*>(data)->run();
CHECK_APR_SUCCESS(apr_thread_exit(thread, APR_SUCCESS));
return NULL;
}
-
-Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
-
-Thread::~Thread(){
}
-void Thread::start(){
- CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
+Thread::Thread() : thread(0) {}
+
+Thread::Thread(Runnable* runnable) {
+ CHECK_APR_SUCCESS(
+ apr_thread_create(&thread, NULL, run, runnable, APRPool::get()));
}
void Thread::join(){
apr_status_t status;
- if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
+ if (thread != 0)
+ CHECK_APR_SUCCESS(apr_thread_join(&status, thread));
}
-void Thread::interrupt(){
- if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
-}
+Thread::Thread(apr_thread_t* t) : thread(t) {}
-unsigned int qpid::sys::Thread::currentThread(){
- return apr_os_thread_current();
+Thread Thread::current(){
+ apr_thread_t* thr;
+ apr_os_thread_t osthr = apr_os_thread_current();
+ CHECK_APR_SUCCESS(apr_os_thread_put(&thr, &osthr, APRPool::get()));
+ return Thread(thr);
}
diff --git a/qpid/cpp/src/qpid/sys/ThreadFactory.h b/qpid/cpp/src/qpid/apr/Thread.h
index 9b7126272a..0c717dea70 100644
--- a/qpid/cpp/src/qpid/sys/ThreadFactory.h
+++ b/qpid/cpp/src/qpid/apr/Thread.h
@@ -1,3 +1,6 @@
+#ifndef _apr_Thread_h
+#define _apr_Thread_h
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,30 +18,28 @@
* limitations under the License.
*
*/
-#ifndef _ThreadFactory_
-#define _ThreadFactory_
-
-#include "apr-1/apr_thread_proc.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/ThreadFactory.h"
-#include "qpid/sys/Runnable.h"
+#include <apr-1/apr_thread_proc.h>
+#include <qpid/sys/Runnable.h>
namespace qpid {
namespace sys {
- class ThreadFactory
- {
- apr_pool_t* pool;
- public:
- ThreadFactory();
- virtual ~ThreadFactory();
- virtual Thread* create(Runnable* runnable);
- };
+class Thread
+{
+
+ public:
+ Thread();
+ explicit Thread(qpid::sys::Runnable*);
+ void join();
+ static Thread current();
-}
-}
+ private:
+ Thread(apr_thread_t*);
+
+ apr_thread_t* thread;
+};
+}}
-#endif
+#endif /*!_apr_Thread_h*/
diff --git a/qpid/cpp/src/qpid/sys/Time.cpp b/qpid/cpp/src/qpid/apr/Time.cpp
index c3512b8df3..8b5590481d 100644
--- a/qpid/cpp/src/qpid/sys/Time.cpp
+++ b/qpid/cpp/src/qpid/apr/Time.cpp
@@ -17,13 +17,23 @@
*/
#include <qpid/sys/Time.h>
-#include <apr-1/apr_time.h>
+#include "apr-1/apr_time.h"
namespace qpid {
namespace sys {
-Time Time::now() {
- return Time(apr_time_now()*1000);
+int64_t getTimeNsecs()
+{
+ // APR returns microseconds.
+ return apr_time_now() * 1000;
}
+int64_t getTimeMsecs()
+{
+ // APR returns microseconds.
+ return apr_time_now() / 1000;
+}
+
+
}}
+
diff --git a/qpid/cpp/src/qpid/broker/AutoDelete.cpp b/qpid/cpp/src/qpid/broker/AutoDelete.cpp
index 434bd4a3a0..d96105ba7d 100644
--- a/qpid/cpp/src/qpid/broker/AutoDelete.cpp
+++ b/qpid/cpp/src/qpid/broker/AutoDelete.cpp
@@ -18,26 +18,23 @@
#include "qpid/broker/AutoDelete.h"
using namespace qpid::broker;
+using namespace qpid::sys;
-AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period) : registry(_registry),
- period(_period),
- stopped(true),
- runner(0){}
+AutoDelete::AutoDelete(QueueRegistry* const _registry, u_int32_t _period)
+ : registry(_registry), period(_period), stopped(true) { }
void AutoDelete::add(Queue::shared_ptr const queue){
- lock.acquire();
+ Mutex::ScopedLock l(lock);
queues.push(queue);
- lock.release();
}
Queue::shared_ptr const AutoDelete::pop(){
Queue::shared_ptr next;
- lock.acquire();
+ Mutex::ScopedLock l(lock);
if(!queues.empty()){
next = queues.front();
queues.pop();
}
- lock.release();
return next;
}
@@ -59,35 +56,27 @@ void AutoDelete::process(){
}
void AutoDelete::run(){
- monitor.acquire();
+ Monitor::ScopedLock l(monitor);
while(!stopped){
process();
- monitor.wait(period);
+ monitor.wait(msecsToNsecs(period));
}
- monitor.release();
}
void AutoDelete::start(){
- monitor.acquire();
+ Monitor::ScopedLock l(monitor);
if(stopped){
- runner = factory.create(this);
stopped = false;
- monitor.release();
- runner->start();
- }else{
- monitor.release();
+ runner = Thread(this);
}
}
void AutoDelete::stop(){
- monitor.acquire();
- if(!stopped){
+ {
+ Monitor::ScopedLock l(monitor);
+ if(stopped) return;
stopped = true;
- monitor.notify();
- monitor.release();
- runner->join();
- delete runner;
- }else{
- monitor.release();
}
+ monitor.notify();
+ runner.join();
}
diff --git a/qpid/cpp/src/qpid/broker/AutoDelete.h b/qpid/cpp/src/qpid/broker/AutoDelete.h
index f3347e6cc5..e0efe7b399 100644
--- a/qpid/cpp/src/qpid/broker/AutoDelete.h
+++ b/qpid/cpp/src/qpid/broker/AutoDelete.h
@@ -20,22 +20,21 @@
#include <iostream>
#include <queue>
-#include "qpid/sys/Monitor.h"
+#include <qpid/sys/Monitor.h>
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/sys/ThreadFactory.h"
+#include "qpid/sys/Thread.h"
namespace qpid {
namespace broker{
- class AutoDelete : private virtual qpid::sys::Runnable{
- qpid::sys::ThreadFactory factory;
- qpid::sys::Monitor lock;
+ class AutoDelete : private qpid::sys::Runnable {
+ qpid::sys::Mutex lock;
qpid::sys::Monitor monitor;
std::queue<Queue::shared_ptr> queues;
QueueRegistry* const registry;
- const u_int32_t period;
+ u_int32_t period;
volatile bool stopped;
- qpid::sys::Thread* runner;
+ qpid::sys::Thread runner;
Queue::shared_ptr const pop();
void process();
diff --git a/qpid/cpp/src/qpid/broker/Channel.cpp b/qpid/cpp/src/qpid/broker/Channel.cpp
index 967c5855fa..947a97ae7c 100644
--- a/qpid/cpp/src/qpid/broker/Channel.cpp
+++ b/qpid/cpp/src/qpid/broker/Channel.cpp
@@ -105,7 +105,7 @@ void Channel::rollback(){
}
void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
- Locker locker(deliveryLock);
+ Mutex::ScopedLock locker(deliveryLock);
u_int64_t deliveryTag = currentDeliveryTag++;
if(ackExpected){
@@ -118,7 +118,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
- Locker locker(deliveryLock);
+ Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
@@ -191,7 +191,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
}else{
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag));
if(i == unacked.end()){
@@ -219,7 +219,7 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
}
void Channel::recover(bool requeue){
- Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
outstanding.reset();
@@ -234,7 +234,7 @@ void Channel::recover(bool requeue){
bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
- Locker locker(deliveryLock);
+ Mutex::ScopedLock locker(deliveryLock);
u_int64_t myDeliveryTag = currentDeliveryTag++;
msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
if(ackExpected){
diff --git a/qpid/cpp/src/qpid/broker/Channel.h b/qpid/cpp/src/qpid/broker/Channel.h
index 56f0e6b4af..24dbf728ba 100644
--- a/qpid/cpp/src/qpid/broker/Channel.h
+++ b/qpid/cpp/src/qpid/broker/Channel.h
@@ -77,7 +77,7 @@ namespace qpid {
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::sys::Monitor deliveryLock;
+ qpid::sys::Mutex deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
TransactionalStore* store;
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.cpp b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
index 46693f6f3c..3f9d23cdc7 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.cpp
@@ -21,24 +21,24 @@
using namespace qpid::broker;
using namespace qpid::framing;
+using namespace qpid::sys;
DirectExchange::DirectExchange(const string& _name) : Exchange(_name) {
}
void DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- lock.acquire();
+ Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
if(i == queues.end()){
bindings[routingKey].push_back(queue);
queue->bound(new ExchangeBinding(this, queue, routingKey, args));
}
- lock.release();
}
void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
+ Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), queues.end(), queue);
@@ -48,11 +48,10 @@ void DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, F
bindings.erase(routingKey);
}
}
- lock.release();
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
+ Mutex::ScopedLock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
int count(0);
for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != queues.end(); i++, count++){
@@ -61,7 +60,6 @@ void DirectExchange::route(Deliverable& msg, const string& routingKey, FieldTabl
if(!count){
std::cout << "WARNING: DirectExchange " << getName() << " could not route message with key " << routingKey << std::endl;
}
- lock.release();
}
DirectExchange::~DirectExchange(){
diff --git a/qpid/cpp/src/qpid/broker/DirectExchange.h b/qpid/cpp/src/qpid/broker/DirectExchange.h
index a452fe3b4b..0ee9ce2705 100644
--- a/qpid/cpp/src/qpid/broker/DirectExchange.h
+++ b/qpid/cpp/src/qpid/broker/DirectExchange.h
@@ -30,7 +30,7 @@ namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::sys::Monitor lock;
+ qpid::sys::Mutex lock;
public:
static const std::string typeName;
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
index 6f6c759aa2..1c3a4af026 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
@@ -26,7 +26,7 @@ using namespace qpid::sys;
using std::pair;
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, const string& type) throw(UnknownExchangeTypeException){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
Exchange::shared_ptr exchange;
@@ -50,12 +50,12 @@ pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, c
}
void ExchangeRegistry::destroy(const string& name){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
exchanges.erase(name);
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
return exchanges[name];
}
diff --git a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
index 33deb743f4..5d4cf10de8 100644
--- a/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -29,7 +29,7 @@ namespace broker {
class ExchangeRegistry{
typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::sys::Monitor lock;
+ qpid::sys::Mutex lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
index 8f5143c8c0..2f8d4eadb2 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -26,7 +26,7 @@ using namespace qpid::sys;
FanOutExchange::FanOutExchange(const std::string& _name) : Exchange(_name) {}
void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i == bindings.end()) {
@@ -36,7 +36,7 @@ void FanOutExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fie
}
void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* /*args*/){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), queue);
if (i != bindings.end()) {
bindings.erase(i);
@@ -45,7 +45,7 @@ void FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* /*args*/){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); ++i){
msg.deliverTo(*i);
}
diff --git a/qpid/cpp/src/qpid/broker/FanOutExchange.h b/qpid/cpp/src/qpid/broker/FanOutExchange.h
index 53b5c39789..910acdc203 100644
--- a/qpid/cpp/src/qpid/broker/FanOutExchange.h
+++ b/qpid/cpp/src/qpid/broker/FanOutExchange.h
@@ -31,7 +31,7 @@ namespace broker {
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::sys::Monitor lock;
+ qpid::sys::Mutex lock;
public:
static const std::string typeName;
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
index 5d5cf2392c..0c4c290bbd 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -41,7 +41,7 @@ namespace {
HeadersExchange::HeadersExchange(const string& _name) : Exchange(_name) { }
void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to headers exchange.");
@@ -51,7 +51,7 @@ void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, Fi
}
void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey*/, FieldTable* args){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
if (i != bindings.end()) bindings.erase(i);
@@ -59,7 +59,7 @@ void HeadersExchange::unbind(Queue::shared_ptr queue, const string& /*routingKey
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, FieldTable* args){
- Locker locker(lock);;
+ Mutex::ScopedLock locker(lock);;
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (match(i->first, *args)) msg.deliverTo(i->second);
}
diff --git a/qpid/cpp/src/qpid/broker/HeadersExchange.h b/qpid/cpp/src/qpid/broker/HeadersExchange.h
index 3cd25739f7..77af612fe6 100644
--- a/qpid/cpp/src/qpid/broker/HeadersExchange.h
+++ b/qpid/cpp/src/qpid/broker/HeadersExchange.h
@@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange {
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::sys::Monitor lock;
+ qpid::sys::Mutex lock;
public:
static const std::string typeName;
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 24fc996f1f..e3a98ae8f5 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -15,14 +15,12 @@
* limitations under the License.
*
*/
-#include "qpid/sys/Monitor.h"
#include "qpid/broker/Message.h"
#include <iostream>
using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
-using namespace qpid::sys;
Message::Message(const ConnectionToken* const _publisher,
const string& _exchange, const string& _routingKey,
diff --git a/qpid/cpp/src/qpid/broker/Prefetch.h b/qpid/cpp/src/qpid/broker/Prefetch.h
index 97abb4102d..d56799f835 100644
--- a/qpid/cpp/src/qpid/broker/Prefetch.h
+++ b/qpid/cpp/src/qpid/broker/Prefetch.h
@@ -30,7 +30,7 @@ namespace qpid {
u_int32_t size;
u_int16_t count;
- void reset();
+ void reset() { size = 0; count = 0; }
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 000552715b..46b14a23f5 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -38,7 +38,7 @@ Queue::Queue(const string& _name, u_int32_t _autodelete,
exclusive(0),
persistenceId(0)
{
- if(autodelete) lastUsed = Time::now().msecs();
+ if(autodelete) lastUsed = getTimeMsecs();
}
Queue::~Queue(){
@@ -58,7 +58,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}
void Queue::process(Message::shared_ptr& msg){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
queueing = true;
messages.push(msg);
@@ -90,7 +90,7 @@ bool Queue::dispatch(Message::shared_ptr& msg){
}
bool Queue::startDispatching(){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(queueing && !dispatching){
dispatching = true;
return true;
@@ -102,7 +102,7 @@ bool Queue::startDispatching(){
void Queue::dispatch(){
bool proceed = startDispatching();
while(proceed){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(!messages.empty() && dispatch(messages.front())){
messages.pop();
}else{
@@ -114,7 +114,7 @@ void Queue::dispatch(){
}
void Queue::consume(Consumer* c, bool requestExclusive){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
if(exclusive) throw ExclusiveAccessException();
if(requestExclusive){
if(!consumers.empty()) throw ExclusiveAccessException();
@@ -126,14 +126,14 @@ void Queue::consume(Consumer* c, bool requestExclusive){
}
void Queue::cancel(Consumer* c){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
consumers.erase(find(consumers.begin(), consumers.end(), c));
- if(autodelete && consumers.empty()) lastUsed = Time::now().msecs();
+ if(autodelete && consumers.empty()) lastUsed = getTimeMsecs();
if(exclusive == c) exclusive = 0;
}
Message::shared_ptr Queue::dequeue(){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
@@ -143,25 +143,25 @@ Message::shared_ptr Queue::dequeue(){
}
u_int32_t Queue::purge(){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
int count = messages.size();
while(!messages.empty()) messages.pop();
return count;
}
u_int32_t Queue::getMessageCount() const{
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
return messages.size();
}
u_int32_t Queue::getConsumerCount() const{
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
return consumers.size();
}
bool Queue::canAutoDelete() const{
- Locker locker(lock);
- return lastUsed && (Time::now().msecs() - lastUsed > autodelete);
+ Mutex::ScopedLock locker(lock);
+ return lastUsed && (getTimeMsecs() - lastUsed > autodelete);
}
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid)
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index fd0bad43ff..c146de1353 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -27,7 +27,6 @@
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Time.h"
namespace qpid {
namespace broker {
@@ -55,7 +54,7 @@ namespace qpid {
bool queueing;
bool dispatching;
int next;
- mutable qpid::sys::Monitor lock;
+ mutable qpid::sys::Mutex lock;
int64_t lastUsed;
Consumer* exclusive;
u_int64_t persistenceId;
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
index aa05db9a16..1976da812d 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -16,7 +16,6 @@
*
*/
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/sys/Monitor.h"
#include "qpid/broker/SessionHandlerImpl.h"
#include <sstream>
#include <assert.h>
@@ -32,7 +31,7 @@ std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
u_int32_t autoDelete, const ConnectionToken* owner)
{
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
@@ -46,12 +45,12 @@ QueueRegistry::declare(const string& declareName, bool durable,
}
void QueueRegistry::destroy(const string& name){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
queues.erase(name);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
- Locker locker(lock);
+ Mutex::ScopedLock locker(lock);
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
return Queue::shared_ptr();
diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h
index c2fc1cc830..e3d03a06b1 100644
--- a/qpid/cpp/src/qpid/broker/QueueRegistry.h
+++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h
@@ -76,7 +76,7 @@ class QueueRegistry{
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::sys::Monitor lock;
+ qpid::sys::Mutex lock;
int counter;
MessageStore* const store;
};
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.cpp b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
index dc252d208f..eecd9918d4 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.cpp
@@ -21,7 +21,7 @@
using namespace qpid::broker;
using namespace qpid::framing;
-
+using namespace qpid::sys;
// TODO aconway 2006-09-20: More efficient matching algorithm.
// Areas for improvement:
@@ -115,15 +115,14 @@ bool TopicPattern::match(const Tokens& target) const
TopicExchange::TopicExchange(const string& _name) : Exchange(_name) { }
void TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
- lock.acquire();
+ Monitor::ScopedLock l(lock);
TopicPattern routingPattern(routingKey);
bindings[routingPattern].push_back(queue);
queue->bound(new ExchangeBinding(this, queue, routingKey, args));
- lock.release();
}
void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
+ Monitor::ScopedLock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
Queue::vector& qv(bi->second);
if (bi == bindings.end()) return;
@@ -131,12 +130,11 @@ void TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, Fi
if(q == qv.end()) return;
qv.erase(q);
if(qv.empty()) bindings.erase(bi);
- lock.release();
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable* /*args*/){
- lock.acquire();
+ Monitor::ScopedLock l(lock);
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(routingKey)) {
Queue::vector& qv(i->second);
@@ -145,7 +143,6 @@ void TopicExchange::route(Deliverable& msg, const string& routingKey, FieldTable
}
}
}
- lock.release();
}
TopicExchange::~TopicExchange() {}
diff --git a/qpid/cpp/src/qpid/broker/TopicExchange.h b/qpid/cpp/src/qpid/broker/TopicExchange.h
index e3b9040cb2..a3e133845f 100644
--- a/qpid/cpp/src/qpid/broker/TopicExchange.h
+++ b/qpid/cpp/src/qpid/broker/TopicExchange.h
@@ -71,7 +71,7 @@ class TopicPattern : public Tokens
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::sys::Monitor lock;
+ qpid::sys::Mutex lock;
public:
static const std::string typeName;
diff --git a/qpid/cpp/src/qpid/client/Channel.cpp b/qpid/cpp/src/qpid/client/Channel.cpp
index a7b30f2f94..fad648f27d 100644
--- a/qpid/cpp/src/qpid/client/Channel.cpp
+++ b/qpid/cpp/src/qpid/client/Channel.cpp
@@ -17,7 +17,6 @@
*/
#include "qpid/client/Channel.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/ThreadFactory.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
@@ -29,26 +28,15 @@ using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
id(0),
con(0),
- dispatcher(0),
out(0),
incoming(0),
closed(true),
prefetch(_prefetch),
transactional(_transactional)
-{
- threadFactory = new ThreadFactory();
- dispatchMonitor = new Monitor();
- retrievalMonitor = new Monitor();
-}
+{ }
Channel::~Channel(){
- if(dispatcher){
- stop();
- delete dispatcher;
- }
- delete retrievalMonitor;
- delete dispatchMonitor;
- delete threadFactory;
+ stop();
}
void Channel::setPrefetch(u_int16_t _prefetch){
@@ -176,9 +164,9 @@ void Channel::cancelAll(){
}
void Channel::retrieve(Message& msg){
- retrievalMonitor->acquire();
+ Monitor::ScopedLock l(retrievalMonitor);
while(retrieved == 0){
- retrievalMonitor->wait();
+ retrievalMonitor.wait();
}
msg.header = retrieved->getHeader();
@@ -186,8 +174,6 @@ void Channel::retrieve(Message& msg){
retrieved->getData(msg.data);
delete retrieved;
retrieved = 0;
-
- retrievalMonitor->release();
}
bool Channel::get(Message& msg, const Queue& queue, int ackMode){
@@ -315,18 +301,16 @@ void Channel::handleHeartbeat(AMQHeartbeatBody::shared_ptr /*body*/){
}
void Channel::start(){
- dispatcher = threadFactory->create(this);
- dispatcher->start();
+ dispatcher = Thread(this);
}
void Channel::stop(){
- closed = true;
- dispatchMonitor->acquire();
- dispatchMonitor->notify();
- dispatchMonitor->release();
- if(dispatcher){
- dispatcher->join();
+ {
+ Monitor::ScopedLock l(dispatchMonitor);
+ closed = true;
+ dispatchMonitor.notify();
}
+ dispatcher.join();
}
void Channel::run(){
@@ -335,30 +319,27 @@ void Channel::run(){
void Channel::enqueue(){
if(incoming->isResponse()){
- retrievalMonitor->acquire();
+ Monitor::ScopedLock l(retrievalMonitor);
retrieved = incoming;
- retrievalMonitor->notify();
- retrievalMonitor->release();
+ retrievalMonitor.notify();
}else{
- dispatchMonitor->acquire();
+ Monitor::ScopedLock l(dispatchMonitor);
messages.push(incoming);
- dispatchMonitor->notify();
- dispatchMonitor->release();
+ dispatchMonitor.notify();
}
incoming = 0;
}
IncomingMessage* Channel::dequeue(){
- dispatchMonitor->acquire();
+ Monitor::ScopedLock l(dispatchMonitor);
while(messages.empty() && !closed){
- dispatchMonitor->wait();
+ dispatchMonitor.wait();
}
IncomingMessage* msg = 0;
if(!messages.empty()){
msg = messages.front();
messages.pop();
}
- dispatchMonitor->release();
return msg;
}
diff --git a/qpid/cpp/src/qpid/client/Channel.h b/qpid/cpp/src/qpid/client/Channel.h
index fa8cd3afe0..daf2b6f9d9 100644
--- a/qpid/cpp/src/qpid/client/Channel.h
+++ b/qpid/cpp/src/qpid/client/Channel.h
@@ -24,9 +24,6 @@
#define _Channel_
#include "qpid/framing/amqp_framing.h"
-
-#include "qpid/sys/ThreadFactory.h"
-
#include "qpid/client/Connection.h"
#include "qpid/client/Exchange.h"
#include "qpid/client/IncomingMessage.h"
@@ -51,15 +48,14 @@ namespace client {
u_int16_t id;
Connection* con;
- qpid::sys::ThreadFactory* threadFactory;
- qpid::sys::Thread* dispatcher;
+ qpid::sys::Thread dispatcher;
qpid::framing::OutputHandler* out;
IncomingMessage* incoming;
ResponseHandler responses;
std::queue<IncomingMessage*> messages;//holds returned messages or those delivered for a consume
IncomingMessage* retrieved;//holds response to basic.get
- qpid::sys::Monitor* dispatchMonitor;
- qpid::sys::Monitor* retrievalMonitor;
+ qpid::sys::Monitor dispatchMonitor;
+ qpid::sys::Monitor retrievalMonitor;
std::map<std::string, Consumer*> consumers;
ReturnedMessageHandler* returnsHandler;
bool closed;
diff --git a/qpid/cpp/src/qpid/client/ResponseHandler.cpp b/qpid/cpp/src/qpid/client/ResponseHandler.cpp
index 16989e2c25..5d2e03c9d9 100644
--- a/qpid/cpp/src/qpid/client/ResponseHandler.cpp
+++ b/qpid/cpp/src/qpid/client/ResponseHandler.cpp
@@ -19,40 +19,35 @@
#include "qpid/sys/Monitor.h"
#include "qpid/QpidError.h"
-qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
- monitor = new qpid::sys::Monitor();
-}
+using namespace qpid::sys;
-qpid::client::ResponseHandler::~ResponseHandler(){
- delete monitor;
-}
+qpid::client::ResponseHandler::ResponseHandler() : waiting(false){}
+
+qpid::client::ResponseHandler::~ResponseHandler(){}
bool qpid::client::ResponseHandler::validate(const qpid::framing::AMQMethodBody& expected){
return expected.match(response.get());
}
void qpid::client::ResponseHandler::waitForResponse(){
- monitor->acquire();
+ Monitor::ScopedLock l(monitor);
if(waiting){
- monitor->wait();
+ monitor.wait();
}
- monitor->release();
}
void qpid::client::ResponseHandler::signalResponse(qpid::framing::AMQMethodBody::shared_ptr _response){
response = _response;
- monitor->acquire();
+ Monitor::ScopedLock l(monitor);
waiting = false;
- monitor->notify();
- monitor->release();
+ monitor.notify();
}
void qpid::client::ResponseHandler::receive(const qpid::framing::AMQMethodBody& expected){
- monitor->acquire();
+ Monitor::ScopedLock l(monitor);
if(waiting){
- monitor->wait();
+ monitor.wait();
}
- monitor->release();
if(!validate(expected)){
THROW_QPID_ERROR(PROTOCOL_ERROR, "Protocol Error");
}
diff --git a/qpid/cpp/src/qpid/client/ResponseHandler.h b/qpid/cpp/src/qpid/client/ResponseHandler.h
index ac4c351211..247c974c14 100644
--- a/qpid/cpp/src/qpid/client/ResponseHandler.h
+++ b/qpid/cpp/src/qpid/client/ResponseHandler.h
@@ -17,7 +17,7 @@
*/
#include <string>
#include "qpid/framing/amqp_framing.h"
-#include "qpid/sys/Monitor.h"
+#include <qpid/sys/Monitor.h>
#ifndef _ResponseHandler_
#define _ResponseHandler_
@@ -28,7 +28,7 @@ namespace qpid {
class ResponseHandler{
bool waiting;
qpid::framing::AMQMethodBody::shared_ptr response;
- qpid::sys::Monitor* monitor;
+ qpid::sys::Monitor monitor;
public:
ResponseHandler();
diff --git a/qpid/cpp/src/qpid/sys/LFSessionContext.h b/qpid/cpp/src/qpid/sys/LFSessionContext.h
deleted file mode 100644
index 92f52ccf83..0000000000
--- a/qpid/cpp/src/qpid/sys/LFSessionContext.h
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LFSessionContext_
-#define _LFSessionContext_
-
-#include <queue>
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/APRSocket.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/sys/LFProcessor.h"
-#include "qpid/sys/SessionContext.h"
-#include "qpid/sys/SessionHandler.h"
-
-namespace qpid {
-namespace sys {
-
-
- class LFSessionContext : public virtual SessionContext
- {
- const bool debug;
- APRSocket socket;
- bool initiated;
-
- qpid::framing::Buffer in;
- qpid::framing::Buffer out;
-
- SessionHandler* handler;
- LFProcessor* const processor;
-
- apr_pollfd_t fd;
-
- std::queue<qpid::framing::AMQFrame*> framesToWrite;
- qpid::sys::Monitor writeLock;
-
- bool processing;
- bool closing;
-
- //these are just for debug, as a crude way of detecting concurrent access
- volatile unsigned int reading;
- volatile unsigned int writing;
-
- static qpid::sys::Monitor logLock;
- void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
-
- public:
- LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
- LFProcessor* const processor,
- bool debug = false);
- ~LFSessionContext();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void close();
- void read();
- void write();
- void init(SessionHandler* handler);
- void startProcessing();
- void stopProcessing();
- void handleClose();
- void shutdown();
- inline apr_pollfd_t* const getFd(){ return &fd; }
- inline bool isClosed(){ return !socket.isOpen(); }
- };
-
-}
-}
-
-
-#endif
diff --git a/qpid/cpp/src/qpid/sys/Runnable.cpp b/qpid/cpp/src/qpid/sys/Runnable.cpp
deleted file mode 100644
index d7d9e968cc..0000000000
--- a/qpid/cpp/src/qpid/sys/Runnable.cpp
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/sys/Runnable.h"
-qpid::sys::Runnable::~Runnable() {}
diff --git a/qpid/cpp/src/qpid/sys/Runnable.h b/qpid/cpp/src/qpid/sys/Runnable.h
index ce13eb2039..c06698bb93 100644
--- a/qpid/cpp/src/qpid/sys/Runnable.h
+++ b/qpid/cpp/src/qpid/sys/Runnable.h
@@ -21,15 +21,15 @@
namespace qpid {
namespace sys {
- class Runnable
- {
- public:
- virtual ~Runnable();
- virtual void run() = 0;
- };
+/** Base class for classes that run in a thread. */
+class Runnable
+{
+ public:
+ virtual ~Runnable() {}
+ virtual void run() = 0;
+};
-}
-}
+}}
#endif
diff --git a/qpid/cpp/src/qpid/sys/Thread.h b/qpid/cpp/src/qpid/sys/Thread.h
index e86bd4a8d2..d884add776 100644
--- a/qpid/cpp/src/qpid/sys/Thread.h
+++ b/qpid/cpp/src/qpid/sys/Thread.h
@@ -1,3 +1,6 @@
+#ifndef _sys_Thread_h
+#define _sys_Thread_h
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,34 +18,10 @@
* limitations under the License.
*
*/
-#ifndef _Thread_
-#define _Thread_
-
-#include "apr-1/apr_thread_proc.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-
-namespace qpid {
-namespace sys {
-
- class Thread
- {
- const Runnable* runnable;
- apr_pool_t* pool;
- apr_thread_t* runner;
- public:
- Thread(apr_pool_t* pool, Runnable* runnable);
- virtual ~Thread();
- virtual void start();
- virtual void join();
- virtual void interrupt();
- static unsigned int currentThread();
- };
+#include <qpid/sys/platform.h>
+#include QPID_PLATFORM_H(Thread.h)
-}
-}
-#endif
+#endif /*!_sys_Thread_h*/
diff --git a/qpid/cpp/src/qpid/sys/ThreadFactory.cpp b/qpid/cpp/src/qpid/sys/ThreadFactory.cpp
deleted file mode 100644
index d33872b9a2..0000000000
--- a/qpid/cpp/src/qpid/sys/ThreadFactory.cpp
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/sys/APRBase.h"
-#include "qpid/sys/ThreadFactory.h"
-
-using namespace qpid::sys;
-
-ThreadFactory::ThreadFactory(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-}
-
-ThreadFactory::~ThreadFactory(){
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
-
-Thread* ThreadFactory::create(Runnable* runnable){
- return new Thread(pool, runnable);
-}
diff --git a/qpid/cpp/src/qpid/sys/Time.h b/qpid/cpp/src/qpid/sys/Time.h
index 5c7cdfb005..79d17b433b 100644
--- a/qpid/cpp/src/qpid/sys/Time.h
+++ b/qpid/cpp/src/qpid/sys/Time.h
@@ -24,28 +24,14 @@
namespace qpid {
namespace sys {
-/**
- * Time since the epoch.
- */
-class Time
-{
- public:
- static const int64_t NANOS = 1000000000;
- static const int64_t MICROS = 1000000;
- static const int64_t MILLIS = 1000;
-
- static Time now();
-
- Time(int64_t nsecs_) : ticks(nsecs_) {}
+inline int64_t msecsToNsecs(int64_t msecs) { return msecs * 1000 *1000; }
+inline int64_t nsecsToMsecs(int64_t nsecs) { return nsecs / (1000 *1000); }
- int64_t nsecs() const { return ticks; }
- int64_t usecs() const { return nsecs()/1000; }
- int64_t msecs() const { return usecs()/1000; }
- int64_t secs() const { return msecs()/1000; }
+/** Nanoseconds since epoch */
+int64_t getTimeNsecs();
- private:
- int64_t ticks;
-};
+/** Milliseconds since epoch */
+int64_t getTimeMsecs();
}}
diff --git a/qpid/cpp/src/qpid/broker/Prefetch.cpp b/qpid/cpp/src/qpid/sys/platform.h
index 6d9dbda13c..878c724953 100644
--- a/qpid/cpp/src/qpid/broker/Prefetch.cpp
+++ b/qpid/cpp/src/qpid/sys/platform.h
@@ -1,3 +1,6 @@
+#ifndef _sys_platform_h
+#define _sys_platform_h
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -16,11 +19,11 @@
*
*/
-#include "qpid/broker/Prefetch.h"
+/**
+ * Macros for including platform-specific headers and aliasing
+ * platform-specific classes into the qpid::sys namespace.
+ */
-using namespace qpid::broker;
+#define QPID_PLATFORM_H(HEADER) <qpid/PLATFORM/HEADER>
-void Prefetch::reset(){
- size = 0;
- count = 0;
-}
+#endif /*!_sys_platform_h*/
diff --git a/qpid/cpp/src/qpidd.cpp b/qpid/cpp/src/qpidd.cpp
index f170aed95e..2974cc8654 100644
--- a/qpid/cpp/src/qpidd.cpp
+++ b/qpid/cpp/src/qpidd.cpp
@@ -24,8 +24,11 @@
using namespace qpid::broker;
using namespace qpid::sys;
+Broker::shared_ptr broker;
+
void handle_signal(int /*signal*/){
std::cout << "Shutting down..." << std::endl;
+ broker->shutdown();
}
int main(int argc, char** argv)
@@ -36,8 +39,8 @@ int main(int argc, char** argv)
if(config.isHelp()){
config.usage();
}else{
+ broker = Broker::create(config);
apr_signal(SIGINT, handle_signal);
- Broker::shared_ptr broker = Broker::create(config);
broker->run();
}
return 0;
diff --git a/qpid/cpp/test/client/client_test.cpp b/qpid/cpp/test/client/client_test.cpp
index 0e57babbef..8e9c58179a 100644
--- a/qpid/cpp/test/client/client_test.cpp
+++ b/qpid/cpp/test/client/client_test.cpp
@@ -36,9 +36,7 @@ public:
inline virtual void received(Message& /*msg*/){
std::cout << "Received message " /**<< msg **/<< std::endl;
- monitor->acquire();
monitor->notify();
- monitor->release();
}
};
@@ -77,12 +75,12 @@ int main(int argc, char**)
msg.setData(data);
channel.publish(msg, exchange, "MyTopic");
std::cout << "Published message." << std::endl;
-
- monitor.acquire();
- monitor.wait();
- monitor.release();
-
+ {
+ Monitor::ScopedLock l(monitor);
+ monitor.wait();
+ }
+
con.closeChannel(&channel);
std::cout << "Closed channel." << std::endl;
con.close();
diff --git a/qpid/cpp/test/client/topic_listener.cpp b/qpid/cpp/test/client/topic_listener.cpp
index 0f383134b5..9aa93bc2b5 100644
--- a/qpid/cpp/test/client/topic_listener.cpp
+++ b/qpid/cpp/test/client/topic_listener.cpp
@@ -21,11 +21,12 @@
#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Queue.h"
-#include <apr-1/apr_time.h>
+#include <qpid/sys/Time.h>
#include <iostream>
#include <sstream>
using namespace qpid::client;
+using namespace qpid::sys;
class Listener : public MessageListener{
Channel* const channel;
@@ -33,7 +34,7 @@ class Listener : public MessageListener{
const bool transactional;
bool init;
int count;
- apr_time_t start;
+ int64_t start;
void shutdown();
void report();
@@ -101,7 +102,7 @@ Listener::Listener(Channel* _channel, const std::string& _responseq, bool tx) :
void Listener::received(Message& message){
if(!init){
- start = apr_time_as_msec(apr_time_now());
+ start = getTimeMsecs();
count = 0;
init = true;
}
@@ -123,8 +124,8 @@ void Listener::shutdown(){
}
void Listener::report(){
- apr_time_t finish = apr_time_as_msec(apr_time_now());
- apr_time_t time = finish - start;
+ int64_t finish = getTimeMsecs();
+ int64_t time = finish - start;
std::stringstream reportstr;
reportstr << "Received " << count << " messages in " << time << " ms.";
Message msg;
diff --git a/qpid/cpp/test/client/topic_publisher.cpp b/qpid/cpp/test/client/topic_publisher.cpp
index 119d275cfd..22c36ea9e3 100644
--- a/qpid/cpp/test/client/topic_publisher.cpp
+++ b/qpid/cpp/test/client/topic_publisher.cpp
@@ -23,7 +23,7 @@
#include "qpid/client/Queue.h"
#include "qpid/sys/Monitor.h"
#include "unistd.h"
-#include <apr-1/apr_time.h>
+#include <qpid/sys/Time.h>
#include <cstdlib>
#include <iostream>
@@ -43,7 +43,7 @@ class Publisher : public MessageListener{
public:
Publisher(Channel* channel, const std::string& controlTopic, bool tx);
virtual void received(Message& msg);
- apr_time_t publish(int msgs, int listeners, int size);
+ int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -105,19 +105,19 @@ int main(int argc, char** argv){
channel.start();
int batchSize(args.getBatches());
- apr_time_t max(0);
- apr_time_t min(0);
- apr_time_t sum(0);
+ int64_t max(0);
+ int64_t min(0);
+ int64_t sum(0);
for(int i = 0; i < batchSize; i++){
if(i > 0 && args.getDelay()) sleep(args.getDelay());
- apr_time_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize());
+ int64_t time = publisher.publish(args.getMessages(), args.getSubscribers(), args.getSize());
if(!max || time > max) max = time;
if(!min || time < min) min = time;
sum += time;
- std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << time << "ms" << std::endl;
+ std::cout << "Completed " << (i+1) << " of " << batchSize << " in " << nsecsToMsecs(time) << "ms" << std::endl;
}
publisher.terminate();
- apr_time_t avg = sum / batchSize;
+ int64_t avg = sum / batchSize;
if(batchSize > 1){
std::cout << batchSize << " batches completed. avg=" << avg <<
", max=" << max << ", min=" << min << std::endl;
@@ -135,12 +135,11 @@ Publisher::Publisher(Channel* _channel, const std::string& _controlTopic, bool t
void Publisher::received(Message& msg){
//count responses and when all are received end the current batch
- monitor.acquire();
+ Monitor::ScopedLock l(monitor);
if(--count == 0){
monitor.notify();
}
std::cout << "Received report: " << msg.getData() << " (" << count << " remaining)." << std::endl;
- monitor.release();
}
void Publisher::waitForCompletion(int msgs){
@@ -148,26 +147,27 @@ void Publisher::waitForCompletion(int msgs){
monitor.wait();
}
-apr_time_t Publisher::publish(int msgs, int listeners, int size){
- monitor.acquire();
+int64_t Publisher::publish(int msgs, int listeners, int size){
Message msg;
msg.setData(generateData(size));
- apr_time_t start(apr_time_as_msec(apr_time_now()));
- for(int i = 0; i < msgs; i++){
- channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
- }
- //send report request
- Message reportRequest;
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
- if(transactional){
- channel->commit();
+ int64_t start = getTimeMsecs();
+ {
+ Monitor::ScopedLock l(monitor);
+ for(int i = 0; i < msgs; i++){
+ channel->publish(msg, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ }
+ //send report request
+ Message reportRequest;
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ channel->publish(reportRequest, Exchange::DEFAULT_TOPIC_EXCHANGE, controlTopic);
+ if(transactional){
+ channel->commit();
+ }
+
+ waitForCompletion(listeners);
}
- waitForCompletion(listeners);
- monitor.release();
- apr_time_t finish(apr_time_as_msec(apr_time_now()));
-
+ int64_t finish(getTimeMsecs());
return finish - start;
}
diff --git a/qpid/cpp/test/unit/qpid/sys/APRBaseTest.cpp b/qpid/cpp/test/unit/qpid/apr/APRBaseTest.cpp
index fc0c7dd9e1..a0f88f78db 100644
--- a/qpid/cpp/test/unit/qpid/sys/APRBaseTest.cpp
+++ b/qpid/cpp/test/unit/qpid/apr/APRBaseTest.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-#include "qpid/sys/APRBase.h"
+#include "qpid/apr/APRBase.h"
#include <qpid_test_plugin.h>
#include <iostream>
diff --git a/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp b/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
index 1dbbeda827..c0b9225483 100644
--- a/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
+++ b/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
@@ -15,14 +15,12 @@
* limitations under the License.
*
*/
-#include "qpid/sys/APRBase.h"
-#include "qpid/broker/Message.h"
+#include <qpid/broker/Message.h>
#include <qpid_test_plugin.h>
#include <iostream>
using namespace qpid::broker;
using namespace qpid::framing;
-using namespace qpid::sys;
class MessageTest : public CppUnit::TestCase
{
@@ -34,7 +32,6 @@ class MessageTest : public CppUnit::TestCase
void testMe()
{
- APRBase::increment();
const int size(10);
for(int i = 0; i < size; i++){
Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", "B", true, true));
diff --git a/qpid/python/tests/basic.py b/qpid/python/tests/basic.py
index afdf1a4003..314c20c8a0 100644
--- a/qpid/python/tests/basic.py
+++ b/qpid/python/tests/basic.py
@@ -260,7 +260,6 @@ class BasicTests(TestBase):
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.content.body)
-
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.content.body)