summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp276
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h97
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp76
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h61
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp633
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h109
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp57
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp145
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h98
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp285
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h76
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp867
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.h110
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj501
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp337
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h131
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp251
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h125
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp255
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h75
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp304
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h89
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidException.h37
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h53
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp525
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h96
26 files changed, 5669 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
new file mode 100644
index 0000000000..1bc9a15d92
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
@@ -0,0 +1,276 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <oletx2xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+
+// Note on locks: Use thisLock for fast counting and idle/busy
+// notifications. Use the "sessions" list to serialize session
+// creation/reaping and overall tear down.
+
+
+AmqpConnection::AmqpConnection(String^ server, int port) :
+ connectionp(NULL),
+ busyCount(0),
+ disposed(false)
+{
+ initialize (server, port, false, false, nullptr, nullptr);
+}
+
+AmqpConnection::AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password) :
+ connectionp(NULL),
+ busyCount(0),
+ disposed(false)
+{
+ initialize (server, port, ssl, saslPlain, username, password);
+}
+
+void AmqpConnection::initialize(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password)
+{
+ if (server == nullptr)
+ throw gcnew ArgumentNullException("AMQP server");
+ if (saslPlain) {
+ if (username == nullptr)
+ throw gcnew ArgumentNullException("username");
+ if (username == nullptr)
+ throw gcnew ArgumentNullException("password");
+ }
+
+ bool success = false;
+ System::Exception^ openException = nullptr;
+ sessions = gcnew Collections::Generic::List<AmqpSession^>();
+ thisLock = gcnew Object();
+
+ try {
+ connectionp = new Connection;
+
+ if (ssl || saslPlain) {
+ ConnectionSettings proposedSettings;
+ proposedSettings.host = QpidMarshal::ToNative(server);
+ proposedSettings.port = port;
+ if (ssl)
+ proposedSettings.protocol = "ssl";
+
+ if (saslPlain) {
+ proposedSettings.username = QpidMarshal::ToNative(username);
+ proposedSettings.password = QpidMarshal::ToNative(password);
+ proposedSettings.mechanism = "PLAIN";
+ }
+
+ connectionp->open (proposedSettings);
+ }
+ else {
+ connectionp->open (QpidMarshal::ToNative(server), port);
+ }
+
+ // TODO: registerFailureCallback for failover
+ success = true;
+ const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
+ this->maxFrameSize = settings.maxFrameSize;
+ this->host = server;
+ this->port = port;
+ this->ssl = ssl;
+ this->saslPlain = saslPlain;
+ this->username = username;
+ this->password = password;
+ this->isOpen = true;
+ } catch (const qpid::Exception& error) {
+ String^ errmsg = gcnew String(error.what());
+ openException = gcnew QpidException(errmsg);
+ } finally {
+ if (!success) {
+ Cleanup();
+ if (openException == nullptr) {
+ openException = gcnew QpidException ("unknown connection failure");
+ }
+ throw openException;
+ }
+ }
+}
+
+AmqpConnection^ AmqpConnection::Clone() {
+ if (disposed)
+ throw gcnew ObjectDisposedException("AmqpConnection.Clone");
+ return gcnew AmqpConnection (this->host, this->port, this->ssl, this->saslPlain, this->username, this->password);
+}
+
+void AmqpConnection::Cleanup()
+{
+ {
+ lock l(sessions);
+ if (disposed)
+ return;
+ disposed = true;
+ }
+
+ try {
+ // let the child sessions clean up
+
+ for each(AmqpSession^ s in sessions) {
+ s->ConnectionClosed();
+ }
+ }
+ finally
+ {
+ if (connectionp != NULL) {
+ isOpen = false;
+ connectionp->close();
+ delete connectionp;
+ connectionp = NULL;
+ }
+ }
+}
+
+AmqpConnection::~AmqpConnection()
+{
+ Cleanup();
+}
+
+AmqpConnection::!AmqpConnection()
+{
+ Cleanup();
+}
+
+void AmqpConnection::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+AmqpSession^ AmqpConnection::CreateSession()
+{
+ lock l(sessions);
+ if (disposed) {
+ throw gcnew ObjectDisposedException("AmqpConnection");
+ }
+ AmqpSession^ session = gcnew AmqpSession(this, connectionp);
+ sessions->Add(session);
+ return session;
+}
+
+// called whenever a child session becomes newly busy (a first reader or writer since last idle)
+
+void AmqpConnection::NotifyBusy()
+{
+ bool changed = false;
+ {
+ lock l(thisLock);
+ if (busyCount++ == 0)
+ changed = true;
+ }
+}
+
+// called whenever a child session becomes newly idle (a last reader or writer has closed)
+// The connection is idle when none of its child sessions are busy
+
+void AmqpConnection::NotifyIdle()
+{
+ bool connectionIdle = false;
+ {
+ lock l(thisLock);
+ if (--busyCount == 0)
+ connectionIdle = true;
+ }
+ if (connectionIdle) {
+ OnConnectionIdle(this, System::EventArgs::Empty);
+ }
+}
+
+void HexAppend(StringBuilder^ sb, String^ s) {
+ if (s->Length > 0) {
+ array<unsigned char>^ bytes = Encoding::UTF8->GetBytes(s);
+ for each (unsigned char b in bytes) {
+ sb->Append(String::Format("{0:x2}", b));
+ }
+ }
+ sb->Append(".");
+}
+
+
+// Note: any change to this format has to be reflected in the DTC plugin's xa_open()
+// for now: "QPIDdsnV2.port.host.instance_id.SSL_tf.SASL_mech.username.password"
+// This extended info is needed so that the DTC can make a separate connection to the broker
+// for recovery.
+
+String^ AmqpConnection::DataSourceName::get() {
+ if (dataSourceName == nullptr) {
+ StringBuilder^ sb = gcnew StringBuilder();
+ sb->Append("QPIDdsnV2.");
+
+ sb->Append(this->port);
+ sb->Append(".");
+
+ HexAppend(sb, this->host);
+
+ sb->Append(System::Diagnostics::Process::GetCurrentProcess()->Id);
+ sb->Append("-");
+ sb->Append(AppDomain::CurrentDomain->Id);
+ sb->Append(".");
+
+ if (this->ssl)
+ sb->Append("T");
+ else
+ sb->Append("F");
+ sb->Append(".");
+
+ if (this->saslPlain) {
+ sb->Append("P.");
+ HexAppend(sb, this->username);
+ HexAppend(sb, this->password);
+ }
+ else {
+ // SASL anonymous
+ sb->Append("A.");
+ }
+
+ dataSourceName = sb->ToString();
+ }
+ return dataSourceName;
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
new file mode 100644
index 0000000000..ef4d0e3f37
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
@@ -0,0 +1,97 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace std;
+using namespace qpid::client;
+
+ref class AmqpSession;
+ref class DtxResourceManager;
+
+public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs);
+
+public ref class AmqpConnection
+{
+private:
+ Connection* connectionp;
+ bool disposed;
+ Collections::Generic::List<AmqpSession^>^ sessions;
+ bool isOpen;
+ int busyCount;
+ int maxFrameSize;
+ DtxResourceManager^ dtxResourceManager;
+ // unique string used for distributed transactions
+ String^ dataSourceName;
+ Object ^thisLock;
+
+ // properties needed to allow DTC to do transactions (see DataSourceName
+ String^ host;
+ int port;
+ bool ssl;
+ bool saslPlain;
+ String^ username;
+ String^ password;
+
+ void Cleanup();
+ void initialize (System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password);
+
+ internal:
+ void NotifyBusy();
+ void NotifyIdle();
+ AmqpConnection^ Clone();
+
+ property int MaxFrameSize {
+ int get () { return maxFrameSize; }
+ }
+
+ property DtxResourceManager^ CachedResourceManager {
+ DtxResourceManager^ get () { return dtxResourceManager; }
+ void set (DtxResourceManager^ value) { dtxResourceManager = value; }
+ }
+
+ property String^ DataSourceName {
+ String^ get();
+ }
+
+public:
+ AmqpConnection(System::String^ server, int port);
+ AmqpConnection(System::String^ server, int port, bool ssl, bool saslPlain, System::String^ username, System::String^ password);
+ ~AmqpConnection();
+ !AmqpConnection();
+ void Close();
+ AmqpSession^ CreateSession();
+ event ConnectionIdleEventHandler^ OnConnectionIdle;
+
+ property bool IsOpen {
+ bool get() { return isOpen; }
+ };
+
+ property bool IsIdle {
+ bool get() { return (busyCount == 0); }
+ }
+};
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp
new file mode 100644
index 0000000000..5c333aff60
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+AmqpMessage::AmqpMessage(MessageBodyStream ^mbs) :
+ messageBodyStream(mbs),
+ disposed(false)
+{
+}
+
+void AmqpMessage::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ messageBodyStream->Close();
+}
+
+AmqpMessage::~AmqpMessage()
+{
+ Cleanup();
+}
+
+AmqpMessage::!AmqpMessage()
+{
+ Cleanup();
+}
+
+void AmqpMessage::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h
new file mode 100644
index 0000000000..f0801d30dc
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+
+public ref class AmqpMessage
+{
+private:
+ MessageBodyStream^ messageBodyStream;
+ AmqpTypes::AmqpProperties^ amqpProperties;
+ bool disposed;
+ void Cleanup();
+
+internal:
+ AmqpMessage(MessageBodyStream ^bstream);
+
+public:
+ ~AmqpMessage();
+ !AmqpMessage();
+ void Close();
+
+ property AmqpTypes::AmqpProperties^ Properties {
+ AmqpTypes::AmqpProperties^ get () { return amqpProperties; }
+ void set(AmqpTypes::AmqpProperties^ p) { amqpProperties = p; }
+ }
+
+ property System::IO::Stream^ BodyStream {
+ System::IO::Stream^ get() { return messageBodyStream; }
+ }
+};
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
new file mode 100644
index 0000000000..ac7c777d1f
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
@@ -0,0 +1,633 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <oletx2xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/Future.h"
+#include "qpid/framing/Xid.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "MessageBodyStream.h"
+#include "InputLink.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+#include "XaTransaction.h"
+#include "DtxResourceManager.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+
+AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidConnectionp) :
+ connection(conn),
+ sessionp(NULL),
+ sessionImplp(NULL),
+ subs_mgrp(NULL),
+ helperRunning(false),
+ openCount(0),
+ syncCount(0),
+ closing(false),
+ dtxEnabled(false)
+{
+ bool success = false;
+ try {
+ sessionp = new qpid::client::AsyncSession;
+ *sessionp = qpidConnectionp->newSession();
+ subs_mgrp = new SubscriptionManager (*sessionp);
+ waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+ sessionLock = waiters; // waiters convenient and not publicly visible
+ openCloseLock = gcnew Object();
+ success = true;
+ } finally {
+ if (!success) {
+ Cleanup();
+ // TODO: include inner exception information
+ throw gcnew QpidException ("session creation failure");
+ }
+ }
+}
+
+
+void AmqpSession::Cleanup()
+{
+ bool connected = connection->IsOpen;
+
+ if (subs_mgrp != NULL) {
+ if (connected)
+ subs_mgrp->stop();
+ delete subs_mgrp;
+ subs_mgrp = NULL;
+ }
+
+ if (sessionp != NULL) {
+ if (connected) {
+ sessionp->close();
+ }
+ delete sessionp;
+ sessionp = NULL;
+ sessionImplp = NULL;
+ }
+}
+
+
+static qpid::framing::Xid& getXid(XaTransaction^ xaTx)
+{
+ return *((qpid::framing::Xid *)xaTx->XidHandle.ToPointer());
+}
+
+
+void AmqpSession::CheckOpen()
+{
+ if (closing)
+ throw gcnew ObjectDisposedException("AmqpSession");
+}
+
+
+// Called by the parent AmqpConnection
+
+void AmqpSession::ConnectionClosed()
+{
+ lock l(sessionLock);
+
+ if (closing)
+ return;
+
+ closing = true;
+
+ if (connection->IsOpen) {
+ // send closing handshakes...
+
+ if (dtxEnabled) {
+ // session may close before all its transactions complete, at least force the phase 0 flush
+ if (pendingTransactions->Count > 0) {
+ array<XaTransaction^>^ txArray = pendingTransactions->ToArray();
+ l.release();
+ for each (XaTransaction^ xaTx in txArray) {
+ //xaTx->SessionClosing(this);
+ xaTx->WaitForCompletion();
+ }
+ l.acquire();
+ }
+ }
+
+ WaitLastSync (%l);
+ // Assert pendingTransactions->Count == 0
+
+ if (openXaTransaction != nullptr) {
+ // send final dtxend
+ sessionp->dtxEnd(getXid(openXaTransaction), false, true, false);
+ openXaTransaction = nullptr;
+ openSystemTransaction = nullptr;
+ // this operation will complete by the time Cleanup() returns
+ }
+ }
+
+ Cleanup();
+}
+
+InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue)
+{
+ return CreateInputLink(sourceQueue, true, false, nullptr, nullptr);
+}
+
+InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary,
+ System::String^ filterKey, System::String^ exchange)
+{
+ lock ocl(openCloseLock);
+ lock l(sessionLock);
+ CheckOpen();
+
+ InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange);
+ {
+ if (openCount == 0) {
+ l.release();
+ connection->NotifyBusy();
+ }
+ openCount++;
+ }
+ return link;
+}
+
+OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue)
+{
+ lock ocl(openCloseLock);
+ lock l(sessionLock);
+ CheckOpen();
+
+ OutputLink^ link = gcnew OutputLink (this, targetQueue);
+
+ if (sessionImplp == NULL) {
+ // not needed unless sending messages
+ SessionBase_0_10Access sa(*sessionp);
+ boost::shared_ptr<SessionImpl> sip = sa.get();
+ sessionImplp = sip.get();
+ }
+
+ if (openCount == 0) {
+ l.release();
+ connection->NotifyBusy();
+ }
+ openCount++;
+
+ return link;
+}
+
+
+// called whenever a child InputLink or OutputLink is closed or finalized
+void AmqpSession::NotifyClosed()
+{
+ lock ocl(openCloseLock);
+ openCount--;
+ if (openCount == 0) {
+ connection->NotifyIdle();
+ }
+}
+
+
+CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state)
+{
+ lock l(sessionLock);
+
+ // delimit with session dtx commands depending on the transaction context
+ UpdateTransactionState(%l);
+
+ CheckOpen();
+
+ bool syncPending = false;
+
+ // create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream
+
+ std::string exname = QpidMarshal::ToNative(queue);
+ FrameSet *framesetp = (FrameSet *) mbody->GetFrameSet().ToPointer();
+ uint8_t acceptMode=1;
+ uint8_t acquireMode=0;
+ MessageTransferBody mtcmd(ProtocolVersion(0,10), exname, acceptMode, acquireMode);
+ // ask for a command completion
+ mtcmd.setSync(true);
+
+ //send it
+
+ Future *futurep = NULL;
+ try {
+ futurep = new Future(sessionImplp->send(mtcmd, *framesetp));
+
+ CompletionWaiter^ waiter = nullptr;
+ if (async || (timeout != TimeSpan::MaxValue)) {
+ waiter = gcnew CompletionWaiter(this, timeout, (IntPtr) futurep, callback, state);
+ // waiter is responsible for releasing the Future native resource
+ futurep = NULL;
+ addWaiter(waiter);
+ return waiter;
+ }
+
+ // synchronous send with no timeout: no need to involve the asyncHelper thread
+
+ IncrementSyncs();
+ syncPending = true;
+ l.release();
+ internalWaitForCompletion((IntPtr) futurep);
+ }
+ finally {
+ if (syncPending) {
+ if (!l.is_locked())
+ l.acquire();
+ DecrementSyncs();
+ }
+ if (futurep != NULL)
+ delete (futurep);
+ }
+ return nullptr;
+}
+
+
+void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey)
+{
+ lock l(sessionLock);
+ CheckOpen();
+
+ sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue),
+ arg::exchange=QpidMarshal::ToNative(exchange),
+ arg::bindingKey=QpidMarshal::ToNative(filterKey));
+
+}
+
+
+void AmqpSession::internalWaitForCompletion(IntPtr fp)
+{
+ Debug::Assert(syncCount > 0, "sync counter mismatch");
+
+ // Qpid native lib call to wait for the command completion
+ ((Future *)fp.ToPointer())->wait(*sessionImplp);
+}
+
+// call with lock held
+void AmqpSession::addWaiter(CompletionWaiter^ waiter)
+{
+ IncrementSyncs();
+ waiters->Add(waiter);
+ if (!helperRunning) {
+ helperRunning = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &AmqpSession::asyncHelper));
+ }
+}
+
+
+void AmqpSession::removeWaiter(CompletionWaiter^ waiter)
+{
+ // a waiter can be removed from anywhere in the list if timed out
+
+ lock l(sessionLock);
+ int idx = waiters->IndexOf(waiter);
+ if (idx == -1) {
+ // TODO: assert or log
+ }
+ else {
+ waiters->RemoveAt(idx);
+ DecrementSyncs();
+ }
+}
+
+
+// process CompletionWaiter list one at a time.
+
+void AmqpSession::asyncHelper(Object ^unused)
+{
+ lock l(sessionLock);
+
+ while (true) {
+ if (waiters->Count == 0) {
+ helperRunning = false;
+ return;
+ }
+
+ CompletionWaiter^ waiter = waiters[0];
+ l.release();
+ // can block, but for short time
+ // the waiter removes itself from the list, possibly as the timer thread on timeout
+ waiter->Run();
+ l.acquire();
+ }
+}
+
+bool AmqpSession::MessageStop(std::string &name)
+{
+ lock l(sessionLock);
+
+ if (closing)
+ return false;
+
+ sessionp->messageStop(name, true);
+ return true;
+}
+
+void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing)
+{
+ lock l(sessionLock);
+
+ if (!browsing) {
+ // delimit with session dtx commands depending on the transaction context
+ UpdateTransactionState(%l);
+ }
+
+ CheckOpen();
+
+ sessionp->markCompleted(transfers, false);
+ if (!browsing)
+ sessionp->messageAccept(transfers, false);
+}
+
+
+// call with session lock held
+
+void AmqpSession::UpdateTransactionState(lock^ slock)
+{
+ Transaction^ currentTx = Transaction::Current;
+ if ((currentTx == nullptr) && !dtxEnabled) {
+ // no transaction scope and no previous dtx work to monitor
+ return;
+ }
+
+ if (currentTx == openSystemTransaction) {
+ // no change
+ return;
+ }
+
+ if (!dtxEnabled) {
+ // AMQP requires that this be the first dtx-related command on the session
+ sessionp->dtxSelect(false);
+ dtxEnabled = true;
+ pendingTransactions = gcnew Collections::Generic::List<XaTransaction^>();
+ }
+
+ bool notify = false; // unless the System.Transaction is no longer active
+ XaTransaction^ oldXaTx = openXaTransaction;
+ if (openSystemTransaction != nullptr) {
+ // The application may start a new transaction before the phase0 on rollback
+ try {
+ if (openSystemTransaction->TransactionInformation->Status != TransactionStatus::Active) {
+ notify = true;
+ }
+ } catch (System::ObjectDisposedException^) {
+ notify = true;
+ }
+ }
+
+ slock->release();
+ // only use stack variables until lock re-acquired
+
+ if (notify) {
+ // will do call back to all enlisted sessions. call with session lock released.
+ // If NotifyPhase0() wins the race to start phase 0, openXaTransaction will be null
+ oldXaTx->NotifyPhase0();
+ }
+
+ XaTransaction^ newXaTx = nullptr;
+ if (currentTx != nullptr) {
+ // This must be called with locks released. The DTC and System.Transactions methods that
+ // will be called hold locks that interfere with the ITransactionResourceAsync callbacks.
+ newXaTx = DtxResourceManager::GetXaTransaction(this, currentTx);
+ }
+
+ slock->acquire();
+
+ if (closing)
+ return;
+
+ if (openSystemTransaction != nullptr) {
+ // some other transaction has the dtx window open
+ // close the XID window, suspend = true... in case it is used again
+ sessionp->dtxEnd(getXid(openXaTransaction), false, true, false);
+ openSystemTransaction = nullptr;
+ openXaTransaction = nullptr;
+ }
+
+
+ // Call enlist with session lock held. The XaTransaction will call DtxStart before returning.
+ if (newXaTx != nullptr) {
+ if (!pendingTransactions->Contains(newXaTx)) {
+ pendingTransactions->Add(newXaTx);
+ }
+
+ newXaTx->Enlist(this);
+ }
+
+ openXaTransaction = newXaTx;
+ openSystemTransaction = currentTx;
+}
+
+
+typedef TypedResult<qpid::framing::XaResult> XaResultCompletion;
+
+
+// send the required closing dtx.End before Phase 1
+
+IntPtr AmqpSession::BeginPhase0Flush(XaTransaction ^xaTx) {
+
+ lock l(sessionLock);
+ IntPtr completionp = IntPtr::Zero;
+ try {
+ if (sessionp != NULL) {
+
+ // proceed even if "closing == true", the phase 0 is part of the transition from closing to closed
+
+ if (xaTx != openXaTransaction) {
+ // a different transaction (or none) is in scope, so xaTx was previously suspended.
+ // must re-open it to close it properly
+ if (openXaTransaction != nullptr) {
+ // suspend the session's current pending transaction
+ // it wil be reopened in a future enlistment or phase 0 flush.
+ sessionp->dtxEnd(getXid(openXaTransaction), false, true, false);
+ }
+ // resuming
+ sessionp->dtxStart(getXid(xaTx), false, true, false);
+ }
+
+ // the closing (i.e. non-suspended) dtxEnd happens here (exactly once for a given transaction)
+ // set the sync bit since phase0 is a precondition to prepare or abort
+ completionp = (IntPtr) new XaResultCompletion(sessionp->dtxEnd(getXid(xaTx), false, false, true));
+ IncrementSyncs();
+ }
+ }
+ catch (System::Exception^ ) {
+ // all the caller wants to know is if completionp is non-null
+ }
+
+ openXaTransaction = nullptr;
+ openSystemTransaction = nullptr;
+ return completionp;
+}
+
+
+void AmqpSession::EndPhase0Flush(XaTransaction ^xaTx, IntPtr intptr) {
+ XaResultCompletion *completionp = (XaResultCompletion *) intptr.ToPointer();
+ lock l(sessionLock);
+
+ if (completionp != NULL) {
+ try {
+ l.release();
+ completionp->wait();
+ pendingTransactions->Remove(xaTx);
+ }
+ catch (System::Exception^) {
+ // connection closed or network drop
+ }
+ finally {
+ l.acquire();
+ DecrementSyncs();
+ delete completionp;
+ }
+ }
+}
+
+
+IntPtr AmqpSession::DtxStart(IntPtr ip, bool join, bool resume) {
+ // called with session lock held (as a callback from the Enlist())
+ // The XaTransaction knows if this should be the originating dtxStart, or a join/resume
+ IntPtr rv = IntPtr::Zero;
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ if (join || resume) {
+ sessionp->dtxStart(*xidp, join, resume, false);
+ }
+ else {
+ // The XaTransaction needs to track when the first dtxStart completes to safely request a join
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+ rv = (IntPtr) new XaResultCompletion(sessionp->dtxStart(*xidp, join, resume, false));
+ }
+
+ return rv;
+}
+
+
+IntPtr AmqpSession::DtxPrepare(IntPtr ip) {
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ lock l(sessionLock);
+
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+ return (IntPtr) new XaResultCompletion(sessionp->dtxPrepare(*xidp, true));
+}
+
+
+IntPtr AmqpSession::DtxCommit(IntPtr ip, bool onePhase) {
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ lock l(sessionLock);
+
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+ return (IntPtr) new XaResultCompletion(sessionp->dtxCommit(*xidp, onePhase, true));
+}
+
+
+IntPtr AmqpSession::DtxRollback(IntPtr ip) {
+ qpid::framing::Xid* xidp = (qpid::framing::Xid *) ip.ToPointer();
+ lock l(sessionLock);
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs(); // caller must use ReleaseCompletion() for corresponding DecrementSyncs
+
+ return (IntPtr) new XaResultCompletion(sessionp->dtxRollback(*xidp, true));
+}
+
+
+//call with lock held
+void AmqpSession::IncrementSyncs() {
+ syncCount++;
+}
+
+
+//call with lock held
+void AmqpSession::DecrementSyncs() {
+ syncCount--;
+ Debug::Assert(syncCount >= 0, "sync counter underrun");
+ if (syncCount == 0) {
+ if (closeWaitHandle != nullptr) {
+ // now OK to move from closing to closed
+ closeWaitHandle->Set();
+ }
+ }
+}
+
+
+// call with lock held
+void AmqpSession::WaitLastSync(lock ^l) {
+ if (syncCount == 0)
+ return;
+ if (AppDomain::CurrentDomain->IsFinalizingForUnload()) {
+ // a wait would be a hang. No more syncs coming
+ return;
+ }
+ if (closeWaitHandle == nullptr)
+ closeWaitHandle = gcnew ManualResetEvent(false);
+ l->release();
+ closeWaitHandle->WaitOne();
+ l->acquire();
+}
+
+
+void AmqpSession::ReleaseCompletion(IntPtr completion) {
+ lock l(sessionLock);
+ DecrementSyncs();
+ delete completion.ToPointer();
+}
+
+
+// Non-exclusive borrowing for a "brief" period. I.e. several synced
+// commands (address resolution)
+
+IntPtr AmqpSession::BorrowNativeSession() {
+ lock l(sessionLock);
+ if (closing)
+ return IntPtr::Zero;
+
+ IncrementSyncs();
+ return (IntPtr) sessionp;
+}
+
+void AmqpSession::ReturnNativeSession() {
+ lock l(sessionLock);
+ DecrementSyncs();
+}
+
+}}} // namespace Apache::Qpid::Cli
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
new file mode 100644
index 0000000000..7a49496805
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
@@ -0,0 +1,109 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "AmqpConnection.h"
+#include "MessageBodyStream.h"
+#include "CompletionWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace System::Diagnostics;
+
+
+using namespace qpid::client;
+using namespace std;
+
+ref class InputLink;
+ref class OutputLink;
+ref class XaTransaction;
+
+public ref class AmqpSession
+{
+private:
+ Object^ sessionLock;
+ Object^ openCloseLock;
+ AmqpConnection^ connection;
+ AsyncSession* sessionp;
+ SessionImpl* sessionImplp;
+ SubscriptionManager* subs_mgrp;
+ Collections::Generic::List<CompletionWaiter^>^ waiters;
+ bool helperRunning;
+
+ // number of active InputLinks and OutputLinks
+ int openCount;
+
+ // the number of async commands sent to the broker that need completion confirmation
+ int syncCount;
+
+ bool closing;
+ ManualResetEvent^ closeWaitHandle;
+ bool dtxEnabled;
+ Transaction^ openSystemTransaction;
+ XaTransaction^ openXaTransaction;
+ Collections::Generic::List<XaTransaction^>^ pendingTransactions;
+
+ void Cleanup();
+ void CheckOpen();
+ void asyncHelper(Object ^);
+ void addWaiter(CompletionWaiter^ waiter);
+ void UpdateTransactionState(msclr::lock^ sessionLock);
+ void IncrementSyncs();
+ void DecrementSyncs();
+ void WaitLastSync(msclr::lock^ l);
+
+public:
+ OutputLink^ CreateOutputLink(System::String^ targetQueue);
+ InputLink^ CreateInputLink(System::String^ sourceQueue);
+
+ // 0-10 specific support; deprecated in favor of Qpid messaging addresses
+ InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange);
+ void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey);
+
+internal:
+ AmqpSession(AmqpConnection^ connection, qpid::client::Connection* qpidConnection);
+ void NotifyClosed();
+ CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state);
+ void ConnectionClosed();
+ void internalWaitForCompletion(IntPtr future);
+ void removeWaiter(CompletionWaiter^ waiter);
+ bool MessageStop(std::string &name);
+ void AcceptAndComplete(SequenceSet& transfers, bool browsing);
+ IntPtr BeginPhase0Flush(XaTransaction^);
+ void EndPhase0Flush(XaTransaction^, IntPtr);
+ IntPtr DtxStart(IntPtr xidp, bool, bool);
+ IntPtr DtxPrepare(IntPtr xidp);
+ IntPtr DtxCommit(IntPtr xidp, bool onePhase);
+ IntPtr DtxRollback(IntPtr xidp);
+ void ReleaseCompletion(IntPtr completion);
+ IntPtr BorrowNativeSession();
+ void ReturnNativeSession();
+
+ property AmqpConnection^ Connection {
+ AmqpConnection^ get () { return connection; }
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp
new file mode 100644
index 0000000000..91c23ae30a
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+using namespace System;
+using namespace System::Reflection;
+using namespace System::Runtime::CompilerServices;
+using namespace System::Runtime::InteropServices;
+using namespace System::Security::Permissions;
+
+//
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+//
+[assembly:AssemblyTitleAttribute("Apache.Qpid.Interop")];
+[assembly:AssemblyDescriptionAttribute("")];
+[assembly:AssemblyConfigurationAttribute("")];
+[assembly:AssemblyCompanyAttribute("")];
+[assembly:AssemblyProductAttribute("")];
+[assembly:AssemblyCopyrightAttribute("")];
+[assembly:AssemblyTrademarkAttribute("")];
+[assembly:AssemblyCultureAttribute("")];
+
+//
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the value or you can default the Revision and Build Numbers
+// by using the '*' as shown below:
+
+[assembly:AssemblyVersionAttribute("1.0.*")];
+
+[assembly:ComVisible(false)];
+
+[assembly:CLSCompliantAttribute(true)];
+
+[assembly:SecurityPermission(SecurityAction::RequestMinimum, UnmanagedCode = true)];
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp
new file mode 100644
index 0000000000..e39ee1b1ae
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp
@@ -0,0 +1,145 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "CompletionWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace msclr;
+
+// A class to provide IAsyncResult semantics for a qpid AsyncSession command (i.e. 0-10 messageTransfer)
+// when the client session receives a "Completion" notification from the Broker.
+
+
+CompletionWaiter::CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback^ callback, Object^ state)
+{
+ this->qpidFuture = future;
+ this->asyncCallback = callback;
+ this->state = state;
+ this->parent = parent;
+ this->thisLock = gcnew Object();
+ // do this after the Completion Waiter is fully initialized, in case of
+ // very small timespan
+ if (timeSpan != TimeSpan::MaxValue) {
+ this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1));
+ }
+}
+
+
+void CompletionWaiter::WaitForCompletion()
+{
+ if (isCompleted)
+ return;
+
+ lock l(thisLock);
+ while (!isCompleted) {
+ Monitor::Wait(thisLock);
+ }
+}
+
+void CompletionWaiter::Run()
+{
+ // no locks required in this method
+ if (isCompleted)
+ return;
+
+ try {
+ // Wait for the arrival of the "AMQP Completion" indication from the Broker
+ parent->internalWaitForCompletion(qpidFuture);
+ }
+ catch (System::Exception^ e) {
+ runException = e;
+ }
+ finally {
+ delete(qpidFuture.ToPointer());
+ qpidFuture = (IntPtr) NULL;
+ }
+
+ if (timer != nullptr) {
+ timer->~Timer();
+ timer = nullptr;
+ }
+
+ Complete(false);
+}
+
+
+// "Complete" here means complete the AsyncResult, which may precede broker "command completion" if timed out
+
+void CompletionWaiter::Complete(bool isTimerThread)
+{
+ lock l(thisLock);
+ if (isCompleted)
+ return;
+
+ isCompleted = true;
+ if (isTimerThread)
+ timedOut = true;
+
+ Monitor::PulseAll(thisLock);
+
+ // do this check and signal while locked
+ if (asyncWaitHandle != nullptr)
+ asyncWaitHandle->Set();
+
+ l.release();
+
+ parent->removeWaiter(this);
+
+ if (asyncCallback != nullptr) {
+ // guard against application callback exception
+ try {
+ asyncCallback(this);
+ }
+ catch (System::Exception^) {
+ // log it?
+ }
+ }
+}
+
+
+void CompletionWaiter::TimeoutCallback(Object^ state)
+{
+ CompletionWaiter^ waiter = (CompletionWaiter^) state;
+ waiter->Complete(true);
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
new file mode 100644
index 0000000000..88880c3721
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
@@ -0,0 +1,98 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+
+public ref class CompletionWaiter : IAsyncResult
+{
+private:
+ bool timedOut;
+ // has an owner thread
+ bool assigned;
+ System::Exception^ runException;
+ AsyncCallback^ asyncCallback;
+ Threading::Timer ^timer;
+ bool isCompleted;
+ Object^ state;
+ Object^ thisLock;
+ ManualResetEvent^ asyncWaitHandle;
+ AmqpSession^ parent;
+ IntPtr qpidFuture;
+ void Complete(bool isTimerThread);
+ static void TimeoutCallback(Object^ state);
+ static TimerCallback^ timeoutCallback = gcnew TimerCallback(CompletionWaiter::TimeoutCallback);
+
+ internal:
+ CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback ^callback, Object^ state);
+
+ void Run();
+ void WaitForCompletion();
+
+ property bool Assigned {
+ bool get () { return assigned; }
+ }
+
+ property bool TimedOut {
+ bool get () { return timedOut; }
+ }
+
+
+ public:
+
+ virtual property bool IsCompleted {
+ bool get () { return isCompleted; }
+ }
+
+ virtual property bool CompletedSynchronously {
+ bool get () { return false; }
+ }
+
+ virtual property WaitHandle^ AsyncWaitHandle {
+ WaitHandle^ get () {
+ if (asyncWaitHandle != nullptr) {
+ return asyncWaitHandle;
+ }
+
+ msclr::lock l(thisLock);
+ if (asyncWaitHandle == nullptr) {
+ asyncWaitHandle = gcnew ManualResetEvent(isCompleted);
+ }
+ return asyncWaitHandle;
+ }
+ }
+
+
+ virtual property Object^ AsyncState {
+ Object^ get () { return state; }
+ }
+
+
+
+
+};
+
+}}} // namespace Apache::Qpid::Interop
+
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
new file mode 100644
index 0000000000..6ea31f8401
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.cpp
@@ -0,0 +1,285 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <oletx2xa.h>
+#include <iostream>
+#include <fstream>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+#include "QpidException.h"
+#include "QpidMarshal.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+
+/*
+ * There is one DtxResourceManager per broker and per application process.
+ *
+ * Each RM manages a collection of active XaTransaction objects. Participating AmqpSessions enlist
+ * (or re-enlist) with an XaTransaction indexed by the corresponding System.Transaction object. The
+ * RM maintains its own AmqpSession for sending 2PC commnds (dtxPrepare, dtxCommit etc.). The
+ * XaTransaction object works through the lifecycle of the Transaction, including prompting the
+ * enlisted sessions to send their delimiting dtxEnd commands.
+ *
+ * A separate DtcPlugin.cpp file provides the recovery logic when needed in a library named
+ * qpidxarm.dll. The MSDTC maintans recovery info in its log and tracks when there may be
+ * transactions in doubt. See the documentation for IDtcToXaHelperSinglePipe.
+ *
+ * To enable transaction support:
+ * DTC requires a registry key to find the plugin
+ * [HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\MSDTC\XADLL] qpidxarm.dll -> [path to qpidxarm.dll]
+ * DTC needs to be configured for XA
+ * cmdprompt -> dcomcnfg -> Component services -> My Computer -> DTC -> Local DTC -> right click properties -> Security -> Enable XA Transactions
+ *
+ */
+
+// TODO: provide shutdown mechanism, perhaps callback from Connection Idle for enlisted connections.
+// But note that a new RM registration with the DTC is very expensive.
+
+
+DtxResourceManager::DtxResourceManager(AmqpConnection^ appConnection) {
+ dtcComp = NULL;
+ xaHelperp = NULL;
+ rmCookie = 0;
+ doubtCount = 0;
+ tmDown = false;
+ AmqpConnection^ clonedCon = appConnection->Clone();
+ dtxControlSession = clonedCon->CreateSession();
+ dataSourceName = clonedCon->DataSourceName;
+ transactionMap = gcnew Collections::Generic::Dictionary<Transaction^, XaTransaction^>();
+
+ HRESULT hr;
+
+ try {
+ // instead of pinning this instance, just use tmp stack variables for small stuff
+ IUnknown* tmp = NULL;
+ // request the default DTC
+ hr = DtcGetTransactionManager(NULL, NULL, IID_IUnknown, 0, 0, 0, (void **)&tmp);
+ if (hr != S_OK)
+ throw gcnew QpidException("connection failure to DTC service");
+ dtcComp = tmp;
+
+ IDtcToXaHelperSinglePipe *tmp2 = NULL;
+ hr = ((IUnknown *)dtcComp)->QueryInterface(IID_IDtcToXaHelperSinglePipe, (void**) &tmp2);
+ if (hr != S_OK)
+ throw gcnew QpidException("DTC XA unavailable");
+ xaHelperp = tmp2;
+
+ std::string native_dsn = QpidMarshal::ToNative(dataSourceName);
+ DWORD tmp3;
+
+ // This call doesn't return until the DTC has opened and closed a connection to the broker
+ // and written a recovery entry in its log.
+ hr = ((IDtcToXaHelperSinglePipe *) xaHelperp)->XARMCreate(const_cast<char *>(native_dsn.c_str()), "qpidxarm.dll", &tmp3);
+ if (hr != S_OK) {
+ switch (hr) {
+ case E_FAIL:
+ throw gcnew QpidException("Resource Manager DLL configuration error");
+ case E_INVALIDARG:
+ throw gcnew QpidException("Resource Manager internal error");
+ case E_OUTOFMEMORY:
+ throw gcnew QpidException("Resource Manager out of memory");
+ case E_UNEXPECTED:
+ throw gcnew QpidException("Resource Manager internal failure");
+ case XACT_E_TMNOTAVAILABLE:
+ case XACT_E_CONNECTION_DOWN:
+ throw gcnew QpidException("MSDTC unavailable");
+
+ default:
+ throw gcnew QpidException("Resource Manager Registration failed");
+ }
+ }
+
+ rmCookie = tmp3;
+ }
+ finally {
+ if (rmCookie == 0) {
+ // undo partial construction
+ Cleanup();
+ }
+ }
+}
+
+
+DtxResourceManager::!DtxResourceManager() {
+ Cleanup();
+}
+
+
+DtxResourceManager::~DtxResourceManager() {
+ GC::SuppressFinalize(this);
+ Cleanup();
+}
+
+
+// Called when the DTC COM proxy sends TMDOWN to a pending XaTransaction
+// called once for each outstanding tx
+
+void DtxResourceManager::TmDown() {
+ // this block is the only place where both locks are held
+ lock l1(transactionMap);
+ lock l2(resourceManagerMap);
+ if (tmDown)
+ return;
+
+ tmDown = true;
+ resourceManagerMap->Remove(this->dataSourceName);
+ // defer cleanup until last TmDown notification received
+}
+
+
+
+void DtxResourceManager::Cleanup() {
+ for each (Collections::Generic::KeyValuePair<Transaction^, XaTransaction^> kvp in transactionMap) {
+ XaTransaction^ xaTr = kvp.Value;
+ xaTr->ChildFinalize();
+ }
+
+ try {
+ if (rmCookie != 0) {
+ // implies no recovery needed
+ bool cleanSession = (doubtCount == 0) && (transactionMap->Count == 0);
+ ((IDtcToXaHelperSinglePipe *)xaHelperp)->ReleaseRMCookie(rmCookie, cleanSession);
+ rmCookie = 0;
+ }
+
+
+ if (xaHelperp != NULL) {
+ ((IDtcToXaHelperSinglePipe *) xaHelperp)->Release();
+ xaHelperp = NULL;
+ }
+
+ if (dtcComp != NULL) {
+ ((IUnknown *) dtcComp)->Release();
+ dtcComp = NULL;
+ }
+
+ if (dtxControlSession != nullptr) {
+ dtxControlSession->Connection->Close();
+ }
+
+ }
+ catch (Exception^) {}
+}
+
+
+XaTransaction^ DtxResourceManager::GetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) {
+ // find or create the RM instance associated with the session's broker
+ AmqpConnection^ connection = appSession->Connection;
+ DtxResourceManager^ instance = connection->CachedResourceManager;
+
+ // try cached rm first
+ if (instance != nullptr) {
+ XaTransaction^ xaTx = instance->InternalGetXaTransaction(appSession, transaction);
+ if (xaTx != nullptr)
+ return xaTx;
+ else {
+ // cached version no longer available, force new rm creation
+ connection->CachedResourceManager = nullptr;
+ }
+ }
+
+ lock l(resourceManagerMap);
+ String^ dsn = connection->DataSourceName;
+ if (!resourceManagerMap->TryGetValue(dsn, instance)) {
+ instance = gcnew DtxResourceManager(connection->Clone());
+ resourceManagerMap->Add(dsn, instance);
+ connection->CachedResourceManager = instance;
+ }
+ l.release();
+
+ return instance->InternalGetXaTransaction(appSession, transaction);
+}
+
+
+XaTransaction^ DtxResourceManager::InternalGetXaTransaction(AmqpSession^ appSession, Transaction^ transaction) {
+ // find or create the tx proxy instance associated with the DTC transaction
+ lock l(transactionMap);
+ if (tmDown)
+ return nullptr;
+
+ XaTransaction^ xaTransaction = nullptr;
+ if (!transactionMap->TryGetValue(transaction, xaTransaction)) {
+ xaTransaction = gcnew XaTransaction(transaction, (IDtcToXaHelperSinglePipe *) xaHelperp, rmCookie, this);
+ transactionMap->Add(transaction, xaTransaction);
+ }
+
+ return xaTransaction;
+}
+
+void DtxResourceManager::Complete(Transaction ^tx) {
+ lock l(transactionMap);
+ transactionMap->Remove(tx);
+
+ if (tmDown && (transactionMap->Count == 0)) {
+ // no more activity on this instance
+ GC::SuppressFinalize(this);
+ Cleanup();
+ }
+}
+
+
+void DtxResourceManager::IncrementDoubt() {
+ Interlocked::Increment(doubtCount);
+}
+
+
+void DtxResourceManager::DecrementDoubt() {
+ Interlocked::Decrement(doubtCount);
+}
+
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+void DtxResourceManager::ForceRecovery(Transaction ^tx) {
+ lock l(resourceManagerMap);
+ for each (Collections::Generic::KeyValuePair<System::String^, DtxResourceManager^> kvp in resourceManagerMap) {
+
+ Collections::Generic::Dictionary<Transaction^, XaTransaction^>^ txmap = kvp.Value->transactionMap;
+ XaTransaction^ xaTransaction = nullptr;
+ lock l2(txmap);
+ if (txmap->TryGetValue(tx, xaTransaction)) {
+ xaTransaction->ForceRecovery();
+ }
+ }
+}
+#endif
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h
new file mode 100644
index 0000000000..7df491eec2
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/DtxResourceManager.h
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Transactions;
+
+ref class XaTransaction;
+
+public ref class DtxResourceManager
+{
+private:
+ // Receive() or WaitForMessage()
+ AmqpSession^ dtxControlSession;
+ String^ dataSourceName;
+ bool consumed;
+ DWORD rmCookie;
+ void* xaHelperp;
+ void* dtcComp;
+ int doubtCount;
+ DtxResourceManager(AmqpConnection^);
+ XaTransaction^ InternalGetXaTransaction (AmqpSession^ session, Transaction^ transaction);
+ bool tmDown;
+
+ // The active transactions
+ Collections::Generic::Dictionary<Transaction^, XaTransaction^>^ transactionMap;
+
+ // one resource manager per AMQP broker per process
+ static Collections::Generic::Dictionary<System::String^, DtxResourceManager^>^ resourceManagerMap =
+ gcnew Collections::Generic::Dictionary<System::String^, DtxResourceManager^>();
+
+ void Cleanup();
+ ~DtxResourceManager();
+ !DtxResourceManager();
+
+internal:
+ static XaTransaction^ GetXaTransaction (AmqpSession^ session, Transaction^ transaction);
+ void Complete(Transaction ^tx);
+ void TmDown();
+
+ property AmqpSession^ DtxControlSession {
+ AmqpSession^ get () { return dtxControlSession; }
+ }
+
+ void IncrementDoubt();
+ void DecrementDoubt();
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+public:
+ static void ForceRecovery(Transaction ^tx);
+#endif
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
new file mode 100644
index 0000000000..2b0119e338
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -0,0 +1,867 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+// Scalability note: When using async methods, an async helper thread is created
+// to block on the Demux BlockingQueue. This design should be revised in line
+// with proposed changes to the native library to reduce the number of servicing
+// threads for large numbers of subscriptions.
+
+// synchronization is accomplished with locks, but also by ensuring that only one
+// MessageWaiter (the one at the front of the line) is ever active.
+// async threads to watch for: Close/finalizer, Timers, SyncCredit and the native Dispatch
+// thread (who deposits FrameSets into the local queue and is oblivious to the
+// managed space locks).
+
+
+// The folowing def must match the "Frames" private typedef.
+// TODO, make Qpid-cpp "Frames" definition visible.
+typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
+
+InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
+ qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp,
+ bool exclusive,
+ bool temporary, System::String^ filterKey, System::String^ exchange) :
+ amqpSession(session),
+ subscriptionp(NULL),
+ localQueuep(NULL),
+ queuePtrp(NULL),
+ dequeuedFrameSetpp(NULL),
+ disposed(false),
+ finalizing(false)
+{
+ bool success = false;
+ System::Exception^ linkException = nullptr;
+
+ waiters = gcnew Collections::Generic::List<MessageWaiter^>();
+ linkLock = waiters; // private and available
+ subscriptionLock = gcnew Object();
+ qpidAddress = QpidAddress::CreateAddress(sourceQueue, true);
+ qpidAddress->ResolveLink(session);
+ browsing = qpidAddress->Browsing;
+
+ try {
+ std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName);
+
+ if (temporary) {
+ qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
+ qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange),
+ arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey));
+ qpidSessionp->sync();
+ }
+
+ localQueuep = new LocalQueue;
+ SubscriptionSettings settings;
+ settings.flowControl = FlowControl::messageCredit(0);
+ settings.completionMode = CompletionMode::MANUAL_COMPLETION;
+
+ if (browsing) {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE;
+ }
+ else {
+ settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED;
+ settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT;
+ }
+
+ Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
+ subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
+
+ // the roundabout way to obtain localQueuep->queue
+ SessionBase_0_10Access sa(*qpidSessionp);
+ boost::shared_ptr<SessionImpl> simpl = sa.get();
+ queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName()));
+
+ success = true;
+ } finally {
+ if (!success) {
+ Cleanup();
+ linkException = gcnew QpidException ("InputLink creation failure");
+ throw linkException;
+ }
+ }
+}
+
+// called with lock held
+void InputLink::ReleaseNative()
+{
+ // involves talking to the Broker unless the connection is broken
+
+ if ((subscriptionp != NULL) && !finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
+ try {
+ subscriptionp->cancel();
+ }
+ catch (const std::exception& error) {
+ // TODO: log this properly
+ std::cout << "shutdown error " << error.what() << std::endl;
+ }
+ }
+
+ // free native mem (or smart pointers) that we own
+ if (subscriptionp != NULL) {
+ delete subscriptionp;
+ subscriptionp = NULL;
+ }
+ if (queuePtrp != NULL) {
+ delete queuePtrp;
+ queuePtrp = NULL;
+ }
+ if (localQueuep != NULL) {
+ if (!finalizing) {
+ // TODO: find boost time error on cleanup when in finalizer thread
+ delete localQueuep;
+ localQueuep = NULL;
+ }
+ }
+ if (dequeuedFrameSetpp != NULL) {
+ delete dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ }
+}
+
+void InputLink::Cleanup()
+{
+ {
+ lock l(linkLock);
+ if (disposed)
+ return;
+
+ disposed = true;
+
+ // if the asyncHelper exists and is idle, unblock it
+ if (asyncHelperWaitHandle != nullptr) {
+ asyncHelperWaitHandle->Set();
+ }
+
+ // wakeup anyone waiting for messages
+ if (queuePtrp != NULL)
+ (*queuePtrp)->close();
+
+ // wait for any sync operations on the subscription to complete before ReleaseNative
+ lock l2(subscriptionLock);
+
+ try {}
+ finally
+ {
+ ReleaseNative();
+ }
+ }
+
+ // Now that subscription is torn down, we can execute pending delete on remote node
+ qpidAddress->CleanupLink(amqpSession);
+ amqpSession->NotifyClosed();
+}
+
+InputLink::~InputLink()
+{
+ Cleanup();
+}
+
+InputLink::!InputLink()
+{
+ finalizing = true;
+ Cleanup();
+}
+
+void InputLink::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+// call with lock held
+bool InputLink::haveMessage()
+{
+ if (dequeuedFrameSetpp != NULL)
+ return true;
+
+ if (queuePtrp != NULL) {
+ if ((*queuePtrp)->size() > 0)
+ return true;
+ }
+ return false;
+}
+
+IntPtr InputLink::nextLocalMessage()
+{
+ lock l(linkLock);
+
+ if (disposed)
+ return (IntPtr) NULL;
+
+ // A message already pulled off BlockingQueue?
+ if (dequeuedFrameSetpp != NULL) {
+ QpidFrameSetPtr* rv = dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ return (IntPtr) rv;
+ }
+
+ if ((*queuePtrp)->empty())
+ return (IntPtr) NULL;
+
+ bool received = false;
+ QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr;
+
+ try {
+ received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
+ if (received) {
+ QpidFrameSetPtr* rv = frameSetpp;
+ // no need to free native in finally block
+ frameSetpp = NULL;
+ return (IntPtr) rv;
+ }
+ } catch(const std::exception& error) {
+ // should be no async tampering with queue since we hold the lock and have a
+ // smart pointer ref to the native LocalQueue, even if the network connection fails...
+ cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl;
+ // TODO: log this
+ }
+ finally {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ }
+ }
+
+ return (IntPtr) NULL;
+}
+
+
+
+void InputLink::unblockWaiter()
+{
+ // to be followed by resetQueue() below
+ lock l(linkLock);
+ if (disposed)
+ return;
+ (*queuePtrp)->close();
+}
+
+
+
+// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks
+// a blocking thread without interefering with queue contents or the ability to push
+// new incoming messages.
+
+void InputLink::resetQueue()
+{
+ lock l(linkLock);
+ if (disposed)
+ return;
+ if ((*queuePtrp)->isClosed()) {
+ (*queuePtrp)->open();
+ }
+}
+
+
+// returns true if there is a message to consume, i.e. nextLocalMessage() won't block
+
+bool InputLink::internalWaitForMessage()
+{
+ Demux::QueuePtr demuxQueuePtr;
+
+ bool received = false;
+ QpidFrameSetPtr* frameSetpp = NULL;
+ try {
+ lock l(linkLock);
+ if (disposed)
+ return false;
+ if (haveMessage())
+ return true;
+
+ AdjustCredit();
+
+ // get a scoped smart ptr ref to guard against async close or hangup
+ demuxQueuePtr = *queuePtrp;
+ frameSetpp = new QpidFrameSetPtr;
+
+ l.release();
+ // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired.
+ received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
+ l.acquire();
+
+ if (received) {
+ dequeuedFrameSetpp = frameSetpp;
+ frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream
+ }
+
+ return true;
+ } catch(const std::exception& ) {
+ // timeout or connection closed
+ return false;
+ }
+ finally {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ }
+ }
+
+ return false;
+}
+
+
+// call with lock held
+void InputLink::addWaiter(MessageWaiter^ waiter)
+{
+ waiters->Add(waiter);
+ if (waiters->Count == 1) {
+ // mark this waiter as ready to run
+ // Only the waiter at the head of the queue is active.
+ waiter->Activate();
+ }
+
+ if (waiter->Assigned)
+ return;
+
+ if (asyncHelperWaitHandle == nullptr) {
+ asyncHelperWaitHandle = gcnew ManualResetEvent(false);
+ ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper);
+ (gcnew Thread(threadDelegate))->Start();
+ }
+
+ if (waiters->Count == 1) {
+ // wake up the asyncHelper
+ asyncHelperWaitHandle->Set();
+ }
+}
+
+
+void InputLink::removeWaiter(MessageWaiter^ waiter) {
+ // a waiter can be removed from anywhere in the list if timed out
+
+ lock l(linkLock);
+ int idx = waiters->IndexOf(waiter);
+ if (idx == -1) {
+ // TODO: assert or log
+ if (asyncHelperWaitHandle != nullptr) {
+ // just in case.
+ asyncHelperWaitHandle->Set();
+ }
+ return;
+ }
+
+ waiters->RemoveAt(idx);
+ if (waiter->TimedOut) {
+ // may have to give back message if it arrives momentarily
+ AdjustCredit();
+ }
+
+ // let the next waiter know it's his turn.
+ if (waiters->Count > 0) {
+ MessageWaiter^ nextWaiter = waiters[0];
+
+ // wakeup the asyncHelper thread to help out if necessary.
+ if (!nextWaiter->Assigned) {
+ asyncHelperWaitHandle->Set();
+ }
+
+ l.release();
+ nextWaiter->Activate();
+ return;
+ }
+ else {
+ if (disposed && (asyncHelperWaitHandle != nullptr)) {
+ asyncHelperWaitHandle->Set();
+ }
+ }
+}
+
+
+void InputLink::asyncHelper()
+{
+ lock l(linkLock);
+
+ while (true) {
+ if (disposed && (waiters->Count == 0)) {
+ asyncHelperWaitHandle = nullptr;
+ return;
+ }
+
+ if (waiters->Count > 0) {
+ MessageWaiter^ waiter = waiters[0];
+
+ l.release();
+ if (waiter->AcceptForWork()) {
+ waiter->Run();
+ }
+ l.acquire();
+ }
+
+ // sleep if more work may be coming or it is currently someone else's turn
+ if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) {
+ // wait for something to do
+ asyncHelperWaitHandle->Reset();
+ l.release();
+ asyncHelperWaitHandle->WaitOne();
+ l.acquire();
+ }
+ }
+}
+
+void InputLink::sync()
+{
+ // used by the MessageWaiter timeout thread to not run before fully initialized
+ lock l(linkLock);
+}
+
+
+void InputLink::PrefetchLimit::set(int value)
+{
+ lock l(linkLock);
+ prefetchLimit = value;
+
+ int delta = 0;
+
+ // rough rule of thumb to keep the flow, but reduce chatter.
+ // for small messages, the credit request is almost as expensive as the transfer itself.
+ // experience may suggest a better heuristic or require a property for the low water mark
+ if (prefetchLimit >= 3) {
+ delta = prefetchLimit / 3;
+ }
+ minWorkingCredit = prefetchLimit - delta;
+ AdjustCredit();
+}
+
+
+// call with lock held
+void InputLink::AdjustCredit()
+{
+ if (creditSyncPending || disposed)
+ return;
+
+ // low watermark check
+ if ((prefetchLimit != 0) &&
+ (workingCredit >= minWorkingCredit) &&
+ (workingCredit >= waiters->Count))
+ return;
+
+ // should have enough for all waiters or to satisfy the prefetch window
+ int targetCredit = waiters->Count;
+ if (targetCredit < prefetchLimit)
+ targetCredit = prefetchLimit;
+
+ if (targetCredit > workingCredit) {
+ subscriptionp->grantMessageCredit(targetCredit - workingCredit);
+ workingCredit = targetCredit;
+ return;
+ }
+ if (targetCredit < workingCredit) {
+ if ((targetCredit == 0) && (prefetchLimit == 0)) {
+ creditSyncPending = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &InputLink::SyncCredit));
+ }
+ // TODO: also shrink credit when prefetchLimit != 0
+ }
+}
+
+void InputLink::SyncCredit(Object ^unused)
+{
+ lock l(linkLock);
+
+ try {
+ if (disposed)
+ return;
+
+ if (!amqpSession->MessageStop(subscriptionp->getName())) {
+ // connection closed
+ return;
+ }
+
+ l.release();
+ // use setFlowControl to re-enable credit flow on the broker.
+ // setFlowControl is a sync operation
+ {
+ lock l2(subscriptionLock);
+ if (subscriptionp != NULL) {
+ subscriptionp->setFlowControl(subscriptionp->getSettings().flowControl);
+ }
+ }
+ l.acquire();
+
+ if (disposed)
+ return;
+
+ // let existing waiters use up any messages that arrived.
+ // local queue size can only decrease until more credit is issued
+ while (true) {
+ if ((waiters->Count > 0) && ((*queuePtrp)->size() > 0)) {
+ l.release();
+ // a rare use case and not used in performance oriented code.
+ // optimization can wait until the qpid/messaging api is used
+ Thread::Sleep(10);
+ l.acquire();
+ if (disposed)
+ return;
+ }
+ else {
+ break;
+ }
+ }
+
+ // At this point, the lock is held and we are fully synced with the broker
+ // so we have a valid snapshot
+
+ if ((prefetchLimit == 0) && ((*queuePtrp)->size() > 0)) {
+ // can't be sure application will request a message again any time soon
+ QpidFrameSetPtr frameSetp;
+ while (!(*queuePtrp)->empty()) {
+ (*queuePtrp)->pop(frameSetp);
+ SequenceSet frameSetID(frameSetp->getId());
+ subscriptionp->release(frameSetID);
+ }
+
+ // don't touch dequeuedFrameSetpp. It is spoken for: explicitely from a
+ // MessageWaiter about to to get the nextLocalMessage(), or implicitely
+ // from a WaitForMessage().
+ }
+ // TODO: if prefetchLimit != 0, release messages from back of the queue that exceed targetCredit
+
+ workingCredit = (*queuePtrp)->size();
+ if (dequeuedFrameSetpp != NULL) {
+ workingCredit++;
+ }
+ }
+ finally {
+ creditSyncPending = false;
+ }
+
+ AdjustCredit();
+}
+
+
+AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
+{
+ QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
+ bool ownFrameSet = true;
+ bool haveProperties = false;
+
+ try {
+ MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp);
+ ownFrameSet = false; // stream releases on close/dispose
+
+ AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream);
+
+ AMQHeaderBody* headerBodyp = (*fspp)->getHeaders();
+ uint64_t contentSize = (*fspp)->getContentSize();
+ SequenceSet frameSetID((*fspp)->getId());
+
+ // target managed representation
+ AmqpProperties^ amqpProperties = gcnew AmqpProperties();
+
+ // source native representation
+ const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>();
+ const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>();
+
+ if (deliveryProperties) {
+ if (deliveryProperties->hasRoutingKey()) {
+ haveProperties = true;
+
+ amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str());
+ }
+
+ if (deliveryProperties->hasDeliveryMode()) {
+ if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT)
+ amqpProperties->Durable = true;
+ }
+
+ if (deliveryProperties->hasTtl()) {
+ long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond;
+ amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks));
+ }
+ }
+
+ if (messageProperties) {
+
+ if (messageProperties->hasReplyTo()) {
+ haveProperties = true;
+ const ReplyTo& rpto = messageProperties->getReplyTo();
+ String^ rk = nullptr;
+ String^ ex = nullptr;
+ if (rpto.hasRoutingKey()) {
+ rk = gcnew String(rpto.getRoutingKey().c_str());
+ }
+ if (rpto.hasExchange()) {
+ ex = gcnew String(rpto.getExchange().c_str());
+ }
+ amqpProperties->SetReplyTo(ex,rk);
+ }
+
+ if (messageProperties->hasContentType()) {
+ haveProperties = true;
+ amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str());
+
+ if (messageProperties->hasContentEncoding()) {
+ String^ enc = gcnew String(messageProperties->getContentEncoding().c_str());
+ if (!String::IsNullOrEmpty(enc)) {
+ // TODO: properly assemble 1.0 style to 0-10 for all cases
+ amqpProperties->ContentType += "; charset=" + enc;
+ }
+ }
+ }
+
+ if (messageProperties->hasCorrelationId()) {
+ haveProperties = true;
+ const std::string& ncid = messageProperties->getCorrelationId();
+ int len = ncid.size();
+ array<unsigned char>^ mcid = gcnew array<unsigned char>(len);
+ Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len);
+ amqpProperties->CorrelationId = mcid;
+ }
+
+ if (messageProperties->hasUserId()) {
+ haveProperties = true;
+ const std::string& nuid = messageProperties->getUserId();
+ int len = nuid.size();
+ array<unsigned char>^ muid = gcnew array<unsigned char>(len);
+ Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len);
+ amqpProperties->UserId = muid;
+ }
+
+ if (messageProperties->hasApplicationHeaders()) {
+ haveProperties = true;
+ const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders();
+ int count = fieldTable.count();
+
+ if (count > 0) {
+ haveProperties = true;
+ Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap =
+ gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count);
+
+ for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) {
+
+ qpid::framing::FieldValue::Data &data = i->second->getData();
+
+ // TODO: replace these generic int/string conversions with handler for each AMQP specific type:
+ // uint8_t dataType = i->second->getType();
+ // switch (dataType) { case TYPE_CODE_STR8: ... }
+
+ if (data.convertsToInt()) {
+ mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt()));
+ }
+ if (data.convertsToString()) {
+ std::string ns = data.getString();
+ String^ ms = gcnew String(ns.data(), 0, ns.size());
+ mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms));
+ }
+ }
+
+ amqpProperties->PropertyMap = mmap;
+ }
+
+ }
+ }
+
+ if (haveProperties) {
+ amqpMessage->Properties = amqpProperties;
+ }
+
+ // We have a message we can return to the caller.
+ // Tell the broker we got it.
+
+ // subscriptionp->accept(frameSetID) is a slow sync operation in the native API
+ // so do it within the AsyncSession directly
+ amqpSession->AcceptAndComplete(frameSetID, browsing);
+
+ workingCredit--;
+ // check if more messages need to be requested from broker
+ AdjustCredit();
+
+ return amqpMessage;
+ }
+ finally {
+ if (ownFrameSet)
+ delete (fspp);
+ }
+}
+
+ // As for IInputChannel:
+ // if success, return true + amqpMessage
+ // elseif timeout, return false
+ // elseif closed/EOF, return true and amqpMessage = null
+ // else throw an Exception
+
+bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
+{
+ lock l(linkLock);
+
+ if (waiters->Count == 0) {
+ // see if there is a message already available without blocking
+ IntPtr fspp = nextLocalMessage();
+ if (fspp.ToPointer() != NULL) {
+ amqpMessage = createAmqpMessage(fspp);
+ return true;
+ }
+ }
+
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr);
+ addWaiter(waiter);
+
+ l.release();
+ waiter->Run();
+ l.acquire();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ IntPtr waiterMsg = waiter->Message;
+ if (waiterMsg.ToPointer() == NULL) {
+ if (disposed) {
+ // indicate normal EOF on channel
+ amqpMessage = nullptr;
+ return true;
+ }
+ }
+
+ amqpMessage = createAmqpMessage(waiterMsg);
+ return true;
+}
+
+IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+
+ //TODO: if haveMessage() complete synchronously
+
+ lock l(linkLock);
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
+ addWaiter(waiter);
+ return waiter;
+}
+
+bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage)
+{
+
+ // TODO: validate result
+
+ MessageWaiter^ waiter = (MessageWaiter ^) result;
+
+ waiter->WaitForCompletion();
+
+ if (waiter->RunException != nullptr)
+ throw waiter->RunException;
+
+ if (waiter->TimedOut) {
+ amqpMessage = nullptr;
+ return false;
+ }
+
+ IntPtr waiterMsg = waiter->Message;
+ if (waiterMsg.ToPointer() == NULL) {
+ if (disposed) {
+ // indicate normal EOF on channel
+ amqpMessage = nullptr;
+ return true;
+ }
+ }
+
+ amqpMessage = createAmqpMessage(waiterMsg);
+ return true;
+}
+
+
+bool InputLink::WaitForMessage(TimeSpan timeout)
+{
+ lock l(linkLock);
+
+ if (disposed)
+ return false;
+
+ if (waiters->Count == 0) {
+ // see if there is a message already available without blocking
+ if (haveMessage())
+ return true;
+ }
+
+ // Same as for TryReceive, except consuming = false
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr);
+ addWaiter(waiter);
+
+ l.release();
+ waiter->Run();
+ l.acquire();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ return haveMessage();
+}
+
+IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+ lock l(linkLock);
+
+ // Same as for BeginTryReceive, except consuming = false
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
+ addWaiter(waiter);
+ return waiter;
+}
+
+bool InputLink::EndWaitForMessage(IAsyncResult^ result)
+{
+ MessageWaiter^ waiter = (MessageWaiter ^) result;
+
+ waiter->WaitForCompletion();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ return haveMessage();
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
new file mode 100644
index 0000000000..136d53d280
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
@@ -0,0 +1,110 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "MessageWaiter.h"
+#include "QpidAddress.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+// smart pointer to the low level AMQP 0-10 frames of the message
+typedef qpid::framing::FrameSet::shared_ptr QpidFrameSetPtr;
+
+public ref class InputLink
+{
+private:
+ AmqpSession^ amqpSession;
+ Subscription* subscriptionp;
+ LocalQueue* localQueuep;
+ Demux::QueuePtr* queuePtrp;
+ Collections::Generic::List<MessageWaiter^>^ waiters;
+ bool disposed;
+ bool finalizing;
+ Object^ linkLock;
+ Object^ subscriptionLock;
+ QpidFrameSetPtr* dequeuedFrameSetpp;
+ ManualResetEvent^ asyncHelperWaitHandle;
+ // number of messages to buffer locally for future consumption
+ int prefetchLimit;
+ // the number of messages requested and not yet processed
+ int workingCredit;
+ // stopping and restarting the message flow
+ bool creditSyncPending;
+ // working credit low water mark
+ int minWorkingCredit;
+
+ bool browsing;
+ QpidAddress^ qpidAddress;
+
+ void Cleanup();
+ void ReleaseNative();
+ bool haveMessage();
+ void addWaiter(MessageWaiter^ waiter);
+ void asyncHelper();
+ AmqpMessage^ createAmqpMessage(IntPtr msgp);
+ void AdjustCredit();
+ void SyncCredit(Object ^);
+
+internal:
+ InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp,
+ qpid::client::SubscriptionManager *qpidSubsMgrp, bool exclusive, bool temporary, System::String^ filterKey,
+ System::String^ exchange);
+
+ bool internalWaitForMessage();
+ void unblockWaiter();
+ void resetQueue();
+ IntPtr nextLocalMessage();
+ void removeWaiter(MessageWaiter^ waiter);
+ void sync();
+
+public:
+ ~InputLink();
+ !InputLink();
+ void Close();
+
+ bool TryReceive(TimeSpan timeout, [Out] AmqpMessage ^% amqpMessage);
+ IAsyncResult^ BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
+ bool EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage);
+
+ bool WaitForMessage(TimeSpan timeout);
+ IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
+ bool EndWaitForMessage(IAsyncResult^ result);
+
+ property int PrefetchLimit {
+ int get () { return prefetchLimit; }
+ void set (int value);
+ }
+
+ property bool Browsing {
+ bool get () { return browsing; }
+ }
+
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
new file mode 100644
index 0000000000..fe288cbe76
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -0,0 +1,501 @@
+<?xml version="1.0" encoding="Windows-1252"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="9.00"
+ Name="Interop"
+ ProjectGUID="{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}"
+ RootNamespace="Interop"
+ Keyword="ManagedCProj"
+ TargetFrameworkVersion="196613"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ <Platform
+ Name="x64"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(ProjectDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ CommandLine="copy ..\AmqpTypes\obj\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule $(ConfigurationName)"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalOptions=" /Zm1000 /wd4244 /wd4800 /wd4355 /FU $(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ Optimization="0"
+ AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ PreprocessorDefinitions="WIN32,_WINDOWS,_DEBUG,BOOST_ALL_DYN_LINK,_CRT_NONSTDC_NO_WARNINGS,NOMINMAX,WIN32_LEAN_AND_MEAN,_SCL_SECURE_NO_WARNINGS,HAVE_CONFIG_H"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalOptions=" /STACK:10000000 /machine:I386 $(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalDependencies="$(NOINHERIT) kernel32.lib user32.lib gdi32.lib winspool.lib shell32.lib ole32.lib oleaut32.lib uuid.lib comdlg32.lib advapi32.lib xolehlp.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib $(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib rpcrt4.lib ws2_32.lib "
+ OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories="&quot;$(BOOST_ROOT)\lib&quot;"
+ GenerateDebugInformation="true"
+ AssemblyDebug="1"
+ TargetMachine="1"
+ KeyFile="$(SolutionDir)\src\wcfnet.snk"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(ProjectDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ CommandLine="copy ..\AmqpTypes\obj\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule $(ConfigurationName)"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalOptions=" /Zm1000 /wd4244 /wd4800 /wd4355 /FU $(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ PreprocessorDefinitions="WIN32,_WINDOWS,NDEBUG,BOOST_ALL_DYN_LINK,_CRT_NONSTDC_NO_WARNINGS,NOMINMAX,WIN32_LEAN_AND_MEAN,_SCL_SECURE_NO_WARNINGS,HAVE_CONFIG_H"
+ Optimization="2"
+ RuntimeLibrary="2"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalOptions=" /STACK:10000000 /machine:I386 $(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalDependencies="$(NOINHERIT) kernel32.lib user32.lib gdi32.lib winspool.lib shell32.lib ole32.lib oleaut32.lib uuid.lib comdlg32.lib advapi32.lib xolehlp.lib $(QPID_BUILD_ROOT)\src\Release\qpidclient.lib $(QPID_BUILD_ROOT)\src\Release\qpidcommon.lib rpcrt4.lib ws2_32.lib "
+ LinkIncremental="2"
+ OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
+ AdditionalLibraryDirectories="&quot;$(BOOST_ROOT)\lib&quot;"
+ KeyFile="$(SolutionDir)\src\wcfnet.snk"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Debug|x64"
+ OutputDirectory="$(ProjectDir)$(PlatformName)\$(ConfigurationName)"
+ IntermediateDirectory="$(PlatformName)\$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ CommandLine="copy ..\AmqpTypes\obj\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule $(PlatformName)\$(ConfigurationName)"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ TargetEnvironment="3"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalOptions=" /Zm1000 /wd4244 /wd4800 /wd4355 /FU $(PlatformName)\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ Optimization="0"
+ AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ PreprocessorDefinitions="WIN32,_WINDOWS,_DEBUG,BOOST_ALL_DYN_LINK,_CRT_NONSTDC_NO_WARNINGS,NOMINMAX,WIN32_LEAN_AND_MEAN,_SCL_SECURE_NO_WARNINGS,HAVE_CONFIG_H"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalOptions=" /STACK:10000000 /machine:x64 /debug $(PlatformName)\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalDependencies="$(NoInherit) kernel32.lib user32.lib gdi32.lib winspool.lib shell32.lib ole32.lib oleaut32.lib uuid.lib comdlg32.lib advapi32.lib rpcrt4.lib ws2_32.lib xolehlp.lib $(QPID_BUILD_ROOT)\src\Debug\qpidcommond.lib $(QPID_BUILD_ROOT)\src\Debug\qpidclientd.lib"
+ OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories="&quot;$(BOOST_ROOT)\lib&quot;"
+ GenerateDebugInformation="true"
+ AssemblyDebug="1"
+ TargetMachine="17"
+ KeyFile="$(SolutionDir)\src\wcfnet.snk"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|x64"
+ OutputDirectory="$(ProjectDir)$(PlatformName)\$(ConfigurationName)"
+ IntermediateDirectory="$(PlatformName)\$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ CommandLine="copy ..\AmqpTypes\obj\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule $(PlatformName)\$(ConfigurationName)"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ TargetEnvironment="3"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalOptions=" /Zm1000 /wd4244 /wd4800 /wd4355 /FU $(PlatformName)\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalIncludeDirectories="&quot;$(QPID_BUILD_ROOT)\include&quot;;&quot;$(QPID_BUILD_ROOT)\src&quot;;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ PreprocessorDefinitions="WIN32,_WINDOWS,NDEBUG,BOOST_ALL_DYN_LINK,_CRT_NONSTDC_NO_WARNINGS,NOMINMAX,WIN32_LEAN_AND_MEAN,_SCL_SECURE_NO_WARNINGS,HAVE_CONFIG_H"
+ InlineFunctionExpansion="2"
+ Optimization="2"
+ RuntimeLibrary="2"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalOptions=" /STACK:10000000 /machine:x64 $(PlatformName)\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalDependencies="$(NoInherit) kernel32.lib user32.lib gdi32.lib winspool.lib shell32.lib ole32.lib oleaut32.lib uuid.lib comdlg32.lib advapi32.lib xolehlp.lib $(QPID_BUILD_ROOT)\src\Release\qpidclient.lib $(QPID_BUILD_ROOT)\src\Release\qpidcommon.lib rpcrt4.lib ws2_32.lib"
+ LinkIncremental="2"
+ OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
+ AdditionalLibraryDirectories="&quot;$(BOOST_ROOT)\lib&quot;"
+ KeyFile="$(SolutionDir)\src\wcfnet.snk"
+ TargetMachine="17"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ <AssemblyReference
+ RelativePath="System.dll"
+ AssemblyName="System, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.Data.dll"
+ AssemblyName="System.Data, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=x86"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.XML.dll"
+ AssemblyName="System.Xml, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.Runtime.Serialization.dll"
+ AssemblyName="System.Runtime.Serialization, Version=3.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="196608"
+ />
+ <AssemblyReference
+ RelativePath="System.Transactions.dll"
+ AssemblyName="System.Transactions, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=x86"
+ MinFrameworkVersion="131072"
+ />
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\AmqpConnection.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpMessage.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpSession.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AssemblyInfo.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\CompletionWaiter.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidAddress.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\InputLink.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageBodyStream.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageWaiter.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\OutputLink.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\DtxResourceManager.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\XaTransaction.cpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ <File
+ RelativePath=".\AmqpConnection.h"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpMessage.h"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpSession.h"
+ >
+ </File>
+ <File
+ RelativePath=".\CompletionWaiter.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidAddress.h"
+ >
+ </File>
+ <File
+ RelativePath=".\InputLink.h"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageBodyStream.h"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageWaiter.h"
+ >
+ </File>
+ <File
+ RelativePath=".\OutputLink.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidException.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidMarshal.h"
+ >
+ </File>
+ <File
+ RelativePath=".\DtxResourceManager.h"
+ >
+ </File>
+ <File
+ RelativePath=".\XaTransaction.h"
+ >
+ </File>
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
new file mode 100644
index 0000000000..f2cb5740d3
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
@@ -0,0 +1,337 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include "MessageBodyStream.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+// Thefolowing def must match "Frames" private typedef.
+// TODO: make "Frames" publicly visible.
+typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
+
+using namespace std;
+
+static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count)
+{
+ if (buffer == nullptr)
+ throw gcnew ArgumentNullException("buffer");
+
+ if (offset < 0)
+ throw gcnew ArgumentOutOfRangeException("offset");
+
+ if (count < 0)
+ throw gcnew ArgumentOutOfRangeException("count");
+
+ if ((offset + count) > buffer->Length)
+ throw gcnew ArgumentException("offset + count");
+}
+
+
+// Input stream constructor
+
+MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp)
+{
+ isInputStream = true;
+ frameSetpp = fspp;
+ fragmentCount = 0;
+ length = 0;
+ position = 0;
+ currentFramep = NULL;
+
+ const std::string *datap; // pointer to the fragment's string variable that holds the content
+
+ for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
+ if (i->getBody()->type() == CONTENT_BODY) {
+ fragmentCount++;
+ datap = &(i->castBody<AMQContentBody>()->getData());
+ length += datap->size();
+ }
+ }
+
+ // fragmentCount can be zero for an empty message
+
+ fragmentIndex = 0;
+ fragmentPosition = 0;
+
+ if (fragmentCount == 0) {
+ currentFragment = NULL;
+ fragmentLength = 0;
+ }
+ else if (fragmentCount == 1) {
+ currentFragment = datap->data();
+ fragmentLength = (int) length;
+ }
+ else {
+ fragments = gcnew array<IntPtr>(fragmentCount);
+ fragmentIndex = 0;
+ for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
+ if (i->getBody()->type() == CONTENT_BODY) {
+ datap = &(i->castBody<AMQContentBody>()->getData());
+ fragments[fragmentIndex++] = (IntPtr) (void *) datap;
+ }
+ }
+ fragmentIndex = 0;
+ datap = (const std::string *) fragments[0].ToPointer();
+ currentFragment = datap->data();
+ fragmentLength = datap->size();
+ }
+}
+
+
+int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count)
+{
+ if (!isInputStream)
+ throw gcnew NotSupportedException();
+ if (disposed)
+ throw gcnew ObjectDisposedException("Stream");
+ if (count == 0)
+ return 0;
+ ThrowIfBadArgs(buffer, offset, count);
+
+ int nRead = 0;
+ int remaining = count;
+
+ while (nRead < count) {
+ int fragAvail = fragmentLength - fragmentPosition;
+ int copyCount = min (fragAvail, remaining);
+ if (copyCount == 0) {
+ // no more to read
+ return nRead;
+ }
+
+ // copy from native space
+ IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition);
+ Marshal::Copy (nativep, buffer, offset, copyCount);
+ nRead += copyCount;
+ remaining -= copyCount;
+ fragmentPosition += copyCount;
+ offset += copyCount;
+
+ // advance to next fragment?
+ if (fragmentPosition == fragmentLength) {
+ if (++fragmentIndex < fragmentCount) {
+ const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer();
+ currentFragment = datap->data();
+ fragmentLength = datap->size();
+ fragmentPosition = 0;
+ }
+ }
+ }
+
+ return nRead;
+}
+
+
+void MessageBodyStream::pushCurrentFrame(bool lastFrame)
+{
+ // set flags as in SessionImpl::sendContent.
+ if (currentFramep->getBody()->type() == CONTENT_BODY) {
+
+ if ((fragmentCount == 1) && lastFrame) {
+ // only one content frame
+ currentFramep->setFirstSegment(false);
+ }
+ else {
+ currentFramep->setFirstSegment(false);
+ currentFramep->setLastSegment(true);
+ if (fragmentCount != 1) {
+ currentFramep->setFirstFrame(false);
+ }
+ if (!lastFrame) {
+ currentFramep->setLastFrame(false);
+ }
+ }
+ }
+ else {
+ // the header frame
+ currentFramep->setFirstSegment(false);
+ if (!lastFrame) {
+ // there will be at least one content frame
+ currentFramep->setLastSegment(false);
+ }
+ }
+
+ // add to frame set. This makes a copy and ref counts the body
+ (*frameSetpp)->append(*currentFramep);
+
+ delete currentFramep;
+
+ currentFramep = NULL;
+}
+
+
+IntPtr MessageBodyStream::GetFrameSet()
+{
+ if (currentFramep != NULL) {
+ // No more content. Tidy up the pending (possibly single header) frame.
+ pushCurrentFrame(true);
+ }
+
+ if (frameSetpp == NULL) {
+ return (IntPtr) NULL;
+ }
+
+ // shared_ptr.get()
+ return (IntPtr) (void *) (*frameSetpp).get();
+}
+
+IntPtr MessageBodyStream::GetHeader()
+{
+ return (IntPtr) headerBodyp;
+}
+
+
+// Ouput stream constructor
+
+MessageBodyStream::MessageBodyStream(int maxFrameSize)
+{
+ isInputStream = false;
+
+ maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ SequenceNumber unused; // only meaningful on incoming frames
+ frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused));
+ fragmentCount = 0;
+ length = 0;
+ position = 0;
+
+ // header goes first in the outgoing frameset
+
+ boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody);
+ currentFramep = new AMQFrame(headerBody);
+ headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get());
+
+ // mark this header frame as "full" to force the first write to create a new content frame
+ fragmentPosition = maxFrameContentSize;
+}
+
+void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count)
+{
+ if (isInputStream)
+ throw gcnew NotSupportedException();
+ if (disposed)
+ throw gcnew ObjectDisposedException("Stream");
+ if (count == 0)
+ return;
+ ThrowIfBadArgs(buffer, offset, count);
+
+ if (currentFramep == NULL) {
+ // GetFrameSet() has been called and we no longer exclusively own the underlying frames.
+ throw gcnew InvalidOperationException ("Mesage Body output already completed");
+ }
+
+ if (count <= 0)
+ return;
+
+ // keep GC memory movement at bay while copying to native space
+ pin_ptr<unsigned char> pinnedBuf = &buffer[0];
+
+ string *datap;
+
+ int remaining = count;
+ while (remaining > 0) {
+ if (fragmentPosition == maxFrameContentSize) {
+ // move to a new frame, but not until ready to add new content.
+ // zero content is valid, or the final write may exactly fill to maxFrameContentSize
+
+ pushCurrentFrame(false);
+
+ currentFramep = new AMQFrame(AMQContentBody());
+ fragmentPosition = 0;
+ fragmentCount++;
+ }
+
+ int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition));
+ datap = &(currentFramep->castBody<AMQContentBody>()->getData());
+
+ char *outp = (char *) pinnedBuf + offset;
+ if (fragmentPosition == 0) {
+ datap->assign(outp, copyCount);
+ }
+ else {
+ datap->append(outp, copyCount);
+ }
+
+ position += copyCount;
+ fragmentPosition += copyCount;
+ remaining -= copyCount;
+ offset += copyCount;
+ }
+}
+
+
+void MessageBodyStream::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ try {}
+ finally
+ {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ frameSetpp = NULL;
+ }
+ if (currentFramep != NULL) {
+ delete currentFramep;
+ currentFramep = NULL;
+ }
+ }
+}
+
+MessageBodyStream::~MessageBodyStream()
+{
+ Cleanup();
+}
+
+MessageBodyStream::!MessageBodyStream()
+{
+ Cleanup();
+}
+
+void MessageBodyStream::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h
new file mode 100644
index 0000000000..fa8e3f6bde
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h
@@ -0,0 +1,131 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace std;
+
+
+// This class provides memory streaming of the message body contents
+// between native and managed space. To avoid additional memory copies
+// in native space, it reads and writes directly to the low level Qpid
+// frames.
+
+public ref class MessageBodyStream : System::IO::Stream
+{
+private:
+ bool isInputStream;
+ long long length;
+ long long position;
+
+ // the boost smart pointer that keeps the message body frames in memory
+ FrameSet::shared_ptr *frameSetpp;
+
+ int fragmentCount;
+ int fragmentIndex;
+ const char* currentFragment;
+ int fragmentPosition;
+ int fragmentLength;
+ array<IntPtr>^ fragments;
+
+ int maxFrameContentSize;
+ AMQFrame* currentFramep;
+ void* headerBodyp;
+ bool disposed;
+ bool finalizing;
+ void Cleanup();
+
+internal:
+ // incoming message
+ MessageBodyStream(FrameSet::shared_ptr *fspp);
+ // outgoing message
+ MessageBodyStream(int maxFrameSize);
+ void pushCurrentFrame(bool last);
+public:
+ ~MessageBodyStream();
+ !MessageBodyStream();
+ virtual void Close() override;
+ virtual int Read(
+ [InAttribute] [OutAttribute] array<unsigned char>^ buffer,
+ int offset,
+ int count) override;
+
+ virtual void Write(
+ array<unsigned char>^ buffer,
+ int offset,
+ int count) override;
+
+
+ IntPtr GetFrameSet();
+ IntPtr GetHeader();
+
+ virtual void Flush() override {} // noop
+
+
+ // TODO: see CanSeek below.
+ virtual long long Seek(
+ long long offset,
+ System::IO::SeekOrigin origin) override {throw gcnew System::NotSupportedException(); }
+
+ // TODO: see CanSeek below.
+ virtual void SetLength(
+ long long value) override {throw gcnew System::NotSupportedException(); }
+
+ virtual property long long Length {
+ long long get() override { return length; }
+ };
+
+ virtual property long long Position {
+ long long get() override { return position; }
+ void set(long long p) override { throw gcnew System::NotSupportedException(); }
+ };
+
+
+ virtual property bool CanRead {
+ bool get () override { return isInputStream; }
+ }
+
+ virtual property bool CanWrite {
+ bool get () override { return !isInputStream; }
+ }
+
+ // Note: this class must return true to signal that the Length property works.
+ // Required by the raw message encoder.
+ // "If a class derived from Stream does not support seeking, calls to Length,
+ // SetLength, Position, and Seek throw a NotSupportedException".
+
+ virtual property bool CanSeek {
+ bool get () override { return true; }
+ }
+
+ virtual property bool CanTimeout {
+ bool get () override { return isInputStream; }
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
new file mode 100644
index 0000000000..f7a28b0692
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
@@ -0,0 +1,251 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace msclr;
+
+
+MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state)
+{
+ this->consuming = consuming;
+ if (!consuming) {
+ GC::SuppressFinalize(this);
+ }
+
+ if (async) {
+ this->async = true;
+ this->asyncCallback = callback;
+ this->state = state;
+ }
+ else {
+ this->assigned = true;
+ }
+ this->parent = parent;
+ this->thisLock = gcnew Object();
+
+ // do this after the Message Waiter is fully initialized, in case of
+ // very small timespan
+ if (timeSpan != TimeSpan::MaxValue) {
+ this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1));
+ }
+}
+
+MessageWaiter::~MessageWaiter()
+{
+ if (message != IntPtr::Zero) {
+ try{}
+ finally {
+ delete message.ToPointer();
+ message = IntPtr::Zero;
+ }
+ }
+}
+
+MessageWaiter::!MessageWaiter()
+{
+ this->~MessageWaiter();
+}
+
+
+void MessageWaiter::WaitForCompletion()
+{
+ if (isCompleted)
+ return;
+
+ lock l(thisLock);
+ while (!isCompleted) {
+ Monitor::Wait(thisLock);
+ }
+}
+
+void MessageWaiter::Activate()
+{
+ if (activated)
+ return;
+
+ lock l(thisLock);
+ if (!activated) {
+ activated = true;
+ Monitor::PulseAll(thisLock);
+ }
+}
+
+
+void MessageWaiter::Run()
+{
+ lock l(thisLock);
+
+ // wait until Activate(), i.e. our turn in the waiter list or a timeout
+ while (!activated) {
+ Monitor::Wait(thisLock);
+ }
+ bool haveMessage = false;
+ bool mustReset = false;
+
+ if (!timedOut)
+ blocking = true;
+
+ if (blocking) {
+ l.release();
+
+ try {
+ haveMessage = parent->internalWaitForMessage();
+ }
+ catch (System::Exception^ e) {
+ runException = e;
+ }
+
+ l.acquire();
+ blocking = false;
+ if (timedOut) {
+ // TimeoutCallback() called parent->unblockWaiter()
+ mustReset = true;
+ // let the timer thread move past critical region
+ while (processingTimeout) {
+ Monitor::Wait(thisLock);
+ }
+ }
+ }
+
+ if (timer != nullptr) {
+ timer->~Timer();
+ timer = nullptr;
+ }
+
+ if (haveMessage) {
+ timedOut = false; // for the case timeout and message arrival are essentially tied
+ if (!consuming) {
+ // just waiting
+ haveMessage = false;
+ }
+ }
+
+ if (haveMessage || mustReset) {
+ l.release();
+ if (haveMessage) {
+ // hang on to it for when the async caller gets around to retrieving
+ message = parent->nextLocalMessage();
+ }
+ if (mustReset) {
+ parent->resetQueue();
+ }
+ l.acquire();
+ }
+
+ isCompleted = true;
+ Monitor::PulseAll(thisLock);
+
+ // do this check and signal while locked
+ if (asyncWaitHandle != nullptr)
+ asyncWaitHandle->Set();
+
+ l.release();
+ parent->removeWaiter(this);
+
+
+ if (asyncCallback != nullptr) {
+ // guard against application callback exception
+ try {
+ asyncCallback(this);
+ }
+ catch (System::Exception^) {
+ // log it?
+ }
+ }
+
+}
+
+bool MessageWaiter::AcceptForWork()
+{
+ lock l(thisLock);
+ if (!assigned) {
+ assigned = true;
+ return true;
+ }
+ return false;
+}
+
+void MessageWaiter::TimeoutCallback(Object^ state)
+{
+ MessageWaiter^ waiter = (MessageWaiter^) state;
+ if (waiter->isCompleted)
+ return;
+
+ // make sure parent has finished initializing us before we get going
+ waiter->parent->sync();
+
+ lock l(waiter->thisLock);
+ if (waiter->timer == nullptr) {
+ // the waiter is in the clean up phase and doesn't need a wakeup
+ return;
+ }
+
+ // timedOut, blocking and processingTimeout work as a unit
+ waiter->timedOut = true;
+ if (waiter->blocking) {
+ // let the waiter know that we are busy with an upcoming unblock operation
+ waiter->processingTimeout = true;
+ }
+
+ waiter->Activate();
+
+ if (waiter->processingTimeout) {
+ // call with lock off
+ l.release();
+ waiter->parent->unblockWaiter();
+
+ // synchronize with blocked thread
+ l.acquire();
+ waiter->processingTimeout = false;
+ Monitor::PulseAll(waiter->thisLock);
+ }
+
+ l.release();
+
+ // If waiter has no associated thread, we must move it to completion
+ if (waiter->AcceptForWork()) {
+ waiter->Run(); // does not block since timedOut == true
+ }
+}
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
new file mode 100644
index 0000000000..3737430844
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
@@ -0,0 +1,125 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+
+public ref class MessageWaiter : IAsyncResult
+{
+private:
+ // Receive() or WaitForMessage()
+ bool consuming;
+ bool consumed;
+ bool timedOut;
+ bool async;
+ // has an owner thread
+ bool assigned;
+ // can Run (i.e. earlier MessageWaiters in the queue have completed)
+ bool activated;
+ // is making a call to internalWaitForMessage() which (usually) blocks
+ bool blocking;
+ // the timeout timer thread is lurking
+ bool processingTimeout;
+ // the saved exception from within Run() for async delivery
+ System::Exception^ runException;
+ AsyncCallback^ asyncCallback;
+ Threading::Timer ^timer;
+ bool isCompleted;
+ bool completedSynchronously;
+ Object^ state;
+ Object^ thisLock;
+ ManualResetEvent^ asyncWaitHandle;
+ InputLink^ parent;
+ static void TimeoutCallback(Object^ state);
+ static TimerCallback^ timeoutCallback = gcnew TimerCallback(MessageWaiter::TimeoutCallback);
+ IntPtr message;
+ !MessageWaiter();
+ ~MessageWaiter();
+
+ internal:
+ MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state);
+
+ void Run();
+ bool AcceptForWork();
+ void Activate();
+ void WaitForCompletion();
+
+
+ property IntPtr Message {
+ IntPtr get () {
+ if (!consuming || consumed)
+ throw gcnew InvalidOperationException("Message property");
+ consumed = true;
+ IntPtr v = message;
+ message = IntPtr::Zero;
+ GC::SuppressFinalize(this);
+ return v;
+ }
+ }
+
+ property bool Assigned {
+ bool get () { return assigned; }
+ }
+
+ property bool TimedOut {
+ bool get () { return timedOut; }
+ }
+
+ property System::Exception^ RunException {
+ System::Exception^ get() { return runException; }
+ }
+
+
+ public:
+
+ virtual property bool IsCompleted {
+ bool get () { return isCompleted; }
+ }
+
+ virtual property bool CompletedSynchronously {
+ bool get () { return completedSynchronously; }
+ }
+
+ virtual property WaitHandle^ AsyncWaitHandle {
+ WaitHandle^ get () {
+ if (asyncWaitHandle != nullptr) {
+ return asyncWaitHandle;
+ }
+
+ msclr::lock l(thisLock);
+ if (asyncWaitHandle == nullptr) {
+ asyncWaitHandle = gcnew ManualResetEvent(isCompleted);
+ }
+ return asyncWaitHandle;
+ }
+ }
+
+ virtual property Object^ AsyncState {
+ Object^ get () { return state; }
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop
+
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
new file mode 100644
index 0000000000..de7141dadb
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
@@ -0,0 +1,255 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+
+
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+
+OutputLink::OutputLink(AmqpSession^ session, String^ address) :
+ amqpSession(session),
+ disposed(false),
+ maxFrameSize(session->Connection->MaxFrameSize),
+ finalizing(false)
+{
+ qpidAddress = QpidAddress::CreateAddress(address, false);
+ qpidAddress->ResolveLink(session);
+}
+
+void OutputLink::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ // process any pending queue delete
+ qpidAddress->CleanupLink(amqpSession);
+ amqpSession->NotifyClosed();
+}
+
+OutputLink::~OutputLink()
+{
+ Cleanup();
+}
+
+OutputLink::!OutputLink()
+{
+ Cleanup();
+}
+
+void OutputLink::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+
+AmqpMessage^ OutputLink::CreateMessage()
+{
+ MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize);
+ AmqpMessage ^amqpm = gcnew AmqpMessage(mbody);
+ return amqpm;
+}
+
+
+void OutputLink::ManagedToNative(AmqpMessage^ m)
+{
+ MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream;
+
+ AmqpProperties^ mprops = m->Properties;
+
+ if (mprops != nullptr) {
+ AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer();
+
+ if (mprops->HasDeliveryProperties) {
+ DeliveryProperties* deliveryPropertiesp = bodyp->get<DeliveryProperties>(true);
+
+ if (mprops->RoutingKey != nullptr) {
+ deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey));
+ }
+
+ if (mprops->Durable) {
+ deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT);
+ }
+
+ if (mprops->TimeToLive.HasValue) {
+ long long ttl = mprops->TimeToLive.Value.Ticks;
+ bool was_positive = (ttl > 0);
+ if (ttl < 0)
+ ttl = 0;
+ ttl = ttl / TimeSpan::TicksPerMillisecond;
+ if ((ttl == 0) && was_positive)
+ ttl = 1;
+ deliveryPropertiesp->setTtl(ttl);
+ }
+ }
+
+ if (mprops->HasMessageProperties) {
+ qpid::framing::MessageProperties* messagePropertiesp =
+ bodyp->get<qpid::framing::MessageProperties>(true);
+
+ String^ replyToExchange = mprops->ReplyToExchange;
+ String^ replyToRoutingKey = mprops->ReplyToRoutingKey;
+ if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) {
+ qpid::framing::ReplyTo nReplyTo;
+ if (replyToExchange != nullptr) {
+ nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange));
+ }
+ if (replyToRoutingKey != nullptr) {
+ nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey));
+ }
+ messagePropertiesp->setReplyTo(nReplyTo);
+ }
+
+ // TODO: properly split 1.0 style to 0-10 content type + encoding
+
+ String^ contentType = mprops->ContentType;
+ if (contentType != nullptr) {
+ String^ type = nullptr;
+ String^ enc = nullptr;
+ int idx = contentType->IndexOf(';');
+ if (idx == -1) {
+ type = contentType;
+ }
+ else {
+ type = contentType->Substring(0, idx);
+ contentType = contentType->Substring(idx + 1);
+ idx = contentType->IndexOf('=');
+ if (idx != -1) {
+ enc = contentType->Substring(idx + 1);
+ enc = enc->Trim();
+ }
+ }
+ if (!String::IsNullOrEmpty(type)) {
+ messagePropertiesp->setContentType(QpidMarshal::ToNative(type));
+ }
+ if (!String::IsNullOrEmpty(enc)) {
+ messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc));
+ }
+ }
+
+
+ array<unsigned char>^ mbytes = mprops->CorrelationId;
+ if (mbytes != nullptr) {
+ pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
+ std::string s((char *) pinnedBuf, mbytes->Length);
+ messagePropertiesp->setCorrelationId(s);
+ }
+
+ mbytes = mprops->UserId;
+ if (mbytes != nullptr) {
+ pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
+ std::string s((char *) pinnedBuf, mbytes->Length);
+ messagePropertiesp->setUserId(s);
+ }
+
+ if (mprops->HasMappedProperties) {
+ qpid::framing::FieldTable fieldTable;
+ // TODO: add support for abitrary AMQP types
+ for each (Collections::Generic::KeyValuePair<System::String^, AmqpType^> kvp in mprops->PropertyMap) {
+ Type^ type = kvp.Value->GetType();
+ if (type == AmqpInt::typeid) {
+ fieldTable.setInt(QpidMarshal::ToNative(kvp.Key),
+ ((AmqpInt ^) kvp.Value)->Value);
+ }
+ else if (type == AmqpString::typeid) {
+ AmqpString^ str = (AmqpString ^) kvp.Value;
+ // For now, FieldTable supports a single string type
+ fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value));
+ }
+ }
+
+ messagePropertiesp->setApplicationHeaders(fieldTable);
+ }
+ }
+ }
+}
+
+
+
+void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout)
+{
+ // copy properties from managed space to the native counterparts
+ ManagedToNative(amqpMessage);
+
+ MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
+ CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream,
+ timeout, false, nullptr, nullptr);
+
+ if (waiter != nullptr) {
+ waiter->WaitForCompletion();
+ if (waiter->TimedOut) {
+ throw gcnew TimeoutException("Receive");
+ }
+ }
+ // else: SendMessage() has already waited for the Completion
+
+}
+
+IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+ ManagedToNative(amqpMessage);
+
+ MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
+ CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state);
+ return waiter;
+}
+
+void OutputLink::EndSend(IAsyncResult^ result)
+{
+ CompletionWaiter^ waiter = (CompletionWaiter ^) result;
+ waiter->WaitForCompletion();
+ if (waiter->TimedOut) {
+ throw gcnew TimeoutException("Receive");
+ }
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
new file mode 100644
index 0000000000..e30d1cc79f
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
@@ -0,0 +1,75 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "QpidAddress.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+public ref class OutputLink
+{
+private:
+ AmqpSession^ amqpSession;
+ QpidAddress^ qpidAddress;
+ bool disposed;
+ bool finalizing;
+ void Cleanup();
+ AmqpTypes::AmqpProperties^ defaultProperties;
+ void ManagedToNative(AmqpMessage^ m);
+ int maxFrameSize;
+
+internal:
+ OutputLink(AmqpSession^ session, String^ defaultQueue);
+
+public:
+ ~OutputLink();
+ !OutputLink();
+ void Close();
+ AmqpMessage^ CreateMessage();
+ void Send(AmqpMessage^ m, TimeSpan timeout);
+ IAsyncResult^ BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state);
+ void EndSend(IAsyncResult^ result);
+
+ property AmqpTypes::AmqpProperties^ DefaultProperties {
+ AmqpTypes::AmqpProperties^ get () { return defaultProperties; }
+ void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; }
+ }
+
+ property String^ DefaultSubject {
+ String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; }
+ }
+
+ property String^ QpidSubject {
+ String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; }
+ }
+
+};
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
new file mode 100644
index 0000000000..bfae1ab313
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp
@@ -0,0 +1,304 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+
+/*
+ * This program parses strings of the form "node/subject;{options}" as
+ * used in the Qpid messaging API. It provides basic wiring
+ * capabilities to create/delete temporary queues (to topic
+ * subsciptions) and unbound "point and shoot" queues.
+ */
+
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <oletx2xa.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/Future.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "MessageBodyStream.h"
+#include "InputLink.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+#include "QpidAddress.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+QpidAddress::QpidAddress(String^ s, bool isInput) {
+ address = s;
+ nodeName = s;
+ isInputChannel = isInput;
+ isQueue = true;
+
+ if (address->StartsWith("//")) {
+ // special case old style address to default exchange,
+ // no options, output only
+ if ((s->IndexOf(';') != -1) || isInputChannel)
+ throw gcnew ArgumentException("Invalid 0-10 address: " + address);
+ nodeName = nodeName->Substring(2);
+ return;
+ }
+
+ String^ options = nullptr;
+ int pos = s->IndexOf(';');
+ if (pos != -1) {
+ options = s->Substring(pos + 1);
+ nodeName = s->Substring(0, pos);
+
+ if (options->Length > 0) {
+ if (!options->StartsWith("{") || !options->EndsWith("}"))
+ throw gcnew ArgumentException("Invalid address: " + address);
+ options = options->Substring(1, options->Length - 2);
+ array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries);
+
+ if ((subOpts->Length % 2) != 0)
+ throw gcnew ArgumentException("Bad address (options): " + address);
+
+ for (int i=0; i < subOpts->Length; i += 2) {
+ String^ opt = subOpts[i];
+ String^ optArg = subOpts[i+1];
+ if (opt->Equals("create")) {
+ creating = PolicyApplies(optArg);
+ }
+ else if (opt->Equals("delete")) {
+ deleting = PolicyApplies(optArg);
+ }
+ else if (opt->Equals("mode")) {
+ if (optArg->Equals("browse")) {
+ browsing = isInputChannel;
+ }
+ else if (!optArg->Equals("consume")) {
+ throw gcnew ArgumentException("Invalid browsing option: " + optArg);
+ }
+ }
+ else if (opt->Equals("assert") || opt->Equals("node")) {
+ throw gcnew ArgumentException("Unsupported address option: " + opt);
+ }
+ else {
+ throw gcnew ArgumentException("Bad address option: " + opt);
+ }
+ }
+ }
+ else
+ options = nullptr;
+ }
+
+ pos = nodeName->IndexOf('/');
+ if (pos != -1) {
+ subject = nodeName->Substring(pos + 1);
+ if (String::IsNullOrEmpty(subject))
+ subject = nullptr;
+ nodeName = nodeName->Substring(0, pos);
+ }
+}
+
+
+QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) {
+ QpidAddress^ addr = gcnew QpidAddress(s, isInput);
+ return addr;
+}
+
+
+void QpidAddress::ResolveLink(AmqpSession^ amqpSession) {
+
+ AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+ if (asyncSessionp == NULL)
+ throw gcnew ObjectDisposedException("session");
+
+ deleteName = nullptr;
+ isQueue = true;
+
+ try {
+ Session session = sync(*asyncSessionp);
+ std::string n_name = QpidMarshal::ToNative(nodeName);
+ ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name);
+
+ bool queueFound = !result.getQueueNotFound();
+ bool exchangeFound = !result.getExchangeNotFound();
+
+ if (isInputChannel) {
+
+ if (queueFound) {
+ linkName = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else if (exchangeFound) {
+ isQueue = false;
+ String^ tmpkey = nullptr;
+ String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString();
+ bool haveSubject = !String::IsNullOrEmpty(subject);
+ FieldTable bindArgs;
+
+ std::string exchangeType = session.exchangeQuery(n_name).getType();
+ if (exchangeType == "topic") {
+ tmpkey = haveSubject ? subject : "#";
+ }
+ else if (exchangeType == "fanout") {
+ tmpkey = tmpname;
+ }
+ else if (exchangeType == "headers") {
+ tmpkey = haveSubject ? subject : "match-all";
+ if (haveSubject)
+ bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject));
+ bindArgs.setString("x-match", "all");
+ }
+ else if (exchangeType == "xml") {
+ tmpkey = haveSubject ? subject : "";
+ if (haveSubject) {
+ String^ v = "declare variable $qpid.subject external; $qpid.subject = '" +
+ subject + "'";
+ bindArgs.setString("xquery", QpidMarshal::ToNative(v));
+ }
+ else
+ bindArgs.setString("xquery", "true()");
+ }
+ else {
+ tmpkey = haveSubject ? subject : "";
+ }
+
+ std::string qn = QpidMarshal::ToNative(tmpname);
+ session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true);
+ bool success = false;
+ try {
+ session.exchangeBind(arg::exchange=n_name, arg::queue=qn,
+ arg::bindingKey=QpidMarshal::ToNative(tmpkey),
+ arg::arguments=bindArgs);
+ bindKey = tmpkey; // remember for later cleanup
+ success = true;
+ }
+ finally {
+ if (!success)
+ session.queueDelete(arg::queue=qn);
+ }
+ linkName = tmpname;
+ deleteName = tmpname;
+ deleting = true;
+ }
+ else if (creating) {
+ // only create "point and shoot" queues for now
+ session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+ // leave unbound
+
+ linkName = nodeName;
+
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else {
+ throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+ }
+ }
+ else {
+ // Output channel
+
+ bool oldStyleUri = address->StartsWith("//");
+
+ if (queueFound) {
+ linkName = ""; // default exchange for point and shoot
+ routingKey = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else if (exchangeFound && !oldStyleUri) {
+ isQueue = false;
+ linkName = nodeName;
+ routingKey = subject;
+ }
+ else if (creating) {
+ // only create "point and shoot" queues for now
+ session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName));
+ // leave unbound
+ linkName = "";
+ routingKey = nodeName;
+ if (deleting)
+ deleteName = nodeName;
+ }
+ else {
+ throw gcnew ArgumentException("AMQP broker node not found: " + nodeName);
+ }
+ }
+ }
+ finally {
+ amqpSession->ReturnNativeSession();
+ }
+}
+
+void QpidAddress::CleanupLink(AmqpSession^ amqpSession) {
+ if (deleteName == nullptr)
+ return;
+
+ AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer();
+ if (asyncSessionp == NULL) {
+ // TODO: log it: can't undo tear down actions
+ return;
+ }
+
+ try {
+ Session session = sync(*asyncSessionp);
+ std::string q = QpidMarshal::ToNative(deleteName);
+ if (isInputChannel && !isQueue) {
+ // undo the temp wiring to the topic
+ session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q,
+ arg::bindingKey=QpidMarshal::ToNative(bindKey));
+ }
+ session.queueDelete(q);
+ }
+ catch (Exception^ e) {
+ // TODO: log it
+ }
+ finally {
+ amqpSession->ReturnNativeSession();
+ }
+}
+
+bool QpidAddress::PolicyApplies(String^ mode) {
+ if (mode->Equals("always"))
+ return true;
+ if (mode->Equals("sender"))
+ return !isInputChannel;
+ if (mode->Equals("receiver"))
+ return isInputChannel;
+ if (mode->Equals("never"))
+ return false;
+
+ throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address));
+}
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
new file mode 100644
index 0000000000..d24317c2aa
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidAddress.h
@@ -0,0 +1,89 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+public ref class QpidAddress
+{
+private:
+ QpidAddress(String^ address, bool isInput);
+
+ // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded
+ String^ address;
+
+ String^ nodeName;
+ // "qpid.subject"
+ String^ subject;
+ // 0-10 routing key (Output channels only)
+ String^ routingKey;
+
+ String^ linkName;
+ String^ deleteName;
+ String^ bindKey;
+
+ // node type: queue/topic
+ bool isQueue;
+
+ // direction
+ bool isInputChannel;
+
+ bool creating;
+ bool deleting;
+ bool browsing;
+
+ bool PolicyApplies(String^ mode);
+
+internal:
+ static QpidAddress^ CreateAddress(String ^s, bool isInput);
+ void ResolveLink(AmqpSession^ amqpSession);
+ void CleanupLink(AmqpSession^ amqpSession);
+
+ property String^ LinkName {
+ String^ get () { return linkName; }
+ }
+
+ property String^ Subject {
+ String^ get () { return subject; }
+ }
+
+ property String^ RoutingKey {
+ String^ get () { return routingKey; }
+ }
+
+ property bool Browsing {
+ bool get () { return browsing; }
+ }
+
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h
new file mode 100644
index 0000000000..91677a5e73
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+
+public ref class QpidException : System::Exception
+{
+ public:
+
+ QpidException() : System::Exception() {}
+ QpidException(String^ estring) : System::Exception(estring) {}
+
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h
new file mode 100644
index 0000000000..3e22af7b39
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Text;
+
+
+// Helper functions for marshaling.
+
+private ref class QpidMarshal
+{
+ public:
+
+ // marshal_as<T> not available in all Visual Studio editions.
+
+ static std::string ToNative (System::String^ managed) {
+ if (managed->Length == 0) {
+ return std::string();
+ }
+ array<unsigned char>^ mbytes = Encoding::UTF8->GetBytes(managed);
+ if (mbytes->Length == 0) {
+ return std::string();
+ }
+
+ pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
+ std::string native((char *) pinnedBuf, mbytes->Length);
+ return native;
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
new file mode 100644
index 0000000000..23743316ff
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.cpp
@@ -0,0 +1,525 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+#include <transact.h>
+#include <xolehlp.h>
+#include <txdtc.h>
+#include <oletx2xa.h>
+#include <iostream>
+#include <fstream>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/Xid.h"
+
+#include "QpidException.h"
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "DtxResourceManager.h"
+#include "XaTransaction.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Transactions;
+using namespace msclr;
+
+using namespace qpid::framing::dtx;
+
+// ------------------------------------------------------------------------
+// Start of a pure native code section
+#pragma unmanaged
+// ------------------------------------------------------------------------
+
+// This is the native COM object the DTC expects to talk to for coordination.
+// There is exactly one native instance of this for each managed XaTransaction object.
+
+
+class DtcCallbackHandler : public ITransactionResourceAsync
+{
+private:
+ long useCount;
+ DtcCallbackFp managedCallback;
+public:
+ ITransactionEnlistmentAsync *txHandle;
+ DtcCallbackHandler(DtcCallbackFp cbp) : managedCallback(cbp), useCount(0) {}
+ ~DtcCallbackHandler() {}
+ virtual HRESULT __stdcall PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase);
+ virtual HRESULT __stdcall CommitRequest(DWORD grfrm, XACTUOW *unused);
+ virtual HRESULT __stdcall AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3);
+
+ virtual HRESULT __stdcall TMDown();
+ virtual HRESULT __stdcall DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject);
+ virtual ULONG __stdcall DtcCallbackHandler::AddRef();
+ virtual ULONG __stdcall DtcCallbackHandler::Release();
+ void __stdcall AbortRequestDone();
+};
+
+
+HRESULT DtcCallbackHandler::PrepareRequest(BOOL unused, DWORD grfrm, BOOL unused2, BOOL singlePhase)
+{
+ if (singlePhase) {
+ return managedCallback(DTC_SINGLE_PHASE) ? S_OK : E_FAIL;
+ }
+
+ return managedCallback(DTC_PREPARE) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::CommitRequest(DWORD grfrm, XACTUOW *unused)
+{
+ return managedCallback(DTC_COMMIT) ? S_OK : E_FAIL;
+}
+
+HRESULT DtcCallbackHandler::AbortRequest(BOID *unused, BOOL unused2, XACTUOW *unused3)
+{
+ return managedCallback(DTC_ABORT) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::TMDown()
+{
+ return managedCallback(DTC_TMDOWN) ? S_OK : E_FAIL;
+}
+
+
+HRESULT DtcCallbackHandler::QueryInterface (REFIID riid, void **ppvObject)
+{
+ *ppvObject = NULL;
+
+ if ((riid == IID_IUnknown) || (riid == IID_IResourceManagerSink))
+ *ppvObject = this;
+ else
+ return ResultFromScode(E_NOINTERFACE);
+
+ this->AddRef();
+ return S_OK;
+}
+
+
+ULONG DtcCallbackHandler::AddRef()
+{
+ return InterlockedIncrement(&useCount);
+}
+
+
+ULONG DtcCallbackHandler::Release()
+{
+ long uc = InterlockedDecrement(&useCount);
+
+ if (uc)
+ return uc;
+
+ delete this;
+ return 0;
+}
+
+
+// ------------------------------------------------------------------------
+// End of pure native code section
+#pragma managed
+// ------------------------------------------------------------------------
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+void XaTransaction::ForceRecovery() {
+ debugFailMode = true;
+}
+#endif
+
+// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
+
+
+XaTransaction::XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *xaHelperp, DWORD rmCookie, DtxResourceManager^ rm) {
+ bool success = false;
+ xidp = NULL;
+ commandCompletionp = NULL;
+ firstDtxStartCompletionp = NULL;
+ nativeHandler = NULL;
+ resourceManager = rm;
+ controlSession = rm->DtxControlSession;
+ active = true;
+ preparing = false;
+ systemTransaction = t;
+ IntPtr comTxp = IntPtr::Zero;
+ completionHandle = gcnew ManualResetEvent(false);
+
+ try {
+ enlistedSessions = gcnew Collections::Generic::List<AmqpSession^>();
+
+ // take a System.Transactions.Transaction and obtain
+ // the corresponding DTC COM object.
+ IDtcTransaction^ dtcTransaction = TransactionInterop::GetDtcTransaction(t);
+ comTxp = Marshal::GetIUnknownForObject(dtcTransaction);
+ XID winXid;
+ HRESULT hr = xaHelperp->ConvertTridToXID((DWORD *)comTxp.ToPointer(), rmCookie, &winXid);
+ if (hr != S_OK)
+ throw gcnew QpidException("get XA XID");
+
+ // Convert the X/Open format to the internal Qpid format
+ xidp = new qpid::framing::Xid();
+ xidp->setFormat((uint32_t) winXid.formatID);
+ int bqualPos = 0;
+ if (winXid.gtrid_length > 0) {
+ xidp->setGlobalId(std::string(winXid.data, winXid.gtrid_length));
+ bqualPos = winXid.gtrid_length;
+ }
+ if (winXid.bqual_length > 0) {
+ xidp->setBranchId(std::string(winXid.data + bqualPos, winXid.bqual_length));
+ }
+
+ // create the callback chain: DTC proxy -> DtcCallbackHandler -> this
+ inboundDelegate = gcnew DtcCallbackDelegate(this, &XaTransaction::DtcCallback);
+ IntPtr ip = Marshal::GetFunctionPointerForDelegate(inboundDelegate);
+ nativeHandler = new DtcCallbackHandler(static_cast<DtcCallbackFp>(ip.ToPointer()));
+ // add myself for later smart pointer destruction
+ nativeHandler->AddRef();
+
+ hr = xaHelperp->EnlistWithRM(rmCookie, (ITransaction *)comTxp.ToPointer(), nativeHandler, &(nativeHandler->txHandle));
+
+ if (hr != S_OK)
+ throw gcnew QpidException("Enlist");
+
+ success = true;
+ }
+ finally {
+ if (!success)
+ Cleanup();
+ if (comTxp != IntPtr::Zero)
+ ((IUnknown *) comTxp.ToPointer())->Release();
+ }
+}
+
+
+void XaTransaction::Cleanup() {
+ if (firstDtxStartCompletionp != NULL) {
+ try {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ firstDtxStartCompletionp = NULL;
+ }
+
+ if (nativeHandler != NULL) {
+ nativeHandler->Release();
+ nativeHandler = NULL;
+ }
+ if (xidp != NULL) {
+ delete xidp;
+ xidp = NULL;
+ }
+}
+
+
+XaTransaction^ XaTransaction::Enlist (AmqpSession ^session) {
+ lock l(enlistedSessions);
+ if (!active)
+ throw gcnew QpidException("transaction enlistment internal error");
+ if (!enlistedSessions->Contains(session)) {
+ enlistedSessions->Add(session);
+ if (firstEnlistedSession == nullptr) {
+ firstEnlistedSession = session;
+ IntPtr intptr = session->DtxStart((IntPtr) xidp, false, false);
+ firstDtxStartCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ }
+ else {
+ // the broker must see the dtxStart as a join operation, and it must arrive
+ // at the broker after the first dtx start
+ if (firstDtxStartCompletionp != NULL)
+ firstDtxStartCompletionp->wait();
+ session->DtxStart((IntPtr) xidp, true, false);
+ }
+ }
+ else {
+ // already started once, so resume is true
+ session->DtxStart((IntPtr) xidp, false, true);
+ }
+ return this;
+}
+
+
+void XaTransaction::SessionClosing(AmqpSession^ session) {
+ lock l(enlistedSessions);
+ if (!enlistedSessions->Contains(session))
+ return;
+
+ enlistedSessions->Remove(session);
+ if (!active) {
+ // Phase0Flush already done on all sessions
+ l.release();
+ return;
+ }
+
+ IntPtr completion = session->BeginPhase0Flush(this);
+ session->EndPhase0Flush(this, completion);
+
+ if (session == firstEnlistedSession) {
+ // if we just completed the dtxEnd, we know the dtxStart completed before that
+ if (firstDtxStartCompletionp != NULL) {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ firstDtxStartCompletionp = NULL;
+ }
+ }
+}
+
+
+void XaTransaction::Phase0Flush() {
+ // let each session delimit their transactional work with an AMQP dtx.end protocol frame
+ lock l(enlistedSessions);
+ if (!active)
+ return;
+
+ active = false; // no more enlistments
+ int scount = enlistedSessions->Count;
+
+ if (scount > 0) {
+ array<IntPtr> ^completions = gcnew array<IntPtr>(scount);
+ for (int i = 0; i < scount; i++) {
+
+ // TODO: skip phase0 flush for rollback case
+
+ completions[i] = enlistedSessions[i]->BeginPhase0Flush(this);
+ }
+
+ for (int i = 0; i < scount; i++) {
+ // without each session.sync(), session commands are queued up in the right order,
+ // but on their separate outbound channels, and destined for receipt at separate Broker inbound
+ // channels. It is not clear how to be sure Phase 0 dtx.End is processed in the
+ // correct order before commit on the broker without the sync.
+ enlistedSessions[i]->EndPhase0Flush(this, completions[i]);
+ }
+ }
+
+ // since all dtxEnds have completed, we know all starts have too
+ if (firstDtxStartCompletionp != NULL) {
+ try {
+ firstEnlistedSession->ReleaseCompletion((IntPtr) firstDtxStartCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ firstDtxStartCompletionp = NULL;
+ }
+}
+
+
+bool XaTransaction::DtcCallback (DtcCallbackType callback) {
+ // called by the DTC proxy thread. Be brief and don't block (but Phase0Flush?)
+
+ if (AppDomain::CurrentDomain->IsFinalizingForUnload())
+ return false;
+
+ IntPtr intptr = IntPtr::Zero;
+ currentCommand = callback;
+
+ try {
+ switch (callback) {
+ case DTC_PREPARE:
+ Phase0Flush();
+ try {
+ intptr = controlSession->DtxPrepare((IntPtr) xidp);
+ preparing = true;
+ resourceManager->IncrementDoubt();
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_COMMIT:
+#ifdef QPID_RECOVERY_TEST_HOOK
+ if (debugFailMode){ return; }
+#endif
+ // no phase 0 required. always preceded by a prepare
+ try {
+ intptr = controlSession->DtxCommit((IntPtr) xidp, false);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_ABORT:
+ Phase0Flush();
+#ifdef QPID_RECOVERY_TEST_HOOK
+ if (debugFailMode){ return; }
+#endif
+ try {
+ intptr = controlSession->DtxRollback((IntPtr) xidp);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_SINGLE_PHASE:
+ Phase0Flush();
+ try {
+ intptr = controlSession->DtxCommit((IntPtr) xidp, true);
+ }
+ catch (System::Exception^ ) {
+ // intptr remains nullptr
+ }
+ commandCompletionp = (TypedResult<qpid::framing::XaResult> *) intptr.ToPointer();
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+
+ case DTC_TMDOWN:
+ commandCompletionp = NULL;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &XaTransaction::AsyncCompleter));
+ break;
+ }
+ return true;
+ }
+ catch (System::Exception^ e) {
+ // TODO: log it
+ Console::WriteLine("Unexpected DtcCallback exception: {0}", e->ToString());
+ }
+ catch (...) {
+ // TODO: log it
+ }
+ return false;
+}
+
+
+// this handles the case where the application regains control for
+// a new transaction before we are notified (abort/rollback
+// optimization in DTC).
+
+void XaTransaction::NotifyPhase0() {
+ if (active)
+ Phase0Flush();
+}
+
+
+void XaTransaction::AsyncCompleter(Object ^unused) {
+ bool success = false;
+
+ if (commandCompletionp != NULL) {
+ try {
+ // waits for the AMQP broker's response and returns the decoded content
+ XaResult& xaResult = commandCompletionp->get();
+ if (xaResult.hasStatus()) {
+ if (xaResult.getStatus() == XaStatus::XA_STATUS_XA_OK) {
+ success = true;
+ }
+ }
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+ try {
+ controlSession->ReleaseCompletion((IntPtr) commandCompletionp);
+ }
+ catch (...) {
+ // TODO: log it?
+ }
+
+ commandCompletionp = NULL;
+ }
+
+ ITransactionEnlistmentAsync *dtcTxHandle = nativeHandler->txHandle;
+
+ HRESULT hr = success ? S_OK : E_FAIL;
+
+ switch (currentCommand) {
+ case DTC_PREPARE:
+ dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
+ break;
+
+ case DTC_COMMIT:
+ dtcTxHandle->CommitRequestDone(hr);
+ if (success)
+ resourceManager->DecrementDoubt();
+ Complete();
+ break;
+
+ case DTC_ABORT:
+ dtcTxHandle->AbortRequestDone(hr);
+ if (success) {
+ if (preparing) {
+ preparing = false;
+ resourceManager->DecrementDoubt();
+ }
+ }
+ Complete();
+ break;
+
+ case DTC_SINGLE_PHASE:
+ if (success)
+ hr = XACT_S_SINGLEPHASE;
+ dtcTxHandle->PrepareRequestDone(hr, NULL, NULL);
+ Complete();
+ break;
+
+ case DTC_TMDOWN:
+ // Stop the RM from accepting new enlistments
+ resourceManager->TmDown();
+ Complete();
+ break;
+ }
+}
+
+
+void XaTransaction::Complete() {
+ Cleanup();
+ resourceManager->Complete(systemTransaction);
+ completionHandle->Set();
+}
+
+
+void XaTransaction::WaitForCompletion() {
+ completionHandle->WaitOne();
+}
+
+
+ /*
+void XaTransaction::WaitForFlush() {
+ isFlushedHandle->WaitOne();
+}
+ */
+
+// called from DtxResourceManager Finalize
+
+void XaTransaction::ChildFinalize() {
+ lock l(enlistedSessions);
+ Phase0Flush();
+ Cleanup();
+}
+
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h
new file mode 100644
index 0000000000..8ff9f99893
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/XaTransaction.h
@@ -0,0 +1,96 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Transactions;
+
+enum DtcCallbackType{
+ DTC_PREPARE,
+ DTC_COMMIT,
+ DTC_ABORT,
+ DTC_SINGLE_PHASE,
+ DTC_TMDOWN
+};
+
+
+ref class DtxResourceManager;
+class DtcCallbackHandler;
+
+// Function pointer declaratiom for managed space delegate
+typedef bool (__stdcall *DtcCallbackFp)(DtcCallbackType);
+
+// and the delegate with the same signature
+public delegate bool DtcCallbackDelegate(DtcCallbackType);
+
+
+
+public ref class XaTransaction
+{
+private:
+ bool active;
+ DtxResourceManager^ resourceManager;
+ Transaction^ systemTransaction;
+ AmqpSession^ controlSession;
+ Collections::Generic::List<AmqpSession^>^ enlistedSessions;
+ qpid::framing::Xid* xidp;
+ DtcCallbackHandler* nativeHandler;
+ bool preparing;
+ DtcCallbackDelegate^ inboundDelegate;
+ // the Qpid async result of the AMQP dtx prepare/commit commands
+ TypedResult<qpid::framing::XaResult>* commandCompletionp;
+ // the Qpid async result of the first session to do dtx start
+ TypedResult<qpid::framing::XaResult>* firstDtxStartCompletionp;
+ ManualResetEvent^ completionHandle;
+
+ AmqpSession^ firstEnlistedSession;
+ DtcCallbackType currentCommand;
+ void AsyncCompleter(Object ^);
+ void Phase0Flush();
+ void Cleanup();
+ void Complete();
+
+internal:
+ XaTransaction(Transaction^ t, IDtcToXaHelperSinglePipe *pXaHelper, DWORD rmCookie, DtxResourceManager^ rm);
+ XaTransaction^ Enlist (AmqpSession ^session);
+ bool DtcCallback (DtcCallbackType callback);
+ void NotifyPhase0();
+ void ChildFinalize();
+ void SessionClosing(AmqpSession^ session);
+ void WaitForCompletion();
+
+ property IntPtr XidHandle {
+ IntPtr get () { return (IntPtr) xidp; }
+ }
+
+#ifdef QPID_RECOVERY_TEST_HOOK
+ void ForceRecovery();
+ bool debugFailMode;
+#endif
+
+};
+
+}}} // namespace Apache::Qpid::Interop
+