diff options
author | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2006-09-19 22:06:50 +0000 |
commit | 913489deb2ee9dbf44455de5f407ddaf4bd8c540 (patch) | |
tree | 7ea442d6867d0076f1c9ea4f4265664059e7aff5 /cpp/common/io/src/LFSessionContext.cpp | |
download | qpid-python-913489deb2ee9dbf44455de5f407ddaf4bd8c540.tar.gz |
Import of qpid from etp:
URL: https://etp.108.redhat.com/svn/etp/trunk/blaze
Repository Root: https://etp.108.redhat.com/svn/etp
Repository UUID: 06e15bec-b515-0410-bef0-cc27a458cf48
Revision: 608
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@447994 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/common/io/src/LFSessionContext.cpp')
-rw-r--r-- | cpp/common/io/src/LFSessionContext.cpp | 187 |
1 files changed, 187 insertions, 0 deletions
diff --git a/cpp/common/io/src/LFSessionContext.cpp b/cpp/common/io/src/LFSessionContext.cpp new file mode 100644 index 0000000000..d786cb5e98 --- /dev/null +++ b/cpp/common/io/src/LFSessionContext.cpp @@ -0,0 +1,187 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include "LFSessionContext.h" +#include "APRBase.h" +#include "QpidError.h" +#include <assert.h> + +using namespace qpid::concurrent; +using namespace qpid::io; +using namespace qpid::framing; + +LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, + LFProcessor* const _processor, + bool _debug) : socket(_socket), + processor(_processor), + initiated(false), + processing(false), + closing(false), + in(32768), + out(32768), + reading(0), + writing(0), + debug(_debug){ + + fd.p = _pool; + fd.desc_type = APR_POLL_SOCKET; + fd.reqevents = APR_POLLIN; + fd.client_data = this; + fd.desc.s = _socket; + + out.flip(); +} + +LFSessionContext::~LFSessionContext(){ + +} + +void LFSessionContext::read(){ + assert(!reading); // No concurrent read. + reading = APRThread::currentThread(); + + socket.read(in); + in.flip(); + if(initiated){ + AMQFrame frame; + while(frame.decode(in)){ + if(debug) log("RECV", &frame); + handler->received(&frame); + } + }else{ + ProtocolInitiation init; + if(init.decode(in)){ + handler->initiated(&init); + initiated = true; + if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; + } + } + in.compact(); + + reading = 0; +} + +void LFSessionContext::write(){ + assert(!writing); // No concurrent writes. + writing = APRThread::currentThread(); + + bool done = isClosed(); + while(!done){ + if(out.available() > 0){ + socket.write(out); + if(out.available() > 0){ + writing = 0; + + //incomplete write, leave flags to receive notification of readiness to write + done = true;//finished processing for now, but write is still in progress + } + }else{ + //do we have any frames to write? + writeLock.acquire(); + if(!framesToWrite.empty()){ + out.clear(); + bool encoded(false); + AMQFrame* frame = framesToWrite.front(); + while(frame && out.available() >= frame->size()){ + encoded = true; + frame->encode(out); + if(debug) log("SENT", frame); + delete frame; + framesToWrite.pop(); + frame = framesToWrite.empty() ? 0 : framesToWrite.front(); + } + if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); + out.flip(); + }else{ + //reset flags, don't care about writability anymore + fd.reqevents = APR_POLLIN; + done = true; + + writing = 0; + + if(closing){ + socket.close(); + } + } + writeLock.release(); + } + } +} + +void LFSessionContext::send(AMQFrame* frame){ + writeLock.acquire(); + if(!closing){ + framesToWrite.push(frame); + if(!(fd.reqevents & APR_POLLOUT)){ + fd.reqevents |= APR_POLLOUT; + if(!processing){ + processor->update(&fd); + } + } + } + writeLock.release(); +} + +void LFSessionContext::startProcessing(){ + writeLock.acquire(); + processing = true; + processor->deactivate(&fd); + writeLock.release(); +} + +void LFSessionContext::stopProcessing(){ + writeLock.acquire(); + processor->reactivate(&fd); + processing = false; + writeLock.release(); +} + +void LFSessionContext::close(){ + closing = true; + writeLock.acquire(); + if(!processing){ + //allow pending frames to be written to socket + fd.reqevents = APR_POLLOUT; + processor->update(&fd); + } + writeLock.release(); +} + +void LFSessionContext::handleClose(){ + handler->closed(); + std::cout << "Session closed [" << &socket << "]" << std::endl; + delete handler; + delete this; +} + +void LFSessionContext::shutdown(){ + socket.close(); + handleClose(); +} + +void LFSessionContext::init(SessionHandler* handler){ + this->handler = handler; + processor->add(&fd); +} + +void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ + logLock.acquire(); + std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; + logLock.release(); +} + +APRMonitor LFSessionContext::logLock; |