diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp | 867 |
1 files changed, 867 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp new file mode 100644 index 0000000000..2b0119e338 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -0,0 +1,867 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +using namespace Apache::Qpid::AmqpTypes; + +// Scalability note: When using async methods, an async helper thread is created +// to block on the Demux BlockingQueue. This design should be revised in line +// with proposed changes to the native library to reduce the number of servicing +// threads for large numbers of subscriptions. + +// synchronization is accomplished with locks, but also by ensuring that only one +// MessageWaiter (the one at the front of the line) is ever active. +// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch +// thread (who deposits FrameSets into the local queue and is oblivious to the +// managed space locks). + + +// The folowing def must match the "Frames" private typedef. +// TODO, make Qpid-cpp "Frames" definition visible. +typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; + +InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, + qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp, + bool exclusive, + bool temporary, System::String^ filterKey, System::String^ exchange) : + amqpSession(session), + subscriptionp(NULL), + localQueuep(NULL), + queuePtrp(NULL), + dequeuedFrameSetpp(NULL), + disposed(false), + finalizing(false) +{ + bool success = false; + System::Exception^ linkException = nullptr; + + waiters = gcnew Collections::Generic::List<MessageWaiter^>(); + linkLock = waiters; // private and available + subscriptionLock = gcnew Object(); + qpidAddress = QpidAddress::CreateAddress(sourceQueue, true); + qpidAddress->ResolveLink(session); + browsing = qpidAddress->Browsing; + + try { + std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName); + + if (temporary) { + qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); + qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange), + arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey)); + qpidSessionp->sync(); + } + + localQueuep = new LocalQueue; + SubscriptionSettings settings; + settings.flowControl = FlowControl::messageCredit(0); + settings.completionMode = CompletionMode::MANUAL_COMPLETION; + + if (browsing) { + settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED; + settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE; + } + else { + settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED; + settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT; + } + + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); + subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup + + // the roundabout way to obtain localQueuep->queue + SessionBase_0_10Access sa(*qpidSessionp); + boost::shared_ptr<SessionImpl> simpl = sa.get(); + queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName())); + + success = true; + } finally { + if (!success) { + Cleanup(); + linkException = gcnew QpidException ("InputLink creation failure"); + throw linkException; + } + } +} + +// called with lock held +void InputLink::ReleaseNative() +{ + // involves talking to the Broker unless the connection is broken + + if ((subscriptionp != NULL) && !finalizing) { + // TODO: find boost time error on cleanup when in finalizer thread + try { + subscriptionp->cancel(); + } + catch (const std::exception& error) { + // TODO: log this properly + std::cout << "shutdown error " << error.what() << std::endl; + } + } + + // free native mem (or smart pointers) that we own + if (subscriptionp != NULL) { + delete subscriptionp; + subscriptionp = NULL; + } + if (queuePtrp != NULL) { + delete queuePtrp; + queuePtrp = NULL; + } + if (localQueuep != NULL) { + if (!finalizing) { + // TODO: find boost time error on cleanup when in finalizer thread + delete localQueuep; + localQueuep = NULL; + } + } + if (dequeuedFrameSetpp != NULL) { + delete dequeuedFrameSetpp; + dequeuedFrameSetpp = NULL; + } +} + +void InputLink::Cleanup() +{ + { + lock l(linkLock); + if (disposed) + return; + + disposed = true; + + // if the asyncHelper exists and is idle, unblock it + if (asyncHelperWaitHandle != nullptr) { + asyncHelperWaitHandle->Set(); + } + + // wakeup anyone waiting for messages + if (queuePtrp != NULL) + (*queuePtrp)->close(); + + // wait for any sync operations on the subscription to complete before ReleaseNative + lock l2(subscriptionLock); + + try {} + finally + { + ReleaseNative(); + } + } + + // Now that subscription is torn down, we can execute pending delete on remote node + qpidAddress->CleanupLink(amqpSession); + amqpSession->NotifyClosed(); +} + +InputLink::~InputLink() +{ + Cleanup(); +} + +InputLink::!InputLink() +{ + finalizing = true; + Cleanup(); +} + +void InputLink::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +// call with lock held +bool InputLink::haveMessage() +{ + if (dequeuedFrameSetpp != NULL) + return true; + + if (queuePtrp != NULL) { + if ((*queuePtrp)->size() > 0) + return true; + } + return false; +} + +IntPtr InputLink::nextLocalMessage() +{ + lock l(linkLock); + + if (disposed) + return (IntPtr) NULL; + + // A message already pulled off BlockingQueue? + if (dequeuedFrameSetpp != NULL) { + QpidFrameSetPtr* rv = dequeuedFrameSetpp; + dequeuedFrameSetpp = NULL; + return (IntPtr) rv; + } + + if ((*queuePtrp)->empty()) + return (IntPtr) NULL; + + bool received = false; + QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr; + + try { + received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE); + if (received) { + QpidFrameSetPtr* rv = frameSetpp; + // no need to free native in finally block + frameSetpp = NULL; + return (IntPtr) rv; + } + } catch(const std::exception& error) { + // should be no async tampering with queue since we hold the lock and have a + // smart pointer ref to the native LocalQueue, even if the network connection fails... + cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl; + // TODO: log this + } + finally { + if (frameSetpp != NULL) { + delete frameSetpp; + } + } + + return (IntPtr) NULL; +} + + + +void InputLink::unblockWaiter() +{ + // to be followed by resetQueue() below + lock l(linkLock); + if (disposed) + return; + (*queuePtrp)->close(); +} + + + +// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks +// a blocking thread without interefering with queue contents or the ability to push +// new incoming messages. + +void InputLink::resetQueue() +{ + lock l(linkLock); + if (disposed) + return; + if ((*queuePtrp)->isClosed()) { + (*queuePtrp)->open(); + } +} + + +// returns true if there is a message to consume, i.e. nextLocalMessage() won't block + +bool InputLink::internalWaitForMessage() +{ + Demux::QueuePtr demuxQueuePtr; + + bool received = false; + QpidFrameSetPtr* frameSetpp = NULL; + try { + lock l(linkLock); + if (disposed) + return false; + if (haveMessage()) + return true; + + AdjustCredit(); + + // get a scoped smart ptr ref to guard against async close or hangup + demuxQueuePtr = *queuePtrp; + frameSetpp = new QpidFrameSetPtr; + + l.release(); + // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired. + received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE); + l.acquire(); + + if (received) { + dequeuedFrameSetpp = frameSetpp; + frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream + } + + return true; + } catch(const std::exception& ) { + // timeout or connection closed + return false; + } + finally { + if (frameSetpp != NULL) { + delete frameSetpp; + } + } + + return false; +} + + +// call with lock held +void InputLink::addWaiter(MessageWaiter^ waiter) +{ + waiters->Add(waiter); + if (waiters->Count == 1) { + // mark this waiter as ready to run + // Only the waiter at the head of the queue is active. + waiter->Activate(); + } + + if (waiter->Assigned) + return; + + if (asyncHelperWaitHandle == nullptr) { + asyncHelperWaitHandle = gcnew ManualResetEvent(false); + ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper); + (gcnew Thread(threadDelegate))->Start(); + } + + if (waiters->Count == 1) { + // wake up the asyncHelper + asyncHelperWaitHandle->Set(); + } +} + + +void InputLink::removeWaiter(MessageWaiter^ waiter) { + // a waiter can be removed from anywhere in the list if timed out + + lock l(linkLock); + int idx = waiters->IndexOf(waiter); + if (idx == -1) { + // TODO: assert or log + if (asyncHelperWaitHandle != nullptr) { + // just in case. + asyncHelperWaitHandle->Set(); + } + return; + } + + waiters->RemoveAt(idx); + if (waiter->TimedOut) { + // may have to give back message if it arrives momentarily + AdjustCredit(); + } + + // let the next waiter know it's his turn. + if (waiters->Count > 0) { + MessageWaiter^ nextWaiter = waiters[0]; + + // wakeup the asyncHelper thread to help out if necessary. + if (!nextWaiter->Assigned) { + asyncHelperWaitHandle->Set(); + } + + l.release(); + nextWaiter->Activate(); + return; + } + else { + if (disposed && (asyncHelperWaitHandle != nullptr)) { + asyncHelperWaitHandle->Set(); + } + } +} + + +void InputLink::asyncHelper() +{ + lock l(linkLock); + + while (true) { + if (disposed && (waiters->Count == 0)) { + asyncHelperWaitHandle = nullptr; + return; + } + + if (waiters->Count > 0) { + MessageWaiter^ waiter = waiters[0]; + + l.release(); + if (waiter->AcceptForWork()) { + waiter->Run(); + } + l.acquire(); + } + + // sleep if more work may be coming or it is currently someone else's turn + if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) { + // wait for something to do + asyncHelperWaitHandle->Reset(); + l.release(); + asyncHelperWaitHandle->WaitOne(); + l.acquire(); + } + } +} + +void InputLink::sync() +{ + // used by the MessageWaiter timeout thread to not run before fully initialized + lock l(linkLock); +} + + +void InputLink::PrefetchLimit::set(int value) +{ + lock l(linkLock); + prefetchLimit = value; + + int delta = 0; + + // rough rule of thumb to keep the flow, but reduce chatter. + // for small messages, the credit request is almost as expensive as the transfer itself. + // experience may suggest a better heuristic or require a property for the low water mark + if (prefetchLimit >= 3) { + delta = prefetchLimit / 3; + } + minWorkingCredit = prefetchLimit - delta; + AdjustCredit(); +} + + +// call with lock held +void InputLink::AdjustCredit() +{ + if (creditSyncPending || disposed) + return; + + // low watermark check + if ((prefetchLimit != 0) && + (workingCredit >= minWorkingCredit) && + (workingCredit >= waiters->Count)) + return; + + // should have enough for all waiters or to satisfy the prefetch window + int targetCredit = waiters->Count; + if (targetCredit < prefetchLimit) + targetCredit = prefetchLimit; + + if (targetCredit > workingCredit) { + subscriptionp->grantMessageCredit(targetCredit - workingCredit); + workingCredit = targetCredit; + return; + } + if (targetCredit < workingCredit) { + if ((targetCredit == 0) && (prefetchLimit == 0)) { + creditSyncPending = true; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); + } + // TODO: also shrink credit when prefetchLimit != 0 + } +} + +void InputLink::SyncCredit(Object ^unused) +{ + lock l(linkLock); + + try { + if (disposed) + return; + + if (!amqpSession->MessageStop(subscriptionp->getName())) { + // connection closed + return; + } + + l.release(); + // use setFlowControl to re-enable credit flow on the broker. + // setFlowControl is a sync operation + { + lock l2(subscriptionLock); + if (subscriptionp != NULL) { + subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl); + } + } + l.acquire(); + + if (disposed) + return; + + // let existing waiters use up any messages that arrived. + // local queue size can only decrease until more credit is issued + while (true) { + if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { + l.release(); + // a rare use case and not used in performance oriented code. + // optimization can wait until the qpid/messaging api is used + Thread::Sleep(10); + l.acquire(); + if (disposed) + return; + } + else { + break; + } + } + + // At this point, the lock is held and we are fully synced with the broker + // so we have a valid snapshot + + if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { + // can't be sure application will request a message again any time soon + QpidFrameSetPtr frameSetp; + while (!(*queuePtrp)->empty()) { + (*queuePtrp)->pop(frameSetp); + SequenceSet frameSetID(frameSetp->getId()); + subscriptionp->release(frameSetID); + } + + // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a + // MessageWaiter about to to get the nextLocalMessage(), or implicitely + // from a WaitForMessage(). + } + // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit + + workingCredit = (*queuePtrp)->size(); + if (dequeuedFrameSetpp != NULL) { + workingCredit++; + } + } + finally { + creditSyncPending = false; + } + + AdjustCredit(); +} + + +AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) +{ + QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); + bool ownFrameSet = true; + bool haveProperties = false; + + try { + MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp); + ownFrameSet = false; // stream releases on close/dispose + + AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream); + + AMQHeaderBody* headerBodyp = (*fspp)->getHeaders(); + uint64_t contentSize = (*fspp)->getContentSize(); + SequenceSet frameSetID((*fspp)->getId()); + + // target managed representation + AmqpProperties^ amqpProperties = gcnew AmqpProperties(); + + // source native representation + const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>(); + const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>(); + + if (deliveryProperties) { + if (deliveryProperties->hasRoutingKey()) { + haveProperties = true; + + amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str()); + } + + if (deliveryProperties->hasDeliveryMode()) { + if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT) + amqpProperties->Durable = true; + } + + if (deliveryProperties->hasTtl()) { + long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond; + amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks)); + } + } + + if (messageProperties) { + + if (messageProperties->hasReplyTo()) { + haveProperties = true; + const ReplyTo& rpto = messageProperties->getReplyTo(); + String^ rk = nullptr; + String^ ex = nullptr; + if (rpto.hasRoutingKey()) { + rk = gcnew String(rpto.getRoutingKey().c_str()); + } + if (rpto.hasExchange()) { + ex = gcnew String(rpto.getExchange().c_str()); + } + amqpProperties->SetReplyTo(ex,rk); + } + + if (messageProperties->hasContentType()) { + haveProperties = true; + amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str()); + + if (messageProperties->hasContentEncoding()) { + String^ enc = gcnew String(messageProperties->getContentEncoding().c_str()); + if (!String::IsNullOrEmpty(enc)) { + // TODO: properly assemble 1.0 style to 0-10 for all cases + amqpProperties->ContentType += "; charset=" + enc; + } + } + } + + if (messageProperties->hasCorrelationId()) { + haveProperties = true; + const std::string& ncid = messageProperties->getCorrelationId(); + int len = ncid.size(); + array<unsigned char>^ mcid = gcnew array<unsigned char>(len); + Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len); + amqpProperties->CorrelationId = mcid; + } + + if (messageProperties->hasUserId()) { + haveProperties = true; + const std::string& nuid = messageProperties->getUserId(); + int len = nuid.size(); + array<unsigned char>^ muid = gcnew array<unsigned char>(len); + Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len); + amqpProperties->UserId = muid; + } + + if (messageProperties->hasApplicationHeaders()) { + haveProperties = true; + const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders(); + int count = fieldTable.count(); + + if (count > 0) { + haveProperties = true; + Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap = + gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count); + + for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) { + + qpid::framing::FieldValue::Data &data = i->second->getData(); + + // TODO: replace these generic int/string conversions with handler for each AMQP specific type: + // uint8_t dataType = i->second->getType(); + // switch (dataType) { case TYPE_CODE_STR8: ... } + + if (data.convertsToInt()) { + mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt())); + } + if (data.convertsToString()) { + std::string ns = data.getString(); + String^ ms = gcnew String(ns.data(), 0, ns.size()); + mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms)); + } + } + + amqpProperties->PropertyMap = mmap; + } + + } + } + + if (haveProperties) { + amqpMessage->Properties = amqpProperties; + } + + // We have a message we can return to the caller. + // Tell the broker we got it. + + // subscriptionp->accept(frameSetID) is a slow sync operation in the native API + // so do it within the AsyncSession directly + amqpSession->AcceptAndComplete(frameSetID, browsing); + + workingCredit--; + // check if more messages need to be requested from broker + AdjustCredit(); + + return amqpMessage; + } + finally { + if (ownFrameSet) + delete (fspp); + } +} + + // As for IInputChannel: + // if success, return true + amqpMessage + // elseif timeout, return false + // elseif closed/EOF, return true and amqpMessage = null + // else throw an Exception + +bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) +{ + lock l(linkLock); + + if (waiters->Count == 0) { + // see if there is a message already available without blocking + IntPtr fspp = nextLocalMessage(); + if (fspp.ToPointer() != NULL) { + amqpMessage = createAmqpMessage(fspp); + return true; + } + } + + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr); + addWaiter(waiter); + + l.release(); + waiter->Run(); + l.acquire(); + + if (waiter->TimedOut) { + return false; + } + + IntPtr waiterMsg = waiter->Message; + if (waiterMsg.ToPointer() == NULL) { + if (disposed) { + // indicate normal EOF on channel + amqpMessage = nullptr; + return true; + } + } + + amqpMessage = createAmqpMessage(waiterMsg); + return true; +} + +IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + + //TODO: if haveMessage() complete synchronously + + lock l(linkLock); + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); + addWaiter(waiter); + return waiter; +} + +bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage) +{ + + // TODO: validate result + + MessageWaiter^ waiter = (MessageWaiter ^) result; + + waiter->WaitForCompletion(); + + if (waiter->RunException != nullptr) + throw waiter->RunException; + + if (waiter->TimedOut) { + amqpMessage = nullptr; + return false; + } + + IntPtr waiterMsg = waiter->Message; + if (waiterMsg.ToPointer() == NULL) { + if (disposed) { + // indicate normal EOF on channel + amqpMessage = nullptr; + return true; + } + } + + amqpMessage = createAmqpMessage(waiterMsg); + return true; +} + + +bool InputLink::WaitForMessage(TimeSpan timeout) +{ + lock l(linkLock); + + if (disposed) + return false; + + if (waiters->Count == 0) { + // see if there is a message already available without blocking + if (haveMessage()) + return true; + } + + // Same as for TryReceive, except consuming = false + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr); + addWaiter(waiter); + + l.release(); + waiter->Run(); + l.acquire(); + + if (waiter->TimedOut) { + return false; + } + + return haveMessage(); +} + +IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + lock l(linkLock); + + // Same as for BeginTryReceive, except consuming = false + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); + addWaiter(waiter); + return waiter; +} + +bool InputLink::EndWaitForMessage(IAsyncResult^ result) +{ + MessageWaiter^ waiter = (MessageWaiter ^) result; + + waiter->WaitForCompletion(); + + if (waiter->TimedOut) { + return false; + } + + return haveMessage(); +} + + +}}} // namespace Apache::Qpid::Interop |