summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp32
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h3
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h1
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp150
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.h15
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj4
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h4
7 files changed, 192 insertions, 17 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
index bab73da74e..425a592509 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
@@ -63,11 +63,12 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon
sessionp = new qpid::client::AsyncSession;
*sessionp = qpidConnectionp->newSession();
subs_mgrp = new SubscriptionManager (*sessionp);
- success = true;
waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+ success = true;
} finally {
if (!success) {
Cleanup();
+ // TODO: include inner exception information
throw gcnew QpidException ("session creation failure");
}
}
@@ -76,12 +77,6 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon
void AmqpSession::Cleanup()
{
- if (subscriptionp != NULL) {
- subscriptionp->cancel();
- delete subscriptionp;
- subscriptionp=NULL;
- }
-
if (subs_mgrp != NULL) {
subs_mgrp->stop();
delete subs_mgrp;
@@ -112,6 +107,7 @@ void AmqpSession::Cleanup()
void AmqpSession::ConnectionClosed()
{
+ lock l(waiters);
Cleanup();
}
@@ -283,5 +279,27 @@ void AmqpSession::asyncHelper(Object ^unused)
}
}
+bool AmqpSession::MessageStop(Completion &comp, std::string &name)
+{
+ lock l(waiters);
+
+ if (sessionp == NULL)
+ return false;
+
+ comp = sessionp->messageStop(name, true);
+ return true;
+}
+
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+{
+ lock l(waiters);
+
+ if (sessionp == NULL)
+ throw gcnew ObjectDisposedException("Accept");
+
+ sessionp->markCompleted(transfers, false);
+ sessionp->messageAccept(transfers, false);
+}
+
}}} // namespace Apache::Qpid::Cli
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
index b959a4123a..8306cdf720 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
@@ -44,7 +44,6 @@ private:
AsyncSession* sessionp;
SessionImpl* sessionImplp;
SubscriptionManager* subs_mgrp;
- Subscription* subscriptionp;
LocalQueue* localQueuep;
Collections::Generic::List<CompletionWaiter^>^ waiters;
bool helperRunning;
@@ -69,6 +68,8 @@ internal:
void ConnectionClosed();
void internalWaitForCompletion(IntPtr Future);
void removeWaiter(CompletionWaiter^ waiter);
+ bool MessageStop(Completion &comp, std::string &name);
+ void AcceptAndComplete(SequenceSet& transfers);
property AmqpConnection^ Connection {
AmqpConnection^ get () { return connection; }
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
index 197ac632b0..88880c3721 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
@@ -32,7 +32,6 @@ private:
bool timedOut;
// has an owner thread
bool assigned;
- // can Run (i.e. earlier CompletionWaiters in the queue have completed)
System::Exception^ runException;
AsyncCallback^ asyncCallback;
Threading::Timer ^timer;
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
index cee394b05d..e12151d943 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -59,6 +59,12 @@ using namespace Apache::Qpid::AmqpTypes;
// with proposed changes to the native library to reduce the number of servicing
// threads for large numbers of subscriptions.
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
// The folowing def must match the "Frames" private typedef.
// TODO, make Qpid-cpp "Frames" definition visible.
@@ -94,6 +100,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
localQueuep = new LocalQueue;
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageCredit(0);
+ settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
@@ -197,6 +205,7 @@ bool InputLink::haveMessage()
IntPtr InputLink::nextLocalMessage()
{
lock l(waiters);
+
if (disposed)
return (IntPtr) NULL;
@@ -279,8 +288,7 @@ bool InputLink::internalWaitForMessage()
if (haveMessage())
return true;
- // TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
- subscriptionp->grantMessageCredit(1);
+ AdjustCredit();
// get a scoped smart ptr ref to guard against async close or hangup
demuxQueuePtr = *queuePtrp;
@@ -350,7 +358,12 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) {
}
return;
}
+
waiters->RemoveAt(idx);
+ if (waiter->TimedOut) {
+ // may have to give back message if it arrives momentarily
+ AdjustCredit();
+ }
// let the next waiter know it's his turn.
if (waiters->Count > 0) {
@@ -411,6 +424,129 @@ void InputLink::sync()
}
+void InputLink::PrefetchLimit::set(int value)
+{
+ lock l(waiters);
+ prefetchLimit = value;
+
+ int delta = 0;
+
+ // rough rule of thumb to keep the flow, but reduce chatter.
+ // for small messages, the credit request is almost as expensive as the transfer itself.
+ // experience may suggest a better heuristic or require a property for the low water mark
+ if (prefetchLimit >= 3) {
+ delta = prefetchLimit / 3;
+ }
+ minWorkingCredit = prefetchLimit - delta;
+ AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+ if (creditSyncPending || disposed)
+ return;
+
+ // low watermark check
+ if ((prefetchLimit != 0) &&
+ (workingCredit >= minWorkingCredit) &&
+ (workingCredit >= waiters->Count))
+ return;
+
+ // should have enough for all waiters or to satisfy the prefetch window
+ int targetCredit = waiters->Count;
+ if (targetCredit < prefetchLimit)
+ targetCredit = prefetchLimit;
+
+ if (targetCredit > workingCredit) {
+ subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+ workingCredit = targetCredit;
+ return;
+ }
+ if (targetCredit < workingCredit) {
+ if ((targetCredit == 0) && (prefetchLimit == 0)) {
+ creditSyncPending = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+ }
+ // TODO: also shrink credit when prefetchLimit != 0
+ }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+ lock l(waiters);
+
+ try {
+ if (disposed)
+ return;
+
+ Completion comp;
+ if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+ // connection closed
+ return;
+ }
+
+ // get a private scoped copy to use outside the lock
+ Subscription s(*subscriptionp);
+
+ l.release();
+ // use setFlowControl to re-enable credit flow on the broker.
+ // previously used comp.wait() here, but setFlowControl is a sync operation
+ s.setFlowControl(s.getSettings().flowControl);
+ l.acquire();
+
+ if (disposed)
+ return;
+
+ // let existing waiters use up any
+ // local queue size can only decrease until more credit is issued
+ while (true) {
+ if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+ l.release();
+ // a rare use case and not used in performance oriented code.
+ // optimization can wait until the qpid/messaging api is used
+ Thread::Sleep(10);
+ l.acquire();
+ if (disposed)
+ return;
+ }
+ else {
+ break;
+ }
+ }
+
+ // At this point, the lock is held and we are fully synced with the broker
+ // so we have a valid snapshot
+
+ if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+ // can't be sure application will request a message again any time soon
+ QpidFrameSetPtr frameSetp;
+ while (!(*queuePtrp)->empty()) {
+ (*queuePtrp)->pop(frameSetp);
+ SequenceSet frameSetID(frameSetp->getId());
+ subscriptionp->release(frameSetID);
+ }
+
+ // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
+ // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+ // from a WaitForMessage().
+ }
+ // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+ workingCredit = (*queuePtrp)->size();
+ if (dequeuedFrameSetpp != NULL) {
+ workingCredit++;
+ }
+ }
+ finally {
+ creditSyncPending = false;
+ }
+
+ AdjustCredit();
+}
+
+
AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
{
QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
@@ -539,7 +675,15 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
// We have a message we can return to the caller.
// Tell the broker we got it.
- subscriptionp->accept(frameSetID);
+
+ // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+ // so do it within the AsyncSession directly
+ amqpSession->AcceptAndComplete(frameSetID);
+
+ workingCredit--;
+ // check if more messages need to be requested from broker
+ AdjustCredit();
+
return amqpMessage;
}
finally {
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
index 366780c137..f59a03a8c3 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
@@ -47,6 +47,14 @@ private:
bool finalizing;
QpidFrameSetPtr* dequeuedFrameSetpp;
ManualResetEvent^ asyncHelperWaitHandle;
+ // number of messages to buffer locally for future consumption
+ int prefetchLimit;
+ // the number of messages requested and not yet processed
+ int workingCredit;
+ // stopping and restarting the message flow
+ bool creditSyncPending;
+ // working credit low water mark
+ int minWorkingCredit;
void Cleanup();
void ReleaseNative();
@@ -54,6 +62,8 @@ private:
void addWaiter(MessageWaiter^ waiter);
void asyncHelper();
AmqpMessage^ createAmqpMessage(IntPtr msgp);
+ void AdjustCredit();
+ void SyncCredit(Object ^);
internal:
InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp,
@@ -80,6 +90,11 @@ public:
IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
bool EndWaitForMessage(IAsyncResult^ result);
+ property int PrefetchLimit {
+ int get () { return prefetchLimit; }
+ void set (int value);
+ }
+
};
}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
index 32f78c8344..484f6898fb 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -65,7 +65,7 @@
Name="VCCLCompilerTool"
AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
Optimization="0"
- AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="3"
UsePrecompiledHeader="0"
@@ -83,7 +83,7 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
AdditionalDependencies="$(NoInherit)"
OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
LinkIncremental="2"
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
index 3eb4919646..3737430844 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
@@ -66,7 +66,6 @@ private:
void Activate();
void WaitForCompletion();
-// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; }
property IntPtr Message {
IntPtr get () {
@@ -78,7 +77,6 @@ private:
GC::SuppressFinalize(this);
return v;
}
- // void set (IntPtr v) { message = v; }
}
property bool Assigned {
@@ -89,11 +87,11 @@ private:
bool get () { return timedOut; }
}
-
property System::Exception^ RunException {
System::Exception^ get() { return runException; }
}
+
public:
virtual property bool IsCompleted {