diff options
Diffstat (limited to 'wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/InputLink.cpp | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/wcf/src/Apache/Qpid/Interop/InputLink.cpp index e12151d943..3245cd3540 100644 --- a/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ b/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -86,6 +86,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, System::Exception^ linkException = nullptr; waiters = gcnew Collections::Generic::List<MessageWaiter^>(); + linkLock = waiters; // private and available + subscriptionLock = gcnew Object(); try { std::string qname = QpidMarshal::ToNative(sourceQueue); @@ -120,10 +122,13 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, } } +// called with lock held void InputLink::ReleaseNative() { // involves talking to the Broker unless the connection is broken - if (subscriptionp != NULL) { + + if ((subscriptionp != NULL) && !finalizing) { + // TODO: find boost time error on cleanup when in finalizer thread try { subscriptionp->cancel(); } @@ -134,20 +139,31 @@ void InputLink::ReleaseNative() } // free native mem (or smart pointers) that we own - if (subscriptionp != NULL) + if (subscriptionp != NULL) { delete subscriptionp; - if (queuePtrp != NULL) + subscriptionp = NULL; + } + if (queuePtrp != NULL) { delete queuePtrp; - if (localQueuep != NULL) - delete localQueuep; - if (dequeuedFrameSetpp != NULL) + queuePtrp = NULL; + } + if (localQueuep != NULL) { + if (!finalizing) { + // TODO: find boost time error on cleanup when in finalizer thread + delete localQueuep; + localQueuep = NULL; + } + } + if (dequeuedFrameSetpp != NULL) { delete dequeuedFrameSetpp; + dequeuedFrameSetpp = NULL; + } } void InputLink::Cleanup() { { - lock l(waiters); + lock l(linkLock); if (disposed) return; @@ -162,6 +178,9 @@ void InputLink::Cleanup() if (queuePtrp != NULL) (*queuePtrp)->close(); + // wait for any sync operations on the subscription to complete before ReleaseNative + lock l2(subscriptionLock); + try {} finally { @@ -179,6 +198,7 @@ InputLink::~InputLink() InputLink::!InputLink() { + finalizing = true; Cleanup(); } @@ -204,7 +224,7 @@ bool InputLink::haveMessage() IntPtr InputLink::nextLocalMessage() { - lock l(waiters); + lock l(linkLock); if (disposed) return (IntPtr) NULL; @@ -250,7 +270,7 @@ IntPtr InputLink::nextLocalMessage() void InputLink::unblockWaiter() { // to be followed by resetQueue() below - lock l(waiters); + lock l(linkLock); if (disposed) return; (*queuePtrp)->close(); @@ -264,7 +284,7 @@ void InputLink::unblockWaiter() void InputLink::resetQueue() { - lock l(waiters); + lock l(linkLock); if (disposed) return; if ((*queuePtrp)->isClosed()) { @@ -282,7 +302,7 @@ bool InputLink::internalWaitForMessage() bool received = false; QpidFrameSetPtr* frameSetpp = NULL; try { - lock l(waiters); + lock l(linkLock); if (disposed) return false; if (haveMessage()) @@ -348,7 +368,7 @@ void InputLink::addWaiter(MessageWaiter^ waiter) void InputLink::removeWaiter(MessageWaiter^ waiter) { // a waiter can be removed from anywhere in the list if timed out - lock l(waiters); + lock l(linkLock); int idx = waiters->IndexOf(waiter); if (idx == -1) { // TODO: assert or log @@ -388,7 +408,7 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) { void InputLink::asyncHelper() { - lock l(waiters); + lock l(linkLock); while (true) { if (disposed && (waiters->Count == 0)) { @@ -419,14 +439,14 @@ void InputLink::asyncHelper() void InputLink::sync() { - // for the timeout thread - lock l(waiters); + // used by the MessageWaiter timeout thread to not run before fully initialized + lock l(linkLock); } void InputLink::PrefetchLimit::set(int value) { - lock l(waiters); + lock l(linkLock); prefetchLimit = value; int delta = 0; @@ -475,31 +495,32 @@ void InputLink::AdjustCredit() void InputLink::SyncCredit(Object ^unused) { - lock l(waiters); + lock l(linkLock); try { if (disposed) return; - Completion comp; - if (!amqpSession->MessageStop(comp, subscriptionp->getName())) { + if (!amqpSession->MessageStop(subscriptionp->getName())) { // connection closed return; } - // get a private scoped copy to use outside the lock - Subscription s(*subscriptionp); - l.release(); // use setFlowControl to re-enable credit flow on the broker. - // previously used comp.wait() here, but setFlowControl is a sync operation - s.setFlowControl(s.getSettings().flowControl); + // setFlowControl is a sync operation + { + lock l2(subscriptionLock); + if (subscriptionp != NULL) { + subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl); + } + } l.acquire(); if (disposed) return; - // let existing waiters use up any + // let existing waiters use up any messages that arrived. // local queue size can only decrease until more credit is issued while (true) { if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { @@ -700,7 +721,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) { - lock l(waiters); + lock l(linkLock); if (waiters->Count == 0) { // see if there is a message already available without blocking @@ -740,7 +761,7 @@ IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callba //TODO: if haveMessage() complete synchronously - lock l(waiters); + lock l(linkLock); MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); addWaiter(waiter); return waiter; @@ -779,7 +800,10 @@ bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMess bool InputLink::WaitForMessage(TimeSpan timeout) { - lock l(waiters); + lock l(linkLock); + + if (disposed) + return false; if (waiters->Count == 0) { // see if there is a message already available without blocking @@ -799,12 +823,12 @@ bool InputLink::WaitForMessage(TimeSpan timeout) return false; } - return true; + return haveMessage(); } IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) { - lock l(waiters); + lock l(linkLock); // Same as for BeginTryReceive, except consuming = false MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); @@ -822,7 +846,7 @@ bool InputLink::EndWaitForMessage(IAsyncResult^ result) return false; } - return true; + return haveMessage(); } |