summaryrefslogtreecommitdiff
path: root/qpid/wcf
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf')
-rw-r--r--qpid/wcf/ReadMe.txt8
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs6
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs11
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs4
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs2
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs4
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs8
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs11
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp32
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h3
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h1
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp150
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.h15
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj4
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h4
-rwxr-xr-xqpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat10
-rw-r--r--qpid/wcf/tools/QCreate/QCreate.vcproj6
17 files changed, 243 insertions, 36 deletions
diff --git a/qpid/wcf/ReadMe.txt b/qpid/wcf/ReadMe.txt
index 0ef3e06ce5..6f118ceac3 100644
--- a/qpid/wcf/ReadMe.txt
+++ b/qpid/wcf/ReadMe.txt
@@ -49,9 +49,9 @@ NOTE: In the following instructions %QPID_ROOT% refers to the root of
qpid source code location e.g. C:\trunk\qpid
5. Build Qpid cpp
-Run CMake and choose "%QPID_ROOT%\cpp\build" as the location for "Where to
-build the binaries". Build at least the "qpidd", "qpidclient" and
-"qpidcommon" projects.
+Build at least the "qpidd", "qpidclient" and "qpidcommon" projects.
+Create an environment variable called QPID_BUILD_ROOT and store the
+path to the Qpid build directory in it.
4. Building the solution file
@@ -81,7 +81,7 @@ System Development Edition, or Team System Team Suite SKU)
%QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional\RunTests.bat has the correct
values for the nunit_exe, qpid_dll_location and configuration_name variables as per
your installation.
-2. Start the qpid broker from the qpid build folder e.g. %QPID_ROOT%\cpp\build\src\Debug.
+2. Start the qpid broker from the qpid build folder e.g. %QPID_BUILD_ROOT%\src\Debug.
3. Execute RunTests.bat from its location e.g. %QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional.
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
index b952faf9e5..b0b71c87f3 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
@@ -64,6 +64,12 @@ namespace Apache.Qpid.Channel
set { transport.BrokerPort = value; }
}
+ public int PrefetchLimit
+ {
+ get { return transport.PrefetchLimit; }
+ set { transport.PrefetchLimit = value; }
+ }
+
public bool Shared
{
get { return transport.Shared; }
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
index 3ec62e809d..554824cea9 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
@@ -63,6 +63,13 @@ namespace Apache.Qpid.Channel
set { brokerPort = value; }
}
+ [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)]
+ public int PrefetchLimit
+ {
+ get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; }
+ set { base[AmqpConfigurationStrings.PrefetchLimit] = value; }
+ }
+
[ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)]
public bool Shared
{
@@ -95,6 +102,8 @@ namespace Apache.Qpid.Channel
get
{
ConfigurationPropertyCollection properties = base.Properties;
+ properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit,
+ typeof(int), 0, null, null, ConfigurationPropertyOptions.None));
properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared,
typeof(bool), false, null, null, ConfigurationPropertyOptions.None));
properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode,
@@ -112,6 +121,7 @@ namespace Apache.Qpid.Channel
this.BrokerPort = amqpBinding.BrokerPort;
this.TransferMode = amqpBinding.TransferMode;
this.Shared = amqpBinding.Shared;
+ this.PrefetchLimit = amqpBinding.PrefetchLimit;
AmqpProperties props = amqpBinding.DefaultMessageProperties;
}
@@ -133,6 +143,7 @@ namespace Apache.Qpid.Channel
amqpBinding.BrokerPort = this.BrokerPort;
amqpBinding.TransferMode = this.TransferMode;
amqpBinding.Shared = this.Shared;
+ amqpBinding.PrefetchLimit = this.PrefetchLimit;
}
protected override void PostDeserialize()
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
index b8e2811527..542f1a00a8 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
AmqpChannelProperties channelProperties;
long maxBufferPoolSize;
bool shared;
+ int prefetchLimit;
internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
: base(context.Binding)
@@ -39,6 +40,7 @@ namespace Apache.Qpid.Channel
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
Collection<MessageEncodingBindingElement> messageEncoderBindingElements
= context.BindingParameters.FindAll<MessageEncodingBindingElement>();
@@ -91,7 +93,7 @@ namespace Apache.Qpid.Channel
protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
{
- return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared);
+ return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
}
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
index f1de30406a..0853b3d6f3 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
@@ -46,6 +46,7 @@ namespace Apache.Qpid.Channel
public const string TransferMode = "transferMode";
public const string Brokers = "brokers";
public const string Shared = "shared";
+ public const string PrefetchLimit = "prefetchLimit";
public const string MaxBufferPoolSize = "maxBufferPoolSize";
public const string MaxReceivedMessageSize = "maxReceivedMessageSize";
}
@@ -55,7 +56,6 @@ namespace Apache.Qpid.Channel
internal const string BrokerHost = "localhost";
internal const int BrokerPort = 5672;
internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered;
- internal const byte Priority = 4;
internal const long MaxBufferPoolSize = 64 * 1024;
internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024;
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
index 44fecdaf62..8894b68584 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
AmqpTransportBindingElement bindingElement;
AmqpChannelProperties channelProperties;
bool shared;
+ int prefetchLimit;
long maxBufferPoolSize;
Uri uri;
AmqpTransportChannel amqpTransportChannel;
@@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
@@ -132,7 +134,7 @@ namespace Apache.Qpid.Channel
{
amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties,
new EndpointAddress(uri), messageEncoderFactory.Encoder,
- maxBufferPoolSize, this.shared);
+ maxBufferPoolSize, this.shared, this.prefetchLimit);
return (IInputChannel)(object) amqpTransportChannel;
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
index f23b8072e9..08c565af18 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
@@ -29,6 +29,7 @@ namespace Apache.Qpid.Channel
{
AmqpChannelProperties channelProperties;
bool shared;
+ int prefetchLimit;
public AmqpTransportBindingElement()
{
@@ -41,6 +42,7 @@ namespace Apache.Qpid.Channel
{
this.channelProperties = other.channelProperties.Clone();
this.shared = other.shared;
+ this.prefetchLimit = other.prefetchLimit;
}
public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
@@ -98,6 +100,12 @@ namespace Apache.Qpid.Channel
set { this.channelProperties.BrokerPort = value; }
}
+ public int PrefetchLimit
+ {
+ get { return this.prefetchLimit; }
+ set { this.prefetchLimit = value; }
+ }
+
public bool Shared
{
get { return this.shared; }
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
index ca9c10be69..5924142046 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
@@ -50,6 +50,7 @@ namespace Apache.Qpid.Channel
private MessageEncoder encoder;
private AmqpChannelProperties factoryChannelProperties;
private bool shared;
+ private int prefetchLimit;
private string encoderContentType;
// input = 0-10 queue, output = 0-10 exchange
@@ -68,7 +69,7 @@ namespace Apache.Qpid.Channel
private AsyncTimeSpanCaller asyncOpenCaller;
private AsyncTimeSpanCaller asyncCloseCaller;
- internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+ internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit)
: base(factory)
{
this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>);
@@ -80,6 +81,7 @@ namespace Apache.Qpid.Channel
this.factoryChannelProperties = channelProperties;
this.shared = sharedConnection;
+ this.prefetchLimit = prefetchLimit;
this.remoteAddress = remoteAddress;
// pull out host, port, queue, and connection arguments
@@ -128,6 +130,7 @@ namespace Apache.Qpid.Channel
if (this.isInputChannel)
{
this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.inputLink.PrefetchLimit = this.prefetchLimit;
}
else
{
@@ -287,7 +290,7 @@ namespace Apache.Qpid.Channel
return false;
}
-
+
public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.inputLink.BeginTryReceive(timeout, callback, state);
@@ -464,7 +467,7 @@ namespace Apache.Qpid.Channel
}
return amqpMessage;
}
-
+
private Message QpidToWcf(AmqpMessage amqpMessage)
{
@@ -531,7 +534,7 @@ namespace Apache.Qpid.Channel
{
this.bufferManager.ReturnBuffer(managedBuffer);
}
- }
+ }
return wcfMessage;
}
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
index bab73da74e..425a592509 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
@@ -63,11 +63,12 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon
sessionp = new qpid::client::AsyncSession;
*sessionp = qpidConnectionp->newSession();
subs_mgrp = new SubscriptionManager (*sessionp);
- success = true;
waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+ success = true;
} finally {
if (!success) {
Cleanup();
+ // TODO: include inner exception information
throw gcnew QpidException ("session creation failure");
}
}
@@ -76,12 +77,6 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon
void AmqpSession::Cleanup()
{
- if (subscriptionp != NULL) {
- subscriptionp->cancel();
- delete subscriptionp;
- subscriptionp=NULL;
- }
-
if (subs_mgrp != NULL) {
subs_mgrp->stop();
delete subs_mgrp;
@@ -112,6 +107,7 @@ void AmqpSession::Cleanup()
void AmqpSession::ConnectionClosed()
{
+ lock l(waiters);
Cleanup();
}
@@ -283,5 +279,27 @@ void AmqpSession::asyncHelper(Object ^unused)
}
}
+bool AmqpSession::MessageStop(Completion &comp, std::string &name)
+{
+ lock l(waiters);
+
+ if (sessionp == NULL)
+ return false;
+
+ comp = sessionp->messageStop(name, true);
+ return true;
+}
+
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers)
+{
+ lock l(waiters);
+
+ if (sessionp == NULL)
+ throw gcnew ObjectDisposedException("Accept");
+
+ sessionp->markCompleted(transfers, false);
+ sessionp->messageAccept(transfers, false);
+}
+
}}} // namespace Apache::Qpid::Cli
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
index b959a4123a..8306cdf720 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
@@ -44,7 +44,6 @@ private:
AsyncSession* sessionp;
SessionImpl* sessionImplp;
SubscriptionManager* subs_mgrp;
- Subscription* subscriptionp;
LocalQueue* localQueuep;
Collections::Generic::List<CompletionWaiter^>^ waiters;
bool helperRunning;
@@ -69,6 +68,8 @@ internal:
void ConnectionClosed();
void internalWaitForCompletion(IntPtr Future);
void removeWaiter(CompletionWaiter^ waiter);
+ bool MessageStop(Completion &comp, std::string &name);
+ void AcceptAndComplete(SequenceSet& transfers);
property AmqpConnection^ Connection {
AmqpConnection^ get () { return connection; }
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
index 197ac632b0..88880c3721 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
@@ -32,7 +32,6 @@ private:
bool timedOut;
// has an owner thread
bool assigned;
- // can Run (i.e. earlier CompletionWaiters in the queue have completed)
System::Exception^ runException;
AsyncCallback^ asyncCallback;
Threading::Timer ^timer;
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
index cee394b05d..e12151d943 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -59,6 +59,12 @@ using namespace Apache::Qpid::AmqpTypes;
// with proposed changes to the native library to reduce the number of servicing
// threads for large numbers of subscriptions.
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
// The folowing def must match the "Frames" private typedef.
// TODO, make Qpid-cpp "Frames" definition visible.
@@ -94,6 +100,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
localQueuep = new LocalQueue;
SubscriptionSettings settings;
settings.flowControl = FlowControl::messageCredit(0);
+ settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
@@ -197,6 +205,7 @@ bool InputLink::haveMessage()
IntPtr InputLink::nextLocalMessage()
{
lock l(waiters);
+
if (disposed)
return (IntPtr) NULL;
@@ -279,8 +288,7 @@ bool InputLink::internalWaitForMessage()
if (haveMessage())
return true;
- // TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
- subscriptionp->grantMessageCredit(1);
+ AdjustCredit();
// get a scoped smart ptr ref to guard against async close or hangup
demuxQueuePtr = *queuePtrp;
@@ -350,7 +358,12 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) {
}
return;
}
+
waiters->RemoveAt(idx);
+ if (waiter->TimedOut) {
+ // may have to give back message if it arrives momentarily
+ AdjustCredit();
+ }
// let the next waiter know it's his turn.
if (waiters->Count > 0) {
@@ -411,6 +424,129 @@ void InputLink::sync()
}
+void InputLink::PrefetchLimit::set(int value)
+{
+ lock l(waiters);
+ prefetchLimit = value;
+
+ int delta = 0;
+
+ // rough rule of thumb to keep the flow, but reduce chatter.
+ // for small messages, the credit request is almost as expensive as the transfer itself.
+ // experience may suggest a better heuristic or require a property for the low water mark
+ if (prefetchLimit >= 3) {
+ delta = prefetchLimit / 3;
+ }
+ minWorkingCredit = prefetchLimit - delta;
+ AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+ if (creditSyncPending || disposed)
+ return;
+
+ // low watermark check
+ if ((prefetchLimit != 0) &&
+ (workingCredit >= minWorkingCredit) &&
+ (workingCredit >= waiters->Count))
+ return;
+
+ // should have enough for all waiters or to satisfy the prefetch window
+ int targetCredit = waiters->Count;
+ if (targetCredit < prefetchLimit)
+ targetCredit = prefetchLimit;
+
+ if (targetCredit > workingCredit) {
+ subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+ workingCredit = targetCredit;
+ return;
+ }
+ if (targetCredit < workingCredit) {
+ if ((targetCredit == 0) && (prefetchLimit == 0)) {
+ creditSyncPending = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+ }
+ // TODO: also shrink credit when prefetchLimit != 0
+ }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+ lock l(waiters);
+
+ try {
+ if (disposed)
+ return;
+
+ Completion comp;
+ if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+ // connection closed
+ return;
+ }
+
+ // get a private scoped copy to use outside the lock
+ Subscription s(*subscriptionp);
+
+ l.release();
+ // use setFlowControl to re-enable credit flow on the broker.
+ // previously used comp.wait() here, but setFlowControl is a sync operation
+ s.setFlowControl(s.getSettings().flowControl);
+ l.acquire();
+
+ if (disposed)
+ return;
+
+ // let existing waiters use up any
+ // local queue size can only decrease until more credit is issued
+ while (true) {
+ if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+ l.release();
+ // a rare use case and not used in performance oriented code.
+ // optimization can wait until the qpid/messaging api is used
+ Thread::Sleep(10);
+ l.acquire();
+ if (disposed)
+ return;
+ }
+ else {
+ break;
+ }
+ }
+
+ // At this point, the lock is held and we are fully synced with the broker
+ // so we have a valid snapshot
+
+ if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+ // can't be sure application will request a message again any time soon
+ QpidFrameSetPtr frameSetp;
+ while (!(*queuePtrp)->empty()) {
+ (*queuePtrp)->pop(frameSetp);
+ SequenceSet frameSetID(frameSetp->getId());
+ subscriptionp->release(frameSetID);
+ }
+
+ // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
+ // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+ // from a WaitForMessage().
+ }
+ // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+ workingCredit = (*queuePtrp)->size();
+ if (dequeuedFrameSetpp != NULL) {
+ workingCredit++;
+ }
+ }
+ finally {
+ creditSyncPending = false;
+ }
+
+ AdjustCredit();
+}
+
+
AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
{
QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
@@ -539,7 +675,15 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
// We have a message we can return to the caller.
// Tell the broker we got it.
- subscriptionp->accept(frameSetID);
+
+ // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+ // so do it within the AsyncSession directly
+ amqpSession->AcceptAndComplete(frameSetID);
+
+ workingCredit--;
+ // check if more messages need to be requested from broker
+ AdjustCredit();
+
return amqpMessage;
}
finally {
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
index 366780c137..f59a03a8c3 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
@@ -47,6 +47,14 @@ private:
bool finalizing;
QpidFrameSetPtr* dequeuedFrameSetpp;
ManualResetEvent^ asyncHelperWaitHandle;
+ // number of messages to buffer locally for future consumption
+ int prefetchLimit;
+ // the number of messages requested and not yet processed
+ int workingCredit;
+ // stopping and restarting the message flow
+ bool creditSyncPending;
+ // working credit low water mark
+ int minWorkingCredit;
void Cleanup();
void ReleaseNative();
@@ -54,6 +62,8 @@ private:
void addWaiter(MessageWaiter^ waiter);
void asyncHelper();
AmqpMessage^ createAmqpMessage(IntPtr msgp);
+ void AdjustCredit();
+ void SyncCredit(Object ^);
internal:
InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp,
@@ -80,6 +90,11 @@ public:
IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
bool EndWaitForMessage(IAsyncResult^ result);
+ property int PrefetchLimit {
+ int get () { return prefetchLimit; }
+ void set (int value);
+ }
+
};
}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
index 32f78c8344..484f6898fb 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -65,7 +65,7 @@
Name="VCCLCompilerTool"
AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
Optimization="0"
- AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="3"
UsePrecompiledHeader="0"
@@ -83,7 +83,7 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
AdditionalDependencies="$(NoInherit)"
OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
LinkIncremental="2"
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
index 3eb4919646..3737430844 100644
--- a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
@@ -66,7 +66,6 @@ private:
void Activate();
void WaitForCompletion();
-// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; }
property IntPtr Message {
IntPtr get () {
@@ -78,7 +77,6 @@ private:
GC::SuppressFinalize(this);
return v;
}
- // void set (IntPtr v) { message = v; }
}
property bool Assigned {
@@ -89,11 +87,11 @@ private:
bool get () { return timedOut; }
}
-
property System::Exception^ RunException {
System::Exception^ get() { return runException; }
}
+
public:
virtual property bool IsCompleted {
diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
index 4b83993257..a5eed8839b 100755
--- a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
+++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat
@@ -19,15 +19,15 @@ REM under the License.
set nunit_exe=%programfiles%\NUnit 2.5.1\bin\net-2.0\nunit-console.exe
-set qpid_dll_location=..\..\..\..\..\..\..\cpp\build\src\Debug
+set qpid_dll_location=%QPID_BUILD_ROOT%\src\Debug
set configuration_name=bin\Debug
set qcreate_location=..\..\..\..\..\..\tools\QCreate\Debug
-copy %qpid_dll_location%\qpidclient.dll %configuration_name%
-copy %qpid_dll_location%\qpidcommon.dll %configuration_name%
+copy %qpid_dll_location%\qpidclientd.dll %configuration_name%
+copy %qpid_dll_location%\qpidcommond.dll %configuration_name%
-copy %qpid_dll_location%\qpidclient.dll %qcreate_location%
-copy %qpid_dll_location%\qpidcommon.dll %qcreate_location%
+copy %qpid_dll_location%\qpidclientd.dll %qcreate_location%
+copy %qpid_dll_location%\qpidcommond.dll %qcreate_location%
%qcreate_location%\QCreate.exe amq.direct routing_key message_queue
diff --git a/qpid/wcf/tools/QCreate/QCreate.vcproj b/qpid/wcf/tools/QCreate/QCreate.vcproj
index e58077d78c..ba77952966 100644
--- a/qpid/wcf/tools/QCreate/QCreate.vcproj
+++ b/qpid/wcf/tools/QCreate/QCreate.vcproj
@@ -61,7 +61,7 @@
<Tool
Name="VCCLCompilerTool"
Optimization="0"
- AdditionalIncludeDirectories="&quot;$(BOOST_ROOT)\include\$(BOOST_VERSION)&quot;;&quot;$(BOOST_ROOT)\.&quot;;..\..\..\cpp\include;..\..\..\cpp\build\include"
+ AdditionalIncludeDirectories="&quot;$(BOOST_ROOT)\include\$(BOOST_VERSION)&quot;;&quot;$(BOOST_ROOT)\.&quot;;..\..\..\cpp\include;&quot;$(QPID_BUILD_ROOT)\include&quot;"
PreprocessorDefinitions="WIN32;_DEBUG;_CONSOLE"
MinimalRebuild="true"
BasicRuntimeChecks="3"
@@ -81,9 +81,9 @@
/>
<Tool
Name="VCLinkerTool"
- AdditionalDependencies="qpidcommon.lib qpidclient.lib"
+ AdditionalDependencies="qpidcommond.lib qpidclientd.lib"
LinkIncremental="2"
- AdditionalLibraryDirectories=".;&quot;$(BOOST_ROOT)\lib&quot;;..\..\..\cpp\build\src\Debug"
+ AdditionalLibraryDirectories=".;&quot;$(BOOST_ROOT)\lib&quot;;&quot;$(QPID_BUILD_ROOT)\src\Debug&quot;"
GenerateDebugInformation="true"
SubSystem="1"
TargetMachine="1"