summaryrefslogtreecommitdiff
path: root/cpp/common/io/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
committerAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
commit8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch)
tree1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/common/io/src
parent9a808fb13aba243d41bbdab75158dae5939a80a4 (diff)
downloadqpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/src')
-rw-r--r--cpp/common/io/src/APRConnector.cpp201
-rw-r--r--cpp/common/io/src/APRSocket.cpp78
-rw-r--r--cpp/common/io/src/Acceptor.cpp21
-rw-r--r--cpp/common/io/src/BlockingAPRAcceptor.cpp101
-rw-r--r--cpp/common/io/src/BlockingAPRSessionContext.cpp178
-rw-r--r--cpp/common/io/src/LFAcceptor.cpp94
-rw-r--r--cpp/common/io/src/LFProcessor.cpp193
-rw-r--r--cpp/common/io/src/LFSessionContext.cpp189
8 files changed, 0 insertions, 1055 deletions
diff --git a/cpp/common/io/src/APRConnector.cpp b/cpp/common/io/src/APRConnector.cpp
deleted file mode 100644
index 5f3bfd6957..0000000000
--- a/cpp/common/io/src/APRConnector.cpp
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "APRBase.h"
-#include "APRConnector.h"
-#include "APRThreadFactory.h"
-#include "QpidError.h"
-
-using namespace qpid::io;
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using qpid::QpidError;
-
-APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
- debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- closed(true),
- lastIn(0), lastOut(0),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- inbuf(receive_buffer_size),
- outbuf(send_buffer_size){
-
- APRBase::increment();
-
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
-
- threadFactory = new APRThreadFactory();
- writeLock = new APRMonitor();
-}
-
-APRConnector::~APRConnector(){
- delete receiver;
- delete writeLock;
- delete threadFactory;
- apr_pool_destroy(pool);
-
- APRBase::decrement();
-}
-
-void APRConnector::connect(const std::string& host, int port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
- CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
- closed = false;
-
- receiver = threadFactory->create(this);
- receiver->start();
-}
-
-void APRConnector::init(ProtocolInitiation* header){
- writeBlock(header);
- delete header;
-}
-
-void APRConnector::close(){
- closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- receiver->join();
-}
-
-void APRConnector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void APRConnector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* APRConnector::getOutputHandler(){
- return this;
-}
-
-void APRConnector::send(AMQFrame* frame){
- writeBlock(frame);
- if(debug) std::cout << "SENT: " << *frame << std::endl;
- delete frame;
-}
-
-void APRConnector::writeBlock(AMQDataBlock* data){
- writeLock->acquire();
- data->encode(outbuf);
-
- //transfer data to wire
- outbuf.flip();
- writeToSocket(outbuf.start(), outbuf.available());
- outbuf.clear();
- writeLock->release();
-}
-
-void APRConnector::writeToSocket(char* data, size_t available){
- apr_size_t bytes(available);
- apr_size_t written(0);
- while(written < available && !closed){
- apr_status_t status = apr_socket_send(socket, data + written, &bytes);
- if(status == APR_TIMEUP){
- std::cout << "Write request timed out." << std::endl;
- }
- if(bytes == 0){
- std::cout << "Write request wrote 0 bytes." << std::endl;
- }
- lastOut = apr_time_as_msec(apr_time_now());
- written += bytes;
- bytes = available - written;
- }
-}
-
-void APRConnector::checkIdle(apr_status_t status){
- if(timeoutHandler){
- apr_time_t now = apr_time_as_msec(apr_time_now());
- if(APR_STATUS_IS_TIMEUP(status)){
- if(idleIn && (now - lastIn > idleIn)){
- timeoutHandler->idleIn();
- }
- }else if(APR_STATUS_IS_EOF(status)){
- closed = true;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- if(shutdownHandler) shutdownHandler->shutdown();
- }else{
- lastIn = now;
- }
- if(idleOut && (now - lastOut > idleOut)){
- timeoutHandler->idleOut();
- }
- }
-}
-
-void APRConnector::setReadTimeout(u_int16_t t){
- idleIn = t * 1000;//t is in secs
- if(idleIn && (!timeout || idleIn < timeout)){
- timeout = idleIn;
- setSocketTimeout();
- }
-
-}
-
-void APRConnector::setWriteTimeout(u_int16_t t){
- idleOut = t * 1000;//t is in secs
- if(idleOut && (!timeout || idleOut < timeout)){
- timeout = idleOut;
- setSocketTimeout();
- }
-}
-
-void APRConnector::setSocketTimeout(){
- //interval is in microseconds, timeout in milliseconds
- //want the interval to be a bit shorter than the timeout, hence multiply
- //by 800 rather than 1000.
- apr_interval_time_t interval(timeout * 800);
- apr_socket_timeout_set(socket, interval);
-}
-
-void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
- timeoutHandler = handler;
-}
-
-void APRConnector::run(){
- try{
- while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
-
- if(bytes > 0){
- inbuf.move(bytes);
- inbuf.flip();//position = 0, limit = total data read
-
- AMQFrame frame;
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: " << frame << std::endl;
- input->received(&frame);
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
diff --git a/cpp/common/io/src/APRSocket.cpp b/cpp/common/io/src/APRSocket.cpp
deleted file mode 100644
index 1ef7e270a3..0000000000
--- a/cpp/common/io/src/APRSocket.cpp
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "APRBase.h"
-#include "APRSocket.h"
-#include <assert.h>
-#include <iostream>
-
-using namespace qpid::io;
-using namespace qpid::framing;
-using namespace qpid::concurrent;
-
-APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
-
-}
-
-void APRSocket::read(qpid::framing::Buffer& buffer){
- apr_size_t bytes;
- bytes = buffer.available();
- apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
- buffer.move(bytes);
- if(APR_STATUS_IS_TIMEUP(s)){
- //timed out
- }else if(APR_STATUS_IS_EOF(s)){
- close();
- }
-}
-
-void APRSocket::write(qpid::framing::Buffer& buffer){
- apr_size_t bytes;
- do{
- bytes = buffer.available();
- apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes);
- // TODO aconway 2006-10-05: better error handling
- assert(s == 0);
- buffer.move(bytes);
- }while(bytes > 0);
-}
-
-void APRSocket::close(){
- if(!closed){
- std::cout << "Closing socket " << socket << "@" << this << std::endl;
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- closed = true;
- }
-}
-
-bool APRSocket::isOpen(){
- return !closed;
-}
-
-u_int8_t APRSocket::read(){
- char data[1];
- apr_size_t bytes = 1;
- apr_status_t s = apr_socket_recv(socket, data, &bytes);
- if(APR_STATUS_IS_EOF(s) || bytes == 0){
- return 0;
- }else{
- return *data;
- }
-}
-
-APRSocket::~APRSocket(){
-}
diff --git a/cpp/common/io/src/Acceptor.cpp b/cpp/common/io/src/Acceptor.cpp
deleted file mode 100644
index d1825c78fa..0000000000
--- a/cpp/common/io/src/Acceptor.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-#include "Acceptor.h"
-
-qpid::io::Acceptor::~Acceptor() {}
diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp
deleted file mode 100644
index 4c55b9e2c8..0000000000
--- a/cpp/common/io/src/BlockingAPRAcceptor.cpp
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <iostream>
-#include "BlockingAPRAcceptor.h"
-#include "APRBase.h"
-#include "APRThreadFactory.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using namespace qpid::io;
-
-BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) :
- debug(_debug),
- threadFactory(new APRThreadFactory()),
- connectionBacklog(c)
-{
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL));
-}
-
-int16_t BlockingAPRAcceptor::bind(int16_t _port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
- return getPort();
-}
-
-int16_t BlockingAPRAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void BlockingAPRAcceptor::run(SessionHandlerFactory* factory)
-{
- running = true;
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, apr_pool);
- if(status == APR_SUCCESS){
- //configure socket:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
-
- BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug);
- session->init(factory->create(session));
- sessions.push_back(session);
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void BlockingAPRAcceptor::shutdown()
-{
- // TODO aconway 2006-10-12: Not thread safe.
- if (running)
- {
- running = false;
- apr_socket_close(socket); // Don't check, exception safety.
- for(iterator i = sessions.begin(); i < sessions.end(); i++){
- (*i)->shutdown();
- }
- }
-}
-
-BlockingAPRAcceptor::~BlockingAPRAcceptor(){
- delete threadFactory;
- apr_pool_destroy(apr_pool);
- APRBase::decrement();
-}
-
-
-void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){
- sessions.erase(find(sessions.begin(), sessions.end(), session));
-}
-
diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp
deleted file mode 100644
index 6d1dc3470c..0000000000
--- a/cpp/common/io/src/BlockingAPRSessionContext.cpp
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include <assert.h>
-#include <iostream>
-#include "BlockingAPRSessionContext.h"
-#include "BlockingAPRAcceptor.h"
-#include "APRBase.h"
-#include "QpidError.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::framing;
-using namespace qpid::io;
-
-
-BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket,
- ThreadFactory* factory,
- BlockingAPRAcceptor* _acceptor,
- bool _debug)
- : socket(_socket),
- debug(_debug),
- handler(0),
- acceptor(_acceptor),
- inbuf(65536),
- outbuf(65536),
- closed(false){
-
- reader = new Reader(this);
- writer = new Writer(this);
-
- rThread = factory->create(reader);
- wThread = factory->create(writer);
-}
-
-BlockingAPRSessionContext::~BlockingAPRSessionContext(){
- delete reader;
- delete writer;
-
- delete rThread;
- delete wThread;
-
- delete handler;
-}
-
-void BlockingAPRSessionContext::read(){
- try{
- bool initiated(false);
- while(!closed){
- apr_size_t bytes(inbuf.available());
- if(bytes < 1){
- THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
- }
- apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes);
- if(APR_STATUS_IS_TIMEUP(s)){
- //timed out, check closed on loop
- }else if(APR_STATUS_IS_EOF(s) || bytes == 0){
- closed = true;
- }else{
- inbuf.move(bytes);
- inbuf.flip();
-
- if(!initiated){
- ProtocolInitiation* protocolInit = new ProtocolInitiation();
- if(protocolInit->decode(inbuf)){
- handler->initiated(protocolInit);
- if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl;
- initiated = true;
- }
- }else{
- AMQFrame frame;
- while(frame.decode(inbuf)){
- if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl;
- handler->received(&frame);
- }
- }
- //need to compact buffer to preserve any 'extra' data
- inbuf.compact();
- }
- }
-
- //close socket
- }catch(qpid::QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
-
-void BlockingAPRSessionContext::write(){
- while(!closed){
- //get next frame
- outlock.acquire();
- while(outframes.empty() && !closed){
- outlock.wait();
- }
- if(!closed){
- AMQFrame* frame = outframes.front();
- outframes.pop();
- outlock.release();
-
- //encode
- frame->encode(outbuf);
- if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl;
- delete frame;
- outbuf.flip();
-
- //write from outbuf to socket
- char* data = outbuf.start();
- const int available = outbuf.available();
- int written = 0;
- apr_size_t bytes = available;
- while(available > written){
- apr_status_t s = apr_socket_send(socket, data + written, &bytes);
- assert(s == 0); // TODO aconway 2006-10-05: Error Handling.
- written += bytes;
- bytes = available - written;
- }
- outbuf.clear();
- }else{
- outlock.release();
- }
- }
-}
-
-void BlockingAPRSessionContext::send(AMQFrame* frame){
- if(!closed){
- outlock.acquire();
- bool was_empty(outframes.empty());
- outframes.push(frame);
- if(was_empty){
- outlock.notify();
- }
- outlock.release();
- }else{
- std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl;
- }
-}
-
-void BlockingAPRSessionContext::init(SessionHandler* _handler){
- handler = _handler;
- rThread->start();
- wThread->start();
-}
-
-void BlockingAPRSessionContext::close(){
- closed = true;
- wThread->join();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl;
- handler->closed();
- acceptor->closed(this);
- delete this;
-}
-
-void BlockingAPRSessionContext::shutdown(){
- closed = true;
- outlock.acquire();
- outlock.notify();
- outlock.release();
-
- wThread->join();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- rThread->join();
- handler->closed();
- delete this;
-}
diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp
deleted file mode 100644
index 86f382afac..0000000000
--- a/cpp/common/io/src/LFAcceptor.cpp
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "LFAcceptor.h"
-#include "APRBase.h"
-
-using namespace qpid::concurrent;
-using namespace qpid::io;
-
-LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) :
- processor(aprPool.pool, worker_threads, 1000, 5000000),
- max_connections_per_processor(m),
- debug(_debug),
- connectionBacklog(c)
-{ }
-
-
-int16_t LFAcceptor::bind(int16_t _port){
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool));
- CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
- return getPort();
-}
-
-int16_t LFAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void LFAcceptor::run(SessionHandlerFactory* factory) {
- running = true;
- processor.start();
- std::cout << "Listening on port " << getPort() << "..." << std::endl;
- while(running){
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool);
- if(status == APR_SUCCESS){
- //make this socket non-blocking:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug);
- session->init(factory->create(session));
- }else{
- running = false;
- if(status != APR_EINTR){
- std::cout << "ERROR: " << get_desc(status) << std::endl;
- }
- }
- }
- shutdown();
-}
-
-void LFAcceptor::shutdown() {
- // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
- if (running) {
- running = false;
- processor.stop();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- }
-}
-
-
-LFAcceptor::~LFAcceptor(){}
-
-LFAcceptor::APRPool::APRPool(){
- APRBase::increment();
- CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
-}
-
-LFAcceptor::APRPool::~APRPool(){
- apr_pool_destroy(pool);
- APRBase::decrement();
-}
diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp
deleted file mode 100644
index 65d7451767..0000000000
--- a/cpp/common/io/src/LFProcessor.cpp
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "LFProcessor.h"
-#include "APRBase.h"
-#include "LFSessionContext.h"
-#include "QpidError.h"
-#include <sstream>
-
-using namespace qpid::io;
-using namespace qpid::concurrent;
-using qpid::QpidError;
-
-// TODO aconway 2006-10-12: stopped is read outside locks.
-//
-
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
- size(_size),
- timeout(_timeout),
- signalledCount(0),
- current(0),
- count(0),
- workerCount(_workers),
- hasLeader(false),
- workers(new Thread*[_workers]),
- stopped(false)
-{
-
- CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
- //create & start the required number of threads
- for(int i = 0; i < workerCount; i++){
- workers[i] = factory.create(this);
- }
-}
-
-
-LFProcessor::~LFProcessor(){
- if (!stopped) stop();
- for(int i = 0; i < workerCount; i++){
- delete workers[i];
- }
- delete[] workers;
- CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
-}
-
-void LFProcessor::start(){
- for(int i = 0; i < workerCount; i++){
- workers[i]->start();
- }
-}
-
-void LFProcessor::add(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
- countLock.acquire();
- sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
- count++;
- countLock.release();
-}
-
-void LFProcessor::remove(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- countLock.acquire();
- sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
- count--;
- countLock.release();
-}
-
-void LFProcessor::reactivate(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
-}
-
-void LFProcessor::deactivate(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
-}
-
-void LFProcessor::update(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
-}
-
-bool LFProcessor::full(){
- Locker locker(countLock);
- return count == size;
-}
-
-bool LFProcessor::empty(){
- Locker locker(countLock);
- return count == 0;
-}
-
-void LFProcessor::poll(){
- apr_status_t status;
- do{
- current = 0;
- if(!stopped){
- status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
- }
- }while(status != APR_SUCCESS && !stopped);
-}
-
-void LFProcessor::run(){
- try{
- while(!stopped){
- leadLock.acquire();
- waitToLead();
- if(!stopped){
- const apr_pollfd_t* evt = getNextEvent();
- if(evt){
- LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
- session->startProcessing();
-
- relinquishLead();
- leadLock.release();
-
- //process event:
- if(evt->rtnevents & APR_POLLIN) session->read();
- if(evt->rtnevents & APR_POLLOUT) session->write();
-
- if(session->isClosed()){
- session->handleClose();
- countLock.acquire();
- sessions.erase(find(sessions.begin(), sessions.end(), session));
- count--;
- countLock.release();
- }else{
- session->stopProcessing();
- }
-
- }else{
- leadLock.release();
- }
- }else{
- leadLock.release();
- }
- }
- }catch(QpidError error){
- std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
- }
-}
-
-void LFProcessor::waitToLead(){
- while(hasLeader && !stopped) leadLock.wait();
- hasLeader = !stopped;
-}
-
-void LFProcessor::relinquishLead(){
- hasLeader = false;
- leadLock.notify();
-}
-
-const apr_pollfd_t* LFProcessor::getNextEvent(){
- while(true){
- if(stopped){
- return 0;
- }else if(current < signalledCount){
- //use result of previous poll if one is available
- return signalledFDs + (current++);
- }else{
- //else poll to get new events
- poll();
- }
- }
-}
-
-void LFProcessor::stop(){
- stopped = true;
- leadLock.acquire();
- leadLock.notifyAll();
- leadLock.release();
-
- for(int i = 0; i < workerCount; i++){
- workers[i]->join();
- }
-
- for(iterator i = sessions.begin(); i < sessions.end(); i++){
- (*i)->shutdown();
- }
-}
-
diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp
deleted file mode 100644
index 7b8208f704..0000000000
--- a/cpp/common/io/src/LFSessionContext.cpp
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-#include "LFSessionContext.h"
-#include "APRBase.h"
-#include "QpidError.h"
-#include <assert.h>
-
-using namespace qpid::concurrent;
-using namespace qpid::io;
-using namespace qpid::framing;
-
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
- LFProcessor* const _processor,
- bool _debug) :
- debug(_debug),
- socket(_socket),
- initiated(false),
- in(32768),
- out(32768),
- processor(_processor),
- processing(false),
- closing(false),
- reading(0),
- writing(0)
-{
-
- fd.p = _pool;
- fd.desc_type = APR_POLL_SOCKET;
- fd.reqevents = APR_POLLIN;
- fd.client_data = this;
- fd.desc.s = _socket;
-
- out.flip();
-}
-
-LFSessionContext::~LFSessionContext(){
-
-}
-
-void LFSessionContext::read(){
- assert(!reading); // No concurrent read.
- reading = APRThread::currentThread();
-
- socket.read(in);
- in.flip();
- if(initiated){
- AMQFrame frame;
- while(frame.decode(in)){
- if(debug) log("RECV", &frame);
- handler->received(&frame);
- }
- }else{
- ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- handler->initiated(&protocolInit);
- initiated = true;
- if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
- }
- }
- in.compact();
-
- reading = 0;
-}
-
-void LFSessionContext::write(){
- assert(!writing); // No concurrent writes.
- writing = APRThread::currentThread();
-
- bool done = isClosed();
- while(!done){
- if(out.available() > 0){
- socket.write(out);
- if(out.available() > 0){
- writing = 0;
-
- //incomplete write, leave flags to receive notification of readiness to write
- done = true;//finished processing for now, but write is still in progress
- }
- }else{
- //do we have any frames to write?
- writeLock.acquire();
- if(!framesToWrite.empty()){
- out.clear();
- bool encoded(false);
- AMQFrame* frame = framesToWrite.front();
- while(frame && out.available() >= frame->size()){
- encoded = true;
- frame->encode(out);
- if(debug) log("SENT", frame);
- delete frame;
- framesToWrite.pop();
- frame = framesToWrite.empty() ? 0 : framesToWrite.front();
- }
- if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
- out.flip();
- }else{
- //reset flags, don't care about writability anymore
- fd.reqevents = APR_POLLIN;
- done = true;
-
- writing = 0;
-
- if(closing){
- socket.close();
- }
- }
- writeLock.release();
- }
- }
-}
-
-void LFSessionContext::send(AMQFrame* frame){
- writeLock.acquire();
- if(!closing){
- framesToWrite.push(frame);
- if(!(fd.reqevents & APR_POLLOUT)){
- fd.reqevents |= APR_POLLOUT;
- if(!processing){
- processor->update(&fd);
- }
- }
- }
- writeLock.release();
-}
-
-void LFSessionContext::startProcessing(){
- writeLock.acquire();
- processing = true;
- processor->deactivate(&fd);
- writeLock.release();
-}
-
-void LFSessionContext::stopProcessing(){
- writeLock.acquire();
- processor->reactivate(&fd);
- processing = false;
- writeLock.release();
-}
-
-void LFSessionContext::close(){
- closing = true;
- writeLock.acquire();
- if(!processing){
- //allow pending frames to be written to socket
- fd.reqevents = APR_POLLOUT;
- processor->update(&fd);
- }
- writeLock.release();
-}
-
-void LFSessionContext::handleClose(){
- handler->closed();
- std::cout << "Session closed [" << &socket << "]" << std::endl;
- delete handler;
- delete this;
-}
-
-void LFSessionContext::shutdown(){
- socket.close();
- handleClose();
-}
-
-void LFSessionContext::init(SessionHandler* _handler){
- handler = _handler;
- processor->add(&fd);
-}
-
-void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
- logLock.acquire();
- std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
- logLock.release();
-}
-
-APRMonitor LFSessionContext::logLock;