diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp | 150 |
1 files changed, 147 insertions, 3 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp index cee394b05d..e12151d943 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -59,6 +59,12 @@ using namespace Apache::Qpid::AmqpTypes; // with proposed changes to the native library to reduce the number of servicing // threads for large numbers of subscriptions. +// synchronization is accomplished with locks, but also by ensuring that only one +// MessageWaiter (the one at the front of the line) is ever active. +// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch +// thread (who deposits FrameSets into the local queue and is oblivious to the +// managed space locks). + // The folowing def must match the "Frames" private typedef. // TODO, make Qpid-cpp "Frames" definition visible. @@ -94,6 +100,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, localQueuep = new LocalQueue; SubscriptionSettings settings; settings.flowControl = FlowControl::messageCredit(0); + settings.completionMode = CompletionMode::MANUAL_COMPLETION; + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup @@ -197,6 +205,7 @@ bool InputLink::haveMessage() IntPtr InputLink::nextLocalMessage() { lock l(waiters); + if (disposed) return (IntPtr) NULL; @@ -279,8 +288,7 @@ bool InputLink::internalWaitForMessage() if (haveMessage()) return true; - // TODO: prefetch window of messages, compatible with both 0-10 and 1.0. - subscriptionp->grantMessageCredit(1); + AdjustCredit(); // get a scoped smart ptr ref to guard against async close or hangup demuxQueuePtr = *queuePtrp; @@ -350,7 +358,12 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) { } return; } + waiters->RemoveAt(idx); + if (waiter->TimedOut) { + // may have to give back message if it arrives momentarily + AdjustCredit(); + } // let the next waiter know it's his turn. if (waiters->Count > 0) { @@ -411,6 +424,129 @@ void InputLink::sync() } +void InputLink::PrefetchLimit::set(int value) +{ + lock l(waiters); + prefetchLimit = value; + + int delta = 0; + + // rough rule of thumb to keep the flow, but reduce chatter. + // for small messages, the credit request is almost as expensive as the transfer itself. + // experience may suggest a better heuristic or require a property for the low water mark + if (prefetchLimit >= 3) { + delta = prefetchLimit / 3; + } + minWorkingCredit = prefetchLimit - delta; + AdjustCredit(); +} + + +// call with lock held +void InputLink::AdjustCredit() +{ + if (creditSyncPending || disposed) + return; + + // low watermark check + if ((prefetchLimit != 0) && + (workingCredit >= minWorkingCredit) && + (workingCredit >= waiters->Count)) + return; + + // should have enough for all waiters or to satisfy the prefetch window + int targetCredit = waiters->Count; + if (targetCredit < prefetchLimit) + targetCredit = prefetchLimit; + + if (targetCredit > workingCredit) { + subscriptionp->grantMessageCredit(targetCredit - workingCredit); + workingCredit = targetCredit; + return; + } + if (targetCredit < workingCredit) { + if ((targetCredit == 0) && (prefetchLimit == 0)) { + creditSyncPending = true; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); + } + // TODO: also shrink credit when prefetchLimit != 0 + } +} + +void InputLink::SyncCredit(Object ^unused) +{ + lock l(waiters); + + try { + if (disposed) + return; + + Completion comp; + if (!amqpSession->MessageStop(comp, subscriptionp->getName())) { + // connection closed + return; + } + + // get a private scoped copy to use outside the lock + Subscription s(*subscriptionp); + + l.release(); + // use setFlowControl to re-enable credit flow on the broker. + // previously used comp.wait() here, but setFlowControl is a sync operation + s.setFlowControl(s.getSettings().flowControl); + l.acquire(); + + if (disposed) + return; + + // let existing waiters use up any + // local queue size can only decrease until more credit is issued + while (true) { + if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { + l.release(); + // a rare use case and not used in performance oriented code. + // optimization can wait until the qpid/messaging api is used + Thread::Sleep(10); + l.acquire(); + if (disposed) + return; + } + else { + break; + } + } + + // At this point, the lock is held and we are fully synced with the broker + // so we have a valid snapshot + + if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { + // can't be sure application will request a message again any time soon + QpidFrameSetPtr frameSetp; + while (!(*queuePtrp)->empty()) { + (*queuePtrp)->pop(frameSetp); + SequenceSet frameSetID(frameSetp->getId()); + subscriptionp->release(frameSetID); + } + + // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a + // MessageWaiter about to to get the nextLocalMessage(), or implicitely + // from a WaitForMessage(). + } + // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit + + workingCredit = (*queuePtrp)->size(); + if (dequeuedFrameSetpp != NULL) { + workingCredit++; + } + } + finally { + creditSyncPending = false; + } + + AdjustCredit(); +} + + AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) { QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); @@ -539,7 +675,15 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) // We have a message we can return to the caller. // Tell the broker we got it. - subscriptionp->accept(frameSetID); + + // subscriptionp->accept(frameSetID) is a slow sync operation in the native API + // so do it within the AsyncSession directly + amqpSession->AcceptAndComplete(frameSetID); + + workingCredit--; + // check if more messages need to be requested from broker + AdjustCredit(); + return amqpMessage; } finally { |