diff options
Diffstat (limited to 'qpid/cpp/src/qpid')
19 files changed, 152 insertions, 97 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index f183ff8e0c..3f854c7510 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -31,9 +31,11 @@ #include <fstream> #include <boost/lexical_cast.hpp> +namespace qpid { +namespace management { + using namespace qpid::client; using namespace qpid::framing; -using namespace qpid::management; using namespace qpid::sys; using namespace std; using std::stringstream; @@ -1260,7 +1262,7 @@ void ManagementAgentImpl::ConnectionThread::run() int totalSleep = 0; do { sys::Mutex::ScopedUnlock _unlock(connLock); - ::sleep(delayMin); + qpid::sys::sleep(delayMin); totalSleep += delayMin; } while (totalSleep < delay && !shutdown); sleeping = false; @@ -1396,8 +1398,10 @@ void ManagementAgentImpl::PublishThread::run() sleepTime = 1; while (totalSleep < agent.getInterval() && !shutdown) { - ::sleep(sleepTime); + qpid::sys::sleep(sleepTime); totalSleep += sleepTime; } } } + +}}
\ No newline at end of file diff --git a/qpid/cpp/src/qpid/broker/Exchange.cpp b/qpid/cpp/src/qpid/broker/Exchange.cpp index f311b79578..ad688ba314 100644 --- a/qpid/cpp/src/qpid/broker/Exchange.cpp +++ b/qpid/cpp/src/qpid/broker/Exchange.cpp @@ -32,7 +32,9 @@ #include "qpid/sys/ExceptionHolder.h" #include <stdexcept> -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; @@ -408,3 +410,5 @@ bool Exchange::routeWithAlternate(Deliverable& msg) } return msg.delivered; } + +}}
\ No newline at end of file diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 7093a68d6c..40dfba39f4 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -131,12 +131,10 @@ uint32_t Message::getRequiredCredit() void Message::encode(framing::Buffer& buffer) const { - { - sys::Mutex::ScopedLock l(lock); // prevent header modifications while encoding - //encode method and header frames - EncodeFrame f1(buffer); - frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>()); - } + sys::Mutex::ScopedLock l(lock); + //encode method and header frames + EncodeFrame f1(buffer); + frames.map_if(f1, TypeFilter2<METHOD_BODY, HEADER_BODY>()); //then encode the payload of each content frame framing::EncodeBody f2(buffer); @@ -145,6 +143,7 @@ void Message::encode(framing::Buffer& buffer) const void Message::encodeContent(framing::Buffer& buffer) const { + sys::Mutex::ScopedLock l(lock); //encode the payload of each content frame EncodeBody f2(buffer); frames.map_if(f2, TypeFilter<CONTENT_BODY>()); @@ -157,6 +156,7 @@ uint32_t Message::encodedSize() const uint32_t Message::encodedContentSize() const { + sys::Mutex::ScopedLock l(lock); return frames.getContentSize(); } @@ -222,8 +222,9 @@ void Message::releaseContent() store->stage(pmsg); staged = true; } - //ensure required credit is cached before content frames are released + //ensure required credit and size is cached before content frames are released getRequiredCredit(); + contentSize(); //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); setContentReleased(); diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp index 5f450cd556..22253532cb 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.cpp @@ -43,9 +43,18 @@ const std::string MessageGroupManager::qpidSharedGroup("qpid.shared_msg_group"); const std::string MessageGroupManager::qpidMessageGroupTimestamp("qpid.group_timestamp"); +/** return an iterator to the message at position, or members.end() if not found */ +MessageGroupManager::GroupState::MessageFifo::iterator +MessageGroupManager::GroupState::findMsg(const qpid::framing::SequenceNumber &position) +{ + MessageState mState(position); + MessageFifo::iterator found = std::lower_bound(members.begin(), members.end(), mState); + return (found->position == position) ? found : members.end(); +} + void MessageGroupManager::unFree( const GroupState& state ) { - GroupFifo::iterator pos = freeGroups.find( state.members.front() ); + GroupFifo::iterator pos = freeGroups.find( state.members.front().position ); assert( pos != freeGroups.end() && pos->second == &state ); freeGroups.erase( pos ); } @@ -60,8 +69,8 @@ void MessageGroupManager::disown( GroupState& state ) { state.owner.clear(); assert(state.members.size()); - assert(freeGroups.find(state.members.front()) == freeGroups.end()); - freeGroups[state.members.front()] = &state; + assert(freeGroups.find(state.members.front().position) == freeGroups.end()); + freeGroups[state.members.front().position] = &state; } MessageGroupManager::GroupState& MessageGroupManager::findGroup( const QueuedMessage& qm ) @@ -106,7 +115,8 @@ void MessageGroupManager::enqueued( const QueuedMessage& qm ) // @todo KAG optimization - store reference to group state in QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - state.members.push_back(qm.position); + GroupState::MessageState mState(qm.position); + state.members.push_back(mState); uint32_t total = state.members.size(); QPID_LOG( trace, "group queue " << qName << ": added message to group id=" << state.group << " total=" << total ); @@ -123,7 +133,9 @@ void MessageGroupManager::acquired( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert(state.members.size()); // there are msgs present + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + m->acquired = true; state.acquired += 1; QPID_LOG( trace, "group queue " << qName << ": acquired message in group id=" << state.group << " acquired=" << state.acquired ); @@ -137,6 +149,9 @@ void MessageGroupManager::requeued( const QueuedMessage& qm ) GroupState& state = findGroup(qm); assert( state.acquired != 0 ); state.acquired -= 1; + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + m->acquired = false; if (state.acquired == 0 && state.owned()) { QPID_LOG( trace, "group queue " << qName << ": consumer name=" << state.owner << " released group id=" << state.group); @@ -152,13 +167,17 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) // @todo KAG avoid lookup: retrieve direct reference to group state from QueuedMessage // issue: const-ness?? GroupState& state = findGroup(qm); - assert( state.members.size() != 0 ); - assert( state.acquired != 0 ); - state.acquired -= 1; + GroupState::MessageFifo::iterator m = state.findMsg(qm.position); + assert(m != state.members.end()); + if (m->acquired) { + assert( state.acquired != 0 ); + state.acquired -= 1; + } - // likely to be at or near begin() if dequeued in order + // special case if qm is first (oldest) message in the group: + // may need to re-insert it back on the freeGroups list, as the index will change bool reFreeNeeded = false; - if (state.members.front() == qm.position) { + if (m == state.members.begin()) { if (!state.owned()) { // will be on the freeGroups list if mgmt is dequeueing rather than a consumer! // if on freelist, it is indexed by first member, which is about to be removed! @@ -167,15 +186,7 @@ void MessageGroupManager::dequeued( const QueuedMessage& qm ) } state.members.pop_front(); } else { - GroupState::PositionFifo::iterator pos = state.members.begin() + 1; - GroupState::PositionFifo::iterator end = state.members.end(); - while (pos != end) { - if (*pos == qm.position) { - state.members.erase(pos); - break; - } - ++pos; - } + state.members.erase(m); } uint32_t total = state.members.size(); @@ -220,11 +231,11 @@ bool MessageGroupManager::nextConsumableMessage( Consumer::shared_ptr& c, Queued GroupState& group = findGroup(next); if (!group.owned()) { //TODO: make acquire more efficient when we already have the message in question - if (group.members.front() == next.position && messages.acquire(next.position, next)) { // only take from head! + if (group.members.front().position == next.position && messages.acquire(next.position, next)) { // only take from head! return true; } QPID_LOG(debug, "Skipping " << next.position << " since group " << group.group - << "'s head message still pending. pos=" << group.members.front()); + << "'s head message still pending. pos=" << group.members.front().position); } else if (group.owner == c->getName() && messages.acquire(next.position, next)) { return true; } @@ -284,7 +295,7 @@ void MessageGroupManager::query(qpid::types::Variant::Map& status) const info[GROUP_TIMESTAMP] = 0; if (g->second.members.size() != 0) { QueuedMessage qm; - if (messages.find(g->second.members.front(), qm) && + if (messages.find(g->second.members.front().position, qm) && qm.payload && qm.payload->hasProperties<framing::DeliveryProperties>()) { info[GROUP_TIMESTAMP] = qm.payload->getProperties<framing::DeliveryProperties>()->getTimestamp(); @@ -353,6 +364,7 @@ namespace { const std::string GROUP_OWNER("owner"); const std::string GROUP_ACQUIRED_CT("acquired-ct"); const std::string GROUP_POSITIONS("positions"); + const std::string GROUP_ACQUIRED_MSGS("acquired-msgs"); const std::string GROUP_STATE("group-state"); } @@ -371,10 +383,14 @@ void MessageGroupManager::getState(qpid::framing::FieldTable& state ) const group.setString(GROUP_OWNER, g->second.owner); group.setInt(GROUP_ACQUIRED_CT, g->second.acquired); framing::Array positions(TYPE_CODE_UINT32); - for (GroupState::PositionFifo::const_iterator p = g->second.members.begin(); - p != g->second.members.end(); ++p) - positions.push_back(framing::Array::ValuePtr(new IntegerValue( *p ))); + framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); + for (GroupState::MessageFifo::const_iterator p = g->second.members.begin(); + p != g->second.members.end(); ++p) { + positions.push_back(framing::Array::ValuePtr(new IntegerValue( p->position ))); + acquiredMsgs.push_back(framing::Array::ValuePtr(new BoolValue( p->acquired ))); + } group.setArray(GROUP_POSITIONS, positions); + group.setArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); groupState.push_back(framing::Array::ValuePtr(new FieldTableValue(group))); } state.setArray(GROUP_STATE, groupState); @@ -425,13 +441,25 @@ void MessageGroupManager::setState(const qpid::framing::FieldTable& state) qName << "\": position encoding error!"); return; } + framing::Array acquiredMsgs(TYPE_CODE_BOOLEAN); + ok = group.getArray(GROUP_ACQUIRED_MSGS, acquiredMsgs); + if (!ok || positions.count() != acquiredMsgs.count()) { + QPID_LOG(error, "Invalid message group state information for queue \"" << + qName << "\": acquired flag encoding error!"); + return; + } + + Array::const_iterator a = acquiredMsgs.begin(); + for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) { + GroupState::MessageState mState((*p)->getIntegerValue<uint32_t, 4>()); + mState.acquired = (*a++)->getIntegerValue<bool>(); + state.members.push_back(mState); + } - for (Array::const_iterator p = positions.begin(); p != positions.end(); ++p) - state.members.push_back((*p)->getIntegerValue<uint32_t, 4>()); messageGroups[state.group] = state; if (!state.owned()) { assert(state.members.size()); - freeGroups[state.members.front()] = &messageGroups[state.group]; + freeGroups[state.members.front().position] = &messageGroups[state.group]; } } diff --git a/qpid/cpp/src/qpid/broker/MessageGroupManager.h b/qpid/cpp/src/qpid/broker/MessageGroupManager.h index f4bffc4760..340ebbc56a 100644 --- a/qpid/cpp/src/qpid/broker/MessageGroupManager.h +++ b/qpid/cpp/src/qpid/broker/MessageGroupManager.h @@ -45,19 +45,29 @@ class MessageGroupManager : public StatefulQueueObserver, public MessageDistribu struct GroupState { // note: update getState()/setState() when changing this object's state implementation - typedef std::deque<framing::SequenceNumber> PositionFifo; + + // track which messages are in this group, and if they have been acquired + struct MessageState { + qpid::framing::SequenceNumber position; + bool acquired; + MessageState() : acquired(false) {} + MessageState(const qpid::framing::SequenceNumber& p) : position(p), acquired(false) {} + bool operator<(const MessageState& b) const { return position < b.position; } + }; + typedef std::deque<MessageState> MessageFifo; std::string group; // group identifier std::string owner; // consumer with outstanding acquired messages uint32_t acquired; // count of outstanding acquired messages - PositionFifo members; // msgs belonging to this group + MessageFifo members; // msgs belonging to this group, in enqueue order GroupState() : acquired(0) {} bool owned() const {return !owner.empty();} + MessageFifo::iterator findMsg(const qpid::framing::SequenceNumber &); }; typedef sys::unordered_map<std::string, struct GroupState> GroupMap; - typedef std::map<framing::SequenceNumber, struct GroupState *> GroupFifo; + typedef std::map<qpid::framing::SequenceNumber, struct GroupState *> GroupFifo; GroupMap messageGroups; // index: group name GroupFifo freeGroups; // ordered by oldest free msg diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 3bd9233791..fdd95ae3bd 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -56,7 +56,9 @@ #include <boost/intrusive_ptr.hpp> -using namespace qpid::broker; +namespace qpid { +namespace broker { + using namespace qpid::sys; using namespace qpid::framing; using qpid::management::ManagementAgent; @@ -1469,7 +1471,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask Queue::shared_ptr queue; AutoDeleteTask(Broker& b, Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion"), broker(b), queue(q) {} + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), broker(b), queue(q) {} void fire() { @@ -1824,3 +1826,5 @@ void Queue::UsageBarrier::destroy() parent.deleted = true; while (count) parent.messageLock.wait(); } + +}}
\ No newline at end of file diff --git a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp index b4f7d00f38..90e4fa9d4d 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterTimer.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/ClusterTimerWakeupBody.h" #include "qpid/framing/ClusterTimerDropBody.h" +#include "qpid/sys/ClusterSafe.h" namespace qpid { namespace cluster { @@ -107,6 +108,7 @@ void ClusterTimer::drop(intrusive_ptr<TimerTask> t) { // Deliver thread void ClusterTimer::deliverWakeup(const std::string& name) { QPID_LOG(trace, "Cluster timer wakeup delivered for " << name); + qpid::sys::assertClusterSafe(); Map::iterator i = map.find(name); if (i == map.end()) throw Exception(QPID_MSG("Cluster timer wakeup non-existent task " << name)); diff --git a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp index ffa7633e3b..8459938e5c 100644 --- a/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp +++ b/qpid/cpp/src/qpid/log/posix/SinkOptions.cpp @@ -30,6 +30,10 @@ using std::string; using qpid::Exception; +namespace qpid { +namespace log { +namespace posix { + namespace { // SyslogFacilities maps from syslog values to the text equivalents. @@ -110,10 +114,6 @@ std::string basename(const std::string path) { } // namespace -namespace qpid { -namespace log { -namespace posix { - std::ostream& operator<<(std::ostream& o, const SyslogFacility& f) { return o << SyslogFacilities().name(f.value); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 8c2cb95faa..cb07d5d047 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -44,14 +44,15 @@ #include <sstream> #include <typeinfo> +namespace qpid { +namespace management { + using boost::intrusive_ptr; using qpid::framing::Uuid; using qpid::types::Variant; using qpid::amqp_0_10::MapCodec; using qpid::amqp_0_10::ListCodec; -using qpid::sys::Mutex; using namespace qpid::framing; -using namespace qpid::management; using namespace qpid::broker; using namespace qpid; using namespace std; @@ -2961,9 +2962,6 @@ bool ManagementAgent::moveDeletedObjectsLH() { return !deleteList.empty(); } -namespace qpid { -namespace management { - namespace { QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; } @@ -2977,4 +2975,4 @@ const qpid::broker::ConnectionState* getManagementExecutionContext() return executionContext; } -}} +}}
\ No newline at end of file diff --git a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp index 2a8d971987..20231bf910 100644 --- a/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp +++ b/qpid/cpp/src/qpid/store/MessageStorePlugin.cpp @@ -28,6 +28,9 @@ #include "qpid/DataDir.h" #include "qpid/log/Statement.h" +namespace qpid { +namespace store { + /* * The MessageStore pointer given to the Broker points to static storage. * Thus, it cannot be deleted, especially by the broker. To prevent deletion, @@ -42,9 +45,6 @@ namespace { }; } -namespace qpid { -namespace store { - static MessageStorePlugin static_instance_registers_plugin; diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp index 14d63a4cd4..849a0a44e8 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/MessageLog.cpp @@ -32,6 +32,10 @@ #include "MessageLog.h" #include "Lsn.h" +namespace qpid { +namespace store { +namespace ms_clfs { + namespace { // Structures that hold log records. Each has a type field at the start. @@ -97,10 +101,6 @@ struct MessageDequeue { } // namespace -namespace qpid { -namespace store { -namespace ms_clfs { - void MessageLog::initialize() { diff --git a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp index 04780e83e8..0ef046d7c8 100644 --- a/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp +++ b/qpid/cpp/src/qpid/store/ms-clfs/TransactionLog.cpp @@ -33,6 +33,10 @@ #include "Transaction.h" #include "Lsn.h" +namespace qpid { +namespace store { +namespace ms_clfs { + namespace { // Structures that hold log records. Each has a type field at the start. @@ -95,10 +99,6 @@ struct TransactionDelete { } // namespace -namespace qpid { -namespace store { -namespace ms_clfs { - void TransactionLog::initialize() { diff --git a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp index c25159985e..01ff8b6bfa 100644 --- a/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp @@ -41,7 +41,9 @@ #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> -using namespace qpid::sys; +namespace qpid { +namespace sys { +namespace posix { namespace { @@ -71,10 +73,6 @@ __thread int64_t threadMaxIoTimeNs = 2 * 1000000; // start at 2ms /* * Asynch Acceptor */ -namespace qpid { -namespace sys { -namespace posix { - class AsynchAcceptor : public qpid::sys::AsynchAcceptor { public: AsynchAcceptor(const Socket& s, AsynchAcceptor::Callback callback); diff --git a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp index b22a615a54..abff8a5be8 100644 --- a/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/posix/PollableCondition.cpp @@ -1,6 +1,3 @@ -#ifndef QPID_SYS_LINUX_POLLABLECONDITION_CPP -#define QPID_SYS_LINUX_POLLABLECONDITION_CPP - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -120,5 +117,3 @@ void PollableCondition::set() { impl->set(); } void PollableCondition::clear() { impl->clear(); } }} // namespace qpid::sys - -#endif /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/ diff --git a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp index 73f15617dc..2a7cf16923 100644 --- a/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp +++ b/qpid/cpp/src/qpid/sys/ssl/SslIo.cpp @@ -37,8 +37,9 @@ #include <boost/bind.hpp> -using namespace qpid::sys; -using namespace qpid::sys::ssl; +namespace qpid { +namespace sys { +namespace ssl { namespace { @@ -448,3 +449,5 @@ SecuritySettings SslIO::getSecuritySettings() { settings.authid = socket.getClientAuthId(); return settings; } + +}}} diff --git a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp index 30378d4c5f..fb8df5ddf8 100644 --- a/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/AsynchIO.cpp @@ -295,6 +295,8 @@ private: volatile bool queuedDelete; // Socket close requested, but there are operations in progress. volatile bool queuedClose; + // Most recent asynch read request + volatile AsynchReadResult* pendingRead; private: // Dispatch events that have completed. @@ -374,6 +376,7 @@ AsynchIO::AsynchIO(const Socket& s, writeInProgress(false), queuedDelete(false), queuedClose(false), + pendingRead(0), working(false) { } @@ -504,6 +507,7 @@ void AsynchIO::startReading() { } } // On status 0 or WSA_IO_PENDING, completion will handle the rest. + pendingRead = result; } else { notifyBuffersEmpty(); @@ -617,16 +621,17 @@ void AsynchIO::readComplete(AsynchReadResult *result) { int status = result->getStatus(); size_t bytes = result->getTransferred(); if (status == 0 && bytes > 0) { - bool restartRead = true; // May not if receiver doesn't want more if (readCallback) readCallback(*this, result->getBuff()); - if (restartRead) - startReading(); + startReading(); } else { // No data read, so put the buffer back. It may be partially filled, // so "unread" it back to the front of the queue. unread(result->getBuff()); + if (queuedClose && status == ERROR_OPERATION_ABORTED) { + return; // Expected reap from CancelIoEx + } notifyEof(); if (status != 0) { @@ -697,8 +702,11 @@ void AsynchIO::completion(AsynchIoResult *result) { { ScopedUnlock<Mutex> ul(completionLock); AsynchReadResult *r = dynamic_cast<AsynchReadResult*>(result); - if (r != 0) + if (r != 0) { readComplete(r); + // Set pendingRead to 0 if it's still pointing to (newly completed) r + InterlockedCompareExchangePointer((void * volatile *)&pendingRead, 0, r); + } else { AsynchWriteResult *w = dynamic_cast<AsynchWriteResult*>(result); @@ -732,6 +740,15 @@ void AsynchIO::completion(AsynchIoResult *result) { else if (queuedDelete) delete this; } + else { + if (queuedClose && pendingRead) { + // Force outstanding read to completion. Layer above will + // call back. + CancelIoEx((HANDLE)toSocketHandle(socket), + ((AsynchReadResult *)pendingRead)->overlapped()); + pendingRead = 0; + } + } } } // namespace windows diff --git a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp index 6a1d9045b4..bb637be0a6 100644 --- a/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp +++ b/qpid/cpp/src/qpid/sys/windows/PollableCondition.cpp @@ -1,6 +1,3 @@ -#ifndef QPID_SYS_WINDOWS_POLLABLECONDITION_CPP -#define QPID_SYS_WINDOWS_POLLABLECONDITION_CPP - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -110,5 +107,3 @@ void PollableCondition::clear() { } }} // namespace qpid::sys - -#endif /*!QPID_SYS_WINDOWS_POLLABLECONDITION_CPP*/ diff --git a/qpid/cpp/src/qpid/sys/windows/Socket.cpp b/qpid/cpp/src/qpid/sys/windows/Socket.cpp index 1fa4768329..b085f67539 100644 --- a/qpid/cpp/src/qpid/sys/windows/Socket.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Socket.cpp @@ -32,6 +32,9 @@ #include <winsock2.h> +namespace qpid { +namespace sys { + // Need to initialize WinSock. Ideally, this would be a singleton or embedded // in some one-time initialization function. I tried boost singleton and could // not get it to compile (and others located in google had the same problem). @@ -76,13 +79,6 @@ protected: static WinSockSetup setup; -} /* namespace */ - -namespace qpid { -namespace sys { - -namespace { - std::string getName(SOCKET fd, bool local) { ::sockaddr_storage name_s; // big enough for any socket address diff --git a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp index 11a3389e45..25cc94b290 100644 --- a/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp +++ b/qpid/cpp/src/qpid/sys/windows/SslAsynchIO.cpp @@ -38,6 +38,10 @@ #include <queue> #include <boost/bind.hpp> +namespace qpid { +namespace sys { +namespace windows { + namespace { /* @@ -66,10 +70,6 @@ namespace { }; } -namespace qpid { -namespace sys { -namespace windows { - SslAsynchIO::SslAsynchIO(const qpid::sys::Socket& s, CredHandle hCred, ReadCallback rCb, |