diff options
Diffstat (limited to 'trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp')
-rw-r--r-- | trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp | 251 |
1 files changed, 0 insertions, 251 deletions
diff --git a/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp deleted file mode 100644 index f7a28b0692..0000000000 --- a/trunk/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp +++ /dev/null @@ -1,251 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -#include <windows.h> -#include <msclr\lock.h> - -#include "qpid/client/AsyncSession.h" -#include "qpid/framing/FrameSet.h" -#include "qpid/client/SubscriptionManager.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Message.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Demux.h" -#include "qpid/client/SessionImpl.h" -#include "qpid/client/SessionBase_0_10Access.h" - -#include "MessageBodyStream.h" -#include "AmqpMessage.h" -#include "AmqpSession.h" -#include "InputLink.h" -#include "MessageWaiter.h" - -namespace Apache { -namespace Qpid { -namespace Interop { - -using namespace System; -using namespace System::Threading; -using namespace msclr; - - -MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state) -{ - this->consuming = consuming; - if (!consuming) { - GC::SuppressFinalize(this); - } - - if (async) { - this->async = true; - this->asyncCallback = callback; - this->state = state; - } - else { - this->assigned = true; - } - this->parent = parent; - this->thisLock = gcnew Object(); - - // do this after the Message Waiter is fully initialized, in case of - // very small timespan - if (timeSpan != TimeSpan::MaxValue) { - this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); - } -} - -MessageWaiter::~MessageWaiter() -{ - if (message != IntPtr::Zero) { - try{} - finally { - delete message.ToPointer(); - message = IntPtr::Zero; - } - } -} - -MessageWaiter::!MessageWaiter() -{ - this->~MessageWaiter(); -} - - -void MessageWaiter::WaitForCompletion() -{ - if (isCompleted) - return; - - lock l(thisLock); - while (!isCompleted) { - Monitor::Wait(thisLock); - } -} - -void MessageWaiter::Activate() -{ - if (activated) - return; - - lock l(thisLock); - if (!activated) { - activated = true; - Monitor::PulseAll(thisLock); - } -} - - -void MessageWaiter::Run() -{ - lock l(thisLock); - - // wait until Activate(), i.e. our turn in the waiter list or a timeout - while (!activated) { - Monitor::Wait(thisLock); - } - bool haveMessage = false; - bool mustReset = false; - - if (!timedOut) - blocking = true; - - if (blocking) { - l.release(); - - try { - haveMessage = parent->internalWaitForMessage(); - } - catch (System::Exception^ e) { - runException = e; - } - - l.acquire(); - blocking = false; - if (timedOut) { - // TimeoutCallback() called parent->unblockWaiter() - mustReset = true; - // let the timer thread move past critical region - while (processingTimeout) { - Monitor::Wait(thisLock); - } - } - } - - if (timer != nullptr) { - timer->~Timer(); - timer = nullptr; - } - - if (haveMessage) { - timedOut = false; // for the case timeout and message arrival are essentially tied - if (!consuming) { - // just waiting - haveMessage = false; - } - } - - if (haveMessage || mustReset) { - l.release(); - if (haveMessage) { - // hang on to it for when the async caller gets around to retrieving - message = parent->nextLocalMessage(); - } - if (mustReset) { - parent->resetQueue(); - } - l.acquire(); - } - - isCompleted = true; - Monitor::PulseAll(thisLock); - - // do this check and signal while locked - if (asyncWaitHandle != nullptr) - asyncWaitHandle->Set(); - - l.release(); - parent->removeWaiter(this); - - - if (asyncCallback != nullptr) { - // guard against application callback exception - try { - asyncCallback(this); - } - catch (System::Exception^) { - // log it? - } - } - -} - -bool MessageWaiter::AcceptForWork() -{ - lock l(thisLock); - if (!assigned) { - assigned = true; - return true; - } - return false; -} - -void MessageWaiter::TimeoutCallback(Object^ state) -{ - MessageWaiter^ waiter = (MessageWaiter^) state; - if (waiter->isCompleted) - return; - - // make sure parent has finished initializing us before we get going - waiter->parent->sync(); - - lock l(waiter->thisLock); - if (waiter->timer == nullptr) { - // the waiter is in the clean up phase and doesn't need a wakeup - return; - } - - // timedOut, blocking and processingTimeout work as a unit - waiter->timedOut = true; - if (waiter->blocking) { - // let the waiter know that we are busy with an upcoming unblock operation - waiter->processingTimeout = true; - } - - waiter->Activate(); - - if (waiter->processingTimeout) { - // call with lock off - l.release(); - waiter->parent->unblockWaiter(); - - // synchronize with blocked thread - l.acquire(); - waiter->processingTimeout = false; - Monitor::PulseAll(waiter->thisLock); - } - - l.release(); - - // If waiter has no associated thread, we must move it to completion - if (waiter->AcceptForWork()) { - waiter->Run(); // does not block since timedOut == true - } -} - -}}} // namespace Apache::Qpid::Interop |