summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/Connector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/Connector.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/Connector.cpp297
1 files changed, 0 insertions, 297 deletions
diff --git a/qpid/cpp/src/qpid/client/Connector.cpp b/qpid/cpp/src/qpid/client/Connector.cpp
deleted file mode 100644
index a0be05fbbc..0000000000
--- a/qpid/cpp/src/qpid/client/Connector.cpp
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <iostream>
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/AMQFrame.h"
-#include "Connector.h"
-
-#include "qpid/sys/AsynchIO.h"
-#include "qpid/sys/Dispatcher.h"
-#include "qpid/sys/Poller.h"
-#include "qpid/Msg.h"
-#include <boost/bind.hpp>
-#include <boost/format.hpp>
-
-namespace qpid {
-namespace client {
-
-using namespace qpid::sys;
-using namespace qpid::framing;
-using boost::format;
-using boost::str;
-
-Connector::Connector(
- ProtocolVersion ver, bool _debug, uint32_t buffer_size
-) : debug(_debug),
- receive_buffer_size(buffer_size),
- send_buffer_size(buffer_size),
- version(ver),
- closed(true),
- joined(true),
- timeout(0),
- idleIn(0), idleOut(0),
- timeoutHandler(0),
- shutdownHandler(0),
- aio(0)
-{}
-
-Connector::~Connector() {
- close();
-}
-
-void Connector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(closedLock);
- assert(closed);
- socket.connect(host, port);
- identifier=str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress());
- closed = false;
- poller = Poller::shared_ptr(new Poller);
- aio = new AsynchIO(socket,
- boost::bind(&Connector::readbuff, this, _1, _2),
- boost::bind(&Connector::eof, this, _1),
- boost::bind(&Connector::eof, this, _1),
- 0, // closed
- 0, // nobuffs
- boost::bind(&Connector::writebuff, this, _1));
- writer.setAio(aio);
-}
-
-void Connector::init(){
- Mutex::ScopedLock l(closedLock);
- assert(joined);
- ProtocolInitiation init(version);
- writeDataBlock(init);
- joined = false;
- receiver = Thread(this);
-}
-
-bool Connector::closeInternal() {
- Mutex::ScopedLock l(closedLock);
- bool ret = !closed;
- if (!closed) {
- closed = true;
- poller->shutdown();
- }
- if (!joined && receiver.id() != Thread::current().id()) {
- joined = true;
- Mutex::ScopedUnlock u(closedLock);
- receiver.join();
- }
- return ret;
-}
-
-void Connector::close() {
- closeInternal();
-}
-
-void Connector::setInputHandler(InputHandler* handler){
- input = handler;
-}
-
-void Connector::setShutdownHandler(ShutdownHandler* handler){
- shutdownHandler = handler;
-}
-
-OutputHandler* Connector::getOutputHandler(){
- return this;
-}
-
-void Connector::send(AMQFrame& frame) {
- writer.handle(frame);
-}
-
-void Connector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
-// TODO: astitcher 20070908: This version of the code can never time out, so the idle processing
-// can never be called. The timeut processing needs to be added into the underlying Dispatcher code
-//
-// TODO: astitcher 20070908: EOF is dealt with separately now via a callback to eof
-void Connector::checkIdle(ssize_t status){
- if(timeoutHandler){
- AbsTime t = now();
- if(status == Socket::SOCKET_TIMEOUT) {
- if(idleIn && (Duration(lastIn, t) > idleIn)){
- timeoutHandler->idleIn();
- }
- }
- else if(status == 0 || status == Socket::SOCKET_EOF) {
- handleClosed();
- }
- else {
- lastIn = t;
- }
- if(idleOut && (Duration(lastOut, t) > idleOut)){
- timeoutHandler->idleOut();
- }
- }
-}
-
-void Connector::setReadTimeout(uint16_t t){
- idleIn = t * TIME_SEC;//t is in secs
- if(idleIn && (!timeout || idleIn < timeout)){
- timeout = idleIn;
- setSocketTimeout();
- }
-
-}
-
-void Connector::setWriteTimeout(uint16_t t){
- idleOut = t * TIME_SEC;//t is in secs
- if(idleOut && (!timeout || idleOut < timeout)){
- timeout = idleOut;
- setSocketTimeout();
- }
-}
-
-void Connector::setSocketTimeout(){
- socket.setTimeout(timeout);
-}
-
-void Connector::setTimeoutHandler(TimeoutHandler* handler){
- timeoutHandler = handler;
-}
-
-struct Connector::Buff : public AsynchIO::BufferBase {
- Buff() : AsynchIO::BufferBase(new char[65536], 65536) {}
- ~Buff() { delete [] bytes;}
-};
-
-Connector::Writer::Writer() : aio(0), buffer(0), lastEof(0)
-{
-}
-
-Connector::Writer::~Writer() { delete buffer; }
-
-void Connector::Writer::setAio(sys::AsynchIO* a) {
- Mutex::ScopedLock l(lock);
- aio = a;
- newBuffer(l);
- identifier = str(format("[%1% %2%]") % aio->getSocket().getLocalPort() % aio->getSocket().getPeerAddress());
-}
-
-void Connector::Writer::handle(framing::AMQFrame& frame) {
- Mutex::ScopedLock l(lock);
- frames.push_back(frame);
- if (frame.getEof()) {
- lastEof = frames.size();
- aio->notifyPendingWrite();
- }
- QPID_LOG(trace, "SENT " << identifier << ": " << frame);
-}
-
-void Connector::Writer::writeOne(const Mutex::ScopedLock& l) {
- assert(buffer);
- QPID_LOG(trace, "Write buffer " << encode.getPosition()
- << " bytes " << framesEncoded << " frames ");
- framesEncoded = 0;
-
- buffer->dataStart = 0;
- buffer->dataCount = encode.getPosition();
- aio->queueWrite(buffer);
- newBuffer(l);
-}
-
-void Connector::Writer::newBuffer(const Mutex::ScopedLock&) {
- buffer = aio->getQueuedBuffer();
- if (!buffer) buffer = new Buff();
- encode = framing::Buffer(buffer->bytes, buffer->byteCount);
- framesEncoded = 0;
-}
-
-// Called in IO thread.
-void Connector::Writer::write(sys::AsynchIO&) {
- Mutex::ScopedLock l(lock);
- assert(buffer);
- for (size_t i = 0; i < lastEof; ++i) {
- AMQFrame& frame = frames[i];
- if (frame.size() > encode.available()) writeOne(l);
- assert(frame.size() <= encode.available());
- frame.encode(encode);
- ++framesEncoded;
- }
- frames.erase(frames.begin(), frames.begin()+lastEof);
- lastEof = 0;
- if (encode.getPosition() > 0) writeOne(l);
-}
-
-void Connector::readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff) {
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
-
- AMQFrame frame;
- while(frame.decode(in)){
- QPID_LOG(trace, "RECV " << identifier << ": " << frame);
- input->received(frame);
- }
- // TODO: unreading needs to go away, and when we can cope
- // with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
- // Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
- aio.unread(buff);
- } else {
- // Give whole buffer back to aio subsystem
- aio.queueReadBuffer(buff);
- }
-}
-
-void Connector::writebuff(AsynchIO& aio_) {
- writer.write(aio_);
-}
-
-void Connector::writeDataBlock(const AMQDataBlock& data) {
- AsynchIO::BufferBase* buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- data.encode(out);
- buff->dataCount = data.size();
- aio->queueWrite(buff);
-}
-
-void Connector::eof(AsynchIO&) {
- handleClosed();
-}
-
-// TODO: astitcher 20070908 This version of the code can never time out, so the idle processing
-// will never be called
-void Connector::run(){
- try {
- Dispatcher d(poller);
-
- for (int i = 0; i < 32; i++) {
- aio->queueReadBuffer(new Buff);
- }
-
- aio->start(poller);
- d.run();
- aio->queueForDeletion();
- socket.close();
- } catch (const std::exception& e) {
- QPID_LOG(error, e.what());
- handleClosed();
- }
-}
-
-
-}} // namespace qpid::client