summaryrefslogtreecommitdiff
path: root/wcf/src/Apache/Qpid/Interop/InputLink.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'wcf/src/Apache/Qpid/Interop/InputLink.cpp')
-rw-r--r--wcf/src/Apache/Qpid/Interop/InputLink.cpp86
1 files changed, 55 insertions, 31 deletions
diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/wcf/src/Apache/Qpid/Interop/InputLink.cpp
index e12151d943..3245cd3540 100644
--- a/wcf/src/Apache/Qpid/Interop/InputLink.cpp
+++ b/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -86,6 +86,8 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
System::Exception^ linkException = nullptr;
waiters = gcnew Collections::Generic::List<MessageWaiter^>();
+ linkLock = waiters; // private and available
+ subscriptionLock = gcnew Object();
try {
std::string qname = QpidMarshal::ToNative(sourceQueue);
@@ -120,10 +122,13 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
}
}
+// called with lock held
void InputLink::ReleaseNative()
{
// involves talking to the Broker unless the connection is broken
- if (subscriptionp != NULL) {
+
+ if ((subscriptionp != NULL) && !finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
try {
subscriptionp->cancel();
}
@@ -134,20 +139,31 @@ void InputLink::ReleaseNative()
}
// free native mem (or smart pointers) that we own
- if (subscriptionp != NULL)
+ if (subscriptionp != NULL) {
delete subscriptionp;
- if (queuePtrp != NULL)
+ subscriptionp = NULL;
+ }
+ if (queuePtrp != NULL) {
delete queuePtrp;
- if (localQueuep != NULL)
- delete localQueuep;
- if (dequeuedFrameSetpp != NULL)
+ queuePtrp = NULL;
+ }
+ if (localQueuep != NULL) {
+ if (!finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
+ delete localQueuep;
+ localQueuep = NULL;
+ }
+ }
+ if (dequeuedFrameSetpp != NULL) {
delete dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ }
}
void InputLink::Cleanup()
{
{
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return;
@@ -162,6 +178,9 @@ void InputLink::Cleanup()
if (queuePtrp != NULL)
(*queuePtrp)->close();
+ // wait for any sync operations on the subscription to complete before ReleaseNative
+ lock l2(subscriptionLock);
+
try {}
finally
{
@@ -179,6 +198,7 @@ InputLink::~InputLink()
InputLink::!InputLink()
{
+ finalizing = true;
Cleanup();
}
@@ -204,7 +224,7 @@ bool InputLink::haveMessage()
IntPtr InputLink::nextLocalMessage()
{
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return (IntPtr) NULL;
@@ -250,7 +270,7 @@ IntPtr InputLink::nextLocalMessage()
void InputLink::unblockWaiter()
{
// to be followed by resetQueue() below
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return;
(*queuePtrp)->close();
@@ -264,7 +284,7 @@ void InputLink::unblockWaiter()
void InputLink::resetQueue()
{
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return;
if ((*queuePtrp)->isClosed()) {
@@ -282,7 +302,7 @@ bool InputLink::internalWaitForMessage()
bool received = false;
QpidFrameSetPtr* frameSetpp = NULL;
try {
- lock l(waiters);
+ lock l(linkLock);
if (disposed)
return false;
if (haveMessage())
@@ -348,7 +368,7 @@ void InputLink::addWaiter(MessageWaiter^ waiter)
void InputLink::removeWaiter(MessageWaiter^ waiter) {
// a waiter can be removed from anywhere in the list if timed out
- lock l(waiters);
+ lock l(linkLock);
int idx = waiters->IndexOf(waiter);
if (idx == -1) {
// TODO: assert or log
@@ -388,7 +408,7 @@ void InputLink::removeWaiter(MessageWaiter^ waiter) {
void InputLink::asyncHelper()
{
- lock l(waiters);
+ lock l(linkLock);
while (true) {
if (disposed && (waiters->Count == 0)) {
@@ -419,14 +439,14 @@ void InputLink::asyncHelper()
void InputLink::sync()
{
- // for the timeout thread
- lock l(waiters);
+ // used by the MessageWaiter timeout thread to not run before fully initialized
+ lock l(linkLock);
}
void InputLink::PrefetchLimit::set(int value)
{
- lock l(waiters);
+ lock l(linkLock);
prefetchLimit = value;
int delta = 0;
@@ -475,31 +495,32 @@ void InputLink::AdjustCredit()
void InputLink::SyncCredit(Object ^unused)
{
- lock l(waiters);
+ lock l(linkLock);
try {
if (disposed)
return;
- Completion comp;
- if (!amqpSession->MessageStop(comp, subscriptionp->getName())) {
+ if (!amqpSession->MessageStop(subscriptionp->getName())) {
// connection closed
return;
}
- // get a private scoped copy to use outside the lock
- Subscription s(*subscriptionp);
-
l.release();
// use setFlowControl to re-enable credit flow on the broker.
- // previously used comp.wait() here, but setFlowControl is a sync operation
- s.setFlowControl(s.getSettings().flowControl);
+ // setFlowControl is a sync operation
+ {
+ lock l2(subscriptionLock);
+ if (subscriptionp != NULL) {
+ subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl);
+ }
+ }
l.acquire();
if (disposed)
return;
- // let existing waiters use up any
+ // let existing waiters use up any messages that arrived.
// local queue size can only decrease until more credit is issued
while (true) {
if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
@@ -700,7 +721,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
{
- lock l(waiters);
+ lock l(linkLock);
if (waiters->Count == 0) {
// see if there is a message already available without blocking
@@ -740,7 +761,7 @@ IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callba
//TODO: if haveMessage() complete synchronously
- lock l(waiters);
+ lock l(linkLock);
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
addWaiter(waiter);
return waiter;
@@ -779,7 +800,10 @@ bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMess
bool InputLink::WaitForMessage(TimeSpan timeout)
{
- lock l(waiters);
+ lock l(linkLock);
+
+ if (disposed)
+ return false;
if (waiters->Count == 0) {
// see if there is a message already available without blocking
@@ -799,12 +823,12 @@ bool InputLink::WaitForMessage(TimeSpan timeout)
return false;
}
- return true;
+ return haveMessage();
}
IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
{
- lock l(waiters);
+ lock l(linkLock);
// Same as for BeginTryReceive, except consuming = false
MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
@@ -822,7 +846,7 @@ bool InputLink::EndWaitForMessage(IAsyncResult^ result)
return false;
}
- return true;
+ return haveMessage();
}