diff options
Diffstat (limited to 'cpp/common/io/src/BlockingAPRSessionContext.cpp')
-rw-r--r-- | cpp/common/io/src/BlockingAPRSessionContext.cpp | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/cpp/common/io/src/BlockingAPRSessionContext.cpp b/cpp/common/io/src/BlockingAPRSessionContext.cpp new file mode 100644 index 0000000000..99352c90d5 --- /dev/null +++ b/cpp/common/io/src/BlockingAPRSessionContext.cpp @@ -0,0 +1,177 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <iostream> +#include "BlockingAPRSessionContext.h" +#include "BlockingAPRAcceptor.h" +#include "APRBase.h" +#include "QpidError.h" + +using namespace qpid::concurrent; +using namespace qpid::framing; +using namespace qpid::io; + + +BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket, + ThreadFactory* factory, + BlockingAPRAcceptor* _acceptor, + bool _debug) + : socket(_socket), + debug(_debug), + inbuf(65536), + outbuf(65536), + handler(0), + acceptor(_acceptor), + closed(false){ + + reader = new Reader(this); + writer = new Writer(this); + + rThread = factory->create(reader); + wThread = factory->create(writer); +} + +BlockingAPRSessionContext::~BlockingAPRSessionContext(){ + delete reader; + delete writer; + + delete rThread; + delete wThread; + + delete handler; +} + +void BlockingAPRSessionContext::read(){ + try{ + bool initiated(false); + while(!closed){ + apr_size_t bytes(inbuf.available()); + if(bytes < 1){ + THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size."); + } + apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes); + if(APR_STATUS_IS_TIMEUP(s)){ + //timed out, check closed on loop + }else if(APR_STATUS_IS_EOF(s) || bytes == 0){ + closed = true; + }else{ + inbuf.move(bytes); + inbuf.flip(); + + if(!initiated){ + ProtocolInitiation* init = new ProtocolInitiation(); + if(init->decode(inbuf)){ + handler->initiated(init); + if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl; + initiated = true; + } + }else{ + AMQFrame frame; + while(frame.decode(inbuf)){ + if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl; + handler->received(&frame); + } + } + //need to compact buffer to preserve any 'extra' data + inbuf.compact(); + } + } + + //close socket + }catch(qpid::QpidError error){ + std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl; + } +} + +void BlockingAPRSessionContext::write(){ + while(!closed){ + //get next frame + outlock.acquire(); + while(outframes.empty() && !closed){ + outlock.wait(); + } + if(!closed){ + AMQFrame* frame = outframes.front(); + outframes.pop(); + outlock.release(); + + //encode + frame->encode(outbuf); + if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl; + delete frame; + outbuf.flip(); + + //write from outbuf to socket + char* data = outbuf.start(); + const int available = outbuf.available(); + int written = 0; + apr_size_t bytes = available; + while(available > written){ + apr_status_t s = apr_socket_send(socket, data + written, &bytes); + written += bytes; + bytes = available - written; + } + outbuf.clear(); + }else{ + outlock.release(); + } + } +} + +void BlockingAPRSessionContext::send(AMQFrame* frame){ + if(!closed){ + outlock.acquire(); + bool was_empty(outframes.empty()); + outframes.push(frame); + if(was_empty){ + outlock.notify(); + } + outlock.release(); + }else{ + std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl; + } +} + +void BlockingAPRSessionContext::init(SessionHandler* handler){ + this->handler = handler; + //start the threads + rThread->start(); + wThread->start(); +} + +void BlockingAPRSessionContext::close(){ + closed = true; + wThread->join(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl; + handler->closed(); + acceptor->closed(this); + delete this; +} + +void BlockingAPRSessionContext::shutdown(){ + closed = true; + outlock.acquire(); + outlock.notify(); + outlock.release(); + + wThread->join(); + CHECK_APR_SUCCESS(apr_socket_close(socket)); + rThread->join(); + handler->closed(); + delete this; +} |