diff options
Diffstat (limited to 'cpp/src/qpid/apr/LFSessionContext.cpp')
-rw-r--r-- | cpp/src/qpid/apr/LFSessionContext.cpp | 173 |
1 files changed, 0 insertions, 173 deletions
diff --git a/cpp/src/qpid/apr/LFSessionContext.cpp b/cpp/src/qpid/apr/LFSessionContext.cpp deleted file mode 100644 index 7dc70ca245..0000000000 --- a/cpp/src/qpid/apr/LFSessionContext.cpp +++ /dev/null @@ -1,173 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "LFSessionContext.h" -#include "APRBase.h" -#include <qpid/QpidError.h> -#include <assert.h> - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(65536), - out(65536), - processor(_processor), - processing(false), - closing(false) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - while(frame.decode(in)){ - if(debug) log("RECV", &frame); - handler->received(&frame); - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(&protocolInit); - initiated = true; - if(debug) std::cout << "INIT [" << &socket << "]" << std::endl; - } - } - in.compact(); -} - -void LFSessionContext::write(){ - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - Mutex::ScopedLock l(writeLock); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ - encoded = true; - frame->encode(out); - if(debug) log("SENT", frame); - delete frame; - framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - if(closing){ - socket.close(); - } - } - } - } -} - -void LFSessionContext::send(AMQFrame* frame){ - Mutex::ScopedLock l(writeLock); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } -} - -void LFSessionContext::startProcessing(){ - Mutex::ScopedLock l(writeLock); - processing = true; - processor->deactivate(&fd); -} - -void LFSessionContext::stopProcessing(){ - Mutex::ScopedLock l(writeLock); - processor->reactivate(&fd); - processing = false; -} - -void LFSessionContext::close(){ - closing = true; - Mutex::ScopedLock l(writeLock); - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } -} - -void LFSessionContext::handleClose(){ - handler->closed(); - std::cout << "Session closed [" << &socket << "]" << std::endl; - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(SessionHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - -void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){ - Mutex::ScopedLock l(logLock); - std::cout << desc << " [" << &socket << "]: " << *frame << std::endl; -} - -Mutex LFSessionContext::logLock; |