diff options
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 54 |
1 files changed, 16 insertions, 38 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 2e117a3fb7..cf08b482e6 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -51,15 +51,15 @@ struct ProtocolTimeoutTask : public sys::TimerTask { } }; -AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) : +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) : identifier(id), aio(0), factory(f), codec(0), reads(0), readError(false), - isClient(false), - readCredit(InfiniteCredit) + isClient(isClient0), + nodict(nodict0) {} AsynchIOHandler::~AsynchIOHandler() { @@ -97,25 +97,20 @@ void AsynchIOHandler::abort() { if (!readError) { aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1)); } + aio->queueWriteClose(); } void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -// Input side -void AsynchIOHandler::giveReadCredit(int32_t credit) { - // Check whether we started in the don't about credit state - if (readCredit.boolCompareAndSwap(InfiniteCredit, credit)) - return; - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (readCredit.fetchAndAdd(credit) != 0) - return; - assert(readCredit.get() >= 0); - if (readCredit.get() != 0) - aio->startReading(); +namespace { + SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict) + { + SecuritySettings settings = aio->getSecuritySettings(); + settings.nodict = nodict; + return settings; + } } void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { @@ -123,26 +118,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { return; } - // Check here for read credit - if (readCredit.get() != InfiniteCredit) { - if (readCredit.get() == 0) { - // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups". - // readbuff is sometimes called with no credit. - // This should be fixed somewhere else to avoid such calls. - aio->unread(buff); - return; - } - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (--readCredit == 0) { - assert(readCredit.get() >= 0); - if (readCredit.get() == 0) { - aio->stopReading(); - } - } - } - ++reads; size_t decoded = 0; if (codec) { // Already initiated @@ -168,13 +143,16 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { - codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); + codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict)); if (!codec) { //TODO: may still want to revise this... //send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; aio->queueWriteClose(); + } else { + //read any further data that may already have been sent + decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition()); } } catch (const std::exception& e) { QPID_LOG(error, e.what()); @@ -223,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { - codec = factory->create(*this, identifier, SecuritySettings()); + codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict)); write(framing::ProtocolInitiation(codec->getVersion())); // We've just sent the protocol negotiation so we can cancel the timeout for that // This is not ideal, because we've not received anything yet, but heartbeats will |