summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/io
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-31 19:53:55 +0000
committerAlan Conway <aconway@apache.org>2006-10-31 19:53:55 +0000
commit9094d2b10ecadd66fa3b22169183e7573cc79629 (patch)
treebf3915f72be2a5f09932b800d2fa4309fb3ad64e /cpp/src/qpid/io
parent0487ea40bc6568765cdec75a36273eeb26fae854 (diff)
downloadqpid-python-9094d2b10ecadd66fa3b22169183e7573cc79629.tar.gz
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/io')
-rw-r--r--cpp/src/qpid/io/APRConnector.h95
-rw-r--r--cpp/src/qpid/io/APRPool.cpp (renamed from cpp/src/qpid/io/LConnector.h)39
-rw-r--r--cpp/src/qpid/io/APRPool.h (renamed from cpp/src/qpid/io/SessionManager.h)39
-rw-r--r--cpp/src/qpid/io/Acceptor.cpp61
-rw-r--r--cpp/src/qpid/io/Acceptor.h57
-rw-r--r--cpp/src/qpid/io/BlockingAPRAcceptor.cpp101
-rw-r--r--cpp/src/qpid/io/BlockingAPRAcceptor.h65
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.cpp177
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.h94
-rw-r--r--cpp/src/qpid/io/Connector.cpp (renamed from cpp/src/qpid/io/APRConnector.cpp)42
-rw-r--r--cpp/src/qpid/io/Connector.h73
-rw-r--r--cpp/src/qpid/io/ConnectorImpl.h53
-rw-r--r--cpp/src/qpid/io/LFAcceptor.cpp94
-rw-r--r--cpp/src/qpid/io/LFAcceptor.h74
-rw-r--r--cpp/src/qpid/io/LFProcessor.h10
-rw-r--r--cpp/src/qpid/io/LFSessionContext.cpp6
-rw-r--r--cpp/src/qpid/io/LFSessionContext.h6
-rw-r--r--cpp/src/qpid/io/doxygen_summary.h34
18 files changed, 251 insertions, 869 deletions
diff --git a/cpp/src/qpid/io/APRConnector.h b/cpp/src/qpid/io/APRConnector.h
deleted file mode 100644
index c835f30056..0000000000
--- a/cpp/src/qpid/io/APRConnector.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRConnector_
-#define _APRConnector_
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/io/ShutdownHandler.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/io/Connector.h"
-#include "qpid/concurrent/APRMonitor.h"
-
-namespace qpid {
-namespace io {
-
- class APRConnector : public virtual qpid::framing::OutputHandler,
- public virtual Connector,
- private virtual qpid::concurrent::Runnable
- {
- const bool debug;
- const int receive_buffer_size;
- const int send_buffer_size;
-
- bool closed;
-
- apr_time_t lastIn;
- apr_time_t lastOut;
- apr_interval_time_t timeout;
- u_int32_t idleIn;
- u_int32_t idleOut;
-
- TimeoutHandler* timeoutHandler;
- ShutdownHandler* shutdownHandler;
- qpid::framing::InputHandler* input;
- qpid::framing::InitiationHandler* initialiser;
- qpid::framing::OutputHandler* output;
-
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
-
- qpid::concurrent::APRMonitor* writeLock;
- qpid::concurrent::ThreadFactory* threadFactory;
- qpid::concurrent::Thread* receiver;
-
- apr_pool_t* pool;
- apr_socket_t* socket;
-
- void checkIdle(apr_status_t status);
- void writeBlock(qpid::framing::AMQDataBlock* data);
- void writeToSocket(char* data, size_t available);
- void setSocketTimeout();
-
- void run();
-
- public:
- APRConnector(bool debug = false, u_int32_t buffer_size = 1024);
- virtual ~APRConnector();
- virtual void connect(const std::string& host, int port);
- virtual void init(qpid::framing::ProtocolInitiation* header);
- virtual void close();
- virtual void setInputHandler(qpid::framing::InputHandler* handler);
- virtual void setTimeoutHandler(TimeoutHandler* handler);
- virtual void setShutdownHandler(ShutdownHandler* handler);
- virtual qpid::framing::OutputHandler* getOutputHandler();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void setReadTimeout(u_int16_t timeout);
- virtual void setWriteTimeout(u_int16_t timeout);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LConnector.h b/cpp/src/qpid/io/APRPool.cpp
index 5fc86597bd..edd434f16c 100644
--- a/cpp/src/qpid/io/LConnector.h
+++ b/cpp/src/qpid/io/APRPool.cpp
@@ -15,34 +15,25 @@
* limitations under the License.
*
*/
-#ifndef _LConnector_
-#define _LConnector_
+#include "APRPool.h"
+#include "qpid/concurrent/APRBase.h"
+#include <boost/pool/singleton_pool.hpp>
-#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/io/Connector.h"
-
-namespace qpid {
-namespace io {
-
- class LConnector : public virtual qpid::framing::OutputHandler,
- public virtual Connector,
- private virtual qpid::concurrent::Runnable
- {
-
- public:
- LConnector(bool debug = false, u_int32_t buffer_size = 1024){};
- virtual ~LConnector(){};
-
- };
+using namespace qpid::io;
+using namespace qpid::concurrent;
+APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
}
+
+APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
}
+apr_pool_t* APRPool::get() {
+ return boost::details::pool::singleton_default<APRPool>::instance().pool;
+}
-#endif
diff --git a/cpp/src/qpid/io/SessionManager.h b/cpp/src/qpid/io/APRPool.h
index e6b17451e4..063eedf1ee 100644
--- a/cpp/src/qpid/io/SessionManager.h
+++ b/cpp/src/qpid/io/APRPool.h
@@ -1,3 +1,6 @@
+#ifndef _APRPool_
+#define _APRPool_
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -15,26 +18,30 @@
* limitations under the License.
*
*/
-#ifndef _SessionManager_
-#define _SessionManager_
-
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
+#include <boost/noncopyable.hpp>
+#include <apr-1/apr_pools.h>
namespace qpid {
namespace io {
+/**
+ * Singleton APR memory pool.
+ */
+class APRPool : private boost::noncopyable {
+ public:
+ APRPool();
+ ~APRPool();
+
+ /** Get singleton instance */
+ static apr_pool_t* get();
+
+ private:
+ apr_pool_t* pool;
+};
+
+}}
+
- class SessionManager
- {
- public:
- virtual SessionHandler* init(SessionContext* ctxt) = 0;
- virtual void close(SessionContext* ctxt) = 0;
- virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0;
- virtual ~SessionManager(){}
- };
-}
-}
-#endif
+#endif /*!_APRPool_*/
diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp
index 6b76bd4da2..f95d9448cf 100644
--- a/cpp/src/qpid/io/Acceptor.cpp
+++ b/cpp/src/qpid/io/Acceptor.cpp
@@ -15,7 +15,64 @@
* limitations under the License.
*
*/
-
#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "APRPool.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
+ port(port_),
+ processor(APRPool::get(), threads, 1000, 5000000)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t Acceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void Acceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void Acceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
-qpid::io::Acceptor::~Acceptor() {}
diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h
index a7f7ad66f0..bc189f7f6e 100644
--- a/cpp/src/qpid/io/Acceptor.h
+++ b/cpp/src/qpid/io/Acceptor.h
@@ -15,36 +15,43 @@
* limitations under the License.
*
*/
-#ifndef _Acceptor_
-#define _Acceptor_
-
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_poll.h"
+#include "apr-1/apr_time.h"
+
+#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+#include "qpid/io/LFProcessor.h"
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/concurrent/Thread.h"
+#include <qpid/SharedObject.h>
namespace qpid {
namespace io {
- class Acceptor
- {
- public:
- /**
- * Bind to port.
- * @param port Port to bind to, 0 to bind to dynamically chosen port.
- * @return The local bound port.
- */
- virtual int16_t bind(int16_t port) = 0;
-
- /**
- * Run the acceptor.
- */
- virtual void run(SessionHandlerFactory* factory) = 0;
-
- /**
- * Shut down the acceptor.
- */
- virtual void shutdown() = 0;
-
- virtual ~Acceptor();
- };
+/** APR Acceptor. */
+class Acceptor : public qpid::SharedObject<Acceptor>
+{
+ public:
+ Acceptor(int16_t port, int backlog, int threads);
+ virtual int16_t getPort() const;
+ virtual void run(SessionHandlerFactory* factory);
+ virtual void shutdown();
+
+ private:
+ int16_t port;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ volatile bool running;
+};
}
}
diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
deleted file mode 100644
index 0e1fc535a2..0000000000
--- a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/concurrent/APRThreadFactory.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using namespace qpid::io;
-
-BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) :
- debug(_debug),
- threadFactory(new APRThreadFactory()),
- connectionBacklog(c)
-{
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL));
-}
-
-int16_t BlockingAPRAcceptor::bind(int16_t _port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
- return getPort();
-}
-
-int16_t BlockingAPRAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void BlockingAPRAcceptor::run(SessionHandlerFactory* factory)
-{
- running = true;
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, apr_pool);
- if(status == APR_SUCCESS){
- //configure socket:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
-
- BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug);
- session->init(factory->create(session));
- sessions.push_back(session);
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void BlockingAPRAcceptor::shutdown()
-{
- // TODO aconway 2006-10-12: Not thread safe.
- if (running)
- {
- running = false;
- apr_socket_close(socket); // Don't check, exception safety.
- for(iterator i = sessions.begin(); i < sessions.end(); i++){
- (*i)->shutdown();
- }
- }
-}
-
-BlockingAPRAcceptor::~BlockingAPRAcceptor(){
- delete threadFactory;
- apr_pool_destroy(apr_pool);
- APRBase::decrement();
-}
-
-
-void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){
- sessions.erase(find(sessions.begin(), sessions.end(), session));
-}
-
diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.h b/cpp/src/qpid/io/BlockingAPRAcceptor.h
deleted file mode 100644
index a3042605aa..0000000000
--- a/cpp/src/qpid/io/BlockingAPRAcceptor.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _BlockingAPRAcceptor_
-#define _BlockingAPRAcceptor_
-
-#include <vector>
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/io/Acceptor.h"
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/io/BlockingAPRSessionContext.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-#include "qpid/concurrent/ThreadPool.h"
-
-namespace qpid {
-namespace io {
-
- class BlockingAPRAcceptor : public virtual Acceptor
- {
- typedef std::vector<BlockingAPRSessionContext*>::iterator iterator;
-
- const bool debug;
- apr_pool_t* apr_pool;
- qpid::concurrent::ThreadFactory* threadFactory;
- std::vector<BlockingAPRSessionContext*> sessions;
- apr_socket_t* socket;
- const int connectionBacklog;
- volatile bool running;
-
- public:
- BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10);
- virtual int16_t bind(int16_t port);
- virtual int16_t getPort() const;
- virtual void run(SessionHandlerFactory* factory);
- virtual void shutdown();
- virtual ~BlockingAPRAcceptor();
- void closed(BlockingAPRSessionContext* session);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
deleted file mode 100644
index 88e6b6b0fc..0000000000
--- a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <assert.h>
-#include <iostream>
-#include "qpid/io/BlockingAPRSessionContext.h"
-#include "qpid/io/BlockingAPRAcceptor.h"
-#include "qpid/concurrent/APRBase.h"
-#include "qpid/QpidError.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using namespace qpid::io;
-
-
-BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket,
- ThreadFactory* factory,
- BlockingAPRAcceptor* _acceptor,
- bool _debug)
- : socket(_socket),
- debug(_debug),
- handler(0),
- acceptor(_acceptor),
- inbuf(65536),
- outbuf(65536),
- closed(false){
-
- reader = new Reader(this);
- writer = new Writer(this);
-
- rThread = factory->create(reader);
- wThread = factory->create(writer);
-}
-
-BlockingAPRSessionContext::~BlockingAPRSessionContext(){
- delete reader;
- delete writer;
-
- delete rThread;
- delete wThread;
-
- delete handler;
-}
-
-void BlockingAPRSessionContext::read(){
- try{
- bool initiated(false);
- while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes);
- if(APR_STATUS_IS_TIMEUP(s)){
- //timed out, check closed on loop
- }else if(APR_STATUS_IS_EOF(s) || bytes == 0){
- closed = true;
- }else{
- inbuf.move(bytes);
- inbuf.flip();
-
- if(!initiated){
- ProtocolInitiation* protocolInit = new ProtocolInitiation();
- if(protocolInit->decode(inbuf)){
- handler->initiated(protocolInit);
- if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl;
- initiated = true;
- }
- }else{
- AMQFrame frame;
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl;
- handler->received(&frame);
- }
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
-
- //close socket
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
-
-void BlockingAPRSessionContext::write(){
- while(!closed){
- //get next frame
- outlock.acquire();
- while(outframes.empty() && !closed){
- outlock.wait();
- }
- if(!closed){
- AMQFrame* frame = outframes.front();
- outframes.pop();
- outlock.release();
-
- //encode
- frame->encode(outbuf);
- if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl;
- delete frame;
- outbuf.flip();
-
- //write from outbuf to socket
- char* data = outbuf.start();
- const int available = outbuf.available();
- int written = 0;
- apr_size_t bytes = available;
- while(available > written){
- apr_socket_send(socket, data + written, &bytes);
- written += bytes;
- bytes = available - written;
- }
- outbuf.clear();
- }else{
- outlock.release();
- }
- }
-}
-
-void BlockingAPRSessionContext::send(AMQFrame* frame){
- if(!closed){
- outlock.acquire();
- bool was_empty(outframes.empty());
- outframes.push(frame);
- if(was_empty){
- outlock.notify();
- }
- outlock.release();
- }else{
- std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl;
- }
-}
-
-void BlockingAPRSessionContext::init(SessionHandler* _handler){
- handler = _handler;
- rThread->start();
- wThread->start();
-}
-
-void BlockingAPRSessionContext::close(){
- closed = true;
- wThread->join();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl;
- handler->closed();
- acceptor->closed(this);
- delete this;
-}
-
-void BlockingAPRSessionContext::shutdown(){
- closed = true;
- outlock.acquire();
- outlock.notify();
- outlock.release();
-
- wThread->join();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- rThread->join();
- handler->closed();
- delete this;
-}
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.h b/cpp/src/qpid/io/BlockingAPRSessionContext.h
deleted file mode 100644
index c06142ace5..0000000000
--- a/cpp/src/qpid/io/BlockingAPRSessionContext.h
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _BlockingAPRSessionContext_
-#define _BlockingAPRSessionContext_
-
-#include <queue>
-#include <vector>
-
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandler.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/io/ShutdownHandler.h"
-#include "qpid/concurrent/Thread.h"
-#include "qpid/concurrent/ThreadFactory.h"
-
-namespace qpid {
-namespace io {
-
- class BlockingAPRAcceptor;
-
- class BlockingAPRSessionContext : public virtual SessionContext
- {
- class Reader : public virtual qpid::concurrent::Runnable{
- BlockingAPRSessionContext* parent;
- public:
- inline Reader(BlockingAPRSessionContext* p) : parent(p){}
- inline virtual void run(){ parent->read(); }
- inline virtual ~Reader(){}
- };
-
- class Writer : public virtual qpid::concurrent::Runnable{
- BlockingAPRSessionContext* parent;
- public:
- inline Writer(BlockingAPRSessionContext* p) : parent(p){}
- inline virtual void run(){ parent->write(); }
- inline virtual ~Writer(){}
- };
-
- apr_socket_t* socket;
- const bool debug;
- SessionHandler* handler;
- BlockingAPRAcceptor* acceptor;
- std::queue<qpid::framing::AMQFrame*> outframes;
- qpid::framing::Buffer inbuf;
- qpid::framing::Buffer outbuf;
- qpid::concurrent::APRMonitor outlock;
- Reader* reader;
- Writer* writer;
- qpid::concurrent::Thread* rThread;
- qpid::concurrent::Thread* wThread;
-
- volatile bool closed;
-
- void read();
- void write();
- public:
- BlockingAPRSessionContext(apr_socket_t* socket,
- qpid::concurrent::ThreadFactory* factory,
- BlockingAPRAcceptor* acceptor,
- bool debug = false);
- ~BlockingAPRSessionContext();
- virtual void send(qpid::framing::AMQFrame* frame);
- virtual void close();
- void shutdown();
- void init(SessionHandler* handler);
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/APRConnector.cpp b/cpp/src/qpid/io/Connector.cpp
index 91cf01c842..ca487deb86 100644
--- a/cpp/src/qpid/io/APRConnector.cpp
+++ b/cpp/src/qpid/io/Connector.cpp
@@ -17,8 +17,8 @@
*/
#include <iostream>
#include "qpid/concurrent/APRBase.h"
-#include "qpid/io/APRConnector.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/QpidError.h"
using namespace qpid::io;
@@ -26,7 +26,7 @@ using namespace qpid::concurrent;
using namespace qpid::framing;
using qpid::QpidError;
-APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
+Connector::Connector(bool _debug, u_int32_t buffer_size) :
debug(_debug),
receive_buffer_size(buffer_size),
send_buffer_size(buffer_size),
@@ -44,11 +44,11 @@ APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
- threadFactory = new APRThreadFactory();
- writeLock = new APRMonitor();
+ threadFactory = new ThreadFactory();
+ writeLock = new Monitor();
}
-APRConnector::~APRConnector(){
+Connector::~Connector(){
delete receiver;
delete writeLock;
delete threadFactory;
@@ -57,7 +57,7 @@ APRConnector::~APRConnector(){
APRBase::decrement();
}
-void APRConnector::connect(const std::string& host, int port){
+void Connector::connect(const std::string& host, int port){
apr_sockaddr_t* address;
CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
@@ -67,36 +67,36 @@ void APRConnector::connect(const std::string& host, int port){
receiver->start();
}
-void APRConnector::init(ProtocolInitiation* header){
+void Connector::init(ProtocolInitiation* header){
writeBlock(header);
delete header;
}
-void APRConnector::close(){
+void Connector::close(){
closed = true;
CHECK_APR_SUCCESS(apr_socket_close(socket));
receiver->join();
}
-void APRConnector::setInputHandler(InputHandler* handler){
+void Connector::setInputHandler(InputHandler* handler){
input = handler;
}
-void APRConnector::setShutdownHandler(ShutdownHandler* handler){
+void Connector::setShutdownHandler(ShutdownHandler* handler){
shutdownHandler = handler;
}
-OutputHandler* APRConnector::getOutputHandler(){
+OutputHandler* Connector::getOutputHandler(){
return this;
}
-void APRConnector::send(AMQFrame* frame){
+void Connector::send(AMQFrame* frame){
writeBlock(frame);
if(debug) std::cout << "SENT: " << *frame << std::endl;
delete frame;
}
-void APRConnector::writeBlock(AMQDataBlock* data){
+void Connector::writeBlock(AMQDataBlock* data){
writeLock->acquire();
data->encode(outbuf);
@@ -107,7 +107,7 @@ void APRConnector::writeBlock(AMQDataBlock* data){
writeLock->release();
}
-void APRConnector::writeToSocket(char* data, size_t available){
+void Connector::writeToSocket(char* data, size_t available){
apr_size_t bytes(available);
apr_size_t written(0);
while(written < available && !closed){
@@ -124,7 +124,7 @@ void APRConnector::writeToSocket(char* data, size_t available){
}
}
-void APRConnector::checkIdle(apr_status_t status){
+void Connector::checkIdle(apr_status_t status){
if(timeoutHandler){
apr_time_t now = apr_time_as_msec(apr_time_now());
if(APR_STATUS_IS_TIMEUP(status)){
@@ -144,7 +144,7 @@ void APRConnector::checkIdle(apr_status_t status){
}
}
-void APRConnector::setReadTimeout(u_int16_t t){
+void Connector::setReadTimeout(u_int16_t t){
idleIn = t * 1000;//t is in secs
if(idleIn && (!timeout || idleIn < timeout)){
timeout = idleIn;
@@ -153,7 +153,7 @@ void APRConnector::setReadTimeout(u_int16_t t){
}
-void APRConnector::setWriteTimeout(u_int16_t t){
+void Connector::setWriteTimeout(u_int16_t t){
idleOut = t * 1000;//t is in secs
if(idleOut && (!timeout || idleOut < timeout)){
timeout = idleOut;
@@ -161,7 +161,7 @@ void APRConnector::setWriteTimeout(u_int16_t t){
}
}
-void APRConnector::setSocketTimeout(){
+void Connector::setSocketTimeout(){
//interval is in microseconds, timeout in milliseconds
//want the interval to be a bit shorter than the timeout, hence multiply
//by 800 rather than 1000.
@@ -169,11 +169,11 @@ void APRConnector::setSocketTimeout(){
apr_socket_timeout_set(socket, interval);
}
-void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
+void Connector::setTimeoutHandler(TimeoutHandler* handler){
timeoutHandler = handler;
}
-void APRConnector::run(){
+void Connector::run(){
try{
while(!closed){
apr_size_t bytes(inbuf.available());
diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h
index d0a2f470a8..7c52f7e87b 100644
--- a/cpp/src/qpid/io/Connector.h
+++ b/cpp/src/qpid/io/Connector.h
@@ -18,35 +18,74 @@
#ifndef _Connector_
#define _Connector_
+#include "apr-1/apr_network_io.h"
+#include "apr-1/apr_time.h"
+
#include "qpid/framing/InputHandler.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/InitiationHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/io/ShutdownHandler.h"
#include "qpid/io/TimeoutHandler.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/Monitor.h"
namespace qpid {
namespace io {
- class Connector
+ class Connector : public virtual qpid::framing::OutputHandler,
+ private virtual qpid::concurrent::Runnable
{
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ apr_time_t lastIn;
+ apr_time_t lastOut;
+ apr_interval_time_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ TimeoutHandler* timeoutHandler;
+ ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::concurrent::Monitor* writeLock;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* receiver;
+
+ apr_pool_t* pool;
+ apr_socket_t* socket;
+
+ void checkIdle(apr_status_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+
public:
- virtual void connect(const std::string& host, int port) = 0;
- virtual void init(qpid::framing::ProtocolInitiation* header) = 0;
- virtual void close() = 0;
- virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0;
- virtual void setTimeoutHandler(TimeoutHandler* handler) = 0;
- virtual void setShutdownHandler(ShutdownHandler* handler) = 0;
- virtual qpid::framing::OutputHandler* getOutputHandler() = 0;
- /**
- * Set the timeout for reads, in secs.
- */
- virtual void setReadTimeout(u_int16_t timeout) = 0;
- /**
- * Set the timeout for writes, in secs.
- */
- virtual void setWriteTimeout(u_int16_t timeout) = 0;
- virtual ~Connector(){}
+ Connector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~Connector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(TimeoutHandler* handler);
+ virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
};
}
diff --git a/cpp/src/qpid/io/ConnectorImpl.h b/cpp/src/qpid/io/ConnectorImpl.h
deleted file mode 100644
index 55dcf7a2d4..0000000000
--- a/cpp/src/qpid/io/ConnectorImpl.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _APRConnectorImpl_
-#define _APRConnectorImpl_
-
-#ifdef _USE_APR_IO_
-#include "qpid/io/APRConnector.h"
-#else
-#include "qpid/io/LConnector.h"
-#endif
-
-namespace qpid {
-namespace io {
-
-#ifdef _USE_APR_IO_
- class ConnectorImpl : public virtual APRConnector
- {
-
- public:
- ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){};
- virtual ~ConnectorImpl(){};
- };
-#else
- class ConnectorImpl : public virtual LConnector
- {
-
- public:
- ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){};
- virtual ~ConnectorImpl(){};
- };
-
-#endif
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LFAcceptor.cpp b/cpp/src/qpid/io/LFAcceptor.cpp
deleted file mode 100644
index 7e51a550af..0000000000
--- a/cpp/src/qpid/io/LFAcceptor.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "qpid/io/LFAcceptor.h"
-#include "qpid/concurrent/APRBase.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::io;
-
-LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) :
- processor(aprPool.pool, worker_threads, 1000, 5000000),
- max_connections_per_processor(m),
- debug(_debug),
- connectionBacklog(c)
-{ }
-
-
-int16_t LFAcceptor::bind(int16_t _port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool));
- CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
- return getPort();
-}
-
-int16_t LFAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void LFAcceptor::run(SessionHandlerFactory* factory) {
- running = true;
- processor.start();
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool);
- if(status == APR_SUCCESS){
- //make this socket non-blocking:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug);
- session->init(factory->create(session));
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void LFAcceptor::shutdown() {
- // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
- if (running) {
- running = false;
- processor.stop();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- }
-}
-
-
-LFAcceptor::~LFAcceptor(){}
-
-LFAcceptor::APRPool::APRPool(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-}
-
-LFAcceptor::APRPool::~APRPool(){
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
diff --git a/cpp/src/qpid/io/LFAcceptor.h b/cpp/src/qpid/io/LFAcceptor.h
deleted file mode 100644
index 35a556d500..0000000000
--- a/cpp/src/qpid/io/LFAcceptor.h
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#ifndef _LFAcceptor_
-#define _LFAcceptor_
-
-#include <vector>
-#include "apr-1/apr_network_io.h"
-#include "apr-1/apr_poll.h"
-#include "apr-1/apr_time.h"
-
-#include "qpid/io/Acceptor.h"
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/APRThreadFactory.h"
-#include "qpid/concurrent/APRThreadPool.h"
-#include "qpid/io/LFProcessor.h"
-#include "qpid/io/LFSessionContext.h"
-#include "qpid/concurrent/Runnable.h"
-#include "qpid/io/SessionContext.h"
-#include "qpid/io/SessionHandlerFactory.h"
-#include "qpid/concurrent/Thread.h"
-
-namespace qpid {
-namespace io {
-
- class LFAcceptor : public virtual Acceptor
- {
- class APRPool{
- public:
- apr_pool_t* pool;
- APRPool();
- ~APRPool();
- };
-
- APRPool aprPool;
- LFProcessor processor;
- apr_socket_t* socket;
- const int max_connections_per_processor;
- const bool debug;
- const int connectionBacklog;
-
- volatile bool running;
-
- public:
- LFAcceptor(bool debug = false,
- int connectionBacklog = 10,
- int worker_threads = 5,
- int max_connections_per_processor = 500);
- virtual int16_t bind(int16_t port);
- virtual int16_t getPort() const;
- virtual void run(SessionHandlerFactory* factory);
- virtual void shutdown();
- virtual ~LFAcceptor();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h
index 16c789f0ff..5b61f444af 100644
--- a/cpp/src/qpid/io/LFProcessor.h
+++ b/cpp/src/qpid/io/LFProcessor.h
@@ -21,8 +21,8 @@
#include "apr-1/apr_poll.h"
#include <iostream>
#include <vector>
-#include "qpid/concurrent/APRMonitor.h"
-#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/Monitor.h"
+#include "qpid/concurrent/ThreadFactory.h"
#include "qpid/concurrent/Runnable.h"
namespace qpid {
@@ -50,9 +50,9 @@ namespace io {
const int workerCount;
bool hasLeader;
qpid::concurrent::Thread** const workers;
- qpid::concurrent::APRMonitor leadLock;
- qpid::concurrent::APRMonitor countLock;
- qpid::concurrent::APRThreadFactory factory;
+ qpid::concurrent::Monitor leadLock;
+ qpid::concurrent::Monitor countLock;
+ qpid::concurrent::ThreadFactory factory;
std::vector<LFSessionContext*> sessions;
volatile bool stopped;
diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp
index 6d6d786841..ca1e6431a6 100644
--- a/cpp/src/qpid/io/LFSessionContext.cpp
+++ b/cpp/src/qpid/io/LFSessionContext.cpp
@@ -54,7 +54,7 @@ LFSessionContext::~LFSessionContext(){
void LFSessionContext::read(){
assert(!reading); // No concurrent read.
- reading = APRThread::currentThread();
+ reading = Thread::currentThread();
socket.read(in);
in.flip();
@@ -79,7 +79,7 @@ void LFSessionContext::read(){
void LFSessionContext::write(){
assert(!writing); // No concurrent writes.
- writing = APRThread::currentThread();
+ writing = Thread::currentThread();
bool done = isClosed();
while(!done){
@@ -186,4 +186,4 @@ void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
logLock.release();
}
-APRMonitor LFSessionContext::logLock;
+Monitor LFSessionContext::logLock;
diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h
index 9406bb97b6..8d30b54204 100644
--- a/cpp/src/qpid/io/LFSessionContext.h
+++ b/cpp/src/qpid/io/LFSessionContext.h
@@ -25,7 +25,7 @@
#include "apr-1/apr_time.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/Monitor.h"
#include "qpid/io/APRSocket.h"
#include "qpid/framing/Buffer.h"
#include "qpid/io/LFProcessor.h"
@@ -51,7 +51,7 @@ namespace io {
apr_pollfd_t fd;
std::queue<qpid::framing::AMQFrame*> framesToWrite;
- qpid::concurrent::APRMonitor writeLock;
+ qpid::concurrent::Monitor writeLock;
bool processing;
bool closing;
@@ -60,7 +60,7 @@ namespace io {
volatile unsigned int reading;
volatile unsigned int writing;
- static qpid::concurrent::APRMonitor logLock;
+ static qpid::concurrent::Monitor logLock;
void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
public:
diff --git a/cpp/src/qpid/io/doxygen_summary.h b/cpp/src/qpid/io/doxygen_summary.h
new file mode 100644
index 0000000000..1086f65f63
--- /dev/null
+++ b/cpp/src/qpid/io/doxygen_summary.h
@@ -0,0 +1,34 @@
+#ifndef _doxygen_summary_
+#define _doxygen_summary_
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// No code just a doxygen comment for the namespace
+
+/** \namspace qpid::io
+ * IO classes used by client and broker.
+ *
+ * This namespace contains platform-neutral classes. Platform
+ * specific classes are in a sub-namespace named after the
+ * platform. At build time the appropriate platform classes are
+ * imported into this namespace so other code does not need to be awre
+ * of the difference.
+ *
+ */
+#endif /*!_doxygen_summary_*/