summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Interop
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp165
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h71
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp76
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h61
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp287
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h80
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp57
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp145
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h99
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp685
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/InputLink.h85
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj302
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp337
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h131
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp251
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h127
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp251
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h64
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidException.h37
-rw-r--r--qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h53
20 files changed, 3364 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
new file mode 100644
index 0000000000..02d6c7ab18
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp
@@ -0,0 +1,165 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/framing/FrameSet.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+
+// Note on locks: Use "this" for fast counting and idle/busy
+// notifications. Use the "sessions" list to serialize session
+// creation/reaping and overall tear down.
+// TODO: switch "this" lock to separate non-visible Object.
+
+
+AmqpConnection::AmqpConnection(String^ server, int port) :
+ connectionp(NULL),
+ busyCount(0),
+ disposed(false)
+{
+ bool success = false;
+ System::Exception^ openException = nullptr;
+ sessions = gcnew Collections::Generic::List<AmqpSession^>();
+
+ try {
+ connectionp = new Connection;
+ connectionp->open (QpidMarshal::ToNative(server), port);
+ // TODO: registerFailureCallback for failover
+ success = true;
+ const ConnectionSettings& settings = connectionp->getNegotiatedSettings();
+ this->maxFrameSize = settings.maxFrameSize;
+ } catch (const qpid::Exception& error) {
+ String^ errmsg = gcnew String(error.what());
+ openException = gcnew QpidException(errmsg);
+ } finally {
+ if (!success) {
+ Cleanup();
+ if (openException == nullptr) {
+ openException = gcnew QpidException ("unknown connection failure");
+ }
+ throw openException;
+ }
+ }
+}
+
+void AmqpConnection::Cleanup()
+{
+ {
+ lock l(sessions);
+ if (disposed)
+ return;
+ disposed = true;
+ }
+
+ try {
+ // let the child sessions clean up
+ for each(AmqpSession^ s in sessions) {
+ s->ConnectionClosed();
+ }
+ }
+ finally
+ {
+ if (connectionp != NULL) {
+ connectionp->close();
+ delete connectionp;
+ connectionp = NULL;
+ }
+ }
+}
+
+AmqpConnection::~AmqpConnection()
+{
+ Cleanup();
+}
+
+AmqpConnection::!AmqpConnection()
+{
+ Cleanup();
+}
+
+void AmqpConnection::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+AmqpSession^ AmqpConnection::CreateSession()
+{
+ lock l(sessions);
+ if (disposed) {
+ throw gcnew ObjectDisposedException("AmqpConnection");
+ }
+ AmqpSession^ session = gcnew AmqpSession(this, connectionp);
+ sessions->Add(session);
+ return session;
+}
+
+// called whenever a child session becomes newly busy (a first reader or writer since last idle)
+
+void AmqpConnection::NotifyBusy()
+{
+ bool changed = false;
+ {
+ lock l(this);
+ if (busyCount++ == 0)
+ changed = true;
+ }
+}
+
+// called whenever a child session becomes newly idle (a last reader or writer has closed)
+// The connection is idle when none of its child sessions are busy
+
+void AmqpConnection::NotifyIdle()
+{
+ bool connectionIdle = false;
+ {
+ lock l(this);
+ if (--busyCount == 0)
+ connectionIdle = true;
+ }
+ if (connectionIdle) {
+ OnConnectionIdle(this, System::EventArgs::Empty);
+ }
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
new file mode 100644
index 0000000000..2641391e82
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h
@@ -0,0 +1,71 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace std;
+using namespace qpid::client;
+
+ref class AmqpSession;
+
+public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs);
+
+public ref class AmqpConnection
+{
+private:
+ Connection* connectionp;
+ void Cleanup();
+ bool disposed;
+ Collections::Generic::List<AmqpSession^>^ sessions;
+ bool isOpen;
+ int busyCount;
+ int maxFrameSize;
+
+ internal:
+ void NotifyBusy();
+ void NotifyIdle();
+
+ property int MaxFrameSize {
+ int get () { return maxFrameSize; }
+ }
+
+public:
+ AmqpConnection(System::String^ server, int port);
+ ~AmqpConnection();
+ !AmqpConnection();
+ void Close();
+ AmqpSession^ CreateSession();
+ event ConnectionIdleEventHandler^ OnConnectionIdle;
+
+ property bool IsOpen {
+ bool get() { return isOpen; }
+ };
+
+ property bool IsIdle {
+ bool get() { return (busyCount == 0); }
+ }
+};
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp
new file mode 100644
index 0000000000..5c333aff60
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp
@@ -0,0 +1,76 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+AmqpMessage::AmqpMessage(MessageBodyStream ^mbs) :
+ messageBodyStream(mbs),
+ disposed(false)
+{
+}
+
+void AmqpMessage::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ messageBodyStream->Close();
+}
+
+AmqpMessage::~AmqpMessage()
+{
+ Cleanup();
+}
+
+AmqpMessage::!AmqpMessage()
+{
+ Cleanup();
+}
+
+void AmqpMessage::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h
new file mode 100644
index 0000000000..f0801d30dc
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h
@@ -0,0 +1,61 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+
+public ref class AmqpMessage
+{
+private:
+ MessageBodyStream^ messageBodyStream;
+ AmqpTypes::AmqpProperties^ amqpProperties;
+ bool disposed;
+ void Cleanup();
+
+internal:
+ AmqpMessage(MessageBodyStream ^bstream);
+
+public:
+ ~AmqpMessage();
+ !AmqpMessage();
+ void Close();
+
+ property AmqpTypes::AmqpProperties^ Properties {
+ AmqpTypes::AmqpProperties^ get () { return amqpProperties; }
+ void set(AmqpTypes::AmqpProperties^ p) { amqpProperties = p; }
+ }
+
+ property System::IO::Stream^ BodyStream {
+ System::IO::Stream^ get() { return messageBodyStream; }
+ }
+};
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
new file mode 100644
index 0000000000..bab73da74e
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp
@@ -0,0 +1,287 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/Message.h"
+#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/client/Future.h"
+
+#include "AmqpConnection.h"
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "MessageBodyStream.h"
+#include "InputLink.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+
+AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidConnectionp) :
+ connection(conn),
+ sessionp(NULL),
+ sessionImplp(NULL),
+ subs_mgrp(NULL),
+ openCount(0)
+{
+ bool success = false;
+
+ try {
+ sessionp = new qpid::client::AsyncSession;
+ *sessionp = qpidConnectionp->newSession();
+ subs_mgrp = new SubscriptionManager (*sessionp);
+ success = true;
+ waiters = gcnew Collections::Generic::List<CompletionWaiter^>();
+ } finally {
+ if (!success) {
+ Cleanup();
+ throw gcnew QpidException ("session creation failure");
+ }
+ }
+}
+
+
+void AmqpSession::Cleanup()
+{
+ if (subscriptionp != NULL) {
+ subscriptionp->cancel();
+ delete subscriptionp;
+ subscriptionp=NULL;
+ }
+
+ if (subs_mgrp != NULL) {
+ subs_mgrp->stop();
+ delete subs_mgrp;
+ subs_mgrp = NULL;
+ }
+
+ if (localQueuep != NULL) {
+ delete localQueuep;
+ localQueuep = NULL;
+ }
+
+ if (sessionp != NULL) {
+ sessionp->close();
+ delete sessionp;
+ sessionp = NULL;
+ sessionImplp = NULL;
+ }
+
+ if (connectionp != NULL) {
+ connectionp->close();
+ delete connectionp;
+ connectionp = NULL;
+ }
+}
+
+
+// Called by the parent AmqpConnection
+
+void AmqpSession::ConnectionClosed()
+{
+ Cleanup();
+}
+
+InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue)
+{
+ return CreateInputLink(sourceQueue, true, false, nullptr, nullptr);
+}
+
+InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary,
+ System::String^ filterKey, System::String^ exchange)
+{
+ InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange);
+ {
+ lock l(waiters);
+ if (openCount == 0) {
+ connection->NotifyBusy();
+ }
+ openCount++;
+ }
+ return link;
+}
+
+OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue)
+{
+ OutputLink^ link = gcnew OutputLink (this, targetQueue);
+
+ lock l(waiters);
+
+ if (sessionImplp == NULL) {
+ // not needed unless sending messages
+ SessionBase_0_10Access sa(*sessionp);
+ boost::shared_ptr<SessionImpl> sip = sa.get();
+ sessionImplp = sip.get();
+ }
+
+ if (openCount == 0) {
+ connection->NotifyBusy();
+ }
+ openCount++;
+
+ return link;
+}
+
+
+// called whenever a child InputLink or OutputLink is closed or finalized
+void AmqpSession::NotifyClosed()
+{
+ lock l(waiters);
+ openCount--;
+ if (openCount == 0) {
+ connection->NotifyIdle();
+ }
+}
+
+
+CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state)
+{
+ lock l(waiters);
+ if (sessionp == NULL)
+ throw gcnew ObjectDisposedException("Send");
+
+ // create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream
+
+ std::string exname = QpidMarshal::ToNative(queue);
+ FrameSet *framesetp = (FrameSet *) mbody->GetFrameSet().ToPointer();
+ uint8_t acceptMode=1;
+ uint8_t acquireMode=0;
+ MessageTransferBody mtcmd(ProtocolVersion(0,10), exname, acceptMode, acquireMode);
+ // ask for a command completion
+ mtcmd.setSync(true);
+
+ //send it
+
+ Future *futurep = NULL;
+ try {
+ futurep = new Future(sessionImplp->send(mtcmd, *framesetp));
+
+ CompletionWaiter^ waiter = nullptr;
+ if (async || (timeout != TimeSpan::MaxValue)) {
+ waiter = gcnew CompletionWaiter(this, timeout, (IntPtr) futurep, callback, state);
+ // waiter is responsible for releasing the Future native resource
+ futurep = NULL;
+ addWaiter(waiter);
+ }
+
+ l.release();
+
+ if (waiter != nullptr)
+ return waiter;
+
+ // synchronous send with no timeout: no need to involve the asyncHelper thread
+
+ internalWaitForCompletion((IntPtr) futurep);
+ }
+ finally {
+ if (futurep != NULL)
+ delete (futurep);
+ }
+ return nullptr;
+}
+
+void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey)
+{
+ sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue),
+ arg::exchange=QpidMarshal::ToNative(exchange),
+ arg::bindingKey=QpidMarshal::ToNative(filterKey));
+
+}
+
+
+void AmqpSession::internalWaitForCompletion(IntPtr fp)
+{
+ lock l(waiters);
+ if (sessionp == NULL)
+ throw gcnew ObjectDisposedException("AmqpSession");
+
+ // increment the smart pointer count to sessionImplp to guard agains async close
+ Session sessionCopy(*sessionp);
+
+ l.release();
+ // Qpid native lib call to wait for the command completion
+ ((Future *)fp.ToPointer())->wait(*sessionImplp);
+}
+
+// call with lock held
+void AmqpSession::addWaiter(CompletionWaiter^ waiter)
+{
+ waiters->Add(waiter);
+ if (!helperRunning) {
+ helperRunning = true;
+ ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &AmqpSession::asyncHelper));
+ }
+}
+
+
+void AmqpSession::removeWaiter(CompletionWaiter^ waiter)
+{
+ // a waiter can be removed from anywhere in the list if timed out
+
+ lock l(waiters);
+ int idx = waiters->IndexOf(waiter);
+ if (idx == -1) {
+ // TODO: assert or log
+ }
+ else {
+ waiters->RemoveAt(idx);
+ }
+}
+
+
+// process CompletionWaiter list one at a time.
+
+void AmqpSession::asyncHelper(Object ^unused)
+{
+ lock l(waiters);
+
+ while (true) {
+ if (waiters->Count == 0) {
+ helperRunning = false;
+ return;
+ }
+
+ CompletionWaiter^ waiter = waiters[0];
+ l.release();
+ // can block, but for short time
+ // the waiter removes itself from the list, possibly as the timer thread on timeout
+ waiter->Run();
+ l.acquire();
+ }
+}
+
+
+}}} // namespace Apache::Qpid::Cli
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
new file mode 100644
index 0000000000..b959a4123a
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h
@@ -0,0 +1,80 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "AmqpConnection.h"
+#include "MessageBodyStream.h"
+#include "CompletionWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+ref class InputLink;
+ref class OutputLink;
+
+public ref class AmqpSession
+{
+private:
+ AmqpConnection^ connection;
+ Connection* connectionp;
+ AsyncSession* sessionp;
+ SessionImpl* sessionImplp;
+ SubscriptionManager* subs_mgrp;
+ Subscription* subscriptionp;
+ LocalQueue* localQueuep;
+ Collections::Generic::List<CompletionWaiter^>^ waiters;
+ bool helperRunning;
+ int openCount;
+
+ void Cleanup();
+ void asyncHelper(Object ^);
+ void addWaiter(CompletionWaiter^ waiter);
+
+public:
+ OutputLink^ CreateOutputLink(System::String^ targetQueue);
+ InputLink^ CreateInputLink(System::String^ sourceQueue);
+
+ // 0-10 specific support
+ InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange);
+ void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey);
+
+internal:
+ AmqpSession(AmqpConnection^ connection, qpid::client::Connection* qpidConnection);
+ void NotifyClosed();
+ CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state);
+ void ConnectionClosed();
+ void internalWaitForCompletion(IntPtr Future);
+ void removeWaiter(CompletionWaiter^ waiter);
+
+ property AmqpConnection^ Connection {
+ AmqpConnection^ get () { return connection; }
+ }
+
+
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp
new file mode 100644
index 0000000000..91c23ae30a
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp
@@ -0,0 +1,57 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+using namespace System;
+using namespace System::Reflection;
+using namespace System::Runtime::CompilerServices;
+using namespace System::Runtime::InteropServices;
+using namespace System::Security::Permissions;
+
+//
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+//
+[assembly:AssemblyTitleAttribute("Apache.Qpid.Interop")];
+[assembly:AssemblyDescriptionAttribute("")];
+[assembly:AssemblyConfigurationAttribute("")];
+[assembly:AssemblyCompanyAttribute("")];
+[assembly:AssemblyProductAttribute("")];
+[assembly:AssemblyCopyrightAttribute("")];
+[assembly:AssemblyTrademarkAttribute("")];
+[assembly:AssemblyCultureAttribute("")];
+
+//
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the value or you can default the Revision and Build Numbers
+// by using the '*' as shown below:
+
+[assembly:AssemblyVersionAttribute("1.0.*")];
+
+[assembly:ComVisible(false)];
+
+[assembly:CLSCompliantAttribute(true)];
+
+[assembly:SecurityPermission(SecurityAction::RequestMinimum, UnmanagedCode = true)];
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp
new file mode 100644
index 0000000000..e39ee1b1ae
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp
@@ -0,0 +1,145 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "CompletionWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace msclr;
+
+// A class to provide IAsyncResult semantics for a qpid AsyncSession command (i.e. 0-10 messageTransfer)
+// when the client session receives a "Completion" notification from the Broker.
+
+
+CompletionWaiter::CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback^ callback, Object^ state)
+{
+ this->qpidFuture = future;
+ this->asyncCallback = callback;
+ this->state = state;
+ this->parent = parent;
+ this->thisLock = gcnew Object();
+ // do this after the Completion Waiter is fully initialized, in case of
+ // very small timespan
+ if (timeSpan != TimeSpan::MaxValue) {
+ this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1));
+ }
+}
+
+
+void CompletionWaiter::WaitForCompletion()
+{
+ if (isCompleted)
+ return;
+
+ lock l(thisLock);
+ while (!isCompleted) {
+ Monitor::Wait(thisLock);
+ }
+}
+
+void CompletionWaiter::Run()
+{
+ // no locks required in this method
+ if (isCompleted)
+ return;
+
+ try {
+ // Wait for the arrival of the "AMQP Completion" indication from the Broker
+ parent->internalWaitForCompletion(qpidFuture);
+ }
+ catch (System::Exception^ e) {
+ runException = e;
+ }
+ finally {
+ delete(qpidFuture.ToPointer());
+ qpidFuture = (IntPtr) NULL;
+ }
+
+ if (timer != nullptr) {
+ timer->~Timer();
+ timer = nullptr;
+ }
+
+ Complete(false);
+}
+
+
+// "Complete" here means complete the AsyncResult, which may precede broker "command completion" if timed out
+
+void CompletionWaiter::Complete(bool isTimerThread)
+{
+ lock l(thisLock);
+ if (isCompleted)
+ return;
+
+ isCompleted = true;
+ if (isTimerThread)
+ timedOut = true;
+
+ Monitor::PulseAll(thisLock);
+
+ // do this check and signal while locked
+ if (asyncWaitHandle != nullptr)
+ asyncWaitHandle->Set();
+
+ l.release();
+
+ parent->removeWaiter(this);
+
+ if (asyncCallback != nullptr) {
+ // guard against application callback exception
+ try {
+ asyncCallback(this);
+ }
+ catch (System::Exception^) {
+ // log it?
+ }
+ }
+}
+
+
+void CompletionWaiter::TimeoutCallback(Object^ state)
+{
+ CompletionWaiter^ waiter = (CompletionWaiter^) state;
+ waiter->Complete(true);
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
new file mode 100644
index 0000000000..197ac632b0
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+
+public ref class CompletionWaiter : IAsyncResult
+{
+private:
+ bool timedOut;
+ // has an owner thread
+ bool assigned;
+ // can Run (i.e. earlier CompletionWaiters in the queue have completed)
+ System::Exception^ runException;
+ AsyncCallback^ asyncCallback;
+ Threading::Timer ^timer;
+ bool isCompleted;
+ Object^ state;
+ Object^ thisLock;
+ ManualResetEvent^ asyncWaitHandle;
+ AmqpSession^ parent;
+ IntPtr qpidFuture;
+ void Complete(bool isTimerThread);
+ static void TimeoutCallback(Object^ state);
+ static TimerCallback^ timeoutCallback = gcnew TimerCallback(CompletionWaiter::TimeoutCallback);
+
+ internal:
+ CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback ^callback, Object^ state);
+
+ void Run();
+ void WaitForCompletion();
+
+ property bool Assigned {
+ bool get () { return assigned; }
+ }
+
+ property bool TimedOut {
+ bool get () { return timedOut; }
+ }
+
+
+ public:
+
+ virtual property bool IsCompleted {
+ bool get () { return isCompleted; }
+ }
+
+ virtual property bool CompletedSynchronously {
+ bool get () { return false; }
+ }
+
+ virtual property WaitHandle^ AsyncWaitHandle {
+ WaitHandle^ get () {
+ if (asyncWaitHandle != nullptr) {
+ return asyncWaitHandle;
+ }
+
+ msclr::lock l(thisLock);
+ if (asyncWaitHandle == nullptr) {
+ asyncWaitHandle = gcnew ManualResetEvent(isCompleted);
+ }
+ return asyncWaitHandle;
+ }
+ }
+
+
+ virtual property Object^ AsyncState {
+ Object^ get () { return state; }
+ }
+
+
+
+
+};
+
+}}} // namespace Apache::Qpid::Interop
+
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
new file mode 100644
index 0000000000..cee394b05d
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp
@@ -0,0 +1,685 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "QpidMarshal.h"
+#include "QpidException.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using namespace std;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+// Scalability note: When using async methods, an async helper thread is created
+// to block on the Demux BlockingQueue. This design should be revised in line
+// with proposed changes to the native library to reduce the number of servicing
+// threads for large numbers of subscriptions.
+
+
+// The folowing def must match the "Frames" private typedef.
+// TODO, make Qpid-cpp "Frames" definition visible.
+typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
+
+InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue,
+ qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp,
+ bool exclusive,
+ bool temporary, System::String^ filterKey, System::String^ exchange) :
+ amqpSession(session),
+ subscriptionp(NULL),
+ localQueuep(NULL),
+ queuePtrp(NULL),
+ dequeuedFrameSetpp(NULL),
+ disposed(false),
+ finalizing(false)
+{
+ bool success = false;
+ System::Exception^ linkException = nullptr;
+
+ waiters = gcnew Collections::Generic::List<MessageWaiter^>();
+
+ try {
+ std::string qname = QpidMarshal::ToNative(sourceQueue);
+
+ if (temporary) {
+ qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true);
+ qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange),
+ arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey));
+ qpidSessionp->sync();
+ }
+
+ localQueuep = new LocalQueue;
+ SubscriptionSettings settings;
+ settings.flowControl = FlowControl::messageCredit(0);
+ Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings);
+ subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup
+
+ // the roundabout way to obtain localQueuep->queue
+ SessionBase_0_10Access sa(*qpidSessionp);
+ boost::shared_ptr<SessionImpl> simpl = sa.get();
+ queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName()));
+
+ success = true;
+ } finally {
+ if (!success) {
+ Cleanup();
+ linkException = gcnew QpidException ("InputLink creation failure");
+ throw linkException;
+ }
+ }
+}
+
+void InputLink::ReleaseNative()
+{
+ // involves talking to the Broker unless the connection is broken
+ if (subscriptionp != NULL) {
+ try {
+ subscriptionp->cancel();
+ }
+ catch (const std::exception& error) {
+ // TODO: log this properly
+ std::cout << "shutdown error " << error.what() << std::endl;
+ }
+ }
+
+ // free native mem (or smart pointers) that we own
+ if (subscriptionp != NULL)
+ delete subscriptionp;
+ if (queuePtrp != NULL)
+ delete queuePtrp;
+ if (localQueuep != NULL)
+ delete localQueuep;
+ if (dequeuedFrameSetpp != NULL)
+ delete dequeuedFrameSetpp;
+}
+
+void InputLink::Cleanup()
+{
+ {
+ lock l(waiters);
+ if (disposed)
+ return;
+
+ disposed = true;
+
+ // if the asyncHelper exists and is idle, unblock it
+ if (asyncHelperWaitHandle != nullptr) {
+ asyncHelperWaitHandle->Set();
+ }
+
+ // wakeup anyone waiting for messages
+ if (queuePtrp != NULL)
+ (*queuePtrp)->close();
+
+ try {}
+ finally
+ {
+ ReleaseNative();
+ }
+
+ }
+ amqpSession->NotifyClosed();
+}
+
+InputLink::~InputLink()
+{
+ Cleanup();
+}
+
+InputLink::!InputLink()
+{
+ Cleanup();
+}
+
+void InputLink::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+// call with lock held
+bool InputLink::haveMessage()
+{
+ if (dequeuedFrameSetpp != NULL)
+ return true;
+
+ if (queuePtrp != NULL) {
+ if ((*queuePtrp)->size() > 0)
+ return true;
+ }
+ return false;
+}
+
+IntPtr InputLink::nextLocalMessage()
+{
+ lock l(waiters);
+ if (disposed)
+ return (IntPtr) NULL;
+
+ // A message already pulled off BlockingQueue?
+ if (dequeuedFrameSetpp != NULL) {
+ QpidFrameSetPtr* rv = dequeuedFrameSetpp;
+ dequeuedFrameSetpp = NULL;
+ return (IntPtr) rv;
+ }
+
+ if ((*queuePtrp)->empty())
+ return (IntPtr) NULL;
+
+ bool received = false;
+ QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr;
+
+ try {
+ received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
+ if (received) {
+ QpidFrameSetPtr* rv = frameSetpp;
+ // no need to free native in finally block
+ frameSetpp = NULL;
+ return (IntPtr) rv;
+ }
+ } catch(const std::exception& error) {
+ // should be no async tampering with queue since we hold the lock and have a
+ // smart pointer ref to the native LocalQueue, even if the network connection fails...
+ cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl;
+ // TODO: log this
+ }
+ finally {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ }
+ }
+
+ return (IntPtr) NULL;
+}
+
+
+
+void InputLink::unblockWaiter()
+{
+ // to be followed by resetQueue() below
+ lock l(waiters);
+ if (disposed)
+ return;
+ (*queuePtrp)->close();
+}
+
+
+
+// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks
+// a blocking thread without interefering with queue contents or the ability to push
+// new incoming messages.
+
+void InputLink::resetQueue()
+{
+ lock l(waiters);
+ if (disposed)
+ return;
+ if ((*queuePtrp)->isClosed()) {
+ (*queuePtrp)->open();
+ }
+}
+
+
+// returns true if there is a message to consume, i.e. nextLocalMessage() won't block
+
+bool InputLink::internalWaitForMessage()
+{
+ Demux::QueuePtr demuxQueuePtr;
+
+ bool received = false;
+ QpidFrameSetPtr* frameSetpp = NULL;
+ try {
+ lock l(waiters);
+ if (disposed)
+ return false;
+ if (haveMessage())
+ return true;
+
+ // TODO: prefetch window of messages, compatible with both 0-10 and 1.0.
+ subscriptionp->grantMessageCredit(1);
+
+ // get a scoped smart ptr ref to guard against async close or hangup
+ demuxQueuePtr = *queuePtrp;
+ frameSetpp = new QpidFrameSetPtr;
+
+ l.release();
+ // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired.
+ received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE);
+ l.acquire();
+
+ if (received) {
+ dequeuedFrameSetpp = frameSetpp;
+ frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream
+ }
+
+ return true;
+ } catch(const std::exception& ) {
+ // timeout or connection closed
+ return false;
+ }
+ finally {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ }
+ }
+
+ return false;
+}
+
+
+// call with lock held
+void InputLink::addWaiter(MessageWaiter^ waiter)
+{
+ waiters->Add(waiter);
+ if (waiters->Count == 1) {
+ // mark this waiter as ready to run
+ // Only the waiter at the head of the queue is active.
+ waiter->Activate();
+ }
+
+ if (waiter->Assigned)
+ return;
+
+ if (asyncHelperWaitHandle == nullptr) {
+ asyncHelperWaitHandle = gcnew ManualResetEvent(false);
+ ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper);
+ (gcnew Thread(threadDelegate))->Start();
+ }
+
+ if (waiters->Count == 1) {
+ // wake up the asyncHelper
+ asyncHelperWaitHandle->Set();
+ }
+}
+
+
+void InputLink::removeWaiter(MessageWaiter^ waiter) {
+ // a waiter can be removed from anywhere in the list if timed out
+
+ lock l(waiters);
+ int idx = waiters->IndexOf(waiter);
+ if (idx == -1) {
+ // TODO: assert or log
+ if (asyncHelperWaitHandle != nullptr) {
+ // just in case.
+ asyncHelperWaitHandle->Set();
+ }
+ return;
+ }
+ waiters->RemoveAt(idx);
+
+ // let the next waiter know it's his turn.
+ if (waiters->Count > 0) {
+ MessageWaiter^ nextWaiter = waiters[0];
+
+ // wakeup the asyncHelper thread to help out if necessary.
+ if (!nextWaiter->Assigned) {
+ asyncHelperWaitHandle->Set();
+ }
+
+ l.release();
+ nextWaiter->Activate();
+ return;
+ }
+ else {
+ if (disposed && (asyncHelperWaitHandle != nullptr)) {
+ asyncHelperWaitHandle->Set();
+ }
+ }
+}
+
+
+void InputLink::asyncHelper()
+{
+ lock l(waiters);
+
+ while (true) {
+ if (disposed && (waiters->Count == 0)) {
+ asyncHelperWaitHandle = nullptr;
+ return;
+ }
+
+ if (waiters->Count > 0) {
+ MessageWaiter^ waiter = waiters[0];
+
+ l.release();
+ if (waiter->AcceptForWork()) {
+ waiter->Run();
+ }
+ l.acquire();
+ }
+
+ // sleep if more work may be coming or it is currently someone else's turn
+ if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) {
+ // wait for something to do
+ asyncHelperWaitHandle->Reset();
+ l.release();
+ asyncHelperWaitHandle->WaitOne();
+ l.acquire();
+ }
+ }
+}
+
+void InputLink::sync()
+{
+ // for the timeout thread
+ lock l(waiters);
+}
+
+
+AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp)
+{
+ QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer();
+ bool ownFrameSet = true;
+ bool haveProperties = false;
+
+ try {
+ MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp);
+ ownFrameSet = false; // stream releases on close/dispose
+
+ AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream);
+
+ AMQHeaderBody* headerBodyp = (*fspp)->getHeaders();
+ uint64_t contentSize = (*fspp)->getContentSize();
+ SequenceSet frameSetID((*fspp)->getId());
+
+ // target managed representation
+ AmqpProperties^ amqpProperties = gcnew AmqpProperties();
+
+ // source native representation
+ const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>();
+ const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>();
+
+ if (deliveryProperties) {
+ if (deliveryProperties->hasRoutingKey()) {
+ haveProperties = true;
+
+ amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str());
+ }
+
+ if (deliveryProperties->hasDeliveryMode()) {
+ if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT)
+ amqpProperties->Durable = true;
+ }
+
+ if (deliveryProperties->hasTtl()) {
+ long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond;
+ amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks));
+ }
+ }
+
+ if (messageProperties) {
+
+ if (messageProperties->hasReplyTo()) {
+ haveProperties = true;
+ const ReplyTo& rpto = messageProperties->getReplyTo();
+ String^ rk = nullptr;
+ String^ ex = nullptr;
+ if (rpto.hasRoutingKey()) {
+ rk = gcnew String(rpto.getRoutingKey().c_str());
+ }
+ if (rpto.hasExchange()) {
+ ex = gcnew String(rpto.getExchange().c_str());
+ }
+ amqpProperties->SetReplyTo(ex,rk);
+ }
+
+ if (messageProperties->hasContentType()) {
+ haveProperties = true;
+ amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str());
+
+ if (messageProperties->hasContentEncoding()) {
+ String^ enc = gcnew String(messageProperties->getContentEncoding().c_str());
+ if (!String::IsNullOrEmpty(enc)) {
+ // TODO: properly assemble 1.0 style to 0-10 for all cases
+ amqpProperties->ContentType += "; charset=" + enc;
+ }
+ }
+ }
+
+ if (messageProperties->hasCorrelationId()) {
+ haveProperties = true;
+ const std::string& ncid = messageProperties->getCorrelationId();
+ int len = ncid.size();
+ array<unsigned char>^ mcid = gcnew array<unsigned char>(len);
+ Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len);
+ amqpProperties->CorrelationId = mcid;
+ }
+
+ if (messageProperties->hasUserId()) {
+ haveProperties = true;
+ const std::string& nuid = messageProperties->getUserId();
+ int len = nuid.size();
+ array<unsigned char>^ muid = gcnew array<unsigned char>(len);
+ Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len);
+ amqpProperties->UserId = muid;
+ }
+
+ if (messageProperties->hasApplicationHeaders()) {
+ haveProperties = true;
+ const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders();
+ int count = fieldTable.count();
+
+ if (count > 0) {
+ haveProperties = true;
+ Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap =
+ gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count);
+
+ for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) {
+
+ qpid::framing::FieldValue::Data &data = i->second->getData();
+
+ // TODO: replace these generic int/string conversions with handler for each AMQP specific type:
+ // uint8_t dataType = i->second->getType();
+ // switch (dataType) { case TYPE_CODE_STR8: ... }
+
+ if (data.convertsToInt()) {
+ mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt()));
+ }
+ if (data.convertsToString()) {
+ std::string ns = data.getString();
+ String^ ms = gcnew String(ns.data(), 0, ns.size());
+ mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms));
+ }
+ }
+
+ amqpProperties->PropertyMap = mmap;
+ }
+
+ }
+ }
+
+ if (haveProperties) {
+ amqpMessage->Properties = amqpProperties;
+ }
+
+ // We have a message we can return to the caller.
+ // Tell the broker we got it.
+ subscriptionp->accept(frameSetID);
+ return amqpMessage;
+ }
+ finally {
+ if (ownFrameSet)
+ delete (fspp);
+ }
+}
+
+ // As for IInputChannel:
+ // if success, return true + amqpMessage
+ // elseif timeout, return false
+ // elseif closed/EOF, return true and amqpMessage = null
+ // else throw an Exception
+
+bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage)
+{
+ lock l(waiters);
+
+ if (waiters->Count == 0) {
+ // see if there is a message already available without blocking
+ IntPtr fspp = nextLocalMessage();
+ if (fspp.ToPointer() != NULL) {
+ amqpMessage = createAmqpMessage(fspp);
+ return true;
+ }
+ }
+
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr);
+ addWaiter(waiter);
+
+ l.release();
+ waiter->Run();
+ l.acquire();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ IntPtr waiterMsg = waiter->Message;
+ if (waiterMsg.ToPointer() == NULL) {
+ if (disposed) {
+ // indicate normal EOF on channel
+ amqpMessage = nullptr;
+ return true;
+ }
+ }
+
+ amqpMessage = createAmqpMessage(waiterMsg);
+ return true;
+}
+
+IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+
+ //TODO: if haveMessage() complete synchronously
+
+ lock l(waiters);
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state);
+ addWaiter(waiter);
+ return waiter;
+}
+
+bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage)
+{
+
+ // TODO: validate result
+
+ MessageWaiter^ waiter = (MessageWaiter ^) result;
+
+ waiter->WaitForCompletion();
+
+ if (waiter->RunException != nullptr)
+ throw waiter->RunException;
+
+ if (waiter->TimedOut) {
+ amqpMessage = nullptr;
+ return false;
+ }
+
+ IntPtr waiterMsg = waiter->Message;
+ if (waiterMsg.ToPointer() == NULL) {
+ if (disposed) {
+ // indicate normal EOF on channel
+ amqpMessage = nullptr;
+ return true;
+ }
+ }
+
+ amqpMessage = createAmqpMessage(waiterMsg);
+ return true;
+}
+
+
+bool InputLink::WaitForMessage(TimeSpan timeout)
+{
+ lock l(waiters);
+
+ if (waiters->Count == 0) {
+ // see if there is a message already available without blocking
+ if (haveMessage())
+ return true;
+ }
+
+ // Same as for TryReceive, except consuming = false
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr);
+ addWaiter(waiter);
+
+ l.release();
+ waiter->Run();
+ l.acquire();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ return true;
+}
+
+IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+ lock l(waiters);
+
+ // Same as for BeginTryReceive, except consuming = false
+ MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state);
+ addWaiter(waiter);
+ return waiter;
+}
+
+bool InputLink::EndWaitForMessage(IAsyncResult^ result)
+{
+ MessageWaiter^ waiter = (MessageWaiter ^) result;
+
+ waiter->WaitForCompletion();
+
+ if (waiter->TimedOut) {
+ return false;
+ }
+
+ return true;
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
new file mode 100644
index 0000000000..366780c137
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h
@@ -0,0 +1,85 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+// smart pointer to the low level AMQP 0-10 frames of the message
+typedef qpid::framing::FrameSet::shared_ptr QpidFrameSetPtr;
+
+public ref class InputLink
+{
+private:
+ AmqpSession^ amqpSession;
+ Subscription* subscriptionp;
+ LocalQueue* localQueuep;
+ Demux::QueuePtr* queuePtrp;
+ Collections::Generic::List<MessageWaiter^>^ waiters;
+ bool disposed;
+ bool finalizing;
+ QpidFrameSetPtr* dequeuedFrameSetpp;
+ ManualResetEvent^ asyncHelperWaitHandle;
+
+ void Cleanup();
+ void ReleaseNative();
+ bool haveMessage();
+ void addWaiter(MessageWaiter^ waiter);
+ void asyncHelper();
+ AmqpMessage^ createAmqpMessage(IntPtr msgp);
+
+internal:
+ InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp,
+ qpid::client::SubscriptionManager *qpidSubsMgrp, bool exclusive, bool temporary, System::String^ filterKey,
+ System::String^ exchange);
+
+ bool internalWaitForMessage();
+ void unblockWaiter();
+ void resetQueue();
+ IntPtr nextLocalMessage();
+ void removeWaiter(MessageWaiter^ waiter);
+ void sync();
+
+public:
+ ~InputLink();
+ !InputLink();
+ void Close();
+
+ bool TryReceive(TimeSpan timeout, [Out] AmqpMessage ^% amqpMessage);
+ IAsyncResult^ BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
+ bool EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage);
+
+ bool WaitForMessage(TimeSpan timeout);
+ IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state);
+ bool EndWaitForMessage(IAsyncResult^ result);
+
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
new file mode 100644
index 0000000000..32f78c8344
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj
@@ -0,0 +1,302 @@
+<?xml version="1.0" encoding="Windows-1252"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="9.00"
+ Name="Interop"
+ ProjectGUID="{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}"
+ RootNamespace="Interop"
+ Keyword="ManagedCProj"
+ TargetFrameworkVersion="196613"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(ProjectDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ CommandLine="copy ..\AmqpTypes\obj\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule $(ConfigurationName)"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
+ Optimization="0"
+ AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;&quot;$(BOOST_ROOT)&quot;"
+ PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalDependencies="$(NoInherit)"
+ OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories="&quot;$(BOOST_ROOT)\lib&quot;"
+ GenerateDebugInformation="true"
+ AssemblyDebug="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ PreprocessorDefinitions="WIN32;NDEBUG"
+ RuntimeLibrary="2"
+ UsePrecompiledHeader="2"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="$(NoInherit)"
+ LinkIncremental="1"
+ GenerateDebugInformation="true"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ <AssemblyReference
+ RelativePath="System.dll"
+ AssemblyName="System, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.Data.dll"
+ AssemblyName="System.Data, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=x86"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.XML.dll"
+ AssemblyName="System.Xml, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.Runtime.Serialization.dll"
+ AssemblyName="System.Runtime.Serialization, Version=3.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="196608"
+ />
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\AmqpConnection.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpMessage.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpSession.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AssemblyInfo.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\CompletionWaiter.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\InputLink.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageBodyStream.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageWaiter.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\OutputLink.cpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ <File
+ RelativePath=".\AmqpConnection.h"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpMessage.h"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpSession.h"
+ >
+ </File>
+ <File
+ RelativePath=".\CompletionWaiter.h"
+ >
+ </File>
+ <File
+ RelativePath=".\InputLink.h"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageBodyStream.h"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageWaiter.h"
+ >
+ </File>
+ <File
+ RelativePath=".\OutputLink.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidException.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidMarshal.h"
+ >
+ </File>
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
new file mode 100644
index 0000000000..f2cb5740d3
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp
@@ -0,0 +1,337 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/framing/AMQFrame.h"
+
+#include "MessageBodyStream.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+// Thefolowing def must match "Frames" private typedef.
+// TODO: make "Frames" publicly visible.
+typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames;
+
+using namespace std;
+
+static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count)
+{
+ if (buffer == nullptr)
+ throw gcnew ArgumentNullException("buffer");
+
+ if (offset < 0)
+ throw gcnew ArgumentOutOfRangeException("offset");
+
+ if (count < 0)
+ throw gcnew ArgumentOutOfRangeException("count");
+
+ if ((offset + count) > buffer->Length)
+ throw gcnew ArgumentException("offset + count");
+}
+
+
+// Input stream constructor
+
+MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp)
+{
+ isInputStream = true;
+ frameSetpp = fspp;
+ fragmentCount = 0;
+ length = 0;
+ position = 0;
+ currentFramep = NULL;
+
+ const std::string *datap; // pointer to the fragment's string variable that holds the content
+
+ for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
+ if (i->getBody()->type() == CONTENT_BODY) {
+ fragmentCount++;
+ datap = &(i->castBody<AMQContentBody>()->getData());
+ length += datap->size();
+ }
+ }
+
+ // fragmentCount can be zero for an empty message
+
+ fragmentIndex = 0;
+ fragmentPosition = 0;
+
+ if (fragmentCount == 0) {
+ currentFragment = NULL;
+ fragmentLength = 0;
+ }
+ else if (fragmentCount == 1) {
+ currentFragment = datap->data();
+ fragmentLength = (int) length;
+ }
+ else {
+ fragments = gcnew array<IntPtr>(fragmentCount);
+ fragmentIndex = 0;
+ for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) {
+ if (i->getBody()->type() == CONTENT_BODY) {
+ datap = &(i->castBody<AMQContentBody>()->getData());
+ fragments[fragmentIndex++] = (IntPtr) (void *) datap;
+ }
+ }
+ fragmentIndex = 0;
+ datap = (const std::string *) fragments[0].ToPointer();
+ currentFragment = datap->data();
+ fragmentLength = datap->size();
+ }
+}
+
+
+int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count)
+{
+ if (!isInputStream)
+ throw gcnew NotSupportedException();
+ if (disposed)
+ throw gcnew ObjectDisposedException("Stream");
+ if (count == 0)
+ return 0;
+ ThrowIfBadArgs(buffer, offset, count);
+
+ int nRead = 0;
+ int remaining = count;
+
+ while (nRead < count) {
+ int fragAvail = fragmentLength - fragmentPosition;
+ int copyCount = min (fragAvail, remaining);
+ if (copyCount == 0) {
+ // no more to read
+ return nRead;
+ }
+
+ // copy from native space
+ IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition);
+ Marshal::Copy (nativep, buffer, offset, copyCount);
+ nRead += copyCount;
+ remaining -= copyCount;
+ fragmentPosition += copyCount;
+ offset += copyCount;
+
+ // advance to next fragment?
+ if (fragmentPosition == fragmentLength) {
+ if (++fragmentIndex < fragmentCount) {
+ const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer();
+ currentFragment = datap->data();
+ fragmentLength = datap->size();
+ fragmentPosition = 0;
+ }
+ }
+ }
+
+ return nRead;
+}
+
+
+void MessageBodyStream::pushCurrentFrame(bool lastFrame)
+{
+ // set flags as in SessionImpl::sendContent.
+ if (currentFramep->getBody()->type() == CONTENT_BODY) {
+
+ if ((fragmentCount == 1) && lastFrame) {
+ // only one content frame
+ currentFramep->setFirstSegment(false);
+ }
+ else {
+ currentFramep->setFirstSegment(false);
+ currentFramep->setLastSegment(true);
+ if (fragmentCount != 1) {
+ currentFramep->setFirstFrame(false);
+ }
+ if (!lastFrame) {
+ currentFramep->setLastFrame(false);
+ }
+ }
+ }
+ else {
+ // the header frame
+ currentFramep->setFirstSegment(false);
+ if (!lastFrame) {
+ // there will be at least one content frame
+ currentFramep->setLastSegment(false);
+ }
+ }
+
+ // add to frame set. This makes a copy and ref counts the body
+ (*frameSetpp)->append(*currentFramep);
+
+ delete currentFramep;
+
+ currentFramep = NULL;
+}
+
+
+IntPtr MessageBodyStream::GetFrameSet()
+{
+ if (currentFramep != NULL) {
+ // No more content. Tidy up the pending (possibly single header) frame.
+ pushCurrentFrame(true);
+ }
+
+ if (frameSetpp == NULL) {
+ return (IntPtr) NULL;
+ }
+
+ // shared_ptr.get()
+ return (IntPtr) (void *) (*frameSetpp).get();
+}
+
+IntPtr MessageBodyStream::GetHeader()
+{
+ return (IntPtr) headerBodyp;
+}
+
+
+// Ouput stream constructor
+
+MessageBodyStream::MessageBodyStream(int maxFrameSize)
+{
+ isInputStream = false;
+
+ maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ SequenceNumber unused; // only meaningful on incoming frames
+ frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused));
+ fragmentCount = 0;
+ length = 0;
+ position = 0;
+
+ // header goes first in the outgoing frameset
+
+ boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody);
+ currentFramep = new AMQFrame(headerBody);
+ headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get());
+
+ // mark this header frame as "full" to force the first write to create a new content frame
+ fragmentPosition = maxFrameContentSize;
+}
+
+void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count)
+{
+ if (isInputStream)
+ throw gcnew NotSupportedException();
+ if (disposed)
+ throw gcnew ObjectDisposedException("Stream");
+ if (count == 0)
+ return;
+ ThrowIfBadArgs(buffer, offset, count);
+
+ if (currentFramep == NULL) {
+ // GetFrameSet() has been called and we no longer exclusively own the underlying frames.
+ throw gcnew InvalidOperationException ("Mesage Body output already completed");
+ }
+
+ if (count <= 0)
+ return;
+
+ // keep GC memory movement at bay while copying to native space
+ pin_ptr<unsigned char> pinnedBuf = &buffer[0];
+
+ string *datap;
+
+ int remaining = count;
+ while (remaining > 0) {
+ if (fragmentPosition == maxFrameContentSize) {
+ // move to a new frame, but not until ready to add new content.
+ // zero content is valid, or the final write may exactly fill to maxFrameContentSize
+
+ pushCurrentFrame(false);
+
+ currentFramep = new AMQFrame(AMQContentBody());
+ fragmentPosition = 0;
+ fragmentCount++;
+ }
+
+ int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition));
+ datap = &(currentFramep->castBody<AMQContentBody>()->getData());
+
+ char *outp = (char *) pinnedBuf + offset;
+ if (fragmentPosition == 0) {
+ datap->assign(outp, copyCount);
+ }
+ else {
+ datap->append(outp, copyCount);
+ }
+
+ position += copyCount;
+ fragmentPosition += copyCount;
+ remaining -= copyCount;
+ offset += copyCount;
+ }
+}
+
+
+void MessageBodyStream::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ try {}
+ finally
+ {
+ if (frameSetpp != NULL) {
+ delete frameSetpp;
+ frameSetpp = NULL;
+ }
+ if (currentFramep != NULL) {
+ delete currentFramep;
+ currentFramep = NULL;
+ }
+ }
+}
+
+MessageBodyStream::~MessageBodyStream()
+{
+ Cleanup();
+}
+
+MessageBodyStream::!MessageBodyStream()
+{
+ Cleanup();
+}
+
+void MessageBodyStream::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h
new file mode 100644
index 0000000000..fa8e3f6bde
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h
@@ -0,0 +1,131 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace qpid::framing;
+using namespace std;
+
+
+// This class provides memory streaming of the message body contents
+// between native and managed space. To avoid additional memory copies
+// in native space, it reads and writes directly to the low level Qpid
+// frames.
+
+public ref class MessageBodyStream : System::IO::Stream
+{
+private:
+ bool isInputStream;
+ long long length;
+ long long position;
+
+ // the boost smart pointer that keeps the message body frames in memory
+ FrameSet::shared_ptr *frameSetpp;
+
+ int fragmentCount;
+ int fragmentIndex;
+ const char* currentFragment;
+ int fragmentPosition;
+ int fragmentLength;
+ array<IntPtr>^ fragments;
+
+ int maxFrameContentSize;
+ AMQFrame* currentFramep;
+ void* headerBodyp;
+ bool disposed;
+ bool finalizing;
+ void Cleanup();
+
+internal:
+ // incoming message
+ MessageBodyStream(FrameSet::shared_ptr *fspp);
+ // outgoing message
+ MessageBodyStream(int maxFrameSize);
+ void pushCurrentFrame(bool last);
+public:
+ ~MessageBodyStream();
+ !MessageBodyStream();
+ virtual void Close() override;
+ virtual int Read(
+ [InAttribute] [OutAttribute] array<unsigned char>^ buffer,
+ int offset,
+ int count) override;
+
+ virtual void Write(
+ array<unsigned char>^ buffer,
+ int offset,
+ int count) override;
+
+
+ IntPtr GetFrameSet();
+ IntPtr GetHeader();
+
+ virtual void Flush() override {} // noop
+
+
+ // TODO: see CanSeek below.
+ virtual long long Seek(
+ long long offset,
+ System::IO::SeekOrigin origin) override {throw gcnew System::NotSupportedException(); }
+
+ // TODO: see CanSeek below.
+ virtual void SetLength(
+ long long value) override {throw gcnew System::NotSupportedException(); }
+
+ virtual property long long Length {
+ long long get() override { return length; }
+ };
+
+ virtual property long long Position {
+ long long get() override { return position; }
+ void set(long long p) override { throw gcnew System::NotSupportedException(); }
+ };
+
+
+ virtual property bool CanRead {
+ bool get () override { return isInputStream; }
+ }
+
+ virtual property bool CanWrite {
+ bool get () override { return !isInputStream; }
+ }
+
+ // Note: this class must return true to signal that the Length property works.
+ // Required by the raw message encoder.
+ // "If a class derived from Stream does not support seeking, calls to Length,
+ // SetLength, Position, and Seek throw a NotSupportedException".
+
+ virtual property bool CanSeek {
+ bool get () override { return true; }
+ }
+
+ virtual property bool CanTimeout {
+ bool get () override { return isInputStream; }
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
new file mode 100644
index 0000000000..f7a28b0692
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp
@@ -0,0 +1,251 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/client/Demux.h"
+#include "qpid/client/SessionImpl.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+
+#include "MessageBodyStream.h"
+#include "AmqpMessage.h"
+#include "AmqpSession.h"
+#include "InputLink.h"
+#include "MessageWaiter.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+using namespace msclr;
+
+
+MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state)
+{
+ this->consuming = consuming;
+ if (!consuming) {
+ GC::SuppressFinalize(this);
+ }
+
+ if (async) {
+ this->async = true;
+ this->asyncCallback = callback;
+ this->state = state;
+ }
+ else {
+ this->assigned = true;
+ }
+ this->parent = parent;
+ this->thisLock = gcnew Object();
+
+ // do this after the Message Waiter is fully initialized, in case of
+ // very small timespan
+ if (timeSpan != TimeSpan::MaxValue) {
+ this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1));
+ }
+}
+
+MessageWaiter::~MessageWaiter()
+{
+ if (message != IntPtr::Zero) {
+ try{}
+ finally {
+ delete message.ToPointer();
+ message = IntPtr::Zero;
+ }
+ }
+}
+
+MessageWaiter::!MessageWaiter()
+{
+ this->~MessageWaiter();
+}
+
+
+void MessageWaiter::WaitForCompletion()
+{
+ if (isCompleted)
+ return;
+
+ lock l(thisLock);
+ while (!isCompleted) {
+ Monitor::Wait(thisLock);
+ }
+}
+
+void MessageWaiter::Activate()
+{
+ if (activated)
+ return;
+
+ lock l(thisLock);
+ if (!activated) {
+ activated = true;
+ Monitor::PulseAll(thisLock);
+ }
+}
+
+
+void MessageWaiter::Run()
+{
+ lock l(thisLock);
+
+ // wait until Activate(), i.e. our turn in the waiter list or a timeout
+ while (!activated) {
+ Monitor::Wait(thisLock);
+ }
+ bool haveMessage = false;
+ bool mustReset = false;
+
+ if (!timedOut)
+ blocking = true;
+
+ if (blocking) {
+ l.release();
+
+ try {
+ haveMessage = parent->internalWaitForMessage();
+ }
+ catch (System::Exception^ e) {
+ runException = e;
+ }
+
+ l.acquire();
+ blocking = false;
+ if (timedOut) {
+ // TimeoutCallback() called parent->unblockWaiter()
+ mustReset = true;
+ // let the timer thread move past critical region
+ while (processingTimeout) {
+ Monitor::Wait(thisLock);
+ }
+ }
+ }
+
+ if (timer != nullptr) {
+ timer->~Timer();
+ timer = nullptr;
+ }
+
+ if (haveMessage) {
+ timedOut = false; // for the case timeout and message arrival are essentially tied
+ if (!consuming) {
+ // just waiting
+ haveMessage = false;
+ }
+ }
+
+ if (haveMessage || mustReset) {
+ l.release();
+ if (haveMessage) {
+ // hang on to it for when the async caller gets around to retrieving
+ message = parent->nextLocalMessage();
+ }
+ if (mustReset) {
+ parent->resetQueue();
+ }
+ l.acquire();
+ }
+
+ isCompleted = true;
+ Monitor::PulseAll(thisLock);
+
+ // do this check and signal while locked
+ if (asyncWaitHandle != nullptr)
+ asyncWaitHandle->Set();
+
+ l.release();
+ parent->removeWaiter(this);
+
+
+ if (asyncCallback != nullptr) {
+ // guard against application callback exception
+ try {
+ asyncCallback(this);
+ }
+ catch (System::Exception^) {
+ // log it?
+ }
+ }
+
+}
+
+bool MessageWaiter::AcceptForWork()
+{
+ lock l(thisLock);
+ if (!assigned) {
+ assigned = true;
+ return true;
+ }
+ return false;
+}
+
+void MessageWaiter::TimeoutCallback(Object^ state)
+{
+ MessageWaiter^ waiter = (MessageWaiter^) state;
+ if (waiter->isCompleted)
+ return;
+
+ // make sure parent has finished initializing us before we get going
+ waiter->parent->sync();
+
+ lock l(waiter->thisLock);
+ if (waiter->timer == nullptr) {
+ // the waiter is in the clean up phase and doesn't need a wakeup
+ return;
+ }
+
+ // timedOut, blocking and processingTimeout work as a unit
+ waiter->timedOut = true;
+ if (waiter->blocking) {
+ // let the waiter know that we are busy with an upcoming unblock operation
+ waiter->processingTimeout = true;
+ }
+
+ waiter->Activate();
+
+ if (waiter->processingTimeout) {
+ // call with lock off
+ l.release();
+ waiter->parent->unblockWaiter();
+
+ // synchronize with blocked thread
+ l.acquire();
+ waiter->processingTimeout = false;
+ Monitor::PulseAll(waiter->thisLock);
+ }
+
+ l.release();
+
+ // If waiter has no associated thread, we must move it to completion
+ if (waiter->AcceptForWork()) {
+ waiter->Run(); // does not block since timedOut == true
+ }
+}
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
new file mode 100644
index 0000000000..3eb4919646
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h
@@ -0,0 +1,127 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Threading;
+
+public ref class MessageWaiter : IAsyncResult
+{
+private:
+ // Receive() or WaitForMessage()
+ bool consuming;
+ bool consumed;
+ bool timedOut;
+ bool async;
+ // has an owner thread
+ bool assigned;
+ // can Run (i.e. earlier MessageWaiters in the queue have completed)
+ bool activated;
+ // is making a call to internalWaitForMessage() which (usually) blocks
+ bool blocking;
+ // the timeout timer thread is lurking
+ bool processingTimeout;
+ // the saved exception from within Run() for async delivery
+ System::Exception^ runException;
+ AsyncCallback^ asyncCallback;
+ Threading::Timer ^timer;
+ bool isCompleted;
+ bool completedSynchronously;
+ Object^ state;
+ Object^ thisLock;
+ ManualResetEvent^ asyncWaitHandle;
+ InputLink^ parent;
+ static void TimeoutCallback(Object^ state);
+ static TimerCallback^ timeoutCallback = gcnew TimerCallback(MessageWaiter::TimeoutCallback);
+ IntPtr message;
+ !MessageWaiter();
+ ~MessageWaiter();
+
+ internal:
+ MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state);
+
+ void Run();
+ bool AcceptForWork();
+ void Activate();
+ void WaitForCompletion();
+
+// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; }
+
+ property IntPtr Message {
+ IntPtr get () {
+ if (!consuming || consumed)
+ throw gcnew InvalidOperationException("Message property");
+ consumed = true;
+ IntPtr v = message;
+ message = IntPtr::Zero;
+ GC::SuppressFinalize(this);
+ return v;
+ }
+ // void set (IntPtr v) { message = v; }
+ }
+
+ property bool Assigned {
+ bool get () { return assigned; }
+ }
+
+ property bool TimedOut {
+ bool get () { return timedOut; }
+ }
+
+
+ property System::Exception^ RunException {
+ System::Exception^ get() { return runException; }
+ }
+
+ public:
+
+ virtual property bool IsCompleted {
+ bool get () { return isCompleted; }
+ }
+
+ virtual property bool CompletedSynchronously {
+ bool get () { return completedSynchronously; }
+ }
+
+ virtual property WaitHandle^ AsyncWaitHandle {
+ WaitHandle^ get () {
+ if (asyncWaitHandle != nullptr) {
+ return asyncWaitHandle;
+ }
+
+ msclr::lock l(thisLock);
+ if (asyncWaitHandle == nullptr) {
+ asyncWaitHandle = gcnew ManualResetEvent(isCompleted);
+ }
+ return asyncWaitHandle;
+ }
+ }
+
+ virtual property Object^ AsyncState {
+ Object^ get () { return state; }
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop
+
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
new file mode 100644
index 0000000000..27725b8207
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
@@ -0,0 +1,251 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#include <windows.h>
+#include <msclr\lock.h>
+
+#include "qpid/client/AsyncSession.h"
+#include "qpid/framing/FrameSet.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+
+
+#include "AmqpSession.h"
+#include "AmqpMessage.h"
+#include "OutputLink.h"
+#include "QpidMarshal.h"
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+using namespace System::Threading;
+using namespace msclr;
+
+using namespace qpid::client;
+using namespace std;
+
+using namespace Apache::Qpid::AmqpTypes;
+
+
+OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) :
+ amqpSession(session),
+ queue(defaultQueue),
+ disposed(false),
+ maxFrameSize(session->Connection->MaxFrameSize),
+ finalizing(false)
+{
+}
+
+void OutputLink::Cleanup()
+{
+ {
+ lock l(this);
+ if (disposed)
+ return;
+
+ disposed = true;
+ }
+
+ amqpSession->NotifyClosed();
+}
+
+OutputLink::~OutputLink()
+{
+ Cleanup();
+}
+
+OutputLink::!OutputLink()
+{
+ Cleanup();
+}
+
+void OutputLink::Close()
+{
+ // Simulate Dispose()...
+ Cleanup();
+ GC::SuppressFinalize(this);
+}
+
+
+AmqpMessage^ OutputLink::CreateMessage()
+{
+ MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize);
+ AmqpMessage ^amqpm = gcnew AmqpMessage(mbody);
+ return amqpm;
+}
+
+
+void OutputLink::ManagedToNative(AmqpMessage^ m)
+{
+ MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream;
+
+ AmqpProperties^ mprops = m->Properties;
+
+ if (mprops != nullptr) {
+ AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer();
+
+ if (mprops->HasDeliveryProperties) {
+ DeliveryProperties* deliveryPropertiesp = bodyp->get<DeliveryProperties>(true);
+
+ if (mprops->RoutingKey != nullptr) {
+ deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey));
+ }
+
+ if (mprops->Durable) {
+ deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT);
+ }
+
+ if (mprops->TimeToLive.HasValue) {
+ long long ttl = mprops->TimeToLive.Value.Ticks;
+ bool was_positive = (ttl > 0);
+ if (ttl < 0)
+ ttl = 0;
+ ttl = ttl / TimeSpan::TicksPerMillisecond;
+ if ((ttl == 0) && was_positive)
+ ttl = 1;
+ deliveryPropertiesp->setTtl(ttl);
+ }
+ }
+
+ if (mprops->HasMessageProperties) {
+ qpid::framing::MessageProperties* messagePropertiesp =
+ bodyp->get<qpid::framing::MessageProperties>(true);
+
+ String^ replyToExchange = mprops->ReplyToExchange;
+ String^ replyToRoutingKey = mprops->ReplyToRoutingKey;
+ if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) {
+ qpid::framing::ReplyTo nReplyTo;
+ if (replyToExchange != nullptr) {
+ nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange));
+ }
+ if (replyToRoutingKey != nullptr) {
+ nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey));
+ }
+ messagePropertiesp->setReplyTo(nReplyTo);
+ }
+
+ // TODO: properly split 1.0 style to 0-10 content type + encoding
+
+ String^ contentType = mprops->ContentType;
+ if (contentType != nullptr) {
+ String^ type = nullptr;
+ String^ enc = nullptr;
+ int idx = contentType->IndexOf(';');
+ if (idx == -1) {
+ type = contentType;
+ }
+ else {
+ type = contentType->Substring(0, idx);
+ contentType = contentType->Substring(idx + 1);
+ idx = contentType->IndexOf('=');
+ if (idx != -1) {
+ enc = contentType->Substring(idx + 1);
+ enc = enc->Trim();
+ }
+ }
+ if (!String::IsNullOrEmpty(type)) {
+ messagePropertiesp->setContentType(QpidMarshal::ToNative(type));
+ }
+ if (!String::IsNullOrEmpty(enc)) {
+ messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc));
+ }
+ }
+
+
+ array<unsigned char>^ mbytes = mprops->CorrelationId;
+ if (mbytes != nullptr) {
+ pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
+ std::string s((char *) pinnedBuf, mbytes->Length);
+ messagePropertiesp->setCorrelationId(s);
+ }
+
+ mbytes = mprops->UserId;
+ if (mbytes != nullptr) {
+ pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
+ std::string s((char *) pinnedBuf, mbytes->Length);
+ messagePropertiesp->setUserId(s);
+ }
+
+ if (mprops->HasMappedProperties) {
+ qpid::framing::FieldTable fieldTable;
+ // TODO: add support for abitrary AMQP types
+ for each (Collections::Generic::KeyValuePair<System::String^, AmqpType^> kvp in mprops->PropertyMap) {
+ Type^ type = kvp.Value->GetType();
+ if (type == AmqpInt::typeid) {
+ fieldTable.setInt(QpidMarshal::ToNative(kvp.Key),
+ ((AmqpInt ^) kvp.Value)->Value);
+ }
+ else if (type == AmqpString::typeid) {
+ AmqpString^ str = (AmqpString ^) kvp.Value;
+ // For now, FieldTable supports a single string type
+ fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value));
+ }
+ }
+
+ messagePropertiesp->setApplicationHeaders(fieldTable);
+ }
+ }
+ }
+}
+
+
+
+void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout)
+{
+ // copy properties from managed space to the native counterparts
+ ManagedToNative(amqpMessage);
+
+ MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
+ CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr);
+
+ if (waiter != nullptr) {
+ waiter->WaitForCompletion();
+ if (waiter->TimedOut) {
+ throw gcnew TimeoutException("Receive");
+ }
+ }
+ // else: SendMessage() has already waited for the Completion
+
+}
+
+IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state)
+{
+ ManagedToNative(amqpMessage);
+
+ MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
+ CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state);
+ return waiter;
+}
+
+void OutputLink::EndSend(IAsyncResult^ result)
+{
+ CompletionWaiter^ waiter = (CompletionWaiter ^) result;
+ waiter->WaitForCompletion();
+ if (waiter->TimedOut) {
+ throw gcnew TimeoutException("Receive");
+ }
+}
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
new file mode 100644
index 0000000000..1f049a7412
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Runtime::InteropServices;
+
+using namespace qpid::client;
+using namespace std;
+
+
+public ref class OutputLink
+{
+private:
+ AmqpSession^ amqpSession;
+ String^ queue;
+ bool disposed;
+ bool finalizing;
+ void Cleanup();
+ AmqpTypes::AmqpProperties^ defaultProperties;
+ void ManagedToNative(AmqpMessage^ m);
+ int maxFrameSize;
+
+internal:
+ OutputLink(AmqpSession^ session, String^ defaultQueue);
+
+public:
+ ~OutputLink();
+ !OutputLink();
+ void Close();
+ AmqpMessage^ CreateMessage();
+ void Send(AmqpMessage^ m, TimeSpan timeout);
+ IAsyncResult^ BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state);
+ void EndSend(IAsyncResult^ result);
+
+ property AmqpTypes::AmqpProperties^ DefaultProperties {
+ AmqpTypes::AmqpProperties^ get () { return defaultProperties; }
+ void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; }
+ }
+};
+
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h
new file mode 100644
index 0000000000..91677a5e73
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h
@@ -0,0 +1,37 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+
+public ref class QpidException : System::Exception
+{
+ public:
+
+ QpidException() : System::Exception() {}
+ QpidException(String^ estring) : System::Exception(estring) {}
+
+};
+
+}}} // namespace Apache::Qpid::Interop
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h
new file mode 100644
index 0000000000..3e22af7b39
--- /dev/null
+++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h
@@ -0,0 +1,53 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+#pragma once
+
+namespace Apache {
+namespace Qpid {
+namespace Interop {
+
+using namespace System;
+using namespace System::Text;
+
+
+// Helper functions for marshaling.
+
+private ref class QpidMarshal
+{
+ public:
+
+ // marshal_as<T> not available in all Visual Studio editions.
+
+ static std::string ToNative (System::String^ managed) {
+ if (managed->Length == 0) {
+ return std::string();
+ }
+ array<unsigned char>^ mbytes = Encoding::UTF8->GetBytes(managed);
+ if (mbytes->Length == 0) {
+ return std::string();
+ }
+
+ pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
+ std::string native((char *) pinnedBuf, mbytes->Length);
+ return native;
+ }
+};
+
+}}} // namespace Apache::Qpid::Interop