diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp | 248 |
1 files changed, 248 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp new file mode 100644 index 0000000000..4868b9efce --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp @@ -0,0 +1,248 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + + +MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state) +{ + this->consuming = consuming; + if (!consuming) { + GC::SuppressFinalize(this); + } + + if (async) { + this->async = true; + this->asyncCallback = callback; + this->state = state; + } + else { + this->assigned = true; + } + if (timeSpan != TimeSpan::MaxValue) { + this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); + } + this->parent = parent; + this->thisLock = gcnew Object(); +} + +MessageWaiter::~MessageWaiter() +{ + if (message != IntPtr::Zero) { + try{} + finally { + delete message.ToPointer(); + message = IntPtr::Zero; + } + } +} + +MessageWaiter::!MessageWaiter() +{ + this->~MessageWaiter(); +} + + +void MessageWaiter::WaitForCompletion() +{ + if (isCompleted) + return; + + lock l(thisLock); + while (!isCompleted) { + Monitor::Wait(thisLock); + } +} + +void MessageWaiter::Activate() +{ + if (activated) + return; + + lock l(thisLock); + if (!activated) { + activated = true; + Monitor::PulseAll(thisLock); + } +} + + +void MessageWaiter::Run() +{ + lock l(thisLock); + + // wait until Activate(), i.e. our turn in the waiter list or a timeout + while (!activated) { + Monitor::Wait(thisLock); + } + bool haveMessage = false; + bool mustReset = false; + + if (!timedOut) + blocking = true; + + if (blocking) { + l.release(); + + try { + haveMessage = parent->internalWaitForMessage(); + } + catch (System::Exception^ e) { + runException = e; + } + + l.acquire(); + blocking = false; + if (timedOut) { + // TimeoutCallback() called parent->unblockWaiter() + mustReset = true; + // let the timer thread move past critical region + while (processingTimeout) { + Monitor::Wait(thisLock); + } + } + } + + if (timer != nullptr) { + timer->~Timer(); + timer = nullptr; + } + + if (haveMessage) { + timedOut = false; // for the case timeout and message arrival are essentially tied + if (!consuming) { + // just waiting + haveMessage = false; + } + } + + if (haveMessage || mustReset) { + l.release(); + if (haveMessage) { + // hang on to it for when the async caller gets around to retrieving + message = parent->nextLocalMessage(); + } + if (mustReset) { + parent->resetQueue(); + } + l.acquire(); + } + + isCompleted = true; + Monitor::PulseAll(thisLock); + + // do this check and signal while locked + if (asyncWaitHandle != nullptr) + asyncWaitHandle->Set(); + + l.release(); + parent->removeWaiter(this); + + + if (asyncCallback != nullptr) { + // guard against application callback exception + try { + asyncCallback(this); + } + catch (System::Exception^) { + // log it? + } + } + +} + +bool MessageWaiter::AcceptForWork() +{ + lock l(thisLock); + if (!assigned) { + assigned = true; + return true; + } + return false; +} + +void MessageWaiter::TimeoutCallback(Object^ state) +{ + MessageWaiter^ waiter = (MessageWaiter^) state; + if (waiter->isCompleted) + return; + + // make sure parent has finished initializing us before we get going + waiter->parent->sync(); + + lock l(waiter->thisLock); + if (waiter->timer == nullptr) { + // the waiter is in the clean up phase and doesn't need a wakeup + return; + } + + // timedOut, blocking and processingTimeout work as a unit + waiter->timedOut = true; + if (waiter->blocking) { + // let the waiter know that we are busy with an upcoming unblock operation + waiter->processingTimeout = true; + } + + waiter->Activate(); + + if (waiter->processingTimeout) { + // call with lock off + l.release(); + waiter->parent->unblockWaiter(); + + // synchronize with blocked thread + l.acquire(); + waiter->processingTimeout = false; + Monitor::PulseAll(waiter->thisLock); + } + + l.release(); + + // If waiter has no associated thread, we must move it to completion + if (waiter->AcceptForWork()) { + waiter->Run(); // does not block since timedOut == true + } +} + +}}} // namespace Apache::Qpid::Interop |