diff options
Diffstat (limited to 'cpp/common/io/src')
-rw-r--r-- | cpp/common/io/src/APRConnector.cpp | 198 | ||||
-rw-r--r-- | cpp/common/io/src/APRIOProcessor.cpp | 100 | ||||
-rw-r--r-- | cpp/common/io/src/APRSocket.cpp | 76 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 81 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRSessionContext.cpp | 177 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 80 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 191 | ||||
-rw-r--r-- | cpp/common/io/src/LFSessionContext.cpp | 187 |
8 files changed, 1090 insertions, 0 deletions
diff --git a/cpp/common/io/src/APRConnector.cpp b/cpp/common/io/src/APRConnector.cpp new file mode 100644 index 0000000000..0e022a8c73 --- /dev/null +++ b/cpp/common/io/src/APRConnector.cpp @@ -0,0 +1,198 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "APRBase.h" +#include "APRConnector.h" +#include "APRThreadFactory.h" +#include "QpidError.h" + +using namespace qpid::io; +using namespace qpid::concurrent; +using namespace qpid::framing; +using qpid::QpidError; + +APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : closed(true), debug(_debug), + idleIn(0), idleOut(0), timeout(0), + timeoutHandler(0), + shutdownHandler(0), + lastIn(0), lastOut(0), + receive_buffer_size(buffer_size), + send_buffer_size(buffer_size), + inbuf(receive_buffer_size), + outbuf(send_buffer_size){ + + APRBase::increment(); + + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); + + threadFactory = new APRThreadFactory(); + writeLock = new APRMonitor(); +} + +APRConnector::~APRConnector(){ + delete receiver; + delete writeLock; + delete threadFactory; + apr_pool_destroy(pool); + + APRBase::decrement(); +} + +void APRConnector::connect(const std::string& host, int port){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); + CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); + closed = false; + + receiver = threadFactory->create(this); + receiver->start(); +} + +void APRConnector::init(ProtocolInitiation* header){ + writeBlock(header); + delete header; +} + +void APRConnector::close(){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + receiver->join(); +} + +void APRConnector::setInputHandler(InputHandler* handler){ + input = handler; +} + +void APRConnector::setShutdownHandler(ShutdownHandler* handler){ + shutdownHandler = handler; +} + +OutputHandler* APRConnector::getOutputHandler(){ + return this; +} + +void APRConnector::send(AMQFrame* frame){ + writeBlock(frame); + if(debug) std::cout << "SENT: " << *frame << std::endl; + delete frame; +} + +void APRConnector::writeBlock(AMQDataBlock* data){ + writeLock->acquire(); + data->encode(outbuf); + + //transfer data to wire + outbuf.flip(); + writeToSocket(outbuf.start(), outbuf.available()); + outbuf.clear(); + writeLock->release(); +} + +void APRConnector::writeToSocket(char* data, int available){ + apr_size_t bytes(available); + apr_size_t written(0); + while(written < available && !closed){ + apr_status_t status = apr_socket_send(socket, data + written, &bytes); + if(status == APR_TIMEUP){ + std::cout << "Write request timed out." << std::endl; + } + if(bytes == 0){ + std::cout << "Write request wrote 0 bytes." << std::endl; + } + lastOut = apr_time_as_msec(apr_time_now()); + written += bytes; + bytes = available - written; + } +} + +void APRConnector::checkIdle(apr_status_t status){ + if(timeoutHandler){ + apr_time_t now = apr_time_as_msec(apr_time_now()); + if(APR_STATUS_IS_TIMEUP(status)){ + if(idleIn && (now - lastIn > idleIn)){ + timeoutHandler->idleIn(); + } + }else if(APR_STATUS_IS_EOF(status)){ + closed = true; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(shutdownHandler) shutdownHandler->shutdown(); + }else{ + lastIn = now; + } + if(idleOut && (now - lastOut > idleOut)){ + timeoutHandler->idleOut(); + } + } +} + +void APRConnector::setReadTimeout(u_int16_t t){ + idleIn = t * 1000;//t is in secs + if(idleIn && (!timeout || idleIn < timeout)){ + timeout = idleIn; + setSocketTimeout(); + } + +} + +void APRConnector::setWriteTimeout(u_int16_t t){ + idleOut = t * 1000;//t is in secs + if(idleOut && (!timeout || idleOut < timeout)){ + timeout = idleOut; + setSocketTimeout(); + } +} + +void APRConnector::setSocketTimeout(){ + //interval is in microseconds, timeout in milliseconds + //want the interval to be a bit shorter than the timeout, hence multiply + //by 800 rather than 1000. + apr_interval_time_t interval(timeout * 800); + apr_socket_timeout_set(socket, interval); +} + +void APRConnector::setTimeoutHandler(TimeoutHandler* handler){ + timeoutHandler = handler; +} + +void APRConnector::run(){ + try{ + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); + + if(bytes > 0){ + inbuf.move(bytes); + inbuf.flip();//position = 0, limit = total data read + + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: " << frame << std::endl; + input->received(&frame); + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} diff --git a/cpp/common/io/src/APRIOProcessor.cpp b/cpp/common/io/src/APRIOProcessor.cpp new file mode 100644 index 0000000000..d630f2f315 --- /dev/null +++ b/cpp/common/io/src/APRIOProcessor.cpp @@ -0,0 +1,100 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRIOProcessor.h" +#include "APRBase.h" +#include "QpidError.h" + +using namespace qpid::io; +using namespace qpid::concurrent; + +APRIOProcessor::APRIOProcessor(apr_pool_t* pool, int _size, int _timeout) : size(_size), + timeout(_timeout), + count(0), + thread(pool, this), + stopped(false){ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + thread.start(); +} + +void APRIOProcessor::add(apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + lock.acquire(); + if(!count++) lock.notify(); + lock.release(); +} + +void APRIOProcessor::remove(apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + lock.acquire(); + count--; + lock.release(); +} + +bool APRIOProcessor::full(){ + lock.acquire(); + bool full = count == size; + lock.release(); + return full; +} + +bool APRIOProcessor::empty(){ + lock.acquire(); + bool empty = count == 0; + lock.release(); + return empty; +} + +void APRIOProcessor::poll(){ + try{ + int signalledCount; + const apr_pollfd_t* signalledFDs; + apr_status_t status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + if(status == APR_SUCCESS){ + for(int i = 0; i < signalledCount; i++){ + IOSessionHolder* session = reinterpret_cast<IOSessionHolder*>(signalledFDs[i].client_data); + if(signalledFDs[i].rtnevents & APR_POLLIN) session->read(); + if(signalledFDs[i].rtnevents & APR_POLLOUT) session->write(); + } + } + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } + +} + +void APRIOProcessor::run(){ + while(!stopped){ + lock.acquire(); + while(count == 0) lock.wait(); + lock.release(); + poll(); + } +} + +void APRIOProcessor::stop(){ + lock.acquire(); + stopped = true; + lock.notify(); + lock.release(); +} + +APRIOProcessor::~APRIOProcessor(){ + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + diff --git a/cpp/common/io/src/APRSocket.cpp b/cpp/common/io/src/APRSocket.cpp new file mode 100644 index 0000000000..32861ea442 --- /dev/null +++ b/cpp/common/io/src/APRSocket.cpp @@ -0,0 +1,76 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "APRBase.h" +#include "APRSocket.h" + +#include <iostream> + +using namespace qpid::io; +using namespace qpid::framing; +using namespace qpid::concurrent; + +APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ + +} + +void APRSocket::read(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + bytes = buffer.available(); + apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); + buffer.move(bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out + }else if(APR_STATUS_IS_EOF(s)){ + close(); + } +} + +void APRSocket::write(qpid::framing::Buffer& buffer){ + apr_size_t bytes; + do{ + bytes = buffer.available(); + apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes); + buffer.move(bytes); + }while(bytes > 0); +} + +void APRSocket::close(){ + if(!closed){ + std::cout << "Closing socket " << socket << "@" << this << std::endl; + CHECK_APR_SUCCESS(apr_socket_close(socket)); + closed = true; + } +} + +bool APRSocket::isOpen(){ + return !closed; +} + +u_int8_t APRSocket::read(){ + char data[1]; + apr_size_t bytes = 1; + apr_status_t s = apr_socket_recv(socket, data, &bytes); + if(APR_STATUS_IS_EOF(s) || bytes == 0){ + return 0; + }else{ + return *data; + } +} + +APRSocket::~APRSocket(){ +} diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp new file mode 100644 index 0000000000..380318bcfa --- /dev/null +++ b/cpp/common/io/src/BlockingAPRAcceptor.cpp @@ -0,0 +1,81 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "BlockingAPRAcceptor.h" +#include "APRBase.h" +#include "APRThreadFactory.h" + +using namespace qpid::concurrent; +using namespace qpid::framing; +using namespace qpid::io; + +BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : connectionBacklog(c), + threadFactory(new APRThreadFactory()), + debug(_debug){ + + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); +} + +void BlockingAPRAcceptor::bind(int port, SessionHandlerFactory* factory){ + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, apr_pool)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + running = true; + std::cout << "Listening on port " << port << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, apr_pool); + if(status == APR_SUCCESS){ + //configure socket: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + + BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug); + session->init(factory->create(session)); + sessions.push_back(session); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } + + CHECK_APR_SUCCESS(apr_socket_close(socket)); +} + +BlockingAPRAcceptor::~BlockingAPRAcceptor(){ + delete threadFactory; + apr_pool_destroy(apr_pool); + APRBase::decrement(); +} + + +void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ + sessions.erase(find(sessions.begin(), sessions.end(), session)); + delete this; +} + diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp new file mode 100644 index 0000000000..99352c90d5 --- /dev/null +++ b/cpp/common/io/src/BlockingAPRSessionContext.cpp @@ -0,0 +1,177 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "BlockingAPRSessionContext.h" +#include "BlockingAPRAcceptor.h" +#include "APRBase.h" +#include "QpidError.h" + +using namespace qpid::concurrent; +using namespace qpid::framing; +using namespace qpid::io; + + +BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, + ThreadFactory* factory, + BlockingAPRAcceptor* _acceptor, + bool _debug) + : socket(_socket), + debug(_debug), + inbuf(65536), + outbuf(65536), + handler(0), + acceptor(_acceptor), + closed(false){ + + reader = new Reader(this); + writer = new Writer(this); + + rThread = factory->create(reader); + wThread = factory->create(writer); +} + +BlockingAPRSessionContext::~BlockingAPRSessionContext(){ + delete reader; + delete writer; + + delete rThread; + delete wThread; + + delete handler; +} + +void BlockingAPRSessionContext::read(){ + try{ + bool initiated(false); + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out, check closed on loop + }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ + closed = true; + }else{ + inbuf.move(bytes); + inbuf.flip(); + + if(!initiated){ + ProtocolInitiation* init = new ProtocolInitiation(); + if(init->decode(inbuf)){ + handler->initiated(init); + if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; + initiated = true; + } + }else{ + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; + handler->received(&frame); + } + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + + //close socket + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void BlockingAPRSessionContext::write(){ + while(!closed){ + //get next frame + outlock.acquire(); + while(outframes.empty() && !closed){ + outlock.wait(); + } + if(!closed){ + AMQFrame* frame = outframes.front(); + outframes.pop(); + outlock.release(); + + //encode + frame->encode(outbuf); + if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; + delete frame; + outbuf.flip(); + + //write from outbuf to socket + char* data = outbuf.start(); + const int available = outbuf.available(); + int written = 0; + apr_size_t bytes = available; + while(available > written){ + apr_status_t s = apr_socket_send(socket, data + written, &bytes); + written += bytes; + bytes = available - written; + } + outbuf.clear(); + }else{ + outlock.release(); + } + } +} + +void BlockingAPRSessionContext::send(AMQFrame* frame){ + if(!closed){ + outlock.acquire(); + bool was_empty(outframes.empty()); + outframes.push(frame); + if(was_empty){ + outlock.notify(); + } + outlock.release(); + }else{ + std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; + } +} + +void BlockingAPRSessionContext::init(SessionHandler* handler){ + this->handler = handler; + //start the threads + rThread->start(); + wThread->start(); +} + +void BlockingAPRSessionContext::close(){ + closed = true; + wThread->join(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; + handler->closed(); + acceptor->closed(this); + delete this; +} + +void BlockingAPRSessionContext::shutdown(){ + closed = true; + outlock.acquire(); + outlock.notify(); + outlock.release(); + + wThread->join(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + rThread->join(); + handler->closed(); + delete this; +} diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp new file mode 100644 index 0000000000..6653e926db --- /dev/null +++ b/cpp/common/io/src/LFAcceptor.cpp @@ -0,0 +1,80 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFAcceptor.h" +#include "APRBase.h" + +using namespace qpid::concurrent; +using namespace qpid::io; + +LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : processor(aprPool.pool, worker_threads, 1000, 5000000), + connectionBacklog(c), + max_connections_per_processor(m), + debug(_debug){ + +} + + +void LFAcceptor::bind(int port, SessionHandlerFactory* factory){ + apr_socket_t* socket; + apr_sockaddr_t* address; + CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, aprPool.pool)); + CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); + CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); + CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); + CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); + running = true; + processor.start(); + + std::cout << "Listening on port " << port << "..." << std::endl; + while(running){ + apr_socket_t* client; + apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); + if(status == APR_SUCCESS){ + //make this socket non-blocking: + CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); + CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); + LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug); + session->init(factory->create(session)); + }else{ + running = false; + if(status != APR_EINTR){ + std::cout << "ERROR: " << get_desc(status) << std::endl; + } + } + } + + processor.stop(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); +} + + +LFAcceptor::~LFAcceptor(){ +} + +LFAcceptor::APRPool::APRPool(){ + APRBase::increment(); + CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); +} + +LFAcceptor::APRPool::~APRPool(){ + apr_pool_destroy(pool); + APRBase::decrement(); +} diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp new file mode 100644 index 0000000000..8ef3543b8f --- /dev/null +++ b/cpp/common/io/src/LFProcessor.cpp @@ -0,0 +1,191 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFProcessor.h" +#include "APRBase.h" +#include "LFSessionContext.h" +#include "QpidError.h" +#include <sstream> + +using namespace qpid::io; +using namespace qpid::concurrent; +using qpid::QpidError; + +LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : size(_size), + timeout(_timeout), + signalledCount(0), + current(0), + count(0), + hasLeader(false), + workerCount(_workers), + workers(new Thread*[_workers]), + stopped(false){ + + CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); + //create & start the required number of threads + for(int i = 0; i < workerCount; i++){ + workers[i] = factory.create(this); + } +} + + +LFProcessor::~LFProcessor(){ + for(int i = 0; i < workerCount; i++){ + delete workers[i]; + } + delete[] workers; + CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); +} + +void LFProcessor::start(){ + for(int i = 0; i < workerCount; i++){ + workers[i]->start(); + } +} + +void LFProcessor::add(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); + countLock.acquire(); + sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); + count++; + countLock.release(); +} + +void LFProcessor::remove(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); + count--; + countLock.release(); +} + +void LFProcessor::reactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +void LFProcessor::deactivate(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); +} + +void LFProcessor::update(const apr_pollfd_t* const fd){ + CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); + CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); +} + +bool LFProcessor::full(){ + countLock.acquire(); + bool full = count == size; + countLock.release(); + return full; +} + +bool LFProcessor::empty(){ + countLock.acquire(); + bool empty = count == 0; + countLock.release(); + return empty; +} + +void LFProcessor::poll(){ + apr_status_t status; + do{ + current = 0; + if(!stopped){ + status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); + } + }while(status != APR_SUCCESS && !stopped); +} + +void LFProcessor::run(){ + try{ + while(!stopped){ + leadLock.acquire(); + waitToLead(); + if(!stopped){ + const apr_pollfd_t* evt = getNextEvent(); + if(evt){ + LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); + session->startProcessing(); + + relinquishLead(); + leadLock.release(); + + //process event: + if(evt->rtnevents & APR_POLLIN) session->read(); + if(evt->rtnevents & APR_POLLOUT) session->write(); + + if(session->isClosed()){ + session->handleClose(); + countLock.acquire(); + sessions.erase(find(sessions.begin(), sessions.end(), session)); + count--; + countLock.release(); + }else{ + session->stopProcessing(); + } + + }else{ + leadLock.release(); + } + }else{ + leadLock.release(); + } + } + }catch(QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void LFProcessor::waitToLead(){ + while(hasLeader && !stopped) leadLock.wait(); + hasLeader = !stopped; +} + +void LFProcessor::relinquishLead(){ + hasLeader = false; + leadLock.notify(); +} + +const apr_pollfd_t* LFProcessor::getNextEvent(){ + while(true){ + if(stopped){ + return 0; + }else if(current < signalledCount){ + //use result of previous poll if one is available + return signalledFDs + (current++); + }else{ + //else poll to get new events + poll(); + } + } +} + +void LFProcessor::stop(){ + stopped = true; + leadLock.acquire(); + leadLock.notifyAll(); + leadLock.release(); + + for(int i = 0; i < workerCount; i++){ + workers[i]->join(); + } + + for(iterator i = sessions.begin(); i < sessions.end(); i++){ + (*i)->shutdown(); + } +} + diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp new file mode 100644 index 0000000000..d786cb5e98 --- /dev/null +++ b/cpp/common/io/src/LFSessionContext.cpp @@ -0,0 +1,187 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include "QpidError.h" +#include <assert.h> + +using namespace qpid::concurrent; +using namespace qpid::io; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : socket(_socket), + processor(_processor), + initiated(false), + processing(false), + closing(false), + in(32768), + out(32768), + reading(0), + writing(0), + debug(_debug){ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + assert(!reading); // No concurrent read. + reading = APRThread::currentThread(); + + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation init; + if(init.decode(in)){ + handler->initiated(&init); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); + + reading = 0; +} + +void LFSessionContext::write(){ + assert(!writing); // No concurrent writes. + writing = APRThread::currentThread(); + + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + writing = 0; + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + writeLock.acquire(); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + writing = 0; + + if(closing){ + socket.close(); + } + } + writeLock.release(); + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + writeLock.acquire(); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } + writeLock.release(); +} + +void LFSessionContext::startProcessing(){ + writeLock.acquire(); + processing = true; + processor->deactivate(&fd); + writeLock.release(); +} + +void LFSessionContext::stopProcessing(){ + writeLock.acquire(); + processor->reactivate(&fd); + processing = false; + writeLock.release(); +} + +void LFSessionContext::close(){ + closing = true; + writeLock.acquire(); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } + writeLock.release(); +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* handler){ + this->handler = handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + logLock.acquire(); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + logLock.release(); +} + +APRMonitor LFSessionContext::logLock; |