diff options
author | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2006-10-16 13:50:26 +0000 |
commit | 8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch) | |
tree | 1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/common/io/src/BlockingAPRSessionContext.cpp | |
parent | 9a808fb13aba243d41bbdab75158dae5939a80a4 (diff) | |
download | qpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz |
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/src/BlockingAPRSessionContext.cpp')
-rw-r--r-- | cpp/common/io/src/BlockingAPRSessionContext.cpp | 178 |
1 files changed, 0 insertions, 178 deletions
diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp deleted file mode 100644 index 6d1dc3470c..0000000000 --- a/cpp/common/io/src/BlockingAPRSessionContext.cpp +++ /dev/null @@ -1,178 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#include <assert.h> -#include <iostream> -#include "BlockingAPRSessionContext.h" -#include "BlockingAPRAcceptor.h" -#include "APRBase.h" -#include "QpidError.h" - -using namespace qpid::concurrent; -using namespace qpid::framing; -using namespace qpid::io; - - -BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, - ThreadFactory* factory, - BlockingAPRAcceptor* _acceptor, - bool _debug) - : socket(_socket), - debug(_debug), - handler(0), - acceptor(_acceptor), - inbuf(65536), - outbuf(65536), - closed(false){ - - reader = new Reader(this); - writer = new Writer(this); - - rThread = factory->create(reader); - wThread = factory->create(writer); -} - -BlockingAPRSessionContext::~BlockingAPRSessionContext(){ - delete reader; - delete writer; - - delete rThread; - delete wThread; - - delete handler; -} - -void BlockingAPRSessionContext::read(){ - try{ - bool initiated(false); - while(!closed){ - apr_size_t bytes(inbuf.available()); - if(bytes < 1){ - THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); - } - apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out, check closed on loop - }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ - closed = true; - }else{ - inbuf.move(bytes); - inbuf.flip(); - - if(!initiated){ - ProtocolInitiation* protocolInit = new ProtocolInitiation(); - if(protocolInit->decode(inbuf)){ - handler->initiated(protocolInit); - if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; - initiated = true; - } - }else{ - AMQFrame frame; - while(frame.decode(inbuf)){ - if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; - handler->received(&frame); - } - } - //need to compact buffer to preserve any 'extra' data - inbuf.compact(); - } - } - - //close socket - }catch(qpid::QpidError error){ - std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; - } -} - -void BlockingAPRSessionContext::write(){ - while(!closed){ - //get next frame - outlock.acquire(); - while(outframes.empty() && !closed){ - outlock.wait(); - } - if(!closed){ - AMQFrame* frame = outframes.front(); - outframes.pop(); - outlock.release(); - - //encode - frame->encode(outbuf); - if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; - delete frame; - outbuf.flip(); - - //write from outbuf to socket - char* data = outbuf.start(); - const int available = outbuf.available(); - int written = 0; - apr_size_t bytes = available; - while(available > written){ - apr_status_t s = apr_socket_send(socket, data + written, &bytes); - assert(s == 0); // TODO aconway 2006-10-05: Error Handling. - written += bytes; - bytes = available - written; - } - outbuf.clear(); - }else{ - outlock.release(); - } - } -} - -void BlockingAPRSessionContext::send(AMQFrame* frame){ - if(!closed){ - outlock.acquire(); - bool was_empty(outframes.empty()); - outframes.push(frame); - if(was_empty){ - outlock.notify(); - } - outlock.release(); - }else{ - std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; - } -} - -void BlockingAPRSessionContext::init(SessionHandler* _handler){ - handler = _handler; - rThread->start(); - wThread->start(); -} - -void BlockingAPRSessionContext::close(){ - closed = true; - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; - handler->closed(); - acceptor->closed(this); - delete this; -} - -void BlockingAPRSessionContext::shutdown(){ - closed = true; - outlock.acquire(); - outlock.notify(); - outlock.release(); - - wThread->join(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - rThread->join(); - handler->closed(); - delete this; -} |