summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/AsynchIOHandler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/AsynchIOHandler.cpp')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp54
1 files changed, 16 insertions, 38 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 2e117a3fb7..cf08b482e6 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -51,15 +51,15 @@ struct ProtocolTimeoutTask : public sys::TimerTask {
}
};
-AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f) :
+AsynchIOHandler::AsynchIOHandler(const std::string& id, ConnectionCodec::Factory* f, bool isClient0, bool nodict0) :
identifier(id),
aio(0),
factory(f),
codec(0),
reads(0),
readError(false),
- isClient(false),
- readCredit(InfiniteCredit)
+ isClient(isClient0),
+ nodict(nodict0)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -97,25 +97,20 @@ void AsynchIOHandler::abort() {
if (!readError) {
aio->requestCallback(boost::bind(&AsynchIOHandler::eof, this, _1));
}
+ aio->queueWriteClose();
}
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
-// Input side
-void AsynchIOHandler::giveReadCredit(int32_t credit) {
- // Check whether we started in the don't about credit state
- if (readCredit.boolCompareAndSwap(InfiniteCredit, credit))
- return;
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (readCredit.fetchAndAdd(credit) != 0)
- return;
- assert(readCredit.get() >= 0);
- if (readCredit.get() != 0)
- aio->startReading();
+namespace {
+ SecuritySettings getSecuritySettings(AsynchIO* aio, bool nodict)
+ {
+ SecuritySettings settings = aio->getSecuritySettings();
+ settings.nodict = nodict;
+ return settings;
+ }
}
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
@@ -123,26 +118,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
return;
}
- // Check here for read credit
- if (readCredit.get() != InfiniteCredit) {
- if (readCredit.get() == 0) {
- // FIXME aconway 2009-10-01: Workaround to avoid "false wakeups".
- // readbuff is sometimes called with no credit.
- // This should be fixed somewhere else to avoid such calls.
- aio->unread(buff);
- return;
- }
- // TODO In theory should be able to use an atomic operation before taking the lock
- // but in practice there seems to be an unexplained race in that case
- ScopedLock<Mutex> l(creditLock);
- if (--readCredit == 0) {
- assert(readCredit.get() >= 0);
- if (readCredit.get() == 0) {
- aio->stopReading();
- }
- }
- }
-
++reads;
size_t decoded = 0;
if (codec) { // Already initiated
@@ -168,13 +143,16 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
QPID_LOG(debug, "RECV [" << identifier << "]: INIT(" << protocolInit << ")");
try {
- codec = factory->create(protocolInit.getVersion(), *this, identifier, SecuritySettings());
+ codec = factory->create(protocolInit.getVersion(), *this, identifier, getSecuritySettings(aio, nodict));
if (!codec) {
//TODO: may still want to revise this...
//send valid version header & close connection.
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
readError = true;
aio->queueWriteClose();
+ } else {
+ //read any further data that may already have been sent
+ decoded += codec->decode(buff->bytes+buff->dataStart+in.getPosition(), buff->dataCount-in.getPosition());
}
} catch (const std::exception& e) {
QPID_LOG(error, e.what());
@@ -223,7 +201,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, SecuritySettings());
+ codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
write(framing::ProtocolInitiation(codec->getVersion()));
// We've just sent the protocol negotiation so we can cancel the timeout for that
// This is not ideal, because we've not received anything yet, but heartbeats will