diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid')
14 files changed, 231 insertions, 24 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs index b952faf9e5..b0b71c87f3 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs @@ -64,6 +64,12 @@ namespace Apache.Qpid.Channel set { transport.BrokerPort = value; } } + public int PrefetchLimit + { + get { return transport.PrefetchLimit; } + set { transport.PrefetchLimit = value; } + } + public bool Shared { get { return transport.Shared; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs index 3ec62e809d..554824cea9 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs @@ -63,6 +63,13 @@ namespace Apache.Qpid.Channel set { brokerPort = value; } } + [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)] + public int PrefetchLimit + { + get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; } + set { base[AmqpConfigurationStrings.PrefetchLimit] = value; } + } + [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)] public bool Shared { @@ -95,6 +102,8 @@ namespace Apache.Qpid.Channel get { ConfigurationPropertyCollection properties = base.Properties; + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, + typeof(int), 0, null, null, ConfigurationPropertyOptions.None)); properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared, typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode, @@ -112,6 +121,7 @@ namespace Apache.Qpid.Channel this.BrokerPort = amqpBinding.BrokerPort; this.TransferMode = amqpBinding.TransferMode; this.Shared = amqpBinding.Shared; + this.PrefetchLimit = amqpBinding.PrefetchLimit; AmqpProperties props = amqpBinding.DefaultMessageProperties; } @@ -133,6 +143,7 @@ namespace Apache.Qpid.Channel amqpBinding.BrokerPort = this.BrokerPort; amqpBinding.TransferMode = this.TransferMode; amqpBinding.Shared = this.Shared; + amqpBinding.PrefetchLimit = this.PrefetchLimit; } protected override void PostDeserialize() diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs index b8e2811527..542f1a00a8 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel AmqpChannelProperties channelProperties; long maxBufferPoolSize; bool shared; + int prefetchLimit; internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) : base(context.Binding) @@ -39,6 +40,7 @@ namespace Apache.Qpid.Channel this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); @@ -91,7 +93,7 @@ namespace Apache.Qpid.Channel protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) { - return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared); + return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs index f1de30406a..0853b3d6f3 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs @@ -46,6 +46,7 @@ namespace Apache.Qpid.Channel public const string TransferMode = "transferMode"; public const string Brokers = "brokers"; public const string Shared = "shared"; + public const string PrefetchLimit = "prefetchLimit"; public const string MaxBufferPoolSize = "maxBufferPoolSize"; public const string MaxReceivedMessageSize = "maxReceivedMessageSize"; } @@ -55,7 +56,6 @@ namespace Apache.Qpid.Channel internal const string BrokerHost = "localhost"; internal const int BrokerPort = 5672; internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered; - internal const byte Priority = 4; internal const long MaxBufferPoolSize = 64 * 1024; internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs index 44fecdaf62..8894b68584 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel AmqpTransportBindingElement bindingElement; AmqpChannelProperties channelProperties; bool shared; + int prefetchLimit; long maxBufferPoolSize; Uri uri; AmqpTransportChannel amqpTransportChannel; @@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; @@ -132,7 +134,7 @@ namespace Apache.Qpid.Channel { amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, new EndpointAddress(uri), messageEncoderFactory.Encoder, - maxBufferPoolSize, this.shared); + maxBufferPoolSize, this.shared, this.prefetchLimit); return (IInputChannel)(object) amqpTransportChannel; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs index f23b8072e9..08c565af18 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -29,6 +29,7 @@ namespace Apache.Qpid.Channel { AmqpChannelProperties channelProperties; bool shared; + int prefetchLimit; public AmqpTransportBindingElement() { @@ -41,6 +42,7 @@ namespace Apache.Qpid.Channel { this.channelProperties = other.channelProperties.Clone(); this.shared = other.shared; + this.prefetchLimit = other.prefetchLimit; } public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) @@ -98,6 +100,12 @@ namespace Apache.Qpid.Channel set { this.channelProperties.BrokerPort = value; } } + public int PrefetchLimit + { + get { return this.prefetchLimit; } + set { this.prefetchLimit = value; } + } + public bool Shared { get { return this.shared; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs index ca9c10be69..5924142046 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -50,6 +50,7 @@ namespace Apache.Qpid.Channel private MessageEncoder encoder; private AmqpChannelProperties factoryChannelProperties; private bool shared; + private int prefetchLimit; private string encoderContentType; // input = 0-10 queue, output = 0-10 exchange @@ -68,7 +69,7 @@ namespace Apache.Qpid.Channel private AsyncTimeSpanCaller asyncOpenCaller; private AsyncTimeSpanCaller asyncCloseCaller; - internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection) + internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit) : base(factory) { this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>); @@ -80,6 +81,7 @@ namespace Apache.Qpid.Channel this.factoryChannelProperties = channelProperties; this.shared = sharedConnection; + this.prefetchLimit = prefetchLimit; this.remoteAddress = remoteAddress; // pull out host, port, queue, and connection arguments @@ -128,6 +130,7 @@ namespace Apache.Qpid.Channel if (this.isInputChannel) { this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.inputLink.PrefetchLimit = this.prefetchLimit; } else { @@ -287,7 +290,7 @@ namespace Apache.Qpid.Channel return false; } - + public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.inputLink.BeginTryReceive(timeout, callback, state); @@ -464,7 +467,7 @@ namespace Apache.Qpid.Channel } return amqpMessage; } - + private Message QpidToWcf(AmqpMessage amqpMessage) { @@ -531,7 +534,7 @@ namespace Apache.Qpid.Channel { this.bufferManager.ReturnBuffer(managedBuffer); } - } + } return wcfMessage; } diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp index bab73da74e..425a592509 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp @@ -63,11 +63,12 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon sessionp = new qpid::client::AsyncSession; *sessionp = qpidConnectionp->newSession(); subs_mgrp = new SubscriptionManager (*sessionp); - success = true; waiters = gcnew Collections::Generic::List<CompletionWaiter^>(); + success = true; } finally { if (!success) { Cleanup(); + // TODO: include inner exception information throw gcnew QpidException ("session creation failure"); } } @@ -76,12 +77,6 @@ AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidCon void AmqpSession::Cleanup() { - if (subscriptionp != NULL) { - subscriptionp->cancel(); - delete subscriptionp; - subscriptionp=NULL; - } - if (subs_mgrp != NULL) { subs_mgrp->stop(); delete subs_mgrp; @@ -112,6 +107,7 @@ void AmqpSession::Cleanup() void AmqpSession::ConnectionClosed() { + lock l(waiters); Cleanup(); } @@ -283,5 +279,27 @@ void AmqpSession::asyncHelper(Object ^unused) } } +bool AmqpSession::MessageStop(Completion &comp, std::string &name) +{ + lock l(waiters); + + if (sessionp == NULL) + return false; + + comp = sessionp->messageStop(name, true); + return true; +} + +void AmqpSession::AcceptAndComplete(SequenceSet& transfers) +{ + lock l(waiters); + + if (sessionp == NULL) + throw gcnew ObjectDisposedException("Accept"); + + sessionp->markCompleted(transfers, false); + sessionp->messageAccept(transfers, false); +} + }}} // namespace Apache::Qpid::Cli diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h index b959a4123a..8306cdf720 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h @@ -44,7 +44,6 @@ private: AsyncSession* sessionp; SessionImpl* sessionImplp; SubscriptionManager* subs_mgrp; - Subscription* subscriptionp; LocalQueue* localQueuep; Collections::Generic::List<CompletionWaiter^>^ waiters; bool helperRunning; @@ -69,6 +68,8 @@ internal: void ConnectionClosed(); void internalWaitForCompletion(IntPtr Future); void removeWaiter(CompletionWaiter^ waiter); + bool MessageStop(Completion &comp, std::string &name); + void AcceptAndComplete(SequenceSet& transfers); property AmqpConnection^ Connection { AmqpConnection^ get () { return connection; } diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h index 197ac632b0..88880c3721 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h @@ -32,7 +32,6 @@ private: bool timedOut; // has an owner thread bool assigned; - // can Run (i.e. earlier CompletionWaiters in the queue have completed) System::Exception^ runException; AsyncCallback^ asyncCallback; Threading::Timer ^timer; diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp index cee394b05d..e12151d943 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -59,6 +59,12 @@ using namespace Apache::Qpid::AmqpTypes; // with proposed changes to the native library to reduce the number of servicing // threads for large numbers of subscriptions. +// synchronization is accomplished with locks, but also by ensuring that only one +// MessageWaiter (the one at the front of the line) is ever active. +// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch +// thread (who deposits FrameSets into the local queue and is oblivious to the +// managed space locks). + // The folowing def must match the "Frames" private typedef. // TODO, make Qpid-cpp "Frames" definition visible. @@ -94,6 +100,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, localQueuep = new LocalQueue; SubscriptionSettings settings; settings.flowControl = FlowControl::messageCredit(0); + settings.completionMode = CompletionMode::MANUAL_COMPLETION; + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup @@ -197,6 +205,7 @@ bool InputLink::haveMessage() IntPtr InputLink::nextLocalMessage() { lock l(waiters); + if (disposed) return (IntPtr) NULL; @@ -279,8 +288,7 @@ bool InputLink::internalWaitForMessage() if (haveMessage()) return true; - // TODO: prefetch window of messages, compatible with both 0-10 and 1.0. - subscriptionp->grantMessageCredit(1); + AdjustCredit(); // get a scoped smart ptr ref to guard against async close or hangup demuxQueuePtr = *queuePtrp; @@ -350,7 +358,12 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) { } return; } + waiters->RemoveAt(idx); + if (waiter->TimedOut) { + // may have to give back message if it arrives momentarily + AdjustCredit(); + } // let the next waiter know it's his turn. if (waiters->Count > 0) { @@ -411,6 +424,129 @@ void InputLink::sync() } +void InputLink::PrefetchLimit::set(int value) +{ + lock l(waiters); + prefetchLimit = value; + + int delta = 0; + + // rough rule of thumb to keep the flow, but reduce chatter. + // for small messages, the credit request is almost as expensive as the transfer itself. + // experience may suggest a better heuristic or require a property for the low water mark + if (prefetchLimit >= 3) { + delta = prefetchLimit / 3; + } + minWorkingCredit = prefetchLimit - delta; + AdjustCredit(); +} + + +// call with lock held +void InputLink::AdjustCredit() +{ + if (creditSyncPending || disposed) + return; + + // low watermark check + if ((prefetchLimit != 0) && + (workingCredit >= minWorkingCredit) && + (workingCredit >= waiters->Count)) + return; + + // should have enough for all waiters or to satisfy the prefetch window + int targetCredit = waiters->Count; + if (targetCredit < prefetchLimit) + targetCredit = prefetchLimit; + + if (targetCredit > workingCredit) { + subscriptionp->grantMessageCredit(targetCredit - workingCredit); + workingCredit = targetCredit; + return; + } + if (targetCredit < workingCredit) { + if ((targetCredit == 0) && (prefetchLimit == 0)) { + creditSyncPending = true; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); + } + // TODO: also shrink credit when prefetchLimit != 0 + } +} + +void InputLink::SyncCredit(Object ^unused) +{ + lock l(waiters); + + try { + if (disposed) + return; + + Completion comp; + if (!amqpSession->MessageStop(comp, subscriptionp->getName())) { + // connection closed + return; + } + + // get a private scoped copy to use outside the lock + Subscription s(*subscriptionp); + + l.release(); + // use setFlowControl to re-enable credit flow on the broker. + // previously used comp.wait() here, but setFlowControl is a sync operation + s.setFlowControl(s.getSettings().flowControl); + l.acquire(); + + if (disposed) + return; + + // let existing waiters use up any + // local queue size can only decrease until more credit is issued + while (true) { + if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { + l.release(); + // a rare use case and not used in performance oriented code. + // optimization can wait until the qpid/messaging api is used + Thread::Sleep(10); + l.acquire(); + if (disposed) + return; + } + else { + break; + } + } + + // At this point, the lock is held and we are fully synced with the broker + // so we have a valid snapshot + + if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { + // can't be sure application will request a message again any time soon + QpidFrameSetPtr frameSetp; + while (!(*queuePtrp)->empty()) { + (*queuePtrp)->pop(frameSetp); + SequenceSet frameSetID(frameSetp->getId()); + subscriptionp->release(frameSetID); + } + + // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a + // MessageWaiter about to to get the nextLocalMessage(), or implicitely + // from a WaitForMessage(). + } + // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit + + workingCredit = (*queuePtrp)->size(); + if (dequeuedFrameSetpp != NULL) { + workingCredit++; + } + } + finally { + creditSyncPending = false; + } + + AdjustCredit(); +} + + AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) { QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); @@ -539,7 +675,15 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) // We have a message we can return to the caller. // Tell the broker we got it. - subscriptionp->accept(frameSetID); + + // subscriptionp->accept(frameSetID) is a slow sync operation in the native API + // so do it within the AsyncSession directly + amqpSession->AcceptAndComplete(frameSetID); + + workingCredit--; + // check if more messages need to be requested from broker + AdjustCredit(); + return amqpMessage; } finally { diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h index 366780c137..f59a03a8c3 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h @@ -47,6 +47,14 @@ private: bool finalizing; QpidFrameSetPtr* dequeuedFrameSetpp; ManualResetEvent^ asyncHelperWaitHandle; + // number of messages to buffer locally for future consumption + int prefetchLimit; + // the number of messages requested and not yet processed + int workingCredit; + // stopping and restarting the message flow + bool creditSyncPending; + // working credit low water mark + int minWorkingCredit; void Cleanup(); void ReleaseNative(); @@ -54,6 +62,8 @@ private: void addWaiter(MessageWaiter^ waiter); void asyncHelper(); AmqpMessage^ createAmqpMessage(IntPtr msgp); + void AdjustCredit(); + void SyncCredit(Object ^); internal: InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp, @@ -80,6 +90,11 @@ public: IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state); bool EndWaitForMessage(IAsyncResult^ result); + property int PrefetchLimit { + int get () { return prefetchLimit; } + void set (int value); + } + }; }}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj index 32f78c8344..484f6898fb 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj +++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj @@ -65,7 +65,7 @@ Name="VCCLCompilerTool"
AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
Optimization="0"
- AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;"$(BOOST_ROOT)""
+ AdditionalIncludeDirectories=""$(QPID_BUILD_ROOT)\include";"$(QPID_BUILD_ROOT)\src";..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;"$(BOOST_ROOT)""
PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
RuntimeLibrary="3"
UsePrecompiledHeader="0"
@@ -83,7 +83,7 @@ />
<Tool
Name="VCLinkerTool"
- AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalOptions="$(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
AdditionalDependencies="$(NoInherit)"
OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
LinkIncremental="2"
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h index 3eb4919646..3737430844 100644 --- a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h @@ -66,7 +66,6 @@ private: void Activate(); void WaitForCompletion(); -// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; } property IntPtr Message { IntPtr get () { @@ -78,7 +77,6 @@ private: GC::SuppressFinalize(this); return v; } - // void set (IntPtr v) { message = v; } } property bool Assigned { @@ -89,11 +87,11 @@ private: bool get () { return timedOut; } } - property System::Exception^ RunException { System::Exception^ get() { return runException; } } + public: virtual property bool IsCompleted { |