summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp867
1 files changed, 867 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
new file mode 100644
index 0000000000..2b0119e338
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -0,0 +1,867 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+// Scalability note: When using async methods, an async helper thread is created
+// to block on the Demux BlockingQueue. This design should be revised in line
+// with proposed changes to the native library to reduce the number of servicing
+// threads for large numbers of subscriptions.
+
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
+
+// The folowing def must match the "Frames" private typedef.
+// TODO, make Qpid-cpp "Frames" definition visible.
+typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
+
+InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
+ qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp,
+ bool exclusive,
+ bool temporary, System::String^ filterKey, System::String^ exchange) :
+ amqpSession(session),
+ subscriptionp(NULL),
+ localQueuep(NULL),
+ queuePtrp(NULL),
+ dequeuedFrameSetpp(NULL),
+ disposed(false),
+ finalizing(false)
+{
+ bool success = false;
+ System::Exception^ linkException = nullptr;
+
+ waiters = gcnew Collections::Generic::List<MessageWaiter^>();
+ linkLock = waiters; // private and available
+ subscriptionLock = gcnew Object();
+ qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
+ qpidAddress->ResolveLink(session);
+ browsing = qpidAddress->Browsing;
+
+ try {
+ std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
+
+ if (temporary) {
+ qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
+ qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange),
+ arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey));
+ qpidSessionp->sync();
+ }
+
+ localQueuep = new LocalQueue;
+ SubscriptionSettings settings;
+ settings.flowControl = FlowControl::messageCredit(0);
+ settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
+ if (browsing) {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
+ }
+ else {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
+ }
+
+ Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
+ subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
+
+ // the roundabout way to obtain localQueuep->queue
+ SessionBase_0_10Access sa(*qpidSessionp);
+ boost::shared_ptr<SessionImpl> simpl = sa.get();
+ queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName()));
+
+ success = true;
+ } finally {
+ if (!success) {
+ Cleanup();
+ linkException = gcnew QpidException ("InputLink creation failure");
+ throw linkException;
+ }
+ }
+}
+
+// called with lock held
+void InputLink::ReleaseNative()
+{
+ // involves talking to the Broker unless the connection is broken
+
+ if ((subscriptionp != NULL) && !finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
+ try {
+ subscriptionp->cancel();
+ }
+ catch (const std::exception& error) {
+ // TODO: log this properly
+ std::cout << "shutdown error " << error.what() << std::endl;
+ }
+ }
+
+ // free native mem (or smart pointers) that we own
+ if (subscriptionp != NULL) {
+ delete subscriptionp;
+ subscriptionp = NULL;
+ }
+ if (queuePtrp != NULL) {
+ delete queuePtrp;
+ queuePtrp = NULL;
+ }
+ if (localQueuep != NULL) {
+ if (!finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
+ delete localQueuep;
+ localQueuep = NULL;
+ }
+ }
+ if (dequeuedFrameSetpp != NULL) {
+ delete dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ }
+}
+
+void InputLink::Cleanup()
+{
+ {
+ lock l(linkLock);
+ if (disposed)
+ return;
+
+ disposed = true;
+
+ // if the asyncHelper exists and is idle, unblock it
+ if (asyncHelperWaitHandle != nullptr) {
+ asyncHelperWaitHandle->Set();
+ }
+
+ // wakeup anyone waiting for messages
+ if (queuePtrp != NULL)
+ (*queuePtrp)->close();
+
+ // wait for any sync operations on the subscription to complete before ReleaseNative
+ lock l2(subscriptionLock);
+
+ try {}
+ finally
+ {
+ ReleaseNative();
+ }
+ }
+
+ // Now that subscription is torn down, we can execute pending delete on remote node
+ qpidAddress->CleanupLink(amqpSession);
+ amqpSession->NotifyClosed();
+}
+
+InputLink::~InputLink()
+{
+ Cleanup();
+}
+
+InputLink::!InputLink()
+{
+ finalizing = true;
+ Cleanup();
+}
+
+void InputLink::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+// call with lock held
+bool InputLink::haveMessage()
+{
+ if (dequeuedFrameSetpp != NULL)
+ return true;
+
+ if (queuePtrp != NULL) {
+ if ((*queuePtrp)->size() > 0)
+ return true;
+ }
+ return false;
+}
+
+IntPtr InputLink::nextLocalMessage()
+{
+ lock l(linkLock);
+
+ if (disposed)
+ return (IntPtr) NULL;
+
+ // A message already pulled off BlockingQueue?
+ if (dequeuedFrameSetpp != NULL) {
+ QpidFrameSetPtr* rv = dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ return (IntPtr) rv;
+ }
+
+ if ((*queuePtrp)->empty())
+ return (IntPtr) NULL;
+
+ bool received = false;
+ QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr;
+
+ try {
+ received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
+ if (received) {
+ QpidFrameSetPtr* rv = frameSetpp;
+ // no need to free native in finally block
+ frameSetpp = NULL;
+ return (IntPtr) rv;
+ }
+ } catch(const std::exception& error) {
+ // should be no async tampering with queue since we hold the lock and have a
+ // smart pointer ref to the native LocalQueue, even if the network connection fails...
+ cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl;
+ // TODO: log this
+ }
+ finally {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ }
+ }
+
+ return (IntPtr) NULL;
+}
+
+
+
+void InputLink::unblockWaiter()
+{
+ // to be followed by resetQueue() below
+ lock l(linkLock);
+ if (disposed)
+ return;
+ (*queuePtrp)->close();
+}
+
+
+
+// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks
+// a blocking thread without interefering with queue contents or the ability to push
+// new incoming messages.
+
+void InputLink::resetQueue()
+{
+ lock l(linkLock);
+ if (disposed)
+ return;
+ if ((*queuePtrp)->isClosed()) {
+ (*queuePtrp)->open();
+ }
+}
+
+
+// returns true if there is a message to consume, i.e. nextLocalMessage() won't block
+
+bool InputLink::internalWaitForMessage()
+{
+ Demux::QueuePtr demuxQueuePtr;
+
+ bool received = false;
+ QpidFrameSetPtr* frameSetpp = NULL;
+ try {
+ lock l(linkLock);
+ if (disposed)
+ return false;
+ if (haveMessage())
+ return true;
+
+ AdjustCredit();
+
+ // get a scoped smart ptr ref to guard against async close or hangup
+ demuxQueuePtr = *queuePtrp;
+ frameSetpp = new QpidFrameSetPtr;
+
+ l.release();
+ // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired.
+ received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
+ l.acquire();
+
+ if (received) {
+ dequeuedFrameSetpp = frameSetpp;
+ frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream
+ }
+
+ return true;
+ } catch(const std::exception& ) {
+ // timeout or connection closed
+ return false;
+ }
+ finally {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ }
+ }
+
+ return false;
+}
+
+
+// call with lock held
+void InputLink::addWaiter(MessageWaiter^ waiter)
+{
+ waiters->Add(waiter);
+ if (waiters->Count == 1) {
+ // mark this waiter as ready to run
+ // Only the waiter at the head of the queue is active.
+ waiter->Activate();
+ }
+
+ if (waiter->Assigned)
+ return;
+
+ if (asyncHelperWaitHandle == nullptr) {
+ asyncHelperWaitHandle = gcnew ManualResetEvent(false);
+ ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper);
+ (gcnew Thread(threadDelegate))->Start();
+ }
+
+ if (waiters->Count == 1) {
+ // wake up the asyncHelper
+ asyncHelperWaitHandle->Set();
+ }
+}
+
+
+void InputLink::removeWaiter(MessageWaiter^ waiter) {
+ // a waiter can be removed from anywhere in the list if timed out
+
+ lock l(linkLock);
+ int idx = waiters->IndexOf(waiter);
+ if (idx == -1) {
+ // TODO: assert or log
+ if (asyncHelperWaitHandle != nullptr) {
+ // just in case.
+ asyncHelperWaitHandle->Set();
+ }
+ return;
+ }
+
+ waiters->RemoveAt(idx);
+ if (waiter->TimedOut) {
+ // may have to give back message if it arrives momentarily
+ AdjustCredit();
+ }
+
+ // let the next waiter know it's his turn.
+ if (waiters->Count > 0) {
+ MessageWaiter^ nextWaiter = waiters[0];
+
+ // wakeup the asyncHelper thread to help out if necessary.
+ if (!nextWaiter->Assigned) {
+ asyncHelperWaitHandle->Set();
+ }
+
+ l.release();
+ nextWaiter->Activate();
+ return;
+ }
+ else {
+ if (disposed && (asyncHelperWaitHandle != nullptr)) {
+ asyncHelperWaitHandle->Set();
+ }
+ }
+}
+
+
+void InputLink::asyncHelper()
+{
+ lock l(linkLock);
+
+ while (true) {
+ if (disposed && (waiters->Count == 0)) {
+ asyncHelperWaitHandle = nullptr;
+ return;
+ }
+
+ if (waiters->Count > 0) {
+ MessageWaiter^ waiter = waiters[0];
+
+ l.release();
+ if (waiter->AcceptForWork()) {
+ waiter->Run();
+ }
+ l.acquire();
+ }
+
+ // sleep if more work may be coming or it is currently someone else's turn
+ if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) {
+ // wait for something to do
+ asyncHelperWaitHandle->Reset();
+ l.release();
+ asyncHelperWaitHandle->WaitOne();
+ l.acquire();
+ }
+ }
+}
+
+void InputLink::sync()
+{
+ // used by the MessageWaiter timeout thread to not run before fully initialized
+ lock l(linkLock);
+}
+
+
+void InputLink::PrefetchLimit::set(int value)
+{
+ lock l(linkLock);
+ prefetchLimit = value;
+
+ int delta = 0;
+
+ // rough rule of thumb to keep the flow, but reduce chatter.
+ // for small messages, the credit request is almost as expensive as the transfer itself.
+ // experience may suggest a better heuristic or require a property for the low water mark
+ if (prefetchLimit >= 3) {
+ delta = prefetchLimit / 3;
+ }
+ minWorkingCredit = prefetchLimit - delta;
+ AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+ if (creditSyncPending || disposed)
+ return;
+
+ // low watermark check
+ if ((prefetchLimit != 0) &&
+ (workingCredit >= minWorkingCredit) &&
+ (workingCredit >= waiters->Count))
+ return;
+
+ // should have enough for all waiters or to satisfy the prefetch window
+ int targetCredit = waiters->Count;
+ if (targetCredit < prefetchLimit)
+ targetCredit = prefetchLimit;
+
+ if (targetCredit > workingCredit) {
+ subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+ workingCredit = targetCredit;
+ return;
+ }
+ if (targetCredit < workingCredit) {
+ if ((targetCredit == 0) && (prefetchLimit == 0)) {
+ creditSyncPending = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+ }
+ // TODO: also shrink credit when prefetchLimit != 0
+ }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+ lock l(linkLock);
+
+ try {
+ if (disposed)
+ return;
+
+ if (!amqpSession->MessageStop(subscriptionp->getName())) {
+ // connection closed
+ return;
+ }
+
+ l.release();
+ // use setFlowControl to re-enable credit flow on the broker.
+ // setFlowControl is a sync operation
+ {
+ lock l2(subscriptionLock);
+ if (subscriptionp != NULL) {
+ subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl);
+ }
+ }
+ l.acquire();
+
+ if (disposed)
+ return;
+
+ // let existing waiters use up any messages that arrived.
+ // local queue size can only decrease until more credit is issued
+ while (true) {
+ if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+ l.release();
+ // a rare use case and not used in performance oriented code.
+ // optimization can wait until the qpid/messaging api is used
+ Thread::Sleep(10);
+ l.acquire();
+ if (disposed)
+ return;
+ }
+ else {
+ break;
+ }
+ }
+
+ // At this point, the lock is held and we are fully synced with the broker
+ // so we have a valid snapshot
+
+ if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+ // can't be sure application will request a message again any time soon
+ QpidFrameSetPtr frameSetp;
+ while (!(*queuePtrp)->empty()) {
+ (*queuePtrp)->pop(frameSetp);
+ SequenceSet frameSetID(frameSetp->getId());
+ subscriptionp->release(frameSetID);
+ }
+
+ // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
+ // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+ // from a WaitForMessage().
+ }
+ // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+ workingCredit = (*queuePtrp)->size();
+ if (dequeuedFrameSetpp != NULL) {
+ workingCredit++;
+ }
+ }
+ finally {
+ creditSyncPending = false;
+ }
+
+ AdjustCredit();
+}
+
+
+AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
+{
+ QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
+ bool ownFrameSet = true;
+ bool haveProperties = false;
+
+ try {
+ MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp);
+ ownFrameSet = false; // stream releases on close/dispose
+
+ AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream);
+
+ AMQHeaderBody* headerBodyp = (*fspp)->getHeaders();
+ uint64_t contentSize = (*fspp)->getContentSize();
+ SequenceSet frameSetID((*fspp)->getId());
+
+ // target managed representation
+ AmqpProperties^ amqpProperties = gcnew AmqpProperties();
+
+ // source native representation
+ const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>();
+ const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>();
+
+ if (deliveryProperties) {
+ if (deliveryProperties->hasRoutingKey()) {
+ haveProperties = true;
+
+ amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str());
+ }
+
+ if (deliveryProperties->hasDeliveryMode()) {
+ if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT)
+ amqpProperties->Durable = true;
+ }
+
+ if (deliveryProperties->hasTtl()) {
+ long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond;
+ amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks));
+ }
+ }
+
+ if (messageProperties) {
+
+ if (messageProperties->hasReplyTo()) {
+ haveProperties = true;
+ const ReplyTo& rpto = messageProperties->getReplyTo();
+ String^ rk = nullptr;
+ String^ ex = nullptr;
+ if (rpto.hasRoutingKey()) {
+ rk = gcnew String(rpto.getRoutingKey().c_str());
+ }
+ if (rpto.hasExchange()) {
+ ex = gcnew String(rpto.getExchange().c_str());
+ }
+ amqpProperties->SetReplyTo(ex,rk);
+ }
+
+ if (messageProperties->hasContentType()) {
+ haveProperties = true;
+ amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str());
+
+ if (messageProperties->hasContentEncoding()) {
+ String^ enc = gcnew String(messageProperties->getContentEncoding().c_str());
+ if (!String::IsNullOrEmpty(enc)) {
+ // TODO: properly assemble 1.0 style to 0-10 for all cases
+ amqpProperties->ContentType += "; charset=" + enc;
+ }
+ }
+ }
+
+ if (messageProperties->hasCorrelationId()) {
+ haveProperties = true;
+ const std::string& ncid = messageProperties->getCorrelationId();
+ int len = ncid.size();
+ array<unsigned char>^ mcid = gcnew array<unsigned char>(len);
+ Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len);
+ amqpProperties->CorrelationId = mcid;
+ }
+
+ if (messageProperties->hasUserId()) {
+ haveProperties = true;
+ const std::string& nuid = messageProperties->getUserId();
+ int len = nuid.size();
+ array<unsigned char>^ muid = gcnew array<unsigned char>(len);
+ Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len);
+ amqpProperties->UserId = muid;
+ }
+
+ if (messageProperties->hasApplicationHeaders()) {
+ haveProperties = true;
+ const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders();
+ int count = fieldTable.count();
+
+ if (count > 0) {
+ haveProperties = true;
+ Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap =
+ gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count);
+
+ for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) {
+
+ qpid::framing::FieldValue::Data &data = i->second->getData();
+
+ // TODO: replace these generic int/string conversions with handler for each AMQP specific type:
+ // uint8_t dataType = i->second->getType();
+ // switch (dataType) { case TYPE_CODE_STR8: ... }
+
+ if (data.convertsToInt()) {
+ mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt()));
+ }
+ if (data.convertsToString()) {
+ std::string ns = data.getString();
+ String^ ms = gcnew String(ns.data(), 0, ns.size());
+ mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms));
+ }
+ }
+
+ amqpProperties->PropertyMap = mmap;
+ }
+
+ }
+ }
+
+ if (haveProperties) {
+ amqpMessage->Properties = amqpProperties;
+ }
+
+ // We have a message we can return to the caller.
+ // Tell the broker we got it.
+
+ // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+ // so do it within the AsyncSession directly
+ amqpSession->AcceptAndComplete(frameSetID, browsing);
+
+ workingCredit--;
+ // check if more messages need to be requested from broker
+ AdjustCredit();
+
+ return amqpMessage;
+ }
+ finally {
+ if (ownFrameSet)
+ delete (fspp);
+ }
+}
+
+ // As for IInputChannel:
+ // if success, return true + amqpMessage
+ // elseif timeout, return false
+ // elseif closed/EOF, return true and amqpMessage = null
+ // else throw an Exception
+
+bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
+{
+ lock l(linkLock);
+
+ if (waiters->Count == 0) {
+ // see if there is a message already available without blocking
+ IntPtr fspp = nextLocalMessage();
+ if (fspp.ToPointer() != NULL) {
+ amqpMessage = createAmqpMessage(fspp);
+ return true;
+ }
+ }
+
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr);
+ addWaiter(waiter);
+
+ l.release();
+ waiter->Run();
+ l.acquire();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ IntPtr waiterMsg = waiter->Message;
+ if (waiterMsg.ToPointer() == NULL) {
+ if (disposed) {
+ // indicate normal EOF on channel
+ amqpMessage = nullptr;
+ return true;
+ }
+ }
+
+ amqpMessage = createAmqpMessage(waiterMsg);
+ return true;
+}
+
+IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+
+ //TODO: if haveMessage() complete synchronously
+
+ lock l(linkLock);
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
+ addWaiter(waiter);
+ return waiter;
+}
+
+bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage)
+{
+
+ // TODO: validate result
+
+ MessageWaiter^ waiter = (MessageWaiter ^) result;
+
+ waiter->WaitForCompletion();
+
+ if (waiter->RunException != nullptr)
+ throw waiter->RunException;
+
+ if (waiter->TimedOut) {
+ amqpMessage = nullptr;
+ return false;
+ }
+
+ IntPtr waiterMsg = waiter->Message;
+ if (waiterMsg.ToPointer() == NULL) {
+ if (disposed) {
+ // indicate normal EOF on channel
+ amqpMessage = nullptr;
+ return true;
+ }
+ }
+
+ amqpMessage = createAmqpMessage(waiterMsg);
+ return true;
+}
+
+
+bool InputLink::WaitForMessage(TimeSpan timeout)
+{
+ lock l(linkLock);
+
+ if (disposed)
+ return false;
+
+ if (waiters->Count == 0) {
+ // see if there is a message already available without blocking
+ if (haveMessage())
+ return true;
+ }
+
+ // Same as for TryReceive, except consuming = false
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr);
+ addWaiter(waiter);
+
+ l.release();
+ waiter->Run();
+ l.acquire();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ return haveMessage();
+}
+
+IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+ lock l(linkLock);
+
+ // Same as for BeginTryReceive, except consuming = false
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
+ addWaiter(waiter);
+ return waiter;
+}
+
+bool InputLink::EndWaitForMessage(IAsyncResult^ result)
+{
+ MessageWaiter^ waiter = (MessageWaiter ^) result;
+
+ waiter->WaitForCompletion();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ return haveMessage();
+}
+
+
+}}} // namespace Apache::Qpid::Interop