diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/sys/AsynchIOHandler.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 54 |
1 files changed, 16 insertions, 38 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index 2e117a3fb7..cf08b482e6 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -51,15 +51,15 @@ struct ProtocolTimeoutTask : public sys::TimerTask { } }; -AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) : +AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) : identifier(id), aio(0), factory(f), codec(0), reads(0), readError(false), - isClient(false), - readCredit(InfiniteCredit) + isClient(isClient0), + nodict(nodict0) {} AsynchIOHandler::~AsynchIOHandler() { @@ -97,25 +97,20 @@ void AsynchIOHandler::abort() { if (!readError) { aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1)); } + aio->queueWriteClose(); } void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } -// Input side -void AsynchIOHandler::giveReadCredit(int32_t credit) { - // Check whether we started in the don't about credit state - if (readCredit.boolCompareAndSwap(InfiniteCredit, credit)) - return; - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (readCredit.fetchAndAdd(credit) != 0) - return; - assert(readCredit.get() >= 0); - if (readCredit.get() != 0) - aio->startReading(); +namespace { + SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict) + { + SecuritySettings settings = aio->getSecuritySettings(); + settings.nodict = nodict; + return settings; + } } void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { @@ -123,26 +118,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { return; } - // Check here for read credit - if (readCredit.get() != InfiniteCredit) { - if (readCredit.get() == 0) { - // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups". - // readbuff is sometimes called with no credit. - // This should be fixed somewhere else to avoid such calls. - aio->unread(buff); - return; - } - // TODO In theory should be able to use an atomic operation before taking the lock - // but in practice there seems to be an unexplained race in that case - ScopedLock<Mutex> l(creditLock); - if (--readCredit == 0) { - assert(readCredit.get() >= 0); - if (readCredit.get() == 0) { - aio->stopReading(); - } - } - } - ++reads; size_t decoded = 0; if (codec) { // Already initiated @@ -168,13 +143,16 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")"); try { - codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings()); + codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict)); if (!codec) { //TODO: may still want to revise this... //send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; aio->queueWriteClose(); + } else { + //read any further data that may already have been sent + decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition()); } } catch (const std::exception& e) { QPID_LOG(error, e.what()); @@ -223,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) { void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { - codec = factory->create(*this, identifier, SecuritySettings()); + codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict)); write(framing::ProtocolInitiation(codec->getVersion())); // We've just sent the protocol negotiation so we can cancel the timeout for that // This is not ideal, because we've not received anything yet, but heartbeats will |