diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp | 868 |
1 files changed, 0 insertions, 868 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp deleted file mode 100644 index f8189df0dd..0000000000 --- a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ /dev/null @@ -1,868 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include <windows.h> -#include <msclr\lock.h> - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FieldValue.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Demux.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" - -#include "MessageBodyStream.h" -#include "AmqpMessage.h" -#include "AmqpSession.h" -#include "InputLink.h" -#include "QpidMarshal.h" -#include "QpidException.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - - -using namespace System; -using namespace System::Runtime::InteropServices; -using namespace System::Threading; -using namespace msclr; - -using namespace qpid::client; -using namespace qpid::framing; - -using namespace std; - -using namespace Apache::Qpid::AmqpTypes; - -// Scalability note: When using async methods, an async helper thread is created -// to block on the Demux BlockingQueue. This design should be revised in line -// with proposed changes to the native library to reduce the number of servicing -// threads for large numbers of subscriptions. - -// synchronization is accomplished with locks, but also by ensuring that only one -// MessageWaiter (the one at the front of the line) is ever active. -// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch -// thread (who deposits FrameSets into the local queue and is oblivious to the -// managed space locks). - - -// The folowing def must match the "Frames" private typedef. -// TODO, make Qpid-cpp "Frames" definition visible. -typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; - -InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, - qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp, - bool exclusive, - bool temporary, System::String^ filterKey, System::String^ exchange) : - amqpSession(session), - subscriptionp(NULL), - localQueuep(NULL), - queuePtrp(NULL), - dequeuedFrameSetpp(NULL), - disposed(false), - finalizing(false) -{ - bool success = false; - System::Exception^ linkException = nullptr; - - waiters = gcnew Collections::Generic::List<MessageWaiter^>(); - linkLock = waiters; // private and available - subscriptionLock = gcnew Object(); - qpidAddress = QpidAddress::CreateAddress(sourceQueue, true); - qpidAddress->ResolveLink(session); - browsing = qpidAddress->Browsing; - - try { - std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName); - - if (temporary) { - qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); - qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange), - arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey)); - qpidSessionp->sync(); - } - - localQueuep = new LocalQueue; - SubscriptionSettings settings; - settings.flowControl = FlowControl::messageCredit(0); - settings.completionMode = CompletionMode::MANUAL_COMPLETION; - - if (browsing) { - settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED; - settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE; - } - else { - settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED; - settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT; - } - - Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); - subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup - - // the roundabout way to obtain localQueuep->queue - SessionBase_0_10Access sa(*qpidSessionp); - boost::shared_ptr<SessionImpl> simpl = sa.get(); - queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName())); - - success = true; - } finally { - if (!success) { - Cleanup(); - linkException = gcnew QpidException ("InputLink creation failure"); - throw linkException; - } - } -} - -// called with lock held -void InputLink::ReleaseNative() -{ - // involves talking to the Broker unless the connection is broken - - if ((subscriptionp != NULL) && !finalizing) { - // TODO: find boost time error on cleanup when in finalizer thread - try { - subscriptionp->cancel(); - } - catch (const std::exception& error) { - // TODO: log this properly - std::cout << "shutdown error " << error.what() << std::endl; - } - } - - // free native mem (or smart pointers) that we own - if (subscriptionp != NULL) { - delete subscriptionp; - subscriptionp = NULL; - } - if (queuePtrp != NULL) { - delete queuePtrp; - queuePtrp = NULL; - } - if (localQueuep != NULL) { - if (!finalizing) { - // TODO: find boost time error on cleanup when in finalizer thread - delete localQueuep; - localQueuep = NULL; - } - } - if (dequeuedFrameSetpp != NULL) { - delete dequeuedFrameSetpp; - dequeuedFrameSetpp = NULL; - } -} - -void InputLink::Cleanup() -{ - { - lock l(linkLock); - if (disposed) - return; - - disposed = true; - - // if the asyncHelper exists and is idle, unblock it - if (asyncHelperWaitHandle != nullptr) { - asyncHelperWaitHandle->Set(); - } - - // wakeup anyone waiting for messages - if (queuePtrp != NULL) - (*queuePtrp)->close(); - - // wait for any sync operations on the subscription to complete before ReleaseNative - lock l2(subscriptionLock); - - try {} - finally - { - ReleaseNative(); - } - } - - // Now that subscription is torn down, we can execute pending delete on remote node - qpidAddress->CleanupLink(amqpSession); - amqpSession->NotifyClosed(); -} - -InputLink::~InputLink() -{ - Cleanup(); -} - -InputLink::!InputLink() -{ - finalizing = true; - Cleanup(); -} - -void InputLink::Close() -{ - // Simulate Dispose()... - Cleanup(); - GC::SuppressFinalize(this); -} - -// call with lock held -bool InputLink::haveMessage() -{ - if (dequeuedFrameSetpp != NULL) - return true; - - if (queuePtrp != NULL) { - if ((*queuePtrp)->size() > 0) - return true; - } - return false; -} - -IntPtr InputLink::nextLocalMessage() -{ - lock l(linkLock); - - if (disposed) - return (IntPtr) NULL; - - // A message already pulled off BlockingQueue? - if (dequeuedFrameSetpp != NULL) { - QpidFrameSetPtr* rv = dequeuedFrameSetpp; - dequeuedFrameSetpp = NULL; - return (IntPtr) rv; - } - - if ((*queuePtrp)->empty()) - return (IntPtr) NULL; - - bool received = false; - QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr; - - try { - received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE); - if (received) { - QpidFrameSetPtr* rv = frameSetpp; - // no need to free native in finally block - frameSetpp = NULL; - return (IntPtr) rv; - } - } catch(const std::exception& error) { - // should be no async tampering with queue since we hold the lock and have a - // smart pointer ref to the native LocalQueue, even if the network connection fails... - cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl; - // TODO: log this - } - finally { - if (frameSetpp != NULL) { - delete frameSetpp; - } - } - - return (IntPtr) NULL; -} - - - -void InputLink::unblockWaiter() -{ - // to be followed by resetQueue() below - lock l(linkLock); - if (disposed) - return; - (*queuePtrp)->close(); -} - - - -// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks -// a blocking thread without interefering with queue contents or the ability to push -// new incoming messages. - -void InputLink::resetQueue() -{ - lock l(linkLock); - if (disposed) - return; - if ((*queuePtrp)->isClosed()) { - (*queuePtrp)->open(); - } -} - - -// returns true if there is a message to consume, i.e. nextLocalMessage() won't block - -bool InputLink::internalWaitForMessage() -{ - Demux::QueuePtr demuxQueuePtr; - - bool received = false; - QpidFrameSetPtr* frameSetpp = NULL; - try { - lock l(linkLock); - if (disposed) - return false; - if (haveMessage()) - return true; - - AdjustCredit(); - - // get a scoped smart ptr ref to guard against async close or hangup - demuxQueuePtr = *queuePtrp; - frameSetpp = new QpidFrameSetPtr; - - l.release(); - // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired. - received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE); - l.acquire(); - - if (received) { - dequeuedFrameSetpp = frameSetpp; - frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream - } - - return true; - } catch(const std::exception& ) { - // timeout or connection closed - return false; - } - finally { - if (frameSetpp != NULL) { - delete frameSetpp; - } - } - - return false; -} - - -// call with lock held -void InputLink::addWaiter(MessageWaiter^ waiter) -{ - waiters->Add(waiter); - if (waiters->Count == 1) { - // mark this waiter as ready to run - // Only the waiter at the head of the queue is active. - waiter->Activate(); - } - - if (waiter->Assigned) - return; - - if (asyncHelperWaitHandle == nullptr) { - asyncHelperWaitHandle = gcnew ManualResetEvent(false); - ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper); - (gcnew Thread(threadDelegate))->Start(); - } - - if (waiters->Count == 1) { - // wake up the asyncHelper - asyncHelperWaitHandle->Set(); - } -} - - -void InputLink::removeWaiter(MessageWaiter^ waiter) { - // a waiter can be removed from anywhere in the list if timed out - - lock l(linkLock); - int idx = waiters->IndexOf(waiter); - if (idx == -1) { - // TODO: assert or log - if (asyncHelperWaitHandle != nullptr) { - // just in case. - asyncHelperWaitHandle->Set(); - } - return; - } - - waiters->RemoveAt(idx); - if (waiter->TimedOut) { - // may have to give back message if it arrives momentarily - AdjustCredit(); - } - - // let the next waiter know it's his turn. - if (waiters->Count > 0) { - MessageWaiter^ nextWaiter = waiters[0]; - - // wakeup the asyncHelper thread to help out if necessary. - if (!nextWaiter->Assigned) { - asyncHelperWaitHandle->Set(); - } - - l.release(); - nextWaiter->Activate(); - return; - } - else { - if (disposed && (asyncHelperWaitHandle != nullptr)) { - asyncHelperWaitHandle->Set(); - } - } -} - - -void InputLink::asyncHelper() -{ - lock l(linkLock); - - while (true) { - if (disposed && (waiters->Count == 0)) { - asyncHelperWaitHandle = nullptr; - return; - } - - if (waiters->Count > 0) { - MessageWaiter^ waiter = waiters[0]; - - l.release(); - if (waiter->AcceptForWork()) { - waiter->Run(); - } - l.acquire(); - } - - // sleep if more work may be coming or it is currently someone else's turn - if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) { - // wait for something to do - asyncHelperWaitHandle->Reset(); - l.release(); - asyncHelperWaitHandle->WaitOne(); - l.acquire(); - } - } -} - -void InputLink::sync() -{ - // used by the MessageWaiter timeout thread to not run before fully initialized - lock l(linkLock); -} - - -void InputLink::PrefetchLimit::set(int value) -{ - lock l(linkLock); - prefetchLimit = value; - - int delta = 0; - - // rough rule of thumb to keep the flow, but reduce chatter. - // for small messages, the credit request is almost as expensive as the transfer itself. - // experience may suggest a better heuristic or require a property for the low water mark - if (prefetchLimit >= 3) { - delta = prefetchLimit / 3; - } - minWorkingCredit = prefetchLimit - delta; - AdjustCredit(); -} - - -// call with lock held -void InputLink::AdjustCredit() -{ - if (creditSyncPending || disposed) - return; - - // low watermark check - if ((prefetchLimit != 0) && - (workingCredit >= minWorkingCredit) && - (workingCredit >= waiters->Count)) - return; - - // should have enough for all waiters or to satisfy the prefetch window - int targetCredit = waiters->Count; - if (targetCredit < prefetchLimit) - targetCredit = prefetchLimit; - - if (targetCredit > workingCredit) { - subscriptionp->grantMessageCredit(targetCredit - workingCredit); - workingCredit = targetCredit; - return; - } - if (targetCredit < workingCredit) { - if ((targetCredit == 0) && (prefetchLimit == 0)) { - creditSyncPending = true; - ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit)); - } - // TODO: also shrink credit when prefetchLimit != 0 - } -} - -void InputLink::SyncCredit(Object ^unused) -{ - lock l(linkLock); - - try { - if (disposed) - return; - - if (!amqpSession->MessageStop(subscriptionp->getName())) { - // connection closed - return; - } - - l.release(); - // use setFlowControl to re-enable credit flow on the broker. - // setFlowControl is a sync operation - { - lock l2(subscriptionLock); - if (subscriptionp != NULL) { - subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl); - } - } - l.acquire(); - - if (disposed) - return; - - // let existing waiters use up any messages that arrived. - // local queue size can only decrease until more credit is issued - while (true) { - if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) { - l.release(); - // a rare use case and not used in performance oriented code. - // optimization can wait until the qpid/messaging api is used - Thread::Sleep(10); - l.acquire(); - if (disposed) - return; - } - else { - break; - } - } - - // At this point, the lock is held and we are fully synced with the broker - // so we have a valid snapshot - - if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) { - // can't be sure application will request a message again any time soon - QpidFrameSetPtr frameSetp; - while (!(*queuePtrp)->empty()) { - (*queuePtrp)->pop(frameSetp); - SequenceSet frameSetID(frameSetp->getId()); - subscriptionp->release(frameSetID); - } - - // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a - // MessageWaiter about to to get the nextLocalMessage(), or implicitely - // from a WaitForMessage(). - } - // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit - - workingCredit = (*queuePtrp)->size(); - if (dequeuedFrameSetpp != NULL) { - workingCredit++; - } - } - finally { - creditSyncPending = false; - } - - AdjustCredit(); -} - - -AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) -{ - QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); - bool ownFrameSet = true; - bool haveProperties = false; - - try { - MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp); - ownFrameSet = false; // stream releases on close/dispose - - AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream); - - AMQHeaderBody* headerBodyp = (*fspp)->getHeaders(); - uint64_t contentSize = (*fspp)->getContentSize(); - SequenceSet frameSetID((*fspp)->getId()); - - // target managed representation - AmqpProperties^ amqpProperties = gcnew AmqpProperties(); - - // source native representation - const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>(); - const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>(); - - if (deliveryProperties) { - if (deliveryProperties->hasRoutingKey()) { - haveProperties = true; - - amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str()); - } - - if (deliveryProperties->hasDeliveryMode()) { - if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT) - amqpProperties->Durable = true; - } - - if (deliveryProperties->hasTtl()) { - long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond; - amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks)); - } - } - - if (messageProperties) { - - if (messageProperties->hasReplyTo()) { - haveProperties = true; - const ReplyTo& rpto = messageProperties->getReplyTo(); - String^ rk = nullptr; - String^ ex = nullptr; - if (rpto.hasRoutingKey()) { - rk = gcnew String(rpto.getRoutingKey().c_str()); - } - if (rpto.hasExchange()) { - ex = gcnew String(rpto.getExchange().c_str()); - } - amqpProperties->SetReplyTo(ex,rk); - } - - if (messageProperties->hasContentType()) { - haveProperties = true; - amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str()); - - if (messageProperties->hasContentEncoding()) { - String^ enc = gcnew String(messageProperties->getContentEncoding().c_str()); - if (!String::IsNullOrEmpty(enc)) { - // TODO: properly assemble 1.0 style to 0-10 for all cases - amqpProperties->ContentType += "; charset=" + enc; - } - } - } - - if (messageProperties->hasCorrelationId()) { - haveProperties = true; - const std::string& ncid = messageProperties->getCorrelationId(); - int len = ncid.size(); - array<unsigned char>^ mcid = gcnew array<unsigned char>(len); - Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len); - amqpProperties->CorrelationId = mcid; - } - - if (messageProperties->hasUserId()) { - haveProperties = true; - const std::string& nuid = messageProperties->getUserId(); - int len = nuid.size(); - array<unsigned char>^ muid = gcnew array<unsigned char>(len); - Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len); - amqpProperties->UserId = muid; - } - - if (messageProperties->hasApplicationHeaders()) { - haveProperties = true; - const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders(); - int count = fieldTable.count(); - - if (count > 0) { - haveProperties = true; - Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap = - gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count); - - for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) { - - qpid::framing::FieldValue::Data &data = i->second->getData(); - - // TODO: replace these generic int/string conversions with handler for each AMQP specific type: - // uint8_t dataType = i->second->getType(); - // switch (dataType) { case TYPE_CODE_STR8: ... } - - if (data.convertsToInt()) { - mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt())); - } - if (data.convertsToString()) { - std::string ns = data.getString(); - String^ ms = gcnew String(ns.data(), 0, ns.size()); - mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms)); - } - } - - amqpProperties->PropertyMap = mmap; - } - - } - } - - if (haveProperties) { - amqpMessage->Properties = amqpProperties; - } - - // We have a message we can return to the caller. - // Tell the broker we got it. - - // subscriptionp->accept(frameSetID) is a slow sync operation in the native API - // so do it within the AsyncSession directly - amqpSession->AcceptAndComplete(frameSetID, browsing); - - workingCredit--; - // check if more messages need to be requested from broker - AdjustCredit(); - - return amqpMessage; - } - finally { - if (ownFrameSet) - delete (fspp); - } -} - - // As for IInputChannel: - // if success, return true + amqpMessage - // elseif timeout, return false - // elseif closed/EOF, return true and amqpMessage = null - // else throw an Exception - -bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) -{ - lock l(linkLock); - - if (waiters->Count == 0) { - // see if there is a message already available without blocking - IntPtr fspp = nextLocalMessage(); - if (fspp.ToPointer() != NULL) { - amqpMessage = createAmqpMessage(fspp); - return true; - } - } - - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr); - addWaiter(waiter); - - l.release(); - waiter->Run(); - l.acquire(); - - if (waiter->TimedOut) { - return false; - } - - IntPtr waiterMsg = waiter->Message; - if (waiterMsg.ToPointer() == NULL) { - if (disposed) { - // indicate normal EOF on channel - amqpMessage = nullptr; - return true; - } - } - - amqpMessage = createAmqpMessage(waiterMsg); - return true; -} - -IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state) -{ - - //TODO: if haveMessage() complete synchronously - - lock l(linkLock); - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); - addWaiter(waiter); - return waiter; -} - -bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage) -{ - - // TODO: validate result - - MessageWaiter^ waiter = (MessageWaiter ^) result; - - waiter->WaitForCompletion(); - - if (waiter->RunException != nullptr) - throw waiter->RunException; - - if (waiter->TimedOut) { - amqpMessage = nullptr; - return false; - } - - IntPtr waiterMsg = waiter->Message; - if (waiterMsg.ToPointer() == NULL) { - if (disposed) { - // indicate normal EOF on channel - amqpMessage = nullptr; - return true; - } - } - - amqpMessage = createAmqpMessage(waiterMsg); - return true; -} - - -bool InputLink::WaitForMessage(TimeSpan timeout) -{ - lock l(linkLock); - - if (disposed) - return false; - - if (waiters->Count == 0) { - // see if there is a message already available without blocking - if (haveMessage()) - return true; - } - - // Same as for TryReceive, except consuming = false - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr); - addWaiter(waiter); - - l.release(); - waiter->Run(); - l.acquire(); - - if (waiter->TimedOut) { - return false; - } - - return haveMessage(); -} - -IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) -{ - lock l(linkLock); - - // Same as for BeginTryReceive, except consuming = false - MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); - addWaiter(waiter); - return waiter; -} - -bool InputLink::EndWaitForMessage(IAsyncResult^ result) -{ - MessageWaiter^ waiter = (MessageWaiter ^) result; - - waiter->WaitForCompletion(); - - if (waiter->TimedOut) { - return false; - } - - return haveMessage(); -} - - -}}} // namespace Apache::Qpid::Interop |