diff options
author | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
commit | 8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch) | |
tree | 1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/common/io/src | |
parent | 9a808fb13aba243d41bbdab75158dae5939a80a4 (diff) | |
download | qpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz |
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/src')
-rw-r--r-- | cpp/common/io/src/APRConnector.cpp | 201 | ||||
-rw-r--r-- | cpp/common/io/src/APRSocket.cpp | 78 | ||||
-rw-r--r-- | cpp/common/io/src/Acceptor.cpp | 21 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRAcceptor.cpp | 101 | ||||
-rw-r--r-- | cpp/common/io/src/BlockingAPRSessionContext.cpp | 178 | ||||
-rw-r--r-- | cpp/common/io/src/LFAcceptor.cpp | 94 | ||||
-rw-r--r-- | cpp/common/io/src/LFProcessor.cpp | 193 | ||||
-rw-r--r-- | cpp/common/io/src/LFSessionContext.cpp | 189 |
8 files changed, 0 insertions, 1055 deletions
diff --git a/cpp/common/io/src/APRConnector.cpp b/cpp/common/io/src/APRConnector.cpp deleted file mode 100644 index 5f3bfd6957..0000000000 --- a/cpp/common/io/src/APRConnector.cpp +++ /dev/null @@ -1,201 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "APRBase.h" -#include "APRConnector.h" -#include "APRThreadFactory.h" -#include "QpidError.h" - -using namespace qpid::io; -using namespace qpid::concurrent; -using namespace qpid::framing; -using qpid::QpidError; - -APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) : - debug(_debug), - receive_buffer_size(buffer_size), - send_buffer_size(buffer_size), - closed(true), - lastIn(0), lastOut(0), - timeout(0), - idleIn(0), idleOut(0), - timeoutHandler(0), - shutdownHandler(0), - inbuf(receive_buffer_size), - outbuf(send_buffer_size){ - - APRBase::increment(); - - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool)); - - threadFactory = new APRThreadFactory(); - writeLock = new APRMonitor(); -} - -APRConnector::~APRConnector(){ - delete receiver; - delete writeLock; - delete threadFactory; - apr_pool_destroy(pool); - - APRBase::decrement(); -} - -void APRConnector::connect(const std::string& host, int port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool)); - CHECK_APR_SUCCESS(apr_socket_connect(socket, address)); - closed = false; - - receiver = threadFactory->create(this); - receiver->start(); -} - -void APRConnector::init(ProtocolInitiation* header){ - writeBlock(header); - delete header; -} - -void APRConnector::close(){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - receiver->join(); -} - -void APRConnector::setInputHandler(InputHandler* handler){ - input = handler; -} - -void APRConnector::setShutdownHandler(ShutdownHandler* handler){ - shutdownHandler = handler; -} - -OutputHandler* APRConnector::getOutputHandler(){ - return this; -} - -void APRConnector::send(AMQFrame* frame){ - writeBlock(frame); - if(debug) std::cout << "SENT: " << *frame << std::endl; - delete frame; -} - -void APRConnector::writeBlock(AMQDataBlock* data){ - writeLock->acquire(); - data->encode(outbuf); - - //transfer data to wire - outbuf.flip(); - writeToSocket(outbuf.start(), outbuf.available()); - outbuf.clear(); - writeLock->release(); -} - -void APRConnector::writeToSocket(char* data, size_t available){ - apr_size_t bytes(available); - apr_size_t written(0); - while(written < available && !closed){ - apr_status_t status = apr_socket_send(socket, data + written, &bytes); - if(status == APR_TIMEUP){ - std::cout << "Write request timed out." << std::endl; - } - if(bytes == 0){ - std::cout << "Write request wrote 0 bytes." << std::endl; - } - lastOut = apr_time_as_msec(apr_time_now()); - written += bytes; - bytes = available - written; - } -} - -void APRConnector::checkIdle(apr_status_t status){ - if(timeoutHandler){ - apr_time_t now = apr_time_as_msec(apr_time_now()); - if(APR_STATUS_IS_TIMEUP(status)){ - if(idleIn && (now - lastIn > idleIn)){ - timeoutHandler->idleIn(); - } - }else if(APR_STATUS_IS_EOF(status)){ - closed = true; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(shutdownHandler) shutdownHandler->shutdown(); - }else{ - lastIn = now; - } - if(idleOut && (now - lastOut > idleOut)){ - timeoutHandler->idleOut(); - } - } -} - -void APRConnector::setReadTimeout(u_int16_t t){ - idleIn = t * 1000;//t is in secs - if(idleIn && (!timeout || idleIn < timeout)){ - timeout = idleIn; - setSocketTimeout(); - } - -} - -void APRConnector::setWriteTimeout(u_int16_t t){ - idleOut = t * 1000;//t is in secs - if(idleOut && (!timeout || idleOut < timeout)){ - timeout = idleOut; - setSocketTimeout(); - } -} - -void APRConnector::setSocketTimeout(){ - //interval is in microseconds, timeout in milliseconds - //want the interval to be a bit shorter than the timeout, hence multiply - //by 800 rather than 1000. - apr_interval_time_t interval(timeout * 800); - apr_socket_timeout_set(socket, interval); -} - -void APRConnector::setTimeoutHandler(TimeoutHandler* handler){ - timeoutHandler = handler; -} - -void APRConnector::run(){ - try{ - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes)); - - if(bytes > 0){ - inbuf.move(bytes); - inbuf.flip();//position = 0, limit = total data read - - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: " << frame << std::endl; - input->received(&frame); - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} diff --git a/cpp/common/io/src/APRSocket.cpp b/cpp/common/io/src/APRSocket.cpp deleted file mode 100644 index 1ef7e270a3..0000000000 --- a/cpp/common/io/src/APRSocket.cpp +++ /dev/null @@ -1,78 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "APRBase.h" -#include "APRSocket.h" -#include <assert.h> -#include <iostream> - -using namespace qpid::io; -using namespace qpid::framing; -using namespace qpid::concurrent; - -APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ - -} - -void APRSocket::read(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - bytes = buffer.available(); - apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); - buffer.move(bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out - }else if(APR_STATUS_IS_EOF(s)){ - close(); - } -} - -void APRSocket::write(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - do{ - bytes = buffer.available(); - apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes); - // TODO aconway 2006-10-05: better error handling - assert(s == 0); - buffer.move(bytes); - }while(bytes > 0); -} - -void APRSocket::close(){ - if(!closed){ - std::cout << "Closing socket " << socket << "@" << this << std::endl; - CHECK_APR_SUCCESS(apr_socket_close(socket)); - closed = true; - } -} - -bool APRSocket::isOpen(){ - return !closed; -} - -u_int8_t APRSocket::read(){ - char data[1]; - apr_size_t bytes = 1; - apr_status_t s = apr_socket_recv(socket, data, &bytes); - if(APR_STATUS_IS_EOF(s) || bytes == 0){ - return 0; - }else{ - return *data; - } -} - -APRSocket::~APRSocket(){ -} diff --git a/cpp/common/io/src/Acceptor.cpp b/cpp/common/io/src/Acceptor.cpp deleted file mode 100644 index d1825c78fa..0000000000 --- a/cpp/common/io/src/Acceptor.cpp +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "Acceptor.h" - -qpid::io::Acceptor::~Acceptor() {} diff --git a/cpp/common/io/src/BlockingAPRAcceptor.cpp b/cpp/common/io/src/BlockingAPRAcceptor.cpp deleted file mode 100644 index 4c55b9e2c8..0000000000 --- a/cpp/common/io/src/BlockingAPRAcceptor.cpp +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <iostream> -#include "BlockingAPRAcceptor.h" -#include "APRBase.h" -#include "APRThreadFactory.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - -BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) : - debug(_debug), - threadFactory(new APRThreadFactory()), - connectionBacklog(c) -{ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL)); -} - -int16_t BlockingAPRAcceptor::bind(int16_t _port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); - return getPort(); -} - -int16_t BlockingAPRAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void BlockingAPRAcceptor::run(SessionHandlerFactory* factory) -{ - running = true; - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, apr_pool); - if(status == APR_SUCCESS){ - //configure socket: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - - BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug); - session->init(factory->create(session)); - sessions.push_back(session); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void BlockingAPRAcceptor::shutdown() -{ - // TODO aconway 2006-10-12: Not thread safe. - if (running) - { - running = false; - apr_socket_close(socket); // Don't check, exception safety. - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } - } -} - -BlockingAPRAcceptor::~BlockingAPRAcceptor(){ - delete threadFactory; - apr_pool_destroy(apr_pool); - APRBase::decrement(); -} - - -void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){ - sessions.erase(find(sessions.begin(), sessions.end(), session)); -} - diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp deleted file mode 100644 index 6d1dc3470c..0000000000 --- a/cpp/common/io/src/BlockingAPRSessionContext.cpp +++ /dev/null @@ -1,178 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <assert.h> -#include <iostream> -#include "BlockingAPRSessionContext.h" -#include "BlockingAPRAcceptor.h" -#include "APRBase.h" -#include "QpidError.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - - -BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, - ThreadFactory* factory, - BlockingAPRAcceptor* _acceptor, - bool _debug) - : socket(_socket), - debug(_debug), - handler(0), - acceptor(_acceptor), - inbuf(65536), - outbuf(65536), - closed(false){ - - reader = new Reader(this); - writer = new Writer(this); - - rThread = factory->create(reader); - wThread = factory->create(writer); -} - -BlockingAPRSessionContext::~BlockingAPRSessionContext(){ - delete reader; - delete writer; - - delete rThread; - delete wThread; - - delete handler; -} - -void BlockingAPRSessionContext::read(){ - try{ - bool initiated(false); - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out, check closed on loop - }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ - closed = true; - }else{ - inbuf.move(bytes); - inbuf.flip(); - - if(!initiated){ - ProtocolInitiation* protocolInit = new ProtocolInitiation(); - if(protocolInit->decode(inbuf)){ - handler->initiated(protocolInit); - if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; - initiated = true; - } - }else{ - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; - handler->received(&frame); - } - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - - //close socket - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void BlockingAPRSessionContext::write(){ - while(!closed){ - //get next frame - outlock.acquire(); - while(outframes.empty() && !closed){ - outlock.wait(); - } - if(!closed){ - AMQFrame* frame = outframes.front(); - outframes.pop(); - outlock.release(); - - //encode - frame->encode(outbuf); - if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; - delete frame; - outbuf.flip(); - - //write from outbuf to socket - char* data = outbuf.start(); - const int available = outbuf.available(); - int written = 0; - apr_size_t bytes = available; - while(available > written){ - apr_status_t s = apr_socket_send(socket, data + written, &bytes); - assert(s == 0); // TODO aconway 2006-10-05: Error Handling. - written += bytes; - bytes = available - written; - } - outbuf.clear(); - }else{ - outlock.release(); - } - } -} - -void BlockingAPRSessionContext::send(AMQFrame* frame){ - if(!closed){ - outlock.acquire(); - bool was_empty(outframes.empty()); - outframes.push(frame); - if(was_empty){ - outlock.notify(); - } - outlock.release(); - }else{ - std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; - } -} - -void BlockingAPRSessionContext::init(SessionHandler* _handler){ - handler = _handler; - rThread->start(); - wThread->start(); -} - -void BlockingAPRSessionContext::close(){ - closed = true; - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; - handler->closed(); - acceptor->closed(this); - delete this; -} - -void BlockingAPRSessionContext::shutdown(){ - closed = true; - outlock.acquire(); - outlock.notify(); - outlock.release(); - - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - rThread->join(); - handler->closed(); - delete this; -} diff --git a/cpp/common/io/src/LFAcceptor.cpp b/cpp/common/io/src/LFAcceptor.cpp deleted file mode 100644 index 86f382afac..0000000000 --- a/cpp/common/io/src/LFAcceptor.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "LFAcceptor.h" -#include "APRBase.h" - -using namespace qpid::concurrent; -using namespace qpid::io; - -LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) : - processor(aprPool.pool, worker_threads, 1000, 5000000), - max_connections_per_processor(m), - debug(_debug), - connectionBacklog(c) -{ } - - -int16_t LFAcceptor::bind(int16_t _port){ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool)); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool)); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog)); - return getPort(); -} - -int16_t LFAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void LFAcceptor::run(SessionHandlerFactory* factory) { - running = true; - processor.start(); - std::cout << "Listening on port " << getPort() << "..." << std::endl; - while(running){ - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug); - session->init(factory->create(session)); - }else{ - running = false; - if(status != APR_EINTR){ - std::cout << "ERROR: " << get_desc(status) << std::endl; - } - } - } - shutdown(); -} - -void LFAcceptor::shutdown() { - // TODO aconway 2006-10-12: Cleanup, this is not thread safe. - if (running) { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - } -} - - -LFAcceptor::~LFAcceptor(){} - -LFAcceptor::APRPool::APRPool(){ - APRBase::increment(); - CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL)); -} - -LFAcceptor::APRPool::~APRPool(){ - apr_pool_destroy(pool); - APRBase::decrement(); -} diff --git a/cpp/common/io/src/LFProcessor.cpp b/cpp/common/io/src/LFProcessor.cpp deleted file mode 100644 index 65d7451767..0000000000 --- a/cpp/common/io/src/LFProcessor.cpp +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "LFProcessor.h" -#include "APRBase.h" -#include "LFSessionContext.h" -#include "QpidError.h" -#include <sstream> - -using namespace qpid::io; -using namespace qpid::concurrent; -using qpid::QpidError; - -// TODO aconway 2006-10-12: stopped is read outside locks. -// - -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : - size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - workerCount(_workers), - hasLeader(false), - workers(new Thread*[_workers]), - stopped(false) -{ - - CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); - //create & start the required number of threads - for(int i = 0; i < workerCount; i++){ - workers[i] = factory.create(this); - } -} - - -LFProcessor::~LFProcessor(){ - if (!stopped) stop(); - for(int i = 0; i < workerCount; i++){ - delete workers[i]; - } - delete[] workers; - CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); -} - -void LFProcessor::start(){ - for(int i = 0; i < workerCount; i++){ - workers[i]->start(); - } -} - -void LFProcessor::add(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - countLock.acquire(); - sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); - count++; - countLock.release(); -} - -void LFProcessor::remove(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); - count--; - countLock.release(); -} - -void LFProcessor::reactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -void LFProcessor::deactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); -} - -void LFProcessor::update(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -bool LFProcessor::full(){ - Locker locker(countLock); - return count == size; -} - -bool LFProcessor::empty(){ - Locker locker(countLock); - return count == 0; -} - -void LFProcessor::poll(){ - apr_status_t status; - do{ - current = 0; - if(!stopped){ - status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); - } - }while(status != APR_SUCCESS && !stopped); -} - -void LFProcessor::run(){ - try{ - while(!stopped){ - leadLock.acquire(); - waitToLead(); - if(!stopped){ - const apr_pollfd_t* evt = getNextEvent(); - if(evt){ - LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data); - session->startProcessing(); - - relinquishLead(); - leadLock.release(); - - //process event: - if(evt->rtnevents & APR_POLLIN) session->read(); - if(evt->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - countLock.acquire(); - sessions.erase(find(sessions.begin(), sessions.end(), session)); - count--; - countLock.release(); - }else{ - session->stopProcessing(); - } - - }else{ - leadLock.release(); - } - }else{ - leadLock.release(); - } - } - }catch(QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void LFProcessor::waitToLead(){ - while(hasLeader && !stopped) leadLock.wait(); - hasLeader = !stopped; -} - -void LFProcessor::relinquishLead(){ - hasLeader = false; - leadLock.notify(); -} - -const apr_pollfd_t* LFProcessor::getNextEvent(){ - while(true){ - if(stopped){ - return 0; - }else if(current < signalledCount){ - //use result of previous poll if one is available - return signalledFDs + (current++); - }else{ - //else poll to get new events - poll(); - } - } -} - -void LFProcessor::stop(){ - stopped = true; - leadLock.acquire(); - leadLock.notifyAll(); - leadLock.release(); - - for(int i = 0; i < workerCount; i++){ - workers[i]->join(); - } - - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } -} - diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp deleted file mode 100644 index 7b8208f704..0000000000 --- a/cpp/common/io/src/LFSessionContext.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include "LFSessionContext.h" -#include "APRBase.h" -#include "QpidError.h" -#include <assert.h> - -using namespace qpid::concurrent; -using namespace qpid::io; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(32768), - out(32768), - processor(_processor), - processing(false), - closing(false), - reading(0), - writing(0) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - assert(!reading); // No concurrent read. - reading = APRThread::currentThread(); - - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - while(frame.decode(in)){ - if(debug) log("RECV", &frame); - handler->received(&frame); - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); - initiated = true; - if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; - } - } - in.compact(); - - reading = 0; -} - -void LFSessionContext::write(){ - assert(!writing); // No concurrent writes. - writing = APRThread::currentThread(); - - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - writing = 0; - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - writeLock.acquire(); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ - encoded = true; - frame->encode(out); - if(debug) log("SENT", frame); - delete frame; - framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - writing = 0; - - if(closing){ - socket.close(); - } - } - writeLock.release(); - } - } -} - -void LFSessionContext::send(AMQFrame* frame){ - writeLock.acquire(); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } - writeLock.release(); -} - -void LFSessionContext::startProcessing(){ - writeLock.acquire(); - processing = true; - processor->deactivate(&fd); - writeLock.release(); -} - -void LFSessionContext::stopProcessing(){ - writeLock.acquire(); - processor->reactivate(&fd); - processing = false; - writeLock.release(); -} - -void LFSessionContext::close(){ - closing = true; - writeLock.acquire(); - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } - writeLock.release(); -} - -void LFSessionContext::handleClose(){ - handler->closed(); - std::cout << "Session closed [" << &socket << "]" << std::endl; - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(SessionHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - logLock.acquire(); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; - logLock.release(); -} - -APRMonitor LFSessionContext::logLock; |