summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp150
1 files changed, 147 insertions, 3 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
index cee394b05d..e12151d943 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -59,6 +59,12 @@ using namespace Apache::Qpid::AmqpTypes;
// with proposed changes to the native library to reduce the number of servicing
// threads for large numbers of subscriptions.
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
// The folowing def must match the "Frames" private typedef.
// TODO, make Qpid-cpp "Frames" definition visible.
@@ -94,6 +100,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
localQueuep = new LocalQueue;
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageCredit(0);
+ settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
@@ -197,6 +205,7 @@ bool InputLink::haveMessage()
IntPtr InputLink::nextLocalMessage()
{
lock l(waiters);
+
if (disposed)
return (IntPtr) NULL;
@@ -279,8 +288,7 @@ bool InputLink::internalWaitForMessage()
if (haveMessage())
return true;
- // TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
- subscriptionp->grantMessageCredit(1);
+ AdjustCredit();
// get a scoped smart ptr ref to guard against async close or hangup
demuxQueuePtr = *queuePtrp;
@@ -350,7 +358,12 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) {
}
return;
}
+
waiters->RemoveAt(idx);
+ if (waiter->TimedOut) {
+ // may have to give back message if it arrives momentarily
+ AdjustCredit();
+ }
// let the next waiter know it's his turn.
if (waiters->Count > 0) {
@@ -411,6 +424,129 @@ void InputLink::sync()
}
+void InputLink::PrefetchLimit::set(int value)
+{
+ lock l(waiters);
+ prefetchLimit = value;
+
+ int delta = 0;
+
+ // rough rule of thumb to keep the flow, but reduce chatter.
+ // for small messages, the credit request is almost as expensive as the transfer itself.
+ // experience may suggest a better heuristic or require a property for the low water mark
+ if (prefetchLimit >= 3) {
+ delta = prefetchLimit / 3;
+ }
+ minWorkingCredit = prefetchLimit - delta;
+ AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+ if (creditSyncPending || disposed)
+ return;
+
+ // low watermark check
+ if ((prefetchLimit != 0) &&
+ (workingCredit >= minWorkingCredit) &&
+ (workingCredit >= waiters->Count))
+ return;
+
+ // should have enough for all waiters or to satisfy the prefetch window
+ int targetCredit = waiters->Count;
+ if (targetCredit < prefetchLimit)
+ targetCredit = prefetchLimit;
+
+ if (targetCredit > workingCredit) {
+ subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+ workingCredit = targetCredit;
+ return;
+ }
+ if (targetCredit < workingCredit) {
+ if ((targetCredit == 0) && (prefetchLimit == 0)) {
+ creditSyncPending = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+ }
+ // TODO: also shrink credit when prefetchLimit != 0
+ }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+ lock l(waiters);
+
+ try {
+ if (disposed)
+ return;
+
+ Completion comp;
+ if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+ // connection closed
+ return;
+ }
+
+ // get a private scoped copy to use outside the lock
+ Subscription s(*subscriptionp);
+
+ l.release();
+ // use setFlowControl to re-enable credit flow on the broker.
+ // previously used comp.wait() here, but setFlowControl is a sync operation
+ s.setFlowControl(s.getSettings().flowControl);
+ l.acquire();
+
+ if (disposed)
+ return;
+
+ // let existing waiters use up any
+ // local queue size can only decrease until more credit is issued
+ while (true) {
+ if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+ l.release();
+ // a rare use case and not used in performance oriented code.
+ // optimization can wait until the qpid/messaging api is used
+ Thread::Sleep(10);
+ l.acquire();
+ if (disposed)
+ return;
+ }
+ else {
+ break;
+ }
+ }
+
+ // At this point, the lock is held and we are fully synced with the broker
+ // so we have a valid snapshot
+
+ if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+ // can't be sure application will request a message again any time soon
+ QpidFrameSetPtr frameSetp;
+ while (!(*queuePtrp)->empty()) {
+ (*queuePtrp)->pop(frameSetp);
+ SequenceSet frameSetID(frameSetp->getId());
+ subscriptionp->release(frameSetID);
+ }
+
+ // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
+ // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+ // from a WaitForMessage().
+ }
+ // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+ workingCredit = (*queuePtrp)->size();
+ if (dequeuedFrameSetpp != NULL) {
+ workingCredit++;
+ }
+ }
+ finally {
+ creditSyncPending = false;
+ }
+
+ AdjustCredit();
+}
+
+
AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
{
QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
@@ -539,7 +675,15 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
// We have a message we can return to the caller.
// Tell the broker we got it.
- subscriptionp->accept(frameSetID);
+
+ // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+ // so do it within the AsyncSession directly
+ amqpSession->AcceptAndComplete(frameSetID);
+
+ workingCredit--;
+ // check if more messages need to be requested from broker
+ AdjustCredit();
+
return amqpMessage;
}
finally {