summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp868
1 files changed, 0 insertions, 868 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
deleted file mode 100644
index f8189df0dd..0000000000
--- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ /dev/null
@@ -1,868 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-#include <windows.h>
-#include <msclr\lock.h>
-
-#include "qpid/client/AsyncSession.h"
-#include "qpid/framing/FieldValue.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/client/Demux.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-
-#include "MessageBodyStream.h"
-#include "AmqpMessage.h"
-#include "AmqpSession.h"
-#include "InputLink.h"
-#include "QpidMarshal.h"
-#include "QpidException.h"
-
-namespace Apache {
-namespace Qpid {
-namespace Interop {
-
-
-using namespace System;
-using namespace System::Runtime::InteropServices;
-using namespace System::Threading;
-using namespace msclr;
-
-using namespace qpid::client;
-using namespace qpid::framing;
-
-using namespace std;
-
-using namespace Apache::Qpid::AmqpTypes;
-
-// Scalability note: When using async methods, an async helper thread is created
-// to block on the Demux BlockingQueue. This design should be revised in line
-// with proposed changes to the native library to reduce the number of servicing
-// threads for large numbers of subscriptions.
-
-// synchronization is accomplished with locks, but also by ensuring that only one
-// MessageWaiter (the one at the front of the line) is ever active.
-// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
-// thread (who deposits FrameSets into the local queue and is oblivious to the
-// managed space locks).
-
-
-// The folowing def must match the "Frames" private typedef.
-// TODO, make Qpid-cpp "Frames" definition visible.
-typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
-
-InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
- qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp,
- bool exclusive,
- bool temporary, System::String^ filterKey, System::String^ exchange) :
- amqpSession(session),
- subscriptionp(NULL),
- localQueuep(NULL),
- queuePtrp(NULL),
- dequeuedFrameSetpp(NULL),
- disposed(false),
- finalizing(false)
-{
- bool success = false;
- System::Exception^ linkException = nullptr;
-
- waiters = gcnew Collections::Generic::List<MessageWaiter^>();
- linkLock = waiters; // private and available
- subscriptionLock = gcnew Object();
- qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
- qpidAddress->ResolveLink(session);
- browsing = qpidAddress->Browsing;
-
- try {
- std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
-
- if (temporary) {
- qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
- qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange),
- arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey));
- qpidSessionp->sync();
- }
-
- localQueuep = new LocalQueue;
- SubscriptionSettings settings;
- settings.flowControl = FlowControl::messageCredit(0);
- settings.completionMode = CompletionMode::MANUAL_COMPLETION;
-
- if (browsing) {
- settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
- settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
- }
- else {
- settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
- settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
- }
-
- Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
- subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
-
- // the roundabout way to obtain localQueuep->queue
- SessionBase_0_10Access sa(*qpidSessionp);
- boost::shared_ptr<SessionImpl> simpl = sa.get();
- queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName()));
-
- success = true;
- } finally {
- if (!success) {
- Cleanup();
- linkException = gcnew QpidException ("InputLink creation failure");
- throw linkException;
- }
- }
-}
-
-// called with lock held
-void InputLink::ReleaseNative()
-{
- // involves talking to the Broker unless the connection is broken
-
- if ((subscriptionp != NULL) && !finalizing) {
- // TODO: find boost time error on cleanup when in finalizer thread
- try {
- subscriptionp->cancel();
- }
- catch (const std::exception& error) {
- // TODO: log this properly
- std::cout << "shutdown error " << error.what() << std::endl;
- }
- }
-
- // free native mem (or smart pointers) that we own
- if (subscriptionp != NULL) {
- delete subscriptionp;
- subscriptionp = NULL;
- }
- if (queuePtrp != NULL) {
- delete queuePtrp;
- queuePtrp = NULL;
- }
- if (localQueuep != NULL) {
- if (!finalizing) {
- // TODO: find boost time error on cleanup when in finalizer thread
- delete localQueuep;
- localQueuep = NULL;
- }
- }
- if (dequeuedFrameSetpp != NULL) {
- delete dequeuedFrameSetpp;
- dequeuedFrameSetpp = NULL;
- }
-}
-
-void InputLink::Cleanup()
-{
- {
- lock l(linkLock);
- if (disposed)
- return;
-
- disposed = true;
-
- // if the asyncHelper exists and is idle, unblock it
- if (asyncHelperWaitHandle != nullptr) {
- asyncHelperWaitHandle->Set();
- }
-
- // wakeup anyone waiting for messages
- if (queuePtrp != NULL)
- (*queuePtrp)->close();
-
- // wait for any sync operations on the subscription to complete before ReleaseNative
- lock l2(subscriptionLock);
-
- try {}
- finally
- {
- ReleaseNative();
- }
- }
-
- // Now that subscription is torn down, we can execute pending delete on remote node
- qpidAddress->CleanupLink(amqpSession);
- amqpSession->NotifyClosed();
-}
-
-InputLink::~InputLink()
-{
- Cleanup();
-}
-
-InputLink::!InputLink()
-{
- finalizing = true;
- Cleanup();
-}
-
-void InputLink::Close()
-{
- // Simulate Dispose()...
- Cleanup();
- GC::SuppressFinalize(this);
-}
-
-// call with lock held
-bool InputLink::haveMessage()
-{
- if (dequeuedFrameSetpp != NULL)
- return true;
-
- if (queuePtrp != NULL) {
- if ((*queuePtrp)->size() > 0)
- return true;
- }
- return false;
-}
-
-IntPtr InputLink::nextLocalMessage()
-{
- lock l(linkLock);
-
- if (disposed)
- return (IntPtr) NULL;
-
- // A message already pulled off BlockingQueue?
- if (dequeuedFrameSetpp != NULL) {
- QpidFrameSetPtr* rv = dequeuedFrameSetpp;
- dequeuedFrameSetpp = NULL;
- return (IntPtr) rv;
- }
-
- if ((*queuePtrp)->empty())
- return (IntPtr) NULL;
-
- bool received = false;
- QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr;
-
- try {
- received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
- if (received) {
- QpidFrameSetPtr* rv = frameSetpp;
- // no need to free native in finally block
- frameSetpp = NULL;
- return (IntPtr) rv;
- }
- } catch(const std::exception& error) {
- // should be no async tampering with queue since we hold the lock and have a
- // smart pointer ref to the native LocalQueue, even if the network connection fails...
- cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl;
- // TODO: log this
- }
- finally {
- if (frameSetpp != NULL) {
- delete frameSetpp;
- }
- }
-
- return (IntPtr) NULL;
-}
-
-
-
-void InputLink::unblockWaiter()
-{
- // to be followed by resetQueue() below
- lock l(linkLock);
- if (disposed)
- return;
- (*queuePtrp)->close();
-}
-
-
-
-// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks
-// a blocking thread without interefering with queue contents or the ability to push
-// new incoming messages.
-
-void InputLink::resetQueue()
-{
- lock l(linkLock);
- if (disposed)
- return;
- if ((*queuePtrp)->isClosed()) {
- (*queuePtrp)->open();
- }
-}
-
-
-// returns true if there is a message to consume, i.e. nextLocalMessage() won't block
-
-bool InputLink::internalWaitForMessage()
-{
- Demux::QueuePtr demuxQueuePtr;
-
- bool received = false;
- QpidFrameSetPtr* frameSetpp = NULL;
- try {
- lock l(linkLock);
- if (disposed)
- return false;
- if (haveMessage())
- return true;
-
- AdjustCredit();
-
- // get a scoped smart ptr ref to guard against async close or hangup
- demuxQueuePtr = *queuePtrp;
- frameSetpp = new QpidFrameSetPtr;
-
- l.release();
- // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired.
- received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
- l.acquire();
-
- if (received) {
- dequeuedFrameSetpp = frameSetpp;
- frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream
- }
-
- return true;
- } catch(const std::exception& ) {
- // timeout or connection closed
- return false;
- }
- finally {
- if (frameSetpp != NULL) {
- delete frameSetpp;
- }
- }
-
- return false;
-}
-
-
-// call with lock held
-void InputLink::addWaiter(MessageWaiter^ waiter)
-{
- waiters->Add(waiter);
- if (waiters->Count == 1) {
- // mark this waiter as ready to run
- // Only the waiter at the head of the queue is active.
- waiter->Activate();
- }
-
- if (waiter->Assigned)
- return;
-
- if (asyncHelperWaitHandle == nullptr) {
- asyncHelperWaitHandle = gcnew ManualResetEvent(false);
- ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper);
- (gcnew Thread(threadDelegate))->Start();
- }
-
- if (waiters->Count == 1) {
- // wake up the asyncHelper
- asyncHelperWaitHandle->Set();
- }
-}
-
-
-void InputLink::removeWaiter(MessageWaiter^ waiter) {
- // a waiter can be removed from anywhere in the list if timed out
-
- lock l(linkLock);
- int idx = waiters->IndexOf(waiter);
- if (idx == -1) {
- // TODO: assert or log
- if (asyncHelperWaitHandle != nullptr) {
- // just in case.
- asyncHelperWaitHandle->Set();
- }
- return;
- }
-
- waiters->RemoveAt(idx);
- if (waiter->TimedOut) {
- // may have to give back message if it arrives momentarily
- AdjustCredit();
- }
-
- // let the next waiter know it's his turn.
- if (waiters->Count > 0) {
- MessageWaiter^ nextWaiter = waiters[0];
-
- // wakeup the asyncHelper thread to help out if necessary.
- if (!nextWaiter->Assigned) {
- asyncHelperWaitHandle->Set();
- }
-
- l.release();
- nextWaiter->Activate();
- return;
- }
- else {
- if (disposed && (asyncHelperWaitHandle != nullptr)) {
- asyncHelperWaitHandle->Set();
- }
- }
-}
-
-
-void InputLink::asyncHelper()
-{
- lock l(linkLock);
-
- while (true) {
- if (disposed && (waiters->Count == 0)) {
- asyncHelperWaitHandle = nullptr;
- return;
- }
-
- if (waiters->Count > 0) {
- MessageWaiter^ waiter = waiters[0];
-
- l.release();
- if (waiter->AcceptForWork()) {
- waiter->Run();
- }
- l.acquire();
- }
-
- // sleep if more work may be coming or it is currently someone else's turn
- if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) {
- // wait for something to do
- asyncHelperWaitHandle->Reset();
- l.release();
- asyncHelperWaitHandle->WaitOne();
- l.acquire();
- }
- }
-}
-
-void InputLink::sync()
-{
- // used by the MessageWaiter timeout thread to not run before fully initialized
- lock l(linkLock);
-}
-
-
-void InputLink::PrefetchLimit::set(int value)
-{
- lock l(linkLock);
- prefetchLimit = value;
-
- int delta = 0;
-
- // rough rule of thumb to keep the flow, but reduce chatter.
- // for small messages, the credit request is almost as expensive as the transfer itself.
- // experience may suggest a better heuristic or require a property for the low water mark
- if (prefetchLimit >= 3) {
- delta = prefetchLimit / 3;
- }
- minWorkingCredit = prefetchLimit - delta;
- AdjustCredit();
-}
-
-
-// call with lock held
-void InputLink::AdjustCredit()
-{
- if (creditSyncPending || disposed)
- return;
-
- // low watermark check
- if ((prefetchLimit != 0) &&
- (workingCredit >= minWorkingCredit) &&
- (workingCredit >= waiters->Count))
- return;
-
- // should have enough for all waiters or to satisfy the prefetch window
- int targetCredit = waiters->Count;
- if (targetCredit < prefetchLimit)
- targetCredit = prefetchLimit;
-
- if (targetCredit > workingCredit) {
- subscriptionp->grantMessageCredit(targetCredit - workingCredit);
- workingCredit = targetCredit;
- return;
- }
- if (targetCredit < workingCredit) {
- if ((targetCredit == 0) && (prefetchLimit == 0)) {
- creditSyncPending = true;
- ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
- }
- // TODO: also shrink credit when prefetchLimit != 0
- }
-}
-
-void InputLink::SyncCredit(Object ^unused)
-{
- lock l(linkLock);
-
- try {
- if (disposed)
- return;
-
- if (!amqpSession->MessageStop(subscriptionp->getName())) {
- // connection closed
- return;
- }
-
- l.release();
- // use setFlowControl to re-enable credit flow on the broker.
- // setFlowControl is a sync operation
- {
- lock l2(subscriptionLock);
- if (subscriptionp != NULL) {
- subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl);
- }
- }
- l.acquire();
-
- if (disposed)
- return;
-
- // let existing waiters use up any messages that arrived.
- // local queue size can only decrease until more credit is issued
- while (true) {
- if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
- l.release();
- // a rare use case and not used in performance oriented code.
- // optimization can wait until the qpid/messaging api is used
- Thread::Sleep(10);
- l.acquire();
- if (disposed)
- return;
- }
- else {
- break;
- }
- }
-
- // At this point, the lock is held and we are fully synced with the broker
- // so we have a valid snapshot
-
- if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
- // can't be sure application will request a message again any time soon
- QpidFrameSetPtr frameSetp;
- while (!(*queuePtrp)->empty()) {
- (*queuePtrp)->pop(frameSetp);
- SequenceSet frameSetID(frameSetp->getId());
- subscriptionp->release(frameSetID);
- }
-
- // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
- // MessageWaiter about to to get the nextLocalMessage(), or implicitely
- // from a WaitForMessage().
- }
- // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
-
- workingCredit = (*queuePtrp)->size();
- if (dequeuedFrameSetpp != NULL) {
- workingCredit++;
- }
- }
- finally {
- creditSyncPending = false;
- }
-
- AdjustCredit();
-}
-
-
-AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
-{
- QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
- bool ownFrameSet = true;
- bool haveProperties = false;
-
- try {
- MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp);
- ownFrameSet = false; // stream releases on close/dispose
-
- AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream);
-
- AMQHeaderBody* headerBodyp = (*fspp)->getHeaders();
- uint64_t contentSize = (*fspp)->getContentSize();
- SequenceSet frameSetID((*fspp)->getId());
-
- // target managed representation
- AmqpProperties^ amqpProperties = gcnew AmqpProperties();
-
- // source native representation
- const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>();
- const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>();
-
- if (deliveryProperties) {
- if (deliveryProperties->hasRoutingKey()) {
- haveProperties = true;
-
- amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str());
- }
-
- if (deliveryProperties->hasDeliveryMode()) {
- if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT)
- amqpProperties->Durable = true;
- }
-
- if (deliveryProperties->hasTtl()) {
- long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond;
- amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks));
- }
- }
-
- if (messageProperties) {
-
- if (messageProperties->hasReplyTo()) {
- haveProperties = true;
- const ReplyTo& rpto = messageProperties->getReplyTo();
- String^ rk = nullptr;
- String^ ex = nullptr;
- if (rpto.hasRoutingKey()) {
- rk = gcnew String(rpto.getRoutingKey().c_str());
- }
- if (rpto.hasExchange()) {
- ex = gcnew String(rpto.getExchange().c_str());
- }
- amqpProperties->SetReplyTo(ex,rk);
- }
-
- if (messageProperties->hasContentType()) {
- haveProperties = true;
- amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str());
-
- if (messageProperties->hasContentEncoding()) {
- String^ enc = gcnew String(messageProperties->getContentEncoding().c_str());
- if (!String::IsNullOrEmpty(enc)) {
- // TODO: properly assemble 1.0 style to 0-10 for all cases
- amqpProperties->ContentType += "; charset=" + enc;
- }
- }
- }
-
- if (messageProperties->hasCorrelationId()) {
- haveProperties = true;
- const std::string& ncid = messageProperties->getCorrelationId();
- int len = ncid.size();
- array<unsigned char>^ mcid = gcnew array<unsigned char>(len);
- Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len);
- amqpProperties->CorrelationId = mcid;
- }
-
- if (messageProperties->hasUserId()) {
- haveProperties = true;
- const std::string& nuid = messageProperties->getUserId();
- int len = nuid.size();
- array<unsigned char>^ muid = gcnew array<unsigned char>(len);
- Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len);
- amqpProperties->UserId = muid;
- }
-
- if (messageProperties->hasApplicationHeaders()) {
- haveProperties = true;
- const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders();
- int count = fieldTable.count();
-
- if (count > 0) {
- haveProperties = true;
- Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap =
- gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count);
-
- for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) {
-
- qpid::framing::FieldValue::Data &data = i->second->getData();
-
- // TODO: replace these generic int/string conversions with handler for each AMQP specific type:
- // uint8_t dataType = i->second->getType();
- // switch (dataType) { case TYPE_CODE_STR8: ... }
-
- if (data.convertsToInt()) {
- mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt()));
- }
- if (data.convertsToString()) {
- std::string ns = data.getString();
- String^ ms = gcnew String(ns.data(), 0, ns.size());
- mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms));
- }
- }
-
- amqpProperties->PropertyMap = mmap;
- }
-
- }
- }
-
- if (haveProperties) {
- amqpMessage->Properties = amqpProperties;
- }
-
- // We have a message we can return to the caller.
- // Tell the broker we got it.
-
- // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
- // so do it within the AsyncSession directly
- amqpSession->AcceptAndComplete(frameSetID, browsing);
-
- workingCredit--;
- // check if more messages need to be requested from broker
- AdjustCredit();
-
- return amqpMessage;
- }
- finally {
- if (ownFrameSet)
- delete (fspp);
- }
-}
-
- // As for IInputChannel:
- // if success, return true + amqpMessage
- // elseif timeout, return false
- // elseif closed/EOF, return true and amqpMessage = null
- // else throw an Exception
-
-bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
-{
- lock l(linkLock);
-
- if (waiters->Count == 0) {
- // see if there is a message already available without blocking
- IntPtr fspp = nextLocalMessage();
- if (fspp.ToPointer() != NULL) {
- amqpMessage = createAmqpMessage(fspp);
- return true;
- }
- }
-
- MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr);
- addWaiter(waiter);
-
- l.release();
- waiter->Run();
- l.acquire();
-
- if (waiter->TimedOut) {
- return false;
- }
-
- IntPtr waiterMsg = waiter->Message;
- if (waiterMsg.ToPointer() == NULL) {
- if (disposed) {
- // indicate normal EOF on channel
- amqpMessage = nullptr;
- return true;
- }
- }
-
- amqpMessage = createAmqpMessage(waiterMsg);
- return true;
-}
-
-IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
-{
-
- //TODO: if haveMessage() complete synchronously
-
- lock l(linkLock);
- MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
- addWaiter(waiter);
- return waiter;
-}
-
-bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage)
-{
-
- // TODO: validate result
-
- MessageWaiter^ waiter = (MessageWaiter ^) result;
-
- waiter->WaitForCompletion();
-
- if (waiter->RunException != nullptr)
- throw waiter->RunException;
-
- if (waiter->TimedOut) {
- amqpMessage = nullptr;
- return false;
- }
-
- IntPtr waiterMsg = waiter->Message;
- if (waiterMsg.ToPointer() == NULL) {
- if (disposed) {
- // indicate normal EOF on channel
- amqpMessage = nullptr;
- return true;
- }
- }
-
- amqpMessage = createAmqpMessage(waiterMsg);
- return true;
-}
-
-
-bool InputLink::WaitForMessage(TimeSpan timeout)
-{
- lock l(linkLock);
-
- if (disposed)
- return false;
-
- if (waiters->Count == 0) {
- // see if there is a message already available without blocking
- if (haveMessage())
- return true;
- }
-
- // Same as for TryReceive, except consuming = false
- MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr);
- addWaiter(waiter);
-
- l.release();
- waiter->Run();
- l.acquire();
-
- if (waiter->TimedOut) {
- return false;
- }
-
- return haveMessage();
-}
-
-IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
-{
- lock l(linkLock);
-
- // Same as for BeginTryReceive, except consuming = false
- MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
- addWaiter(waiter);
- return waiter;
-}
-
-bool InputLink::EndWaitForMessage(IAsyncResult^ result)
-{
- MessageWaiter^ waiter = (MessageWaiter ^) result;
-
- waiter->WaitForCompletion();
-
- if (waiter->TimedOut) {
- return false;
- }
-
- return haveMessage();
-}
-
-
-}}} // namespace Apache::Qpid::Interop