summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-31 19:53:55 +0000
committerAlan Conway <aconway@apache.org>2006-10-31 19:53:55 +0000
commit9094d2b10ecadd66fa3b22169183e7573cc79629 (patch)
treebf3915f72be2a5f09932b800d2fa4309fb3ad64e /cpp
parent0487ea40bc6568765cdec75a36273eeb26fae854 (diff)
downloadqpid-python-9094d2b10ecadd66fa3b22169183e7573cc79629.tar.gz
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/Makefile3
-rw-r--r--cpp/src/qpid/SharedObject.h52
-rw-r--r--cpp/src/qpid/broker/AutoDelete.h10
-rw-r--r--cpp/src/qpid/broker/Broker.cpp46
-rw-r--r--cpp/src/qpid/broker/Broker.h37
-rw-r--r--cpp/src/qpid/broker/Channel.h4
-rw-r--r--cpp/src/qpid/broker/Configuration.cpp10
-rw-r--r--cpp/src/qpid/broker/Configuration.h3
-rw-r--r--cpp/src/qpid/broker/DirectExchange.h4
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h4
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.h4
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.h4
-rw-r--r--cpp/src/qpid/broker/Message.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.cpp2
-rw-r--r--cpp/src/qpid/broker/Queue.h4
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp2
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h4
-rw-r--r--cpp/src/qpid/broker/TopicExchange.h4
-rw-r--r--cpp/src/qpid/client/Channel.cpp10
-rw-r--r--cpp/src/qpid/client/Connection.cpp4
-rw-r--r--cpp/src/qpid/client/ResponseHandler.cpp4
-rw-r--r--cpp/src/qpid/concurrent/APRMonitor.h48
-rw-r--r--cpp/src/qpid/concurrent/APRThread.h48
-rw-r--r--cpp/src/qpid/concurrent/APRThreadFactory.h44
-rw-r--r--cpp/src/qpid/concurrent/APRThreadPool.h67
-rw-r--r--cpp/src/qpid/concurrent/Monitor.cpp (renamed from cpp/src/qpid/concurrent/APRMonitor.cpp)18
-rw-r--r--cpp/src/qpid/concurrent/Monitor.h41
-rw-r--r--cpp/src/qpid/concurrent/MonitorImpl.h57
-rw-r--r--cpp/src/qpid/concurrent/Thread.cpp (renamed from cpp/src/qpid/concurrent/APRThread.cpp)14
-rw-r--r--cpp/src/qpid/concurrent/Thread.h19
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactory.cpp (renamed from cpp/src/qpid/concurrent/APRThreadFactory.cpp)10
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactory.h10
-rw-r--r--cpp/src/qpid/concurrent/ThreadFactoryImpl.h52
-rw-r--r--cpp/src/qpid/concurrent/ThreadPool.cpp (renamed from cpp/src/qpid/concurrent/APRThreadPool.cpp)18
-rw-r--r--cpp/src/qpid/concurrent/ThreadPool.h35
-rw-r--r--cpp/src/qpid/framing/InputHandler.cpp21
-rw-r--r--cpp/src/qpid/framing/InputHandler.h19
-rw-r--r--cpp/src/qpid/framing/OutputHandler.cpp21
-rw-r--r--cpp/src/qpid/framing/OutputHandler.h21
-rw-r--r--cpp/src/qpid/io/APRConnector.h95
-rw-r--r--cpp/src/qpid/io/APRPool.cpp (renamed from cpp/src/qpid/concurrent/LMonitor.h)35
-rw-r--r--cpp/src/qpid/io/APRPool.h (renamed from cpp/src/qpid/concurrent/LThreadFactory.h)38
-rw-r--r--cpp/src/qpid/io/Acceptor.cpp61
-rw-r--r--cpp/src/qpid/io/Acceptor.h57
-rw-r--r--cpp/src/qpid/io/BlockingAPRAcceptor.cpp101
-rw-r--r--cpp/src/qpid/io/BlockingAPRAcceptor.h65
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.cpp177
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.h94
-rw-r--r--cpp/src/qpid/io/Connector.cpp (renamed from cpp/src/qpid/io/APRConnector.cpp)42
-rw-r--r--cpp/src/qpid/io/Connector.h73
-rw-r--r--cpp/src/qpid/io/ConnectorImpl.h53
-rw-r--r--cpp/src/qpid/io/LConnector.h48
-rw-r--r--cpp/src/qpid/io/LFAcceptor.cpp94
-rw-r--r--cpp/src/qpid/io/LFAcceptor.h74
-rw-r--r--cpp/src/qpid/io/LFProcessor.h10
-rw-r--r--cpp/src/qpid/io/LFSessionContext.cpp6
-rw-r--r--cpp/src/qpid/io/LFSessionContext.h6
-rw-r--r--cpp/src/qpid/io/doxygen_summary.h (renamed from cpp/src/qpid/io/SessionManager.h)36
-rw-r--r--cpp/src/qpidd.cpp2
-rw-r--r--cpp/test/client/client_test.cpp4
-rw-r--r--cpp/test/client/topic_publisher.cpp4
-rw-r--r--cpp/test/unit/qpid/broker/ConfigurationTest.cpp19
62 files changed, 470 insertions, 1504 deletions
diff --git a/cpp/Makefile b/cpp/Makefile
index fe5e40bb26..e81e89a6a7 100644
--- a/cpp/Makefile
+++ b/cpp/Makefile
@@ -36,6 +36,7 @@ STYLESHEETS := $(XSL:%=$(CURDIR)/etc/stylesheets/%)
TRANSFORM := java -jar $(CURDIR)/tools/saxon8.jar -o results.out $(SPEC)
generate: $(GENDIR)/timestamp
$(GENDIR)/timestamp: $(wildcard etc/stylesheets/*.xsl) $(SPEC)
+ rm -rf $(GENDIR)
mkdir -p $(GENDIR)/qpid/framing
( cd $(GENDIR)/qpid/framing && for s in $(STYLESHEETS) ; do $(TRANSFORM) $$s ; done ) && echo > $(GENDIR)/timestamp
$(shell find $(GENDIR) -name *.cpp -o -name *.h): $(GENDIR)/timestamp
@@ -106,7 +107,7 @@ CLIENT_TEST_EXE := $(CLIENT_TEST_SRC:test/client/%.cpp=$(TESTDIR)/%)
all-nogen: $(CLIENT_TEST_EXE)
## #include dependencies
--include $(shell find src test -name '*.d') dummy-avoid-warning-if-none
+-include $(shell find $(GENDIR) $(OBJDIR) -name '*.d') dummy-avoid-warning-if-none
## Clean up
diff --git a/cpp/src/qpid/SharedObject.h b/cpp/src/qpid/SharedObject.h
new file mode 100644
index 0000000000..15f333173a
--- /dev/null
+++ b/cpp/src/qpid/SharedObject.h
@@ -0,0 +1,52 @@
+#ifndef _SharedObject_
+#define _SharedObject_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/shared_ptr.hpp>
+#include <boost/noncopyable.hpp>
+
+namespace qpid {
+ /**
+ * Template to enforce shared object conventions.
+ * Shared object classes should inherit : public qpid::SharedObject
+ * That ensures Foo:
+ * - has typedef boost::shared_ptr<T> SharedPtr
+ * - has virtual destructor
+ * - is boost::noncopyable (no default copy or assign)
+ * - has a protected default constructor.
+ *
+ * Shared objects should not have public constructors.
+ * Make constructors protected and provide public statc create()
+ * functions that return a SharedPtr.
+ */
+ template <class T>
+ class SharedObject : private boost::noncopyable
+ {
+ public:
+ typedef boost::shared_ptr<T> SharedPtr;
+
+ virtual ~SharedObject() {};
+
+ protected:
+ SharedObject() {}
+ };
+}
+
+#endif /*!_SharedObject_*/
diff --git a/cpp/src/qpid/broker/AutoDelete.h b/cpp/src/qpid/broker/AutoDelete.h
index 9faa4aa4c4..509ac3bec1 100644
--- a/cpp/src/qpid/broker/AutoDelete.h
+++ b/cpp/src/qpid/broker/AutoDelete.h
@@ -20,17 +20,17 @@
#include <iostream>
#include <queue>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/ThreadFactory.h"
namespace qpid {
namespace broker{
class AutoDelete : private virtual qpid::concurrent::Runnable{
- qpid::concurrent::ThreadFactoryImpl factory;
- qpid::concurrent::MonitorImpl lock;
- qpid::concurrent::MonitorImpl monitor;
+ qpid::concurrent::ThreadFactory factory;
+ qpid::concurrent::Monitor lock;
+ qpid::concurrent::Monitor monitor;
std::queue<Queue::shared_ptr> queues;
QueueRegistry* const registry;
const u_int32_t period;
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index fe859b240b..7b5f9e3e32 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -18,60 +18,30 @@
#include <iostream>
#include <memory>
#include "qpid/broker/Broker.h"
-#include "qpid/io/Acceptor.h"
-#include "qpid/broker/Configuration.h"
-#include "qpid/QpidError.h"
-#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/io/LFAcceptor.h"
using namespace qpid::broker;
using namespace qpid::io;
-namespace {
- Acceptor* createAcceptor(const Configuration& config){
- const string type(config.getAcceptor());
- if("blocking" == type){
- std::cout << "Using blocking acceptor " << std::endl;
- return new BlockingAPRAcceptor(config.isTrace(), config.getConnectionBacklog());
- }else if("non-blocking" == type){
- std::cout << "Using non-blocking acceptor " << std::endl;
- return new LFAcceptor(config.isTrace(),
- config.getConnectionBacklog(),
- config.getWorkerThreads(),
- config.getMaxConnections());
- }
- throw Configuration::ParseException("Unrecognised acceptor: " + type);
- }
-}
-
Broker::Broker(const Configuration& config) :
- acceptor(createAcceptor(config)),
- port(config.getPort()),
- isBound(false) {}
+ acceptor(new Acceptor(config.getPort(),
+ config.getConnectionBacklog(),
+ config.getWorkerThreads()))
+{ }
+
-Broker::shared_ptr Broker::create(int port)
+Broker::SharedPtr Broker::create(int16_t port)
{
Configuration config;
config.setPort(port);
return create(config);
}
-Broker::shared_ptr Broker::create(const Configuration& config) {
- return Broker::shared_ptr(new Broker(config));
+Broker::SharedPtr Broker::create(const Configuration& config) {
+ return Broker::SharedPtr(new Broker(config));
}
-int16_t Broker::bind()
-{
- if (!isBound) {
- port = acceptor->bind(port);
- }
- return port;
-}
-
void Broker::run() {
- bind();
acceptor->run(&factory);
}
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 8581093910..dd87c47909 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -19,47 +19,35 @@
*
*/
-#include "qpid/io/Acceptor.h"
#include "qpid/broker/Configuration.h"
-#include "qpid/concurrent/Runnable.h"
#include "qpid/broker/SessionHandlerFactoryImpl.h"
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/Acceptor.h"
+#include <qpid/SharedObject.h>
namespace qpid {
namespace broker {
/**
* A broker instance.
*/
- class Broker : public qpid::concurrent::Runnable, private boost::noncopyable {
- Broker(const Configuration& config); // Private, use create()
- std::auto_ptr<qpid::io::Acceptor> acceptor;
- SessionHandlerFactoryImpl factory;
- int16_t port;
- bool isBound;
-
+ class Broker : public qpid::concurrent::Runnable,
+ public qpid::SharedObject<Broker>
+ {
public:
static const int16_t DEFAULT_PORT;
virtual ~Broker();
- typedef boost::shared_ptr<Broker> shared_ptr;
/**
* Create a broker.
* @param port Port to listen on or 0 to pick a port dynamically.
*/
- static shared_ptr create(int port = DEFAULT_PORT);
+ static SharedPtr create(int16_t port = DEFAULT_PORT);
/**
- * Create a broker from a Configuration.
+ * Create a broker using a Configuration.
*/
- static shared_ptr create(const Configuration& config);
-
- /**
- * Bind to the listening port.
- * @return The port number bound.
- */
- virtual int16_t bind();
+ static SharedPtr create(const Configuration& config);
/**
* Return listening port. If called before bind this is
@@ -67,7 +55,7 @@ namespace qpid {
* port, which will be different if the configured port is
* 0.
*/
- virtual int16_t getPort() { return port; }
+ virtual int16_t getPort() const { return acceptor->getPort(); }
/**
* Run the broker. Implements Runnable::run() so the broker
@@ -77,6 +65,11 @@ namespace qpid {
/** Shut down the broker */
virtual void shutdown();
+
+ private:
+ Broker(const Configuration& config);
+ qpid::io::Acceptor::SharedPtr acceptor;
+ SessionHandlerFactoryImpl factory;
};
}
}
diff --git a/cpp/src/qpid/broker/Channel.h b/cpp/src/qpid/broker/Channel.h
index f5aa0e45ed..13bd4cd450 100644
--- a/cpp/src/qpid/broker/Channel.h
+++ b/cpp/src/qpid/broker/Channel.h
@@ -37,7 +37,7 @@
#include "qpid/broker/TxAck.h"
#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/AMQContentBody.h"
#include "qpid/framing/AMQHeaderBody.h"
@@ -77,7 +77,7 @@ namespace qpid {
u_int32_t framesize;
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
- qpid::concurrent::MonitorImpl deliveryLock;
+ qpid::concurrent::Monitor deliveryLock;
TxBuffer txBuffer;
AccumulatedAck accumulatedAck;
TransactionalStore* store;
diff --git a/cpp/src/qpid/broker/Configuration.cpp b/cpp/src/qpid/broker/Configuration.cpp
index 2dcefd878d..550b283d62 100644
--- a/cpp/src/qpid/broker/Configuration.cpp
+++ b/cpp/src/qpid/broker/Configuration.cpp
@@ -24,10 +24,9 @@ using namespace std;
Configuration::Configuration() :
trace('t', "trace", "Print incoming & outgoing frames to the console (default=false)", false),
port('p', "port", "Sets the port to listen on (default=5672)", 5672),
- workerThreads("worker-threads", "Sets the number of worker threads to use (default=5). Only valid for non-blocking acceptor.", 5),
- maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500). Only valid for non-blocking acceptor.", 500),
+ workerThreads("worker-threads", "Sets the number of worker threads to use (default=5).", 5),
+ maxConnections("max-connections", "Sets the maximum number of connections the broker can accept (default=500).", 500),
connectionBacklog("connection-backlog", "Sets the connection backlog for the servers socket (default=10)", 10),
- acceptor('a', "acceptor", "Sets the acceptor to use. Currently only two values are recognised, blocking and non-blocking (which is the default)", "non-blocking"),
help("help", "Prints usage information", false)
{
options.push_back(&trace);
@@ -35,7 +34,6 @@ Configuration::Configuration() :
options.push_back(&workerThreads);
options.push_back(&maxConnections);
options.push_back(&connectionBacklog);
- options.push_back(&acceptor);
options.push_back(&help);
}
@@ -85,10 +83,6 @@ int Configuration::getConnectionBacklog() const {
return connectionBacklog.getValue();
}
-string Configuration::getAcceptor() const {
- return acceptor.getValue();
-}
-
Configuration::Option::Option(const char _flag, const string& _name, const string& _desc) :
flag(string("-") + _flag), name("--" +_name), desc(_desc) {}
diff --git a/cpp/src/qpid/broker/Configuration.h b/cpp/src/qpid/broker/Configuration.h
index 61ecc89ed9..e1e7c40947 100644
--- a/cpp/src/qpid/broker/Configuration.h
+++ b/cpp/src/qpid/broker/Configuration.h
@@ -92,7 +92,6 @@ namespace qpid {
IntOption workerThreads;
IntOption maxConnections;
IntOption connectionBacklog;
- StringOption acceptor;
BoolOption help;
typedef std::vector<Option*>::iterator op_iterator;
@@ -116,7 +115,6 @@ namespace qpid {
int getWorkerThreads() const;
int getMaxConnections() const;
int getConnectionBacklog() const;
- std::string getAcceptor() const;
void setHelp(bool b) { help.setValue(b); }
void setTrace(bool b) { trace.setValue(b); }
@@ -124,7 +122,6 @@ namespace qpid {
void setWorkerThreads(int i) { workerThreads.setValue(i); }
void setMaxConnections(int i) { maxConnections.setValue(i); }
void setConnectionBacklog(int i) { connectionBacklog.setValue(i); }
- void setAcceptor(const std::string& val) { acceptor.setValue(val); }
void usage();
};
diff --git a/cpp/src/qpid/broker/DirectExchange.h b/cpp/src/qpid/broker/DirectExchange.h
index 5c5f78d90a..2c3143cd3c 100644
--- a/cpp/src/qpid/broker/DirectExchange.h
+++ b/cpp/src/qpid/broker/DirectExchange.h
@@ -23,14 +23,14 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
namespace broker {
class DirectExchange : public virtual Exchange{
std::map<string, std::vector<Queue::shared_ptr> > bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index fca5462e72..c574a97e14 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -20,7 +20,7 @@
#include <map>
#include "qpid/broker/Exchange.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace broker {
@@ -29,7 +29,7 @@ namespace broker {
class ExchangeRegistry{
typedef std::map<string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const string& name, const string& type) throw(UnknownExchangeTypeException);
void destroy(const string& name);
diff --git a/cpp/src/qpid/broker/FanOutExchange.h b/cpp/src/qpid/broker/FanOutExchange.h
index 334f1ccdcc..83fcdb9b34 100644
--- a/cpp/src/qpid/broker/FanOutExchange.h
+++ b/cpp/src/qpid/broker/FanOutExchange.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -31,7 +31,7 @@ namespace broker {
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/HeadersExchange.h b/cpp/src/qpid/broker/HeadersExchange.h
index 2e2403361e..cf699ac455 100644
--- a/cpp/src/qpid/broker/HeadersExchange.h
+++ b/cpp/src/qpid/broker/HeadersExchange.h
@@ -22,7 +22,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -34,7 +34,7 @@ class HeadersExchange : public virtual Exchange {
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 962c74864e..e96cc65b95 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Message.h"
#include <iostream>
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 67fb6764be..88dad7aaf9 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -17,7 +17,7 @@
*/
#include "qpid/broker/Queue.h"
#include "qpid/broker/MessageStore.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include <iostream>
using namespace qpid::broker;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 93570f59cc..f954e48c20 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -27,7 +27,7 @@
#include "qpid/broker/ConnectionToken.h"
#include "qpid/broker/Consumer.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace broker {
@@ -56,7 +56,7 @@ namespace qpid {
bool queueing;
bool dispatching;
int next;
- mutable qpid::concurrent::MonitorImpl lock;
+ mutable qpid::concurrent::Monitor lock;
apr_time_t lastUsed;
Consumer* exclusive;
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index 973201fe64..949c194bbe 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -16,7 +16,7 @@
*
*/
#include "qpid/broker/QueueRegistry.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/SessionHandlerImpl.h"
#include <sstream>
#include <assert.h>
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index 6f80291192..4f9e4b882a 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -19,7 +19,7 @@
#define _QueueRegistry_
#include <map>
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -77,7 +77,7 @@ class QueueRegistry{
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
int counter;
};
diff --git a/cpp/src/qpid/broker/TopicExchange.h b/cpp/src/qpid/broker/TopicExchange.h
index 19ea732fbc..cb773b9a56 100644
--- a/cpp/src/qpid/broker/TopicExchange.h
+++ b/cpp/src/qpid/broker/TopicExchange.h
@@ -23,7 +23,7 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Message.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/broker/Queue.h"
namespace qpid {
@@ -71,7 +71,7 @@ class TopicPattern : public Tokens
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::concurrent::MonitorImpl lock;
+ qpid::concurrent::Monitor lock;
public:
static const std::string typeName;
diff --git a/cpp/src/qpid/client/Channel.cpp b/cpp/src/qpid/client/Channel.cpp
index 4d994f0510..4579b6126d 100644
--- a/cpp/src/qpid/client/Channel.cpp
+++ b/cpp/src/qpid/client/Channel.cpp
@@ -16,8 +16,8 @@
*
*/
#include "qpid/client/Channel.h"
-#include "qpid/concurrent/MonitorImpl.h"
-#include "qpid/concurrent/ThreadFactoryImpl.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
@@ -36,9 +36,9 @@ Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch),
transactional(_transactional)
{
- threadFactory = new ThreadFactoryImpl();
- dispatchMonitor = new MonitorImpl();
- retrievalMonitor = new MonitorImpl();
+ threadFactory = new ThreadFactory();
+ dispatchMonitor = new Monitor();
+ retrievalMonitor = new Monitor();
}
Channel::~Channel(){
diff --git a/cpp/src/qpid/client/Connection.cpp b/cpp/src/qpid/client/Connection.cpp
index b20df92e9b..acd4488813 100644
--- a/cpp/src/qpid/client/Connection.cpp
+++ b/cpp/src/qpid/client/Connection.cpp
@@ -17,7 +17,7 @@
*/
#include "qpid/client/Connection.h"
#include "qpid/client/Channel.h"
-#include "qpid/io/ConnectorImpl.h"
+#include "qpid/io/Connector.h"
#include "qpid/client/Message.h"
#include "qpid/QpidError.h"
#include <iostream>
@@ -30,7 +30,7 @@ using namespace qpid::concurrent;
u_int16_t Connection::channelIdCounter;
Connection::Connection(bool debug, u_int32_t _max_frame_size) : max_frame_size(_max_frame_size), closed(true){
- connector = new ConnectorImpl(debug, _max_frame_size);
+ connector = new Connector(debug, _max_frame_size);
}
Connection::~Connection(){
diff --git a/cpp/src/qpid/client/ResponseHandler.cpp b/cpp/src/qpid/client/ResponseHandler.cpp
index ec20dd1a10..fcbc76f625 100644
--- a/cpp/src/qpid/client/ResponseHandler.cpp
+++ b/cpp/src/qpid/client/ResponseHandler.cpp
@@ -16,11 +16,11 @@
*
*/
#include "qpid/client/ResponseHandler.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/QpidError.h"
qpid::client::ResponseHandler::ResponseHandler() : waiting(false){
- monitor = new qpid::concurrent::MonitorImpl();
+ monitor = new qpid::concurrent::Monitor();
}
qpid::client::ResponseHandler::~ResponseHandler(){
diff --git a/cpp/src/qpid/concurrent/APRMonitor.h b/cpp/src/qpid/concurrent/APRMonitor.h
deleted file mode 100644
index a396beab50..0000000000
--- a/cpp/src/qpid/concurrent/APRMonitor.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRMonitor_
-#define _APRMonitor_
-
-#include "apr-1/apr_thread_mutex.h"
-#include "apr-1/apr_thread_cond.h"
-#include "qpid/concurrent/Monitor.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRMonitor : public virtual Monitor
- {
- apr_pool_t* pool;
- apr_thread_mutex_t* mutex;
- apr_thread_cond_t* condition;
-
- public:
- APRMonitor();
- virtual ~APRMonitor();
- virtual void wait();
- virtual void wait(u_int64_t time);
- virtual void notify();
- virtual void notifyAll();
- virtual void acquire();
- virtual void release();
- };
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThread.h b/cpp/src/qpid/concurrent/APRThread.h
deleted file mode 100644
index 6328765a06..0000000000
--- a/cpp/src/qpid/concurrent/APRThread.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRThread_
-#define _APRThread_
-
-#include "apr-1/apr_thread_proc.h"
-#include "qpid/concurrent/APRThread.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/concurrent/Thread.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRThread : public Thread
- {
- const Runnable* runnable;
- apr_pool_t* pool;
- apr_thread_t* runner;
-
- public:
- APRThread(apr_pool_t* pool, Runnable* runnable);
- virtual ~APRThread();
- virtual void start();
- virtual void join();
- virtual void interrupt();
- static unsigned int currentThread();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.h b/cpp/src/qpid/concurrent/APRThreadFactory.h
deleted file mode 100644
index 40e96fc2d1..0000000000
--- a/cpp/src/qpid/concurrent/APRThreadFactory.h
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRThreadFactory_
-#define _APRThreadFactory_
-
-#include "apr-1/apr_thread_proc.h"
-
-#include "qpid/concurrent/APRThread.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRThreadFactory : public virtual ThreadFactory
- {
- apr_pool_t* pool;
- public:
- APRThreadFactory();
- virtual ~APRThreadFactory();
- virtual Thread* create(Runnable* runnable);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThreadPool.h b/cpp/src/qpid/concurrent/APRThreadPool.h
deleted file mode 100644
index cab5bcc9ce..0000000000
--- a/cpp/src/qpid/concurrent/APRThreadPool.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRThreadPool_
-#define _APRThreadPool_
-
-#include <queue>
-#include <vector>
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-#include "qpid/concurrent/Runnable.h"
-
-namespace qpid {
-namespace concurrent {
-
- class APRThreadPool : public virtual ThreadPool
- {
- class Worker : public virtual Runnable{
- APRThreadPool* pool;
- public:
- inline Worker(APRThreadPool* _pool) : pool(_pool){}
- inline virtual void run(){
- while(pool->running){
- pool->runTask();
- }
- }
- };
- const bool deleteFactory;
- const int size;
- ThreadFactory* factory;
- APRMonitor lock;
- std::vector<Thread*> threads;
- std::queue<Runnable*> tasks;
- Worker* worker;
- volatile bool running;
-
- void runTask();
- public:
- APRThreadPool(int size);
- APRThreadPool(int size, ThreadFactory* factory);
- virtual void start();
- virtual void stop();
- virtual void addTask(Runnable* task);
- virtual ~APRThreadPool();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRMonitor.cpp b/cpp/src/qpid/concurrent/Monitor.cpp
index cc5eda800f..ae68cf8751 100644
--- a/cpp/src/qpid/concurrent/APRMonitor.cpp
+++ b/cpp/src/qpid/concurrent/Monitor.cpp
@@ -16,45 +16,45 @@
*
*/
#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/Monitor.h"
#include <iostream>
-qpid::concurrent::APRMonitor::APRMonitor(){
+qpid::concurrent::Monitor::Monitor(){
APRBase::increment();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
CHECK_APR_SUCCESS(apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_NESTED, pool));
CHECK_APR_SUCCESS(apr_thread_cond_create(&condition, pool));
}
-qpid::concurrent::APRMonitor::~APRMonitor(){
+qpid::concurrent::Monitor::~Monitor(){
CHECK_APR_SUCCESS(apr_thread_cond_destroy(condition));
CHECK_APR_SUCCESS(apr_thread_mutex_destroy(mutex));
apr_pool_destroy(pool);
APRBase::decrement();
}
-void qpid::concurrent::APRMonitor::wait(){
+void qpid::concurrent::Monitor::wait(){
CHECK_APR_SUCCESS(apr_thread_cond_wait(condition, mutex));
}
-void qpid::concurrent::APRMonitor::wait(u_int64_t time){
+void qpid::concurrent::Monitor::wait(u_int64_t time){
apr_status_t status = apr_thread_cond_timedwait(condition, mutex, time * 1000);
if(!status == APR_TIMEUP) CHECK_APR_SUCCESS(status);
}
-void qpid::concurrent::APRMonitor::notify(){
+void qpid::concurrent::Monitor::notify(){
CHECK_APR_SUCCESS(apr_thread_cond_signal(condition));
}
-void qpid::concurrent::APRMonitor::notifyAll(){
+void qpid::concurrent::Monitor::notifyAll(){
CHECK_APR_SUCCESS(apr_thread_cond_broadcast(condition));
}
-void qpid::concurrent::APRMonitor::acquire(){
+void qpid::concurrent::Monitor::acquire(){
CHECK_APR_SUCCESS(apr_thread_mutex_lock(mutex));
}
-void qpid::concurrent::APRMonitor::release(){
+void qpid::concurrent::Monitor::release(){
CHECK_APR_SUCCESS(apr_thread_mutex_unlock(mutex));
}
diff --git a/cpp/src/qpid/concurrent/Monitor.h b/cpp/src/qpid/concurrent/Monitor.h
index 42e88c0a48..a2777cb2f1 100644
--- a/cpp/src/qpid/concurrent/Monitor.h
+++ b/cpp/src/qpid/concurrent/Monitor.h
@@ -18,42 +18,39 @@
#ifndef _Monitor_
#define _Monitor_
-#include "qpid/framing/amqp_types.h"
+#include "apr-1/apr_thread_mutex.h"
+#include "apr-1/apr_thread_cond.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace concurrent {
class Monitor
{
+ apr_pool_t* pool;
+ apr_thread_mutex_t* mutex;
+ apr_thread_cond_t* condition;
+
public:
- virtual ~Monitor(){}
- virtual void wait() = 0;
- virtual void wait(u_int64_t time) = 0;
- virtual void notify() = 0;
- virtual void notifyAll() = 0;
- virtual void acquire() = 0;
- virtual void release() = 0;
+ Monitor();
+ virtual ~Monitor();
+ virtual void wait();
+ virtual void wait(u_int64_t time);
+ virtual void notify();
+ virtual void notifyAll();
+ virtual void acquire();
+ virtual void release();
};
-/**
- * Scoped locker for a monitor.
- */
class Locker
{
public:
- Locker(Monitor& lock_) : lock(lock_) { lock.acquire(); }
- ~Locker() { lock.release(); }
-
+ Locker(Monitor& monitor_) : monitor(monitor_) { monitor.acquire(); }
+ ~Locker() { monitor.release(); }
private:
- Monitor& lock;
-
- // private and unimplemented to prevent copying
- Locker(const Locker&);
- void operator=(const Locker&);
+ Monitor& monitor;
};
-
-}
-}
+}}
#endif
diff --git a/cpp/src/qpid/concurrent/MonitorImpl.h b/cpp/src/qpid/concurrent/MonitorImpl.h
deleted file mode 100644
index 258ad140b3..0000000000
--- a/cpp/src/qpid/concurrent/MonitorImpl.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-#ifndef _MonitorImpl_
-#define _MonitorImpl_
-
-#ifdef _USE_APR_IO_
-#include "qpid/concurrent/APRMonitor.h"
-#else /* use POSIX Monitor */
-#include "qpid/concurrent/LMonitor.h"
-#endif
-
-
-namespace qpid {
-namespace concurrent {
-
-#ifdef _USE_APR_IO_
- class MonitorImpl : public virtual APRMonitor
- {
-
- public:
- MonitorImpl() : APRMonitor(){};
- virtual ~MonitorImpl(){};
-
- };
-#else
- class MonitorImpl : public virtual LMonitor
- {
-
- public:
- MonitorImpl() : LMonitor(){};
- virtual ~MonitorImpl(){};
-
- };
-#endif
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThread.cpp b/cpp/src/qpid/concurrent/Thread.cpp
index d4d073cac6..9bbc2f8131 100644
--- a/cpp/src/qpid/concurrent/APRThread.cpp
+++ b/cpp/src/qpid/concurrent/Thread.cpp
@@ -16,7 +16,7 @@
*
*/
#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRThread.h"
+#include "qpid/concurrent/Thread.h"
#include "apr-1/apr_portable.h"
using namespace qpid::concurrent;
@@ -27,24 +27,24 @@ void* APR_THREAD_FUNC ExecRunnable(apr_thread_t* thread, void *data){
return NULL;
}
-APRThread::APRThread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
+Thread::Thread(apr_pool_t* _pool, Runnable* _runnable) : runnable(_runnable), pool(_pool), runner(0) {}
-APRThread::~APRThread(){
+Thread::~Thread(){
}
-void APRThread::start(){
+void Thread::start(){
CHECK_APR_SUCCESS(apr_thread_create(&runner, NULL, ExecRunnable,(void*) runnable, pool));
}
-void APRThread::join(){
+void Thread::join(){
apr_status_t status;
if (runner) CHECK_APR_SUCCESS(apr_thread_join(&status, runner));
}
-void APRThread::interrupt(){
+void Thread::interrupt(){
if (runner) CHECK_APR_SUCCESS(apr_thread_exit(runner, APR_SUCCESS));
}
-unsigned int qpid::concurrent::APRThread::currentThread(){
+unsigned int qpid::concurrent::Thread::currentThread(){
return apr_os_thread_current();
}
diff --git a/cpp/src/qpid/concurrent/Thread.h b/cpp/src/qpid/concurrent/Thread.h
index 6bd2a379ce..d18bc153bf 100644
--- a/cpp/src/qpid/concurrent/Thread.h
+++ b/cpp/src/qpid/concurrent/Thread.h
@@ -18,16 +18,27 @@
#ifndef _Thread_
#define _Thread_
+#include "apr-1/apr_thread_proc.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/concurrent/Thread.h"
+
namespace qpid {
namespace concurrent {
class Thread
{
+ const Runnable* runnable;
+ apr_pool_t* pool;
+ apr_thread_t* runner;
+
public:
- virtual ~Thread(){}
- virtual void start() = 0;
- virtual void join() = 0;
- virtual void interrupt() = 0;
+ Thread(apr_pool_t* pool, Runnable* runnable);
+ virtual ~Thread();
+ virtual void start();
+ virtual void join();
+ virtual void interrupt();
+ static unsigned int currentThread();
};
}
diff --git a/cpp/src/qpid/concurrent/APRThreadFactory.cpp b/cpp/src/qpid/concurrent/ThreadFactory.cpp
index 1c99a3da33..b20f9f2b04 100644
--- a/cpp/src/qpid/concurrent/APRThreadFactory.cpp
+++ b/cpp/src/qpid/concurrent/ThreadFactory.cpp
@@ -16,20 +16,20 @@
*
*/
#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/ThreadFactory.h"
using namespace qpid::concurrent;
-APRThreadFactory::APRThreadFactory(){
+ThreadFactory::ThreadFactory(){
APRBase::increment();
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
}
-APRThreadFactory::~APRThreadFactory(){
+ThreadFactory::~ThreadFactory(){
apr_pool_destroy(pool);
APRBase::decrement();
}
-Thread* APRThreadFactory::create(Runnable* runnable){
- return new APRThread(pool, runnable);
+Thread* ThreadFactory::create(Runnable* runnable){
+ return new Thread(pool, runnable);
}
diff --git a/cpp/src/qpid/concurrent/ThreadFactory.h b/cpp/src/qpid/concurrent/ThreadFactory.h
index 60c8ad2556..572419cae6 100644
--- a/cpp/src/qpid/concurrent/ThreadFactory.h
+++ b/cpp/src/qpid/concurrent/ThreadFactory.h
@@ -18,7 +18,11 @@
#ifndef _ThreadFactory_
#define _ThreadFactory_
+#include "apr-1/apr_thread_proc.h"
+
+#include "qpid/concurrent/Thread.h"
#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -26,9 +30,11 @@ namespace concurrent {
class ThreadFactory
{
+ apr_pool_t* pool;
public:
- virtual ~ThreadFactory(){}
- virtual Thread* create(Runnable* runnable) = 0;
+ ThreadFactory();
+ virtual ~ThreadFactory();
+ virtual Thread* create(Runnable* runnable);
};
}
diff --git a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h b/cpp/src/qpid/concurrent/ThreadFactoryImpl.h
deleted file mode 100644
index 352b77ac21..0000000000
--- a/cpp/src/qpid/concurrent/ThreadFactoryImpl.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _ThreadFactoryImpl_
-#define _ThreadFactoryImpl_
-
-
-#ifdef _USE_APR_IO_
-#include "qpid/concurrent/APRThreadFactory.h"
-#else
-#include "qpid/concurrent/LThreadFactory.h"
-#endif
-
-
-namespace qpid {
-namespace concurrent {
-
-
-#ifdef _USE_APR_IO_
- class ThreadFactoryImpl : public virtual APRThreadFactory
- {
- public:
- ThreadFactoryImpl(): APRThreadFactory() {};
- virtual ~ThreadFactoryImpl() {};
- };
-#else
- class ThreadFactoryImpl : public virtual LThreadFactory
- {
- public:
- ThreadFactoryImpl(): LThreadFactory() {};
- virtual ~ThreadFactoryImpl() {};
- };
-#endif
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/APRThreadPool.cpp b/cpp/src/qpid/concurrent/ThreadPool.cpp
index 3222c71b0c..5da19745a7 100644
--- a/cpp/src/qpid/concurrent/APRThreadPool.cpp
+++ b/cpp/src/qpid/concurrent/ThreadPool.cpp
@@ -15,33 +15,33 @@
* limitations under the License.
*
*/
-#include "qpid/concurrent/APRThreadFactory.h"
-#include "qpid/concurrent/APRThreadPool.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
#include "qpid/QpidError.h"
#include <iostream>
using namespace qpid::concurrent;
-APRThreadPool::APRThreadPool(int _size) : deleteFactory(true), size(_size), factory(new APRThreadFactory()), running(false){
+ThreadPool::ThreadPool(int _size) : deleteFactory(true), size(_size), factory(new ThreadFactory()), running(false){
worker = new Worker(this);
}
-APRThreadPool::APRThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){
+ThreadPool::ThreadPool(int _size, ThreadFactory* _factory) : deleteFactory(false), size(_size), factory(_factory), running(false){
worker = new Worker(this);
}
-APRThreadPool::~APRThreadPool(){
+ThreadPool::~ThreadPool(){
if(deleteFactory) delete factory;
}
-void APRThreadPool::addTask(Runnable* task){
+void ThreadPool::addTask(Runnable* task){
lock.acquire();
tasks.push(task);
lock.notifyAll();
lock.release();
}
-void APRThreadPool::runTask(){
+void ThreadPool::runTask(){
lock.acquire();
while(tasks.empty()){
lock.wait();
@@ -56,7 +56,7 @@ void APRThreadPool::runTask(){
}
}
-void APRThreadPool::start(){
+void ThreadPool::start(){
if(!running){
running = true;
for(int i = 0; i < size; i++){
@@ -67,7 +67,7 @@ void APRThreadPool::start(){
}
}
-void APRThreadPool::stop(){
+void ThreadPool::stop(){
if(!running){
running = false;
lock.acquire();
diff --git a/cpp/src/qpid/concurrent/ThreadPool.h b/cpp/src/qpid/concurrent/ThreadPool.h
index 925faa76de..11f0cc364f 100644
--- a/cpp/src/qpid/concurrent/ThreadPool.h
+++ b/cpp/src/qpid/concurrent/ThreadPool.h
@@ -18,7 +18,12 @@
#ifndef _ThreadPool_
#define _ThreadPool_
+#include <queue>
+#include <vector>
+#include "qpid/concurrent/Monitor.h"
#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -26,11 +31,33 @@ namespace concurrent {
class ThreadPool
{
+ class Worker : public virtual Runnable{
+ ThreadPool* pool;
+ public:
+ inline Worker(ThreadPool* _pool) : pool(_pool){}
+ inline virtual void run(){
+ while(pool->running){
+ pool->runTask();
+ }
+ }
+ };
+ const bool deleteFactory;
+ const int size;
+ ThreadFactory* factory;
+ Monitor lock;
+ std::vector<Thread*> threads;
+ std::queue<Runnable*> tasks;
+ Worker* worker;
+ volatile bool running;
+
+ void runTask();
public:
- virtual void start() = 0;
- virtual void stop() = 0;
- virtual void addTask(Runnable* runnable) = 0;
- virtual ~ThreadPool(){}
+ ThreadPool(int size);
+ ThreadPool(int size, ThreadFactory* factory);
+ virtual void start();
+ virtual void stop();
+ virtual void addTask(Runnable* task);
+ virtual ~ThreadPool();
};
}
diff --git a/cpp/src/qpid/framing/InputHandler.cpp b/cpp/src/qpid/framing/InputHandler.cpp
deleted file mode 100644
index accf68421a..0000000000
--- a/cpp/src/qpid/framing/InputHandler.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/InputHandler.h"
-
-qpid::framing::InputHandler::~InputHandler() {}
diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h
index e2ad545993..8f56d176b8 100644
--- a/cpp/src/qpid/framing/InputHandler.h
+++ b/cpp/src/qpid/framing/InputHandler.h
@@ -1,3 +1,5 @@
+#ifndef _InputHandler_
+#define _InputHandler_
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,24 +17,19 @@
* limitations under the License.
*
*/
-#include <string>
-
-#ifndef _InputHandler_
-#define _InputHandler_
+#include <qpid/SharedObject.h>
#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace framing {
- class InputHandler{
- public:
- virtual ~InputHandler();
- virtual void received(AMQFrame* frame) = 0;
- };
+class InputHandler : public qpid::SharedObject<InputHandler> {
+ public:
+ virtual void received(AMQFrame* frame) = 0;
+};
-}
-}
+}}
#endif
diff --git a/cpp/src/qpid/framing/OutputHandler.cpp b/cpp/src/qpid/framing/OutputHandler.cpp
deleted file mode 100644
index 22de39b82a..0000000000
--- a/cpp/src/qpid/framing/OutputHandler.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "qpid/framing/OutputHandler.h"
-
-qpid::framing::OutputHandler::~OutputHandler() {}
diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h
index ed38a321e5..16fb7e8afb 100644
--- a/cpp/src/qpid/framing/OutputHandler.h
+++ b/cpp/src/qpid/framing/OutputHandler.h
@@ -1,3 +1,6 @@
+#ifndef _OutputHandler_
+#define _OutputHandler_
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,24 +18,18 @@
* limitations under the License.
*
*/
-#include <string>
-
-#ifndef _OutputHandler_
-#define _OutputHandler_
-
+#include <qpid/SharedObject.h>
#include "qpid/framing/AMQFrame.h"
namespace qpid {
namespace framing {
- class OutputHandler{
- public:
- virtual ~OutputHandler();
- virtual void send(AMQFrame* frame) = 0;
- };
+class OutputHandler : public qpid::SharedObject<OutputHandler> {
+ public:
+ virtual void send(AMQFrame* frame) = 0;
+};
-}
-}
+}}
#endif
diff --git a/cpp/src/qpid/io/APRConnector.h b/cpp/src/qpid/io/APRConnector.h
deleted file mode 100644
index c835f30056..0000000000
--- a/cpp/src/qpid/io/APRConnector.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRConnector_
-#define _APRConnector_
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/io/ShutdownHandler.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/io/Connector.h"
-#include "qpid/concurrent/APRMonitor.h"
-
-namespace qpid {
-namespace io {
-
- class APRConnector : public virtual qpid::framing::OutputHandler,
- public virtual Connector,
- private virtual qpid::concurrent::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
-
- bool closed;
-
- apr_time_t lastIn;
- apr_time_t lastOut;
- apr_interval_time_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- TimeoutHandler* timeoutHandler;
- ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
-
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
-
- qpid::concurrent::APRMonitor* writeLock;
- qpid::concurrent::ThreadFactory* threadFactory;
- qpid::concurrent::Thread* receiver;
-
- apr_pool_t* pool;
- apr_socket_t* socket;
-
- void checkIdle(apr_status_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
-
- public:
- APRConnector(bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~APRConnector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(TimeoutHandler* handler);
- virtual void setShutdownHandler(ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/concurrent/LMonitor.h b/cpp/src/qpid/io/APRPool.cpp
index 70e99b9807..edd434f16c 100644
--- a/cpp/src/qpid/concurrent/LMonitor.h
+++ b/cpp/src/qpid/io/APRPool.cpp
@@ -15,30 +15,25 @@
* limitations under the License.
*
*/
-#ifndef _LMonitor_
-#define _LMonitor_
-/* Native Linux Monitor - Based of Kernel patch 19/20 */
+#include "APRPool.h"
+#include "qpid/concurrent/APRBase.h"
+#include <boost/pool/singleton_pool.hpp>
-#include "qpid/concurrent/Monitor.h"
+using namespace qpid::io;
+using namespace qpid::concurrent;
-namespace qpid {
-namespace concurrent {
-
- class LMonitor : public virtual Monitor
- {
-
- public:
- LMonitor();
- virtual ~LMonitor();
- virtual void wait();
- virtual void notify();
- virtual void notifyAll();
- virtual void acquire();
- virtual void release();
- };
+APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
}
+
+APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
}
+apr_pool_t* APRPool::get() {
+ return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
-#endif
diff --git a/cpp/src/qpid/concurrent/LThreadFactory.h b/cpp/src/qpid/io/APRPool.h
index 4a573d1bd1..063eedf1ee 100644
--- a/cpp/src/qpid/concurrent/LThreadFactory.h
+++ b/cpp/src/qpid/io/APRPool.h
@@ -1,3 +1,6 @@
+#ifndef _APRPool_
+#define _APRPool_
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,23 +18,30 @@
* limitations under the License.
*
*/
-#ifndef _LAPRThreadFactory_
-#define _LAPRThreadFactory_
-
+#include <boost/noncopyable.hpp>
+#include <apr-1/apr_pools.h>
namespace qpid {
-namespace concurrent {
+namespace io {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ /** Get singleton instance */
+ static apr_pool_t* get();
+
+ private:
+ apr_pool_t* pool;
+};
+
+}}
+
- class LThreadFactory
- {
- public:
- LThreadFactory();
- virtual ~LThreadFactory();
- virtual Thread* create(Runnable* runnable);
- };
-}
-}
-#endif
+#endif /*!_APRPool_*/
diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp
index 6b76bd4da2..f95d9448cf 100644
--- a/cpp/src/qpid/io/Acceptor.cpp
+++ b/cpp/src/qpid/io/Acceptor.cpp
@@ -15,7 +15,64 @@
* limitations under the License.
*
*/
-
#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "APRPool.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
+ port(port_),
+ processor(APRPool::get(), threads, 1000, 5000000)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t Acceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void Acceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void Acceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
-qpid::io::Acceptor::~Acceptor() {}
diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h
index a7f7ad66f0..bc189f7f6e 100644
--- a/cpp/src/qpid/io/Acceptor.h
+++ b/cpp/src/qpid/io/Acceptor.h
@@ -15,36 +15,43 @@
* limitations under the License.
*
*/
-#ifndef _Acceptor_
-#define _Acceptor_
-
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+#include "qpid/io/LFProcessor.h"
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/concurrent/Thread.h"
+#include <qpid/SharedObject.h>
namespace qpid {
namespace io {
- class Acceptor
- {
- public:
- /**
- * Bind to port.
- * @param port Port to bind to, 0 to bind to dynamically chosen port.
- * @return The local bound port.
- */
- virtual int16_t bind(int16_t port) = 0;
-
- /**
- * Run the acceptor.
- */
- virtual void run(SessionHandlerFactory* factory) = 0;
-
- /**
- * Shut down the acceptor.
- */
- virtual void shutdown() = 0;
-
- virtual ~Acceptor();
- };
+/** APR Acceptor. */
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ Acceptor(int16_t port, int backlog, int threads);
+ virtual int16_t getPort() const;
+ virtual void run(SessionHandlerFactory* factory);
+ virtual void shutdown();
+
+ private:
+ int16_t port;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+};
}
}
diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
deleted file mode 100644
index 0e1fc535a2..0000000000
--- a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRThreadFactory.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using namespace qpid::io;
-
-BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) :
- debug(_debug),
- threadFactory(new APRThreadFactory()),
- connectionBacklog(c)
-{
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL));
-}
-
-int16_t BlockingAPRAcceptor::bind(int16_t _port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
- return getPort();
-}
-
-int16_t BlockingAPRAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void BlockingAPRAcceptor::run(SessionHandlerFactory* factory)
-{
- running = true;
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, apr_pool);
- if(status == APR_SUCCESS){
- //configure socket:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
-
- BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug);
- session->init(factory->create(session));
- sessions.push_back(session);
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void BlockingAPRAcceptor::shutdown()
-{
- // TODO aconway 2006-10-12: Not thread safe.
- if (running)
- {
- running = false;
- apr_socket_close(socket); // Don't check, exception safety.
- for(iterator i = sessions.begin(); i < sessions.end(); i++){
- (*i)->shutdown();
- }
- }
-}
-
-BlockingAPRAcceptor::~BlockingAPRAcceptor(){
- delete threadFactory;
- apr_pool_destroy(apr_pool);
- APRBase::decrement();
-}
-
-
-void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){
- sessions.erase(find(sessions.begin(), sessions.end(), session));
-}
-
diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.h b/cpp/src/qpid/io/BlockingAPRAcceptor.h
deleted file mode 100644
index a3042605aa..0000000000
--- a/cpp/src/qpid/io/BlockingAPRAcceptor.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _BlockingAPRAcceptor_
-#define _BlockingAPRAcceptor_
-
-#include <vector>
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/io/Acceptor.h"
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/io/BlockingAPRSessionContext.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-
-namespace qpid {
-namespace io {
-
- class BlockingAPRAcceptor : public virtual Acceptor
- {
- typedef std::vector<BlockingAPRSessionContext*>::iterator iterator;
-
- const bool debug;
- apr_pool_t* apr_pool;
- qpid::concurrent::ThreadFactory* threadFactory;
- std::vector<BlockingAPRSessionContext*> sessions;
- apr_socket_t* socket;
- const int connectionBacklog;
- volatile bool running;
-
- public:
- BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10);
- virtual int16_t bind(int16_t port);
- virtual int16_t getPort() const;
- virtual void run(SessionHandlerFactory* factory);
- virtual void shutdown();
- virtual ~BlockingAPRAcceptor();
- void closed(BlockingAPRSessionContext* session);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
deleted file mode 100644
index 88e6b6b0fc..0000000000
--- a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <assert.h>
-#include <iostream>
-#include "qpid/io/BlockingAPRSessionContext.h"
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/QpidError.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using namespace qpid::io;
-
-
-BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket,
- ThreadFactory* factory,
- BlockingAPRAcceptor* _acceptor,
- bool _debug)
- : socket(_socket),
- debug(_debug),
- handler(0),
- acceptor(_acceptor),
- inbuf(65536),
- outbuf(65536),
- closed(false){
-
- reader = new Reader(this);
- writer = new Writer(this);
-
- rThread = factory->create(reader);
- wThread = factory->create(writer);
-}
-
-BlockingAPRSessionContext::~BlockingAPRSessionContext(){
- delete reader;
- delete writer;
-
- delete rThread;
- delete wThread;
-
- delete handler;
-}
-
-void BlockingAPRSessionContext::read(){
- try{
- bool initiated(false);
- while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes);
- if(APR_STATUS_IS_TIMEUP(s)){
- //timed out, check closed on loop
- }else if(APR_STATUS_IS_EOF(s) || bytes == 0){
- closed = true;
- }else{
- inbuf.move(bytes);
- inbuf.flip();
-
- if(!initiated){
- ProtocolInitiation* protocolInit = new ProtocolInitiation();
- if(protocolInit->decode(inbuf)){
- handler->initiated(protocolInit);
- if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl;
- initiated = true;
- }
- }else{
- AMQFrame frame;
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl;
- handler->received(&frame);
- }
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
-
- //close socket
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
-
-void BlockingAPRSessionContext::write(){
- while(!closed){
- //get next frame
- outlock.acquire();
- while(outframes.empty() && !closed){
- outlock.wait();
- }
- if(!closed){
- AMQFrame* frame = outframes.front();
- outframes.pop();
- outlock.release();
-
- //encode
- frame->encode(outbuf);
- if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl;
- delete frame;
- outbuf.flip();
-
- //write from outbuf to socket
- char* data = outbuf.start();
- const int available = outbuf.available();
- int written = 0;
- apr_size_t bytes = available;
- while(available > written){
- apr_socket_send(socket, data + written, &bytes);
- written += bytes;
- bytes = available - written;
- }
- outbuf.clear();
- }else{
- outlock.release();
- }
- }
-}
-
-void BlockingAPRSessionContext::send(AMQFrame* frame){
- if(!closed){
- outlock.acquire();
- bool was_empty(outframes.empty());
- outframes.push(frame);
- if(was_empty){
- outlock.notify();
- }
- outlock.release();
- }else{
- std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl;
- }
-}
-
-void BlockingAPRSessionContext::init(SessionHandler* _handler){
- handler = _handler;
- rThread->start();
- wThread->start();
-}
-
-void BlockingAPRSessionContext::close(){
- closed = true;
- wThread->join();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl;
- handler->closed();
- acceptor->closed(this);
- delete this;
-}
-
-void BlockingAPRSessionContext::shutdown(){
- closed = true;
- outlock.acquire();
- outlock.notify();
- outlock.release();
-
- wThread->join();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- rThread->join();
- handler->closed();
- delete this;
-}
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.h b/cpp/src/qpid/io/BlockingAPRSessionContext.h
deleted file mode 100644
index c06142ace5..0000000000
--- a/cpp/src/qpid/io/BlockingAPRSessionContext.h
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _BlockingAPRSessionContext_
-#define _BlockingAPRSessionContext_
-
-#include <queue>
-#include <vector>
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/io/ShutdownHandler.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-
-namespace qpid {
-namespace io {
-
- class BlockingAPRAcceptor;
-
- class BlockingAPRSessionContext : public virtual SessionContext
- {
- class Reader : public virtual qpid::concurrent::Runnable{
- BlockingAPRSessionContext* parent;
- public:
- inline Reader(BlockingAPRSessionContext* p) : parent(p){}
- inline virtual void run(){ parent->read(); }
- inline virtual ~Reader(){}
- };
-
- class Writer : public virtual qpid::concurrent::Runnable{
- BlockingAPRSessionContext* parent;
- public:
- inline Writer(BlockingAPRSessionContext* p) : parent(p){}
- inline virtual void run(){ parent->write(); }
- inline virtual ~Writer(){}
- };
-
- apr_socket_t* socket;
- const bool debug;
- SessionHandler* handler;
- BlockingAPRAcceptor* acceptor;
- std::queue<qpid::framing::AMQFrame*> outframes;
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
- qpid::concurrent::APRMonitor outlock;
- Reader* reader;
- Writer* writer;
- qpid::concurrent::Thread* rThread;
- qpid::concurrent::Thread* wThread;
-
- volatile bool closed;
-
- void read();
- void write();
- public:
- BlockingAPRSessionContext(apr_socket_t* socket,
- qpid::concurrent::ThreadFactory* factory,
- BlockingAPRAcceptor* acceptor,
- bool debug = false);
- ~BlockingAPRSessionContext();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void close();
- void shutdown();
- void init(SessionHandler* handler);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/APRConnector.cpp b/cpp/src/qpid/io/Connector.cpp
index 91cf01c842..ca487deb86 100644
--- a/cpp/src/qpid/io/APRConnector.cpp
+++ b/cpp/src/qpid/io/Connector.cpp
@@ -17,8 +17,8 @@
*/
#include <iostream>
#include "qpid/concurrent/APRBase.h"
-#include "qpid/io/APRConnector.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/QpidError.h"
using namespace qpid::io;
@@ -26,7 +26,7 @@ using namespace qpid::concurrent;
using namespace qpid::framing;
using qpid::QpidError;
-APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
@@ -44,11 +44,11 @@ APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
- threadFactory = new APRThreadFactory();
- writeLock = new APRMonitor();
+ threadFactory = new ThreadFactory();
+ writeLock = new Monitor();
}
-APRConnector::~APRConnector(){
+Connector::~Connector(){
delete receiver;
delete writeLock;
delete threadFactory;
@@ -57,7 +57,7 @@ APRConnector::~APRConnector(){
APRBase::decrement();
}
-void APRConnector::connect(const std::string& host, int port){
+void Connector::connect(const std::string& host, int port){
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
@@ -67,36 +67,36 @@ void APRConnector::connect(const std::string& host, int port){
receiver->start();
}
-void APRConnector::init(ProtocolInitiation* header){
+void Connector::init(ProtocolInitiation* header){
writeBlock(header);
delete header;
}
-void APRConnector::close(){
+void Connector::close(){
closed = true;
CHECK_APR_SUCCESS(apr_socket_close(socket));
receiver->join();
}
-void APRConnector::setInputHandler(InputHandler* handler){
+void Connector::setInputHandler(InputHandler* handler){
input = handler;
}
-void APRConnector::setShutdownHandler(ShutdownHandler* handler){
+void Connector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
-OutputHandler* APRConnector::getOutputHandler(){
+OutputHandler* Connector::getOutputHandler(){
return this;
}
-void APRConnector::send(AMQFrame* frame){
+void Connector::send(AMQFrame* frame){
writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
-void APRConnector::writeBlock(AMQDataBlock* data){
+void Connector::writeBlock(AMQDataBlock* data){
writeLock->acquire();
data->encode(outbuf);
@@ -107,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){
writeLock->release();
}
-void APRConnector::writeToSocket(char* data, size_t available){
+void Connector::writeToSocket(char* data, size_t available){
apr_size_t bytes(available);
apr_size_t written(0);
while(written < available && !closed){
@@ -124,7 +124,7 @@ void APRConnector::writeToSocket(char* data, size_t available){
}
}
-void APRConnector::checkIdle(apr_status_t status){
+void Connector::checkIdle(apr_status_t status){
if(timeoutHandler){
apr_time_t now = apr_time_as_msec(apr_time_now());
if(APR_STATUS_IS_TIMEUP(status)){
@@ -144,7 +144,7 @@ void APRConnector::checkIdle(apr_status_t status){
}
}
-void APRConnector::setReadTimeout(u_int16_t t){
+void Connector::setReadTimeout(u_int16_t t){
idleIn = t * 1000;//t is in secs
if(idleIn && (!timeout || idleIn < timeout)){
timeout = idleIn;
@@ -153,7 +153,7 @@ void APRConnector::setReadTimeout(u_int16_t t){
}
-void APRConnector::setWriteTimeout(u_int16_t t){
+void Connector::setWriteTimeout(u_int16_t t){
idleOut = t * 1000;//t is in secs
if(idleOut && (!timeout || idleOut < timeout)){
timeout = idleOut;
@@ -161,7 +161,7 @@ void APRConnector::setWriteTimeout(u_int16_t t){
}
}
-void APRConnector::setSocketTimeout(){
+void Connector::setSocketTimeout(){
//interval is in microseconds, timeout in milliseconds
//want the interval to be a bit shorter than the timeout, hence multiply
//by 800 rather than 1000.
@@ -169,11 +169,11 @@ void APRConnector::setSocketTimeout(){
apr_socket_timeout_set(socket, interval);
}
-void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-void APRConnector::run(){
+void Connector::run(){
try{
while(!closed){
apr_size_t bytes(inbuf.available());
diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h
index d0a2f470a8..7c52f7e87b 100644
--- a/cpp/src/qpid/io/Connector.h
+++ b/cpp/src/qpid/io/Connector.h
@@ -18,35 +18,74 @@
#ifndef _Connector_
#define _Connector_
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_time.h"
+
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/io/ShutdownHandler.h"
#include "qpid/io/TimeoutHandler.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace io {
- class Connector
+ class Connector : public virtual qpid::framing::OutputHandler,
+ private virtual qpid::concurrent::Runnable
{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ apr_time_t lastIn;
+ apr_time_t lastOut;
+ apr_interval_time_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ TimeoutHandler* timeoutHandler;
+ ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::concurrent::Monitor* writeLock;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* receiver;
+
+ apr_pool_t* pool;
+ apr_socket_t* socket;
+
+ void checkIdle(apr_status_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+
public:
- virtual void connect(const std::string& host, int port) = 0;
- virtual void init(qpid::framing::ProtocolInitiation* header) = 0;
- virtual void close() = 0;
- virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0;
- virtual void setTimeoutHandler(TimeoutHandler* handler) = 0;
- virtual void setShutdownHandler(ShutdownHandler* handler) = 0;
- virtual qpid::framing::OutputHandler* getOutputHandler() = 0;
- /**
- * Set the timeout for reads, in secs.
- */
- virtual void setReadTimeout(u_int16_t timeout) = 0;
- /**
- * Set the timeout for writes, in secs.
- */
- virtual void setWriteTimeout(u_int16_t timeout) = 0;
- virtual ~Connector(){}
+ Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(TimeoutHandler* handler);
+ virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
};
}
diff --git a/cpp/src/qpid/io/ConnectorImpl.h b/cpp/src/qpid/io/ConnectorImpl.h
deleted file mode 100644
index 55dcf7a2d4..0000000000
--- a/cpp/src/qpid/io/ConnectorImpl.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRConnectorImpl_
-#define _APRConnectorImpl_
-
-#ifdef _USE_APR_IO_
-#include "qpid/io/APRConnector.h"
-#else
-#include "qpid/io/LConnector.h"
-#endif
-
-namespace qpid {
-namespace io {
-
-#ifdef _USE_APR_IO_
- class ConnectorImpl : public virtual APRConnector
- {
-
- public:
- ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){};
- virtual ~ConnectorImpl(){};
- };
-#else
- class ConnectorImpl : public virtual LConnector
- {
-
- public:
- ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){};
- virtual ~ConnectorImpl(){};
- };
-
-#endif
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LConnector.h b/cpp/src/qpid/io/LConnector.h
deleted file mode 100644
index 5fc86597bd..0000000000
--- a/cpp/src/qpid/io/LConnector.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LConnector_
-#define _LConnector_
-
-
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/io/Connector.h"
-
-namespace qpid {
-namespace io {
-
- class LConnector : public virtual qpid::framing::OutputHandler,
- public virtual Connector,
- private virtual qpid::concurrent::Runnable
- {
-
- public:
- LConnector(bool debug = false, u_int32_t buffer_size = 1024){};
- virtual ~LConnector(){};
-
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LFAcceptor.cpp b/cpp/src/qpid/io/LFAcceptor.cpp
deleted file mode 100644
index 7e51a550af..0000000000
--- a/cpp/src/qpid/io/LFAcceptor.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/io/LFAcceptor.h"
-#include "qpid/concurrent/APRBase.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::io;
-
-LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) :
- processor(aprPool.pool, worker_threads, 1000, 5000000),
- max_connections_per_processor(m),
- debug(_debug),
- connectionBacklog(c)
-{ }
-
-
-int16_t LFAcceptor::bind(int16_t _port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool));
- CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
- return getPort();
-}
-
-int16_t LFAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void LFAcceptor::run(SessionHandlerFactory* factory) {
- running = true;
- processor.start();
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool);
- if(status == APR_SUCCESS){
- //make this socket non-blocking:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug);
- session->init(factory->create(session));
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void LFAcceptor::shutdown() {
- // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
- if (running) {
- running = false;
- processor.stop();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- }
-}
-
-
-LFAcceptor::~LFAcceptor(){}
-
-LFAcceptor::APRPool::APRPool(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-}
-
-LFAcceptor::APRPool::~APRPool(){
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
diff --git a/cpp/src/qpid/io/LFAcceptor.h b/cpp/src/qpid/io/LFAcceptor.h
deleted file mode 100644
index 35a556d500..0000000000
--- a/cpp/src/qpid/io/LFAcceptor.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LFAcceptor_
-#define _LFAcceptor_
-
-#include <vector>
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/io/Acceptor.h"
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/APRThreadFactory.h"
-#include "qpid/concurrent/APRThreadPool.h"
-#include "qpid/io/LFProcessor.h"
-#include "qpid/io/LFSessionContext.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/concurrent/Thread.h"
-
-namespace qpid {
-namespace io {
-
- class LFAcceptor : public virtual Acceptor
- {
- class APRPool{
- public:
- apr_pool_t* pool;
- APRPool();
- ~APRPool();
- };
-
- APRPool aprPool;
- LFProcessor processor;
- apr_socket_t* socket;
- const int max_connections_per_processor;
- const bool debug;
- const int connectionBacklog;
-
- volatile bool running;
-
- public:
- LFAcceptor(bool debug = false,
- int connectionBacklog = 10,
- int worker_threads = 5,
- int max_connections_per_processor = 500);
- virtual int16_t bind(int16_t port);
- virtual int16_t getPort() const;
- virtual void run(SessionHandlerFactory* factory);
- virtual void shutdown();
- virtual ~LFAcceptor();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h
index 16c789f0ff..5b61f444af 100644
--- a/cpp/src/qpid/io/LFProcessor.h
+++ b/cpp/src/qpid/io/LFProcessor.h
@@ -21,8 +21,8 @@
#include "apr-1/apr_poll.h"
#include <iostream>
#include <vector>
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -50,9 +50,9 @@ namespace io {
const int workerCount;
bool hasLeader;
qpid::concurrent::Thread** const workers;
- qpid::concurrent::APRMonitor leadLock;
- qpid::concurrent::APRMonitor countLock;
- qpid::concurrent::APRThreadFactory factory;
+ qpid::concurrent::Monitor leadLock;
+ qpid::concurrent::Monitor countLock;
+ qpid::concurrent::ThreadFactory factory;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp
index 6d6d786841..ca1e6431a6 100644
--- a/cpp/src/qpid/io/LFSessionContext.cpp
+++ b/cpp/src/qpid/io/LFSessionContext.cpp
@@ -54,7 +54,7 @@ LFSessionContext::~LFSessionContext(){
void LFSessionContext::read(){
assert(!reading); // No concurrent read.
- reading = APRThread::currentThread();
+ reading = Thread::currentThread();
socket.read(in);
in.flip();
@@ -79,7 +79,7 @@ void LFSessionContext::read(){
void LFSessionContext::write(){
assert(!writing); // No concurrent writes.
- writing = APRThread::currentThread();
+ writing = Thread::currentThread();
bool done = isClosed();
while(!done){
@@ -186,4 +186,4 @@ void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
logLock.release();
}
-APRMonitor LFSessionContext::logLock;
+Monitor LFSessionContext::logLock;
diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h
index 9406bb97b6..8d30b54204 100644
--- a/cpp/src/qpid/io/LFSessionContext.h
+++ b/cpp/src/qpid/io/LFSessionContext.h
@@ -25,7 +25,7 @@
#include "apr-1/apr_time.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/io/APRSocket.h"
#include "qpid/framing/Buffer.h"
#include "qpid/io/LFProcessor.h"
@@ -51,7 +51,7 @@ namespace io {
apr_pollfd_t fd;
std::queue<qpid::framing::AMQFrame*> framesToWrite;
- qpid::concurrent::APRMonitor writeLock;
+ qpid::concurrent::Monitor writeLock;
bool processing;
bool closing;
@@ -60,7 +60,7 @@ namespace io {
volatile unsigned int reading;
volatile unsigned int writing;
- static qpid::concurrent::APRMonitor logLock;
+ static qpid::concurrent::Monitor logLock;
void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
public:
diff --git a/cpp/src/qpid/io/SessionManager.h b/cpp/src/qpid/io/doxygen_summary.h
index e6b17451e4..1086f65f63 100644
--- a/cpp/src/qpid/io/SessionManager.h
+++ b/cpp/src/qpid/io/doxygen_summary.h
@@ -1,3 +1,6 @@
+#ifndef _doxygen_summary_
+#define _doxygen_summary_
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,26 +18,17 @@
* limitations under the License.
*
*/
-#ifndef _SessionManager_
-#define _SessionManager_
-
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
-
-namespace qpid {
-namespace io {
-
- class SessionManager
- {
- public:
- virtual SessionHandler* init(SessionContext* ctxt) = 0;
- virtual void close(SessionContext* ctxt) = 0;
- virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0;
- virtual ~SessionManager(){}
- };
-}
-}
+// No code just a doxygen comment for the namespace
-
-#endif
+/** \namspace qpid::io
+ * IO classes used by client and broker.
+ *
+ * This namespace contains platform-neutral classes. Platform
+ * specific classes are in a sub-namespace named after the
+ * platform. At build time the appropriate platform classes are
+ * imported into this namespace so other code does not need to be awre
+ * of the difference.
+ *
+ */
+#endif /*!_doxygen_summary_*/
diff --git a/cpp/src/qpidd.cpp b/cpp/src/qpidd.cpp
index e93676513a..40be6485f2 100644
--- a/cpp/src/qpidd.cpp
+++ b/cpp/src/qpidd.cpp
@@ -37,7 +37,7 @@ int main(int argc, char** argv)
config.usage();
}else{
apr_signal(SIGINT, handle_signal);
- Broker::shared_ptr broker = Broker::create(config);
+ Broker::SharedPtr broker = Broker::create(config);
broker->run();
}
return 0;
diff --git a/cpp/test/client/client_test.cpp b/cpp/test/client/client_test.cpp
index e1c7d56573..2a1f1a9747 100644
--- a/cpp/test/client/client_test.cpp
+++ b/cpp/test/client/client_test.cpp
@@ -22,7 +22,7 @@
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/MessageListener.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/framing/FieldTable.h"
using namespace qpid::client;
@@ -65,7 +65,7 @@ int main(int argc, char**)
std::cout << "Bound queue to exchange." << std::endl;
//set up a message listener
- MonitorImpl monitor;
+ Monitor monitor;
SimpleListener listener(&monitor);
string tag("MyTag");
channel.consume(queue, tag, &listener);
diff --git a/cpp/test/client/topic_publisher.cpp b/cpp/test/client/topic_publisher.cpp
index 9652c6450f..e53b70cc75 100644
--- a/cpp/test/client/topic_publisher.cpp
+++ b/cpp/test/client/topic_publisher.cpp
@@ -21,7 +21,7 @@
#include "qpid/client/Exchange.h"
#include "qpid/client/MessageListener.h"
#include "qpid/client/Queue.h"
-#include "qpid/concurrent/MonitorImpl.h"
+#include "qpid/concurrent/Monitor.h"
#include "unistd.h"
#include <apr-1/apr_time.h>
#include <cstdlib>
@@ -34,7 +34,7 @@ class Publisher : public MessageListener{
Channel* const channel;
const std::string controlTopic;
const bool transactional;
- MonitorImpl monitor;
+ Monitor monitor;
int count;
void waitForCompletion(int msgs);
diff --git a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
index 1c24076ee3..7acee7c8b9 100644
--- a/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
+++ b/cpp/test/unit/qpid/broker/ConfigurationTest.cpp
@@ -28,8 +28,6 @@ class ConfigurationTest : public CppUnit::TestCase
CPPUNIT_TEST(testIsHelp);
CPPUNIT_TEST(testPortLongForm);
CPPUNIT_TEST(testPortShortForm);
- CPPUNIT_TEST(testAcceptorLongForm);
- CPPUNIT_TEST(testAcceptorShortForm);
CPPUNIT_TEST(testVarious);
CPPUNIT_TEST_SUITE_END();
@@ -59,29 +57,12 @@ class ConfigurationTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(6789, conf.getPort());
}
- void testAcceptorLongForm()
- {
- Configuration conf;
- char* argv[] = {"ignore", "--acceptor", "blocking"};
- conf.parse(3, argv);
- CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
- }
-
- void testAcceptorShortForm()
- {
- Configuration conf;
- char* argv[] = {"ignore", "-a", "blocking"};
- conf.parse(3, argv);
- CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
- }
-
void testVarious()
{
Configuration conf;
char* argv[] = {"ignore", "-t", "--worker-threads", "10", "-a", "blocking"};
conf.parse(6, argv);
CPPUNIT_ASSERT_EQUAL(5672, conf.getPort());//default
- CPPUNIT_ASSERT_EQUAL(string("blocking"), conf.getAcceptor());
CPPUNIT_ASSERT_EQUAL(10, conf.getWorkerThreads());
CPPUNIT_ASSERT(conf.isTrace());
CPPUNIT_ASSERT(!conf.isHelp());