summaryrefslogtreecommitdiff
path: root/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp')
-rw-r--r--trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp251
1 files changed, 0 insertions, 251 deletions
diff --git a/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
deleted file mode 100644
index f7a28b0692..0000000000
--- a/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-#include <windows.h>
-#include <msclr\lock.h>
-
-#include "qpid/client/AsyncSession.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/client/Demux.h"
-#include "qpid/client/SessionImpl.h"
-#include "qpid/client/SessionBase_0_10Access.h"
-
-#include "MessageBodyStream.h"
-#include "AmqpMessage.h"
-#include "AmqpSession.h"
-#include "InputLink.h"
-#include "MessageWaiter.h"
-
-namespace Apache {
-namespace Qpid {
-namespace Interop {
-
-using namespace System;
-using namespace System::Threading;
-using namespace msclr;
-
-
-MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state)
-{
- this->consuming = consuming;
- if (!consuming) {
- GC::SuppressFinalize(this);
- }
-
- if (async) {
- this->async = true;
- this->asyncCallback = callback;
- this->state = state;
- }
- else {
- this->assigned = true;
- }
- this->parent = parent;
- this->thisLock = gcnew Object();
-
- // do this after the Message Waiter is fully initialized, in case of
- // very small timespan
- if (timeSpan != TimeSpan::MaxValue) {
- this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1));
- }
-}
-
-MessageWaiter::~MessageWaiter()
-{
- if (message != IntPtr::Zero) {
- try{}
- finally {
- delete message.ToPointer();
- message = IntPtr::Zero;
- }
- }
-}
-
-MessageWaiter::!MessageWaiter()
-{
- this->~MessageWaiter();
-}
-
-
-void MessageWaiter::WaitForCompletion()
-{
- if (isCompleted)
- return;
-
- lock l(thisLock);
- while (!isCompleted) {
- Monitor::Wait(thisLock);
- }
-}
-
-void MessageWaiter::Activate()
-{
- if (activated)
- return;
-
- lock l(thisLock);
- if (!activated) {
- activated = true;
- Monitor::PulseAll(thisLock);
- }
-}
-
-
-void MessageWaiter::Run()
-{
- lock l(thisLock);
-
- // wait until Activate(), i.e. our turn in the waiter list or a timeout
- while (!activated) {
- Monitor::Wait(thisLock);
- }
- bool haveMessage = false;
- bool mustReset = false;
-
- if (!timedOut)
- blocking = true;
-
- if (blocking) {
- l.release();
-
- try {
- haveMessage = parent->internalWaitForMessage();
- }
- catch (System::Exception^ e) {
- runException = e;
- }
-
- l.acquire();
- blocking = false;
- if (timedOut) {
- // TimeoutCallback() called parent->unblockWaiter()
- mustReset = true;
- // let the timer thread move past critical region
- while (processingTimeout) {
- Monitor::Wait(thisLock);
- }
- }
- }
-
- if (timer != nullptr) {
- timer->~Timer();
- timer = nullptr;
- }
-
- if (haveMessage) {
- timedOut = false; // for the case timeout and message arrival are essentially tied
- if (!consuming) {
- // just waiting
- haveMessage = false;
- }
- }
-
- if (haveMessage || mustReset) {
- l.release();
- if (haveMessage) {
- // hang on to it for when the async caller gets around to retrieving
- message = parent->nextLocalMessage();
- }
- if (mustReset) {
- parent->resetQueue();
- }
- l.acquire();
- }
-
- isCompleted = true;
- Monitor::PulseAll(thisLock);
-
- // do this check and signal while locked
- if (asyncWaitHandle != nullptr)
- asyncWaitHandle->Set();
-
- l.release();
- parent->removeWaiter(this);
-
-
- if (asyncCallback != nullptr) {
- // guard against application callback exception
- try {
- asyncCallback(this);
- }
- catch (System::Exception^) {
- // log it?
- }
- }
-
-}
-
-bool MessageWaiter::AcceptForWork()
-{
- lock l(thisLock);
- if (!assigned) {
- assigned = true;
- return true;
- }
- return false;
-}
-
-void MessageWaiter::TimeoutCallback(Object^ state)
-{
- MessageWaiter^ waiter = (MessageWaiter^) state;
- if (waiter->isCompleted)
- return;
-
- // make sure parent has finished initializing us before we get going
- waiter->parent->sync();
-
- lock l(waiter->thisLock);
- if (waiter->timer == nullptr) {
- // the waiter is in the clean up phase and doesn't need a wakeup
- return;
- }
-
- // timedOut, blocking and processingTimeout work as a unit
- waiter->timedOut = true;
- if (waiter->blocking) {
- // let the waiter know that we are busy with an upcoming unblock operation
- waiter->processingTimeout = true;
- }
-
- waiter->Activate();
-
- if (waiter->processingTimeout) {
- // call with lock off
- l.release();
- waiter->parent->unblockWaiter();
-
- // synchronize with blocked thread
- l.acquire();
- waiter->processingTimeout = false;
- Monitor::PulseAll(waiter->thisLock);
- }
-
- l.release();
-
- // If waiter has no associated thread, we must move it to completion
- if (waiter->AcceptForWork()) {
- waiter->Run(); // does not block since timedOut == true
- }
-}
-
-}}} // namespace Apache::Qpid::Interop