diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Interop')
20 files changed, 3364 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp new file mode 100644 index 0000000000..02d6c7ab18 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp @@ -0,0 +1,165 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/framing/FrameSet.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + + +// Note on locks: Use "this" for fast counting and idle/busy +// notifications. Use the "sessions" list to serialize session +// creation/reaping and overall tear down. +// TODO: switch "this" lock to separate non-visible Object. + + +AmqpConnection::AmqpConnection(String^ server, int port) : + connectionp(NULL), + busyCount(0), + disposed(false) +{ + bool success = false; + System::Exception^ openException = nullptr; + sessions = gcnew Collections::Generic::List<AmqpSession^>(); + + try { + connectionp = new Connection; + connectionp->open (QpidMarshal::ToNative(server), port); + // TODO: registerFailureCallback for failover + success = true; + const ConnectionSettings& settings = connectionp->getNegotiatedSettings(); + this->maxFrameSize = settings.maxFrameSize; + } catch (const qpid::Exception& error) { + String^ errmsg = gcnew String(error.what()); + openException = gcnew QpidException(errmsg); + } finally { + if (!success) { + Cleanup(); + if (openException == nullptr) { + openException = gcnew QpidException ("unknown connection failure"); + } + throw openException; + } + } +} + +void AmqpConnection::Cleanup() +{ + { + lock l(sessions); + if (disposed) + return; + disposed = true; + } + + try { + // let the child sessions clean up + for each(AmqpSession^ s in sessions) { + s->ConnectionClosed(); + } + } + finally + { + if (connectionp != NULL) { + connectionp->close(); + delete connectionp; + connectionp = NULL; + } + } +} + +AmqpConnection::~AmqpConnection() +{ + Cleanup(); +} + +AmqpConnection::!AmqpConnection() +{ + Cleanup(); +} + +void AmqpConnection::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +AmqpSession^ AmqpConnection::CreateSession() +{ + lock l(sessions); + if (disposed) { + throw gcnew ObjectDisposedException("AmqpConnection"); + } + AmqpSession^ session = gcnew AmqpSession(this, connectionp); + sessions->Add(session); + return session; +} + +// called whenever a child session becomes newly busy (a first reader or writer since last idle) + +void AmqpConnection::NotifyBusy() +{ + bool changed = false; + { + lock l(this); + if (busyCount++ == 0) + changed = true; + } +} + +// called whenever a child session becomes newly idle (a last reader or writer has closed) +// The connection is idle when none of its child sessions are busy + +void AmqpConnection::NotifyIdle() +{ + bool connectionIdle = false; + { + lock l(this); + if (--busyCount == 0) + connectionIdle = true; + } + if (connectionIdle) { + OnConnectionIdle(this, System::EventArgs::Empty); + } +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h new file mode 100644 index 0000000000..2641391e82 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h @@ -0,0 +1,71 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace std; +using namespace qpid::client; + +ref class AmqpSession; + +public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs); + +public ref class AmqpConnection +{ +private: + Connection* connectionp; + void Cleanup(); + bool disposed; + Collections::Generic::List<AmqpSession^>^ sessions; + bool isOpen; + int busyCount; + int maxFrameSize; + + internal: + void NotifyBusy(); + void NotifyIdle(); + + property int MaxFrameSize { + int get () { return maxFrameSize; } + } + +public: + AmqpConnection(System::String^ server, int port); + ~AmqpConnection(); + !AmqpConnection(); + void Close(); + AmqpSession^ CreateSession(); + event ConnectionIdleEventHandler^ OnConnectionIdle; + + property bool IsOpen { + bool get() { return isOpen; } + }; + + property bool IsIdle { + bool get() { return (busyCount == 0); } + } +}; + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp new file mode 100644 index 0000000000..5c333aff60 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp @@ -0,0 +1,76 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/AMQFrame.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + +using namespace Apache::Qpid::AmqpTypes; + +AmqpMessage::AmqpMessage(MessageBodyStream ^mbs) : + messageBodyStream(mbs), + disposed(false) +{ +} + +void AmqpMessage::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + messageBodyStream->Close(); +} + +AmqpMessage::~AmqpMessage() +{ + Cleanup(); +} + +AmqpMessage::!AmqpMessage() +{ + Cleanup(); +} + +void AmqpMessage::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h new file mode 100644 index 0000000000..f0801d30dc --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h @@ -0,0 +1,61 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + + + +public ref class AmqpMessage +{ +private: + MessageBodyStream^ messageBodyStream; + AmqpTypes::AmqpProperties^ amqpProperties; + bool disposed; + void Cleanup(); + +internal: + AmqpMessage(MessageBodyStream ^bstream); + +public: + ~AmqpMessage(); + !AmqpMessage(); + void Close(); + + property AmqpTypes::AmqpProperties^ Properties { + AmqpTypes::AmqpProperties^ get () { return amqpProperties; } + void set(AmqpTypes::AmqpProperties^ p) { amqpProperties = p; } + } + + property System::IO::Stream^ BodyStream { + System::IO::Stream^ get() { return messageBodyStream; } + } +}; + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp new file mode 100644 index 0000000000..bab73da74e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp @@ -0,0 +1,287 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/Message.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/client/Future.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "AmqpMessage.h" +#include "MessageBodyStream.h" +#include "InputLink.h" +#include "OutputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + + +AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidConnectionp) : + connection(conn), + sessionp(NULL), + sessionImplp(NULL), + subs_mgrp(NULL), + openCount(0) +{ + bool success = false; + + try { + sessionp = new qpid::client::AsyncSession; + *sessionp = qpidConnectionp->newSession(); + subs_mgrp = new SubscriptionManager (*sessionp); + success = true; + waiters = gcnew Collections::Generic::List<CompletionWaiter^>(); + } finally { + if (!success) { + Cleanup(); + throw gcnew QpidException ("session creation failure"); + } + } +} + + +void AmqpSession::Cleanup() +{ + if (subscriptionp != NULL) { + subscriptionp->cancel(); + delete subscriptionp; + subscriptionp=NULL; + } + + if (subs_mgrp != NULL) { + subs_mgrp->stop(); + delete subs_mgrp; + subs_mgrp = NULL; + } + + if (localQueuep != NULL) { + delete localQueuep; + localQueuep = NULL; + } + + if (sessionp != NULL) { + sessionp->close(); + delete sessionp; + sessionp = NULL; + sessionImplp = NULL; + } + + if (connectionp != NULL) { + connectionp->close(); + delete connectionp; + connectionp = NULL; + } +} + + +// Called by the parent AmqpConnection + +void AmqpSession::ConnectionClosed() +{ + Cleanup(); +} + +InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue) +{ + return CreateInputLink(sourceQueue, true, false, nullptr, nullptr); +} + +InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, + System::String^ filterKey, System::String^ exchange) +{ + InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange); + { + lock l(waiters); + if (openCount == 0) { + connection->NotifyBusy(); + } + openCount++; + } + return link; +} + +OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue) +{ + OutputLink^ link = gcnew OutputLink (this, targetQueue); + + lock l(waiters); + + if (sessionImplp == NULL) { + // not needed unless sending messages + SessionBase_0_10Access sa(*sessionp); + boost::shared_ptr<SessionImpl> sip = sa.get(); + sessionImplp = sip.get(); + } + + if (openCount == 0) { + connection->NotifyBusy(); + } + openCount++; + + return link; +} + + +// called whenever a child InputLink or OutputLink is closed or finalized +void AmqpSession::NotifyClosed() +{ + lock l(waiters); + openCount--; + if (openCount == 0) { + connection->NotifyIdle(); + } +} + + +CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state) +{ + lock l(waiters); + if (sessionp == NULL) + throw gcnew ObjectDisposedException("Send"); + + // create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream + + std::string exname = QpidMarshal::ToNative(queue); + FrameSet *framesetp = (FrameSet *) mbody->GetFrameSet().ToPointer(); + uint8_t acceptMode=1; + uint8_t acquireMode=0; + MessageTransferBody mtcmd(ProtocolVersion(0,10), exname, acceptMode, acquireMode); + // ask for a command completion + mtcmd.setSync(true); + + //send it + + Future *futurep = NULL; + try { + futurep = new Future(sessionImplp->send(mtcmd, *framesetp)); + + CompletionWaiter^ waiter = nullptr; + if (async || (timeout != TimeSpan::MaxValue)) { + waiter = gcnew CompletionWaiter(this, timeout, (IntPtr) futurep, callback, state); + // waiter is responsible for releasing the Future native resource + futurep = NULL; + addWaiter(waiter); + } + + l.release(); + + if (waiter != nullptr) + return waiter; + + // synchronous send with no timeout: no need to involve the asyncHelper thread + + internalWaitForCompletion((IntPtr) futurep); + } + finally { + if (futurep != NULL) + delete (futurep); + } + return nullptr; +} + +void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey) +{ + sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue), + arg::exchange=QpidMarshal::ToNative(exchange), + arg::bindingKey=QpidMarshal::ToNative(filterKey)); + +} + + +void AmqpSession::internalWaitForCompletion(IntPtr fp) +{ + lock l(waiters); + if (sessionp == NULL) + throw gcnew ObjectDisposedException("AmqpSession"); + + // increment the smart pointer count to sessionImplp to guard agains async close + Session sessionCopy(*sessionp); + + l.release(); + // Qpid native lib call to wait for the command completion + ((Future *)fp.ToPointer())->wait(*sessionImplp); +} + +// call with lock held +void AmqpSession::addWaiter(CompletionWaiter^ waiter) +{ + waiters->Add(waiter); + if (!helperRunning) { + helperRunning = true; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &AmqpSession::asyncHelper)); + } +} + + +void AmqpSession::removeWaiter(CompletionWaiter^ waiter) +{ + // a waiter can be removed from anywhere in the list if timed out + + lock l(waiters); + int idx = waiters->IndexOf(waiter); + if (idx == -1) { + // TODO: assert or log + } + else { + waiters->RemoveAt(idx); + } +} + + +// process CompletionWaiter list one at a time. + +void AmqpSession::asyncHelper(Object ^unused) +{ + lock l(waiters); + + while (true) { + if (waiters->Count == 0) { + helperRunning = false; + return; + } + + CompletionWaiter^ waiter = waiters[0]; + l.release(); + // can block, but for short time + // the waiter removes itself from the list, possibly as the timer thread on timeout + waiter->Run(); + l.acquire(); + } +} + + +}}} // namespace Apache::Qpid::Cli diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h new file mode 100644 index 0000000000..b959a4123a --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h @@ -0,0 +1,80 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +#include "AmqpConnection.h" +#include "MessageBodyStream.h" +#include "CompletionWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + +ref class InputLink; +ref class OutputLink; + +public ref class AmqpSession +{ +private: + AmqpConnection^ connection; + Connection* connectionp; + AsyncSession* sessionp; + SessionImpl* sessionImplp; + SubscriptionManager* subs_mgrp; + Subscription* subscriptionp; + LocalQueue* localQueuep; + Collections::Generic::List<CompletionWaiter^>^ waiters; + bool helperRunning; + int openCount; + + void Cleanup(); + void asyncHelper(Object ^); + void addWaiter(CompletionWaiter^ waiter); + +public: + OutputLink^ CreateOutputLink(System::String^ targetQueue); + InputLink^ CreateInputLink(System::String^ sourceQueue); + + // 0-10 specific support + InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange); + void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey); + +internal: + AmqpSession(AmqpConnection^ connection, qpid::client::Connection* qpidConnection); + void NotifyClosed(); + CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state); + void ConnectionClosed(); + void internalWaitForCompletion(IntPtr Future); + void removeWaiter(CompletionWaiter^ waiter); + + property AmqpConnection^ Connection { + AmqpConnection^ get () { return connection; } + } + + +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp new file mode 100644 index 0000000000..91c23ae30a --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using namespace System; +using namespace System::Reflection; +using namespace System::Runtime::CompilerServices; +using namespace System::Runtime::InteropServices; +using namespace System::Security::Permissions; + +// +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +// +[assembly:AssemblyTitleAttribute("Apache.Qpid.Interop")]; +[assembly:AssemblyDescriptionAttribute("")]; +[assembly:AssemblyConfigurationAttribute("")]; +[assembly:AssemblyCompanyAttribute("")]; +[assembly:AssemblyProductAttribute("")]; +[assembly:AssemblyCopyrightAttribute("")]; +[assembly:AssemblyTrademarkAttribute("")]; +[assembly:AssemblyCultureAttribute("")]; + +// +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the value or you can default the Revision and Build Numbers +// by using the '*' as shown below: + +[assembly:AssemblyVersionAttribute("1.0.*")]; + +[assembly:ComVisible(false)]; + +[assembly:CLSCompliantAttribute(true)]; + +[assembly:SecurityPermission(SecurityAction::RequestMinimum, UnmanagedCode = true)]; diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp new file mode 100644 index 0000000000..e39ee1b1ae --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp @@ -0,0 +1,145 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "CompletionWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + +// A class to provide IAsyncResult semantics for a qpid AsyncSession command (i.e. 0-10 messageTransfer) +// when the client session receives a "Completion" notification from the Broker. + + +CompletionWaiter::CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback^ callback, Object^ state) +{ + this->qpidFuture = future; + this->asyncCallback = callback; + this->state = state; + this->parent = parent; + this->thisLock = gcnew Object(); + // do this after the Completion Waiter is fully initialized, in case of + // very small timespan + if (timeSpan != TimeSpan::MaxValue) { + this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); + } +} + + +void CompletionWaiter::WaitForCompletion() +{ + if (isCompleted) + return; + + lock l(thisLock); + while (!isCompleted) { + Monitor::Wait(thisLock); + } +} + +void CompletionWaiter::Run() +{ + // no locks required in this method + if (isCompleted) + return; + + try { + // Wait for the arrival of the "AMQP Completion" indication from the Broker + parent->internalWaitForCompletion(qpidFuture); + } + catch (System::Exception^ e) { + runException = e; + } + finally { + delete(qpidFuture.ToPointer()); + qpidFuture = (IntPtr) NULL; + } + + if (timer != nullptr) { + timer->~Timer(); + timer = nullptr; + } + + Complete(false); +} + + +// "Complete" here means complete the AsyncResult, which may precede broker "command completion" if timed out + +void CompletionWaiter::Complete(bool isTimerThread) +{ + lock l(thisLock); + if (isCompleted) + return; + + isCompleted = true; + if (isTimerThread) + timedOut = true; + + Monitor::PulseAll(thisLock); + + // do this check and signal while locked + if (asyncWaitHandle != nullptr) + asyncWaitHandle->Set(); + + l.release(); + + parent->removeWaiter(this); + + if (asyncCallback != nullptr) { + // guard against application callback exception + try { + asyncCallback(this); + } + catch (System::Exception^) { + // log it? + } + } +} + + +void CompletionWaiter::TimeoutCallback(Object^ state) +{ + CompletionWaiter^ waiter = (CompletionWaiter^) state; + waiter->Complete(true); +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h new file mode 100644 index 0000000000..197ac632b0 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h @@ -0,0 +1,99 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; + +public ref class CompletionWaiter : IAsyncResult +{ +private: + bool timedOut; + // has an owner thread + bool assigned; + // can Run (i.e. earlier CompletionWaiters in the queue have completed) + System::Exception^ runException; + AsyncCallback^ asyncCallback; + Threading::Timer ^timer; + bool isCompleted; + Object^ state; + Object^ thisLock; + ManualResetEvent^ asyncWaitHandle; + AmqpSession^ parent; + IntPtr qpidFuture; + void Complete(bool isTimerThread); + static void TimeoutCallback(Object^ state); + static TimerCallback^ timeoutCallback = gcnew TimerCallback(CompletionWaiter::TimeoutCallback); + + internal: + CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback ^callback, Object^ state); + + void Run(); + void WaitForCompletion(); + + property bool Assigned { + bool get () { return assigned; } + } + + property bool TimedOut { + bool get () { return timedOut; } + } + + + public: + + virtual property bool IsCompleted { + bool get () { return isCompleted; } + } + + virtual property bool CompletedSynchronously { + bool get () { return false; } + } + + virtual property WaitHandle^ AsyncWaitHandle { + WaitHandle^ get () { + if (asyncWaitHandle != nullptr) { + return asyncWaitHandle; + } + + msclr::lock l(thisLock); + if (asyncWaitHandle == nullptr) { + asyncWaitHandle = gcnew ManualResetEvent(isCompleted); + } + return asyncWaitHandle; + } + } + + + virtual property Object^ AsyncState { + Object^ get () { return state; } + } + + + + +}; + +}}} // namespace Apache::Qpid::Interop + diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp new file mode 100644 index 0000000000..cee394b05d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -0,0 +1,685 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +using namespace Apache::Qpid::AmqpTypes; + +// Scalability note: When using async methods, an async helper thread is created +// to block on the Demux BlockingQueue. This design should be revised in line +// with proposed changes to the native library to reduce the number of servicing +// threads for large numbers of subscriptions. + + +// The folowing def must match the "Frames" private typedef. +// TODO, make Qpid-cpp "Frames" definition visible. +typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; + +InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, + qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp, + bool exclusive, + bool temporary, System::String^ filterKey, System::String^ exchange) : + amqpSession(session), + subscriptionp(NULL), + localQueuep(NULL), + queuePtrp(NULL), + dequeuedFrameSetpp(NULL), + disposed(false), + finalizing(false) +{ + bool success = false; + System::Exception^ linkException = nullptr; + + waiters = gcnew Collections::Generic::List<MessageWaiter^>(); + + try { + std::string qname = QpidMarshal::ToNative(sourceQueue); + + if (temporary) { + qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); + qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange), + arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey)); + qpidSessionp->sync(); + } + + localQueuep = new LocalQueue; + SubscriptionSettings settings; + settings.flowControl = FlowControl::messageCredit(0); + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); + subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup + + // the roundabout way to obtain localQueuep->queue + SessionBase_0_10Access sa(*qpidSessionp); + boost::shared_ptr<SessionImpl> simpl = sa.get(); + queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName())); + + success = true; + } finally { + if (!success) { + Cleanup(); + linkException = gcnew QpidException ("InputLink creation failure"); + throw linkException; + } + } +} + +void InputLink::ReleaseNative() +{ + // involves talking to the Broker unless the connection is broken + if (subscriptionp != NULL) { + try { + subscriptionp->cancel(); + } + catch (const std::exception& error) { + // TODO: log this properly + std::cout << "shutdown error " << error.what() << std::endl; + } + } + + // free native mem (or smart pointers) that we own + if (subscriptionp != NULL) + delete subscriptionp; + if (queuePtrp != NULL) + delete queuePtrp; + if (localQueuep != NULL) + delete localQueuep; + if (dequeuedFrameSetpp != NULL) + delete dequeuedFrameSetpp; +} + +void InputLink::Cleanup() +{ + { + lock l(waiters); + if (disposed) + return; + + disposed = true; + + // if the asyncHelper exists and is idle, unblock it + if (asyncHelperWaitHandle != nullptr) { + asyncHelperWaitHandle->Set(); + } + + // wakeup anyone waiting for messages + if (queuePtrp != NULL) + (*queuePtrp)->close(); + + try {} + finally + { + ReleaseNative(); + } + + } + amqpSession->NotifyClosed(); +} + +InputLink::~InputLink() +{ + Cleanup(); +} + +InputLink::!InputLink() +{ + Cleanup(); +} + +void InputLink::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +// call with lock held +bool InputLink::haveMessage() +{ + if (dequeuedFrameSetpp != NULL) + return true; + + if (queuePtrp != NULL) { + if ((*queuePtrp)->size() > 0) + return true; + } + return false; +} + +IntPtr InputLink::nextLocalMessage() +{ + lock l(waiters); + if (disposed) + return (IntPtr) NULL; + + // A message already pulled off BlockingQueue? + if (dequeuedFrameSetpp != NULL) { + QpidFrameSetPtr* rv = dequeuedFrameSetpp; + dequeuedFrameSetpp = NULL; + return (IntPtr) rv; + } + + if ((*queuePtrp)->empty()) + return (IntPtr) NULL; + + bool received = false; + QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr; + + try { + received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE); + if (received) { + QpidFrameSetPtr* rv = frameSetpp; + // no need to free native in finally block + frameSetpp = NULL; + return (IntPtr) rv; + } + } catch(const std::exception& error) { + // should be no async tampering with queue since we hold the lock and have a + // smart pointer ref to the native LocalQueue, even if the network connection fails... + cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl; + // TODO: log this + } + finally { + if (frameSetpp != NULL) { + delete frameSetpp; + } + } + + return (IntPtr) NULL; +} + + + +void InputLink::unblockWaiter() +{ + // to be followed by resetQueue() below + lock l(waiters); + if (disposed) + return; + (*queuePtrp)->close(); +} + + + +// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks +// a blocking thread without interefering with queue contents or the ability to push +// new incoming messages. + +void InputLink::resetQueue() +{ + lock l(waiters); + if (disposed) + return; + if ((*queuePtrp)->isClosed()) { + (*queuePtrp)->open(); + } +} + + +// returns true if there is a message to consume, i.e. nextLocalMessage() won't block + +bool InputLink::internalWaitForMessage() +{ + Demux::QueuePtr demuxQueuePtr; + + bool received = false; + QpidFrameSetPtr* frameSetpp = NULL; + try { + lock l(waiters); + if (disposed) + return false; + if (haveMessage()) + return true; + + // TODO: prefetch window of messages, compatible with both 0-10 and 1.0. + subscriptionp->grantMessageCredit(1); + + // get a scoped smart ptr ref to guard against async close or hangup + demuxQueuePtr = *queuePtrp; + frameSetpp = new QpidFrameSetPtr; + + l.release(); + // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired. + received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE); + l.acquire(); + + if (received) { + dequeuedFrameSetpp = frameSetpp; + frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream + } + + return true; + } catch(const std::exception& ) { + // timeout or connection closed + return false; + } + finally { + if (frameSetpp != NULL) { + delete frameSetpp; + } + } + + return false; +} + + +// call with lock held +void InputLink::addWaiter(MessageWaiter^ waiter) +{ + waiters->Add(waiter); + if (waiters->Count == 1) { + // mark this waiter as ready to run + // Only the waiter at the head of the queue is active. + waiter->Activate(); + } + + if (waiter->Assigned) + return; + + if (asyncHelperWaitHandle == nullptr) { + asyncHelperWaitHandle = gcnew ManualResetEvent(false); + ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper); + (gcnew Thread(threadDelegate))->Start(); + } + + if (waiters->Count == 1) { + // wake up the asyncHelper + asyncHelperWaitHandle->Set(); + } +} + + +void InputLink::removeWaiter(MessageWaiter^ waiter) { + // a waiter can be removed from anywhere in the list if timed out + + lock l(waiters); + int idx = waiters->IndexOf(waiter); + if (idx == -1) { + // TODO: assert or log + if (asyncHelperWaitHandle != nullptr) { + // just in case. + asyncHelperWaitHandle->Set(); + } + return; + } + waiters->RemoveAt(idx); + + // let the next waiter know it's his turn. + if (waiters->Count > 0) { + MessageWaiter^ nextWaiter = waiters[0]; + + // wakeup the asyncHelper thread to help out if necessary. + if (!nextWaiter->Assigned) { + asyncHelperWaitHandle->Set(); + } + + l.release(); + nextWaiter->Activate(); + return; + } + else { + if (disposed && (asyncHelperWaitHandle != nullptr)) { + asyncHelperWaitHandle->Set(); + } + } +} + + +void InputLink::asyncHelper() +{ + lock l(waiters); + + while (true) { + if (disposed && (waiters->Count == 0)) { + asyncHelperWaitHandle = nullptr; + return; + } + + if (waiters->Count > 0) { + MessageWaiter^ waiter = waiters[0]; + + l.release(); + if (waiter->AcceptForWork()) { + waiter->Run(); + } + l.acquire(); + } + + // sleep if more work may be coming or it is currently someone else's turn + if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) { + // wait for something to do + asyncHelperWaitHandle->Reset(); + l.release(); + asyncHelperWaitHandle->WaitOne(); + l.acquire(); + } + } +} + +void InputLink::sync() +{ + // for the timeout thread + lock l(waiters); +} + + +AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) +{ + QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); + bool ownFrameSet = true; + bool haveProperties = false; + + try { + MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp); + ownFrameSet = false; // stream releases on close/dispose + + AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream); + + AMQHeaderBody* headerBodyp = (*fspp)->getHeaders(); + uint64_t contentSize = (*fspp)->getContentSize(); + SequenceSet frameSetID((*fspp)->getId()); + + // target managed representation + AmqpProperties^ amqpProperties = gcnew AmqpProperties(); + + // source native representation + const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>(); + const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>(); + + if (deliveryProperties) { + if (deliveryProperties->hasRoutingKey()) { + haveProperties = true; + + amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str()); + } + + if (deliveryProperties->hasDeliveryMode()) { + if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT) + amqpProperties->Durable = true; + } + + if (deliveryProperties->hasTtl()) { + long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond; + amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks)); + } + } + + if (messageProperties) { + + if (messageProperties->hasReplyTo()) { + haveProperties = true; + const ReplyTo& rpto = messageProperties->getReplyTo(); + String^ rk = nullptr; + String^ ex = nullptr; + if (rpto.hasRoutingKey()) { + rk = gcnew String(rpto.getRoutingKey().c_str()); + } + if (rpto.hasExchange()) { + ex = gcnew String(rpto.getExchange().c_str()); + } + amqpProperties->SetReplyTo(ex,rk); + } + + if (messageProperties->hasContentType()) { + haveProperties = true; + amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str()); + + if (messageProperties->hasContentEncoding()) { + String^ enc = gcnew String(messageProperties->getContentEncoding().c_str()); + if (!String::IsNullOrEmpty(enc)) { + // TODO: properly assemble 1.0 style to 0-10 for all cases + amqpProperties->ContentType += "; charset=" + enc; + } + } + } + + if (messageProperties->hasCorrelationId()) { + haveProperties = true; + const std::string& ncid = messageProperties->getCorrelationId(); + int len = ncid.size(); + array<unsigned char>^ mcid = gcnew array<unsigned char>(len); + Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len); + amqpProperties->CorrelationId = mcid; + } + + if (messageProperties->hasUserId()) { + haveProperties = true; + const std::string& nuid = messageProperties->getUserId(); + int len = nuid.size(); + array<unsigned char>^ muid = gcnew array<unsigned char>(len); + Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len); + amqpProperties->UserId = muid; + } + + if (messageProperties->hasApplicationHeaders()) { + haveProperties = true; + const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders(); + int count = fieldTable.count(); + + if (count > 0) { + haveProperties = true; + Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap = + gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count); + + for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) { + + qpid::framing::FieldValue::Data &data = i->second->getData(); + + // TODO: replace these generic int/string conversions with handler for each AMQP specific type: + // uint8_t dataType = i->second->getType(); + // switch (dataType) { case TYPE_CODE_STR8: ... } + + if (data.convertsToInt()) { + mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt())); + } + if (data.convertsToString()) { + std::string ns = data.getString(); + String^ ms = gcnew String(ns.data(), 0, ns.size()); + mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms)); + } + } + + amqpProperties->PropertyMap = mmap; + } + + } + } + + if (haveProperties) { + amqpMessage->Properties = amqpProperties; + } + + // We have a message we can return to the caller. + // Tell the broker we got it. + subscriptionp->accept(frameSetID); + return amqpMessage; + } + finally { + if (ownFrameSet) + delete (fspp); + } +} + + // As for IInputChannel: + // if success, return true + amqpMessage + // elseif timeout, return false + // elseif closed/EOF, return true and amqpMessage = null + // else throw an Exception + +bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) +{ + lock l(waiters); + + if (waiters->Count == 0) { + // see if there is a message already available without blocking + IntPtr fspp = nextLocalMessage(); + if (fspp.ToPointer() != NULL) { + amqpMessage = createAmqpMessage(fspp); + return true; + } + } + + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr); + addWaiter(waiter); + + l.release(); + waiter->Run(); + l.acquire(); + + if (waiter->TimedOut) { + return false; + } + + IntPtr waiterMsg = waiter->Message; + if (waiterMsg.ToPointer() == NULL) { + if (disposed) { + // indicate normal EOF on channel + amqpMessage = nullptr; + return true; + } + } + + amqpMessage = createAmqpMessage(waiterMsg); + return true; +} + +IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + + //TODO: if haveMessage() complete synchronously + + lock l(waiters); + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); + addWaiter(waiter); + return waiter; +} + +bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage) +{ + + // TODO: validate result + + MessageWaiter^ waiter = (MessageWaiter ^) result; + + waiter->WaitForCompletion(); + + if (waiter->RunException != nullptr) + throw waiter->RunException; + + if (waiter->TimedOut) { + amqpMessage = nullptr; + return false; + } + + IntPtr waiterMsg = waiter->Message; + if (waiterMsg.ToPointer() == NULL) { + if (disposed) { + // indicate normal EOF on channel + amqpMessage = nullptr; + return true; + } + } + + amqpMessage = createAmqpMessage(waiterMsg); + return true; +} + + +bool InputLink::WaitForMessage(TimeSpan timeout) +{ + lock l(waiters); + + if (waiters->Count == 0) { + // see if there is a message already available without blocking + if (haveMessage()) + return true; + } + + // Same as for TryReceive, except consuming = false + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr); + addWaiter(waiter); + + l.release(); + waiter->Run(); + l.acquire(); + + if (waiter->TimedOut) { + return false; + } + + return true; +} + +IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + lock l(waiters); + + // Same as for BeginTryReceive, except consuming = false + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); + addWaiter(waiter); + return waiter; +} + +bool InputLink::EndWaitForMessage(IAsyncResult^ result) +{ + MessageWaiter^ waiter = (MessageWaiter ^) result; + + waiter->WaitForCompletion(); + + if (waiter->TimedOut) { + return false; + } + + return true; +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h new file mode 100644 index 0000000000..366780c137 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h @@ -0,0 +1,85 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + +// smart pointer to the low level AMQP 0-10 frames of the message +typedef qpid::framing::FrameSet::shared_ptr QpidFrameSetPtr; + +public ref class InputLink +{ +private: + AmqpSession^ amqpSession; + Subscription* subscriptionp; + LocalQueue* localQueuep; + Demux::QueuePtr* queuePtrp; + Collections::Generic::List<MessageWaiter^>^ waiters; + bool disposed; + bool finalizing; + QpidFrameSetPtr* dequeuedFrameSetpp; + ManualResetEvent^ asyncHelperWaitHandle; + + void Cleanup(); + void ReleaseNative(); + bool haveMessage(); + void addWaiter(MessageWaiter^ waiter); + void asyncHelper(); + AmqpMessage^ createAmqpMessage(IntPtr msgp); + +internal: + InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp, + qpid::client::SubscriptionManager *qpidSubsMgrp, bool exclusive, bool temporary, System::String^ filterKey, + System::String^ exchange); + + bool internalWaitForMessage(); + void unblockWaiter(); + void resetQueue(); + IntPtr nextLocalMessage(); + void removeWaiter(MessageWaiter^ waiter); + void sync(); + +public: + ~InputLink(); + !InputLink(); + void Close(); + + bool TryReceive(TimeSpan timeout, [Out] AmqpMessage ^% amqpMessage); + IAsyncResult^ BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state); + bool EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage); + + bool WaitForMessage(TimeSpan timeout); + IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state); + bool EndWaitForMessage(IAsyncResult^ result); + +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj new file mode 100644 index 0000000000..32f78c8344 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/Interop.vcproj @@ -0,0 +1,302 @@ +<?xml version="1.0" encoding="Windows-1252"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="9.00"
+ Name="Interop"
+ ProjectGUID="{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}"
+ RootNamespace="Interop"
+ Keyword="ManagedCProj"
+ TargetFrameworkVersion="196613"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(ProjectDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ CommandLine="copy ..\AmqpTypes\obj\$(ConfigurationName)\Apache.Qpid.AmqpTypes.netmodule $(ConfigurationName)"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalOptions="/FU Debug\Apache.Qpid.AmqpTypes.netmodule"
+ Optimization="0"
+ AdditionalIncludeDirectories="..\..\..\..\..\cpp\build\include;..\..\..\..\..\cpp\build\src;..\..\..\..\..\cpp\include;..\..\..\..\..\cpp\src;"$(BOOST_ROOT)""
+ PreprocessorDefinitions="WIN32;_DEBUG;_CRT_NONSTDC_NO_WARNINGS;WIN32_LEAN_AND_MEAN;NOMINMAX;_SCL_SECURE_NO_WARNINGS;BOOST_ALL_DYN_LINK"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalOptions="..\..\..\..\..\cpp\build\src\Debug\qpidcommon.lib ..\..\..\..\..\cpp\build\src\Debug\qpidclient.lib Debug\Apache.Qpid.AmqpTypes.netmodule"
+ AdditionalDependencies="$(NoInherit)"
+ OutputFile="$(OutDir)\Apache.Qpid.Interop.dll"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories=""$(BOOST_ROOT)\lib""
+ GenerateDebugInformation="true"
+ AssemblyDebug="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(SolutionDir)$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)"
+ ConfigurationType="2"
+ CharacterSet="1"
+ ManagedExtensions="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ PreprocessorDefinitions="WIN32;NDEBUG"
+ RuntimeLibrary="2"
+ UsePrecompiledHeader="2"
+ WarningLevel="3"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ AdditionalDependencies="$(NoInherit)"
+ LinkIncremental="1"
+ GenerateDebugInformation="true"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ <AssemblyReference
+ RelativePath="System.dll"
+ AssemblyName="System, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.Data.dll"
+ AssemblyName="System.Data, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=x86"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.XML.dll"
+ AssemblyName="System.Xml, Version=2.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="131072"
+ />
+ <AssemblyReference
+ RelativePath="System.Runtime.Serialization.dll"
+ AssemblyName="System.Runtime.Serialization, Version=3.0.0.0, PublicKeyToken=b77a5c561934e089, processorArchitecture=MSIL"
+ MinFrameworkVersion="196608"
+ />
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\AmqpConnection.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpMessage.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpSession.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\AssemblyInfo.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\CompletionWaiter.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\InputLink.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageBodyStream.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageWaiter.cpp"
+ >
+ </File>
+ <File
+ RelativePath=".\OutputLink.cpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ <File
+ RelativePath=".\AmqpConnection.h"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpMessage.h"
+ >
+ </File>
+ <File
+ RelativePath=".\AmqpSession.h"
+ >
+ </File>
+ <File
+ RelativePath=".\CompletionWaiter.h"
+ >
+ </File>
+ <File
+ RelativePath=".\InputLink.h"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageBodyStream.h"
+ >
+ </File>
+ <File
+ RelativePath=".\MessageWaiter.h"
+ >
+ </File>
+ <File
+ RelativePath=".\OutputLink.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidException.h"
+ >
+ </File>
+ <File
+ RelativePath=".\QpidMarshal.h"
+ >
+ </File>
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>
diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp new file mode 100644 index 0000000000..f2cb5740d3 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp @@ -0,0 +1,337 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/AMQFrame.h" + +#include "MessageBodyStream.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace qpid::framing; + +// Thefolowing def must match "Frames" private typedef. +// TODO: make "Frames" publicly visible. +typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; + +using namespace std; + +static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count) +{ + if (buffer == nullptr) + throw gcnew ArgumentNullException("buffer"); + + if (offset < 0) + throw gcnew ArgumentOutOfRangeException("offset"); + + if (count < 0) + throw gcnew ArgumentOutOfRangeException("count"); + + if ((offset + count) > buffer->Length) + throw gcnew ArgumentException("offset + count"); +} + + +// Input stream constructor + +MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp) +{ + isInputStream = true; + frameSetpp = fspp; + fragmentCount = 0; + length = 0; + position = 0; + currentFramep = NULL; + + const std::string *datap; // pointer to the fragment's string variable that holds the content + + for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) { + fragmentCount++; + datap = &(i->castBody<AMQContentBody>()->getData()); + length += datap->size(); + } + } + + // fragmentCount can be zero for an empty message + + fragmentIndex = 0; + fragmentPosition = 0; + + if (fragmentCount == 0) { + currentFragment = NULL; + fragmentLength = 0; + } + else if (fragmentCount == 1) { + currentFragment = datap->data(); + fragmentLength = (int) length; + } + else { + fragments = gcnew array<IntPtr>(fragmentCount); + fragmentIndex = 0; + for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) { + datap = &(i->castBody<AMQContentBody>()->getData()); + fragments[fragmentIndex++] = (IntPtr) (void *) datap; + } + } + fragmentIndex = 0; + datap = (const std::string *) fragments[0].ToPointer(); + currentFragment = datap->data(); + fragmentLength = datap->size(); + } +} + + +int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count) +{ + if (!isInputStream) + throw gcnew NotSupportedException(); + if (disposed) + throw gcnew ObjectDisposedException("Stream"); + if (count == 0) + return 0; + ThrowIfBadArgs(buffer, offset, count); + + int nRead = 0; + int remaining = count; + + while (nRead < count) { + int fragAvail = fragmentLength - fragmentPosition; + int copyCount = min (fragAvail, remaining); + if (copyCount == 0) { + // no more to read + return nRead; + } + + // copy from native space + IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition); + Marshal::Copy (nativep, buffer, offset, copyCount); + nRead += copyCount; + remaining -= copyCount; + fragmentPosition += copyCount; + offset += copyCount; + + // advance to next fragment? + if (fragmentPosition == fragmentLength) { + if (++fragmentIndex < fragmentCount) { + const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer(); + currentFragment = datap->data(); + fragmentLength = datap->size(); + fragmentPosition = 0; + } + } + } + + return nRead; +} + + +void MessageBodyStream::pushCurrentFrame(bool lastFrame) +{ + // set flags as in SessionImpl::sendContent. + if (currentFramep->getBody()->type() == CONTENT_BODY) { + + if ((fragmentCount == 1) && lastFrame) { + // only one content frame + currentFramep->setFirstSegment(false); + } + else { + currentFramep->setFirstSegment(false); + currentFramep->setLastSegment(true); + if (fragmentCount != 1) { + currentFramep->setFirstFrame(false); + } + if (!lastFrame) { + currentFramep->setLastFrame(false); + } + } + } + else { + // the header frame + currentFramep->setFirstSegment(false); + if (!lastFrame) { + // there will be at least one content frame + currentFramep->setLastSegment(false); + } + } + + // add to frame set. This makes a copy and ref counts the body + (*frameSetpp)->append(*currentFramep); + + delete currentFramep; + + currentFramep = NULL; +} + + +IntPtr MessageBodyStream::GetFrameSet() +{ + if (currentFramep != NULL) { + // No more content. Tidy up the pending (possibly single header) frame. + pushCurrentFrame(true); + } + + if (frameSetpp == NULL) { + return (IntPtr) NULL; + } + + // shared_ptr.get() + return (IntPtr) (void *) (*frameSetpp).get(); +} + +IntPtr MessageBodyStream::GetHeader() +{ + return (IntPtr) headerBodyp; +} + + +// Ouput stream constructor + +MessageBodyStream::MessageBodyStream(int maxFrameSize) +{ + isInputStream = false; + + maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead(); + SequenceNumber unused; // only meaningful on incoming frames + frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused)); + fragmentCount = 0; + length = 0; + position = 0; + + // header goes first in the outgoing frameset + + boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody); + currentFramep = new AMQFrame(headerBody); + headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get()); + + // mark this header frame as "full" to force the first write to create a new content frame + fragmentPosition = maxFrameContentSize; +} + +void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count) +{ + if (isInputStream) + throw gcnew NotSupportedException(); + if (disposed) + throw gcnew ObjectDisposedException("Stream"); + if (count == 0) + return; + ThrowIfBadArgs(buffer, offset, count); + + if (currentFramep == NULL) { + // GetFrameSet() has been called and we no longer exclusively own the underlying frames. + throw gcnew InvalidOperationException ("Mesage Body output already completed"); + } + + if (count <= 0) + return; + + // keep GC memory movement at bay while copying to native space + pin_ptr<unsigned char> pinnedBuf = &buffer[0]; + + string *datap; + + int remaining = count; + while (remaining > 0) { + if (fragmentPosition == maxFrameContentSize) { + // move to a new frame, but not until ready to add new content. + // zero content is valid, or the final write may exactly fill to maxFrameContentSize + + pushCurrentFrame(false); + + currentFramep = new AMQFrame(AMQContentBody()); + fragmentPosition = 0; + fragmentCount++; + } + + int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition)); + datap = &(currentFramep->castBody<AMQContentBody>()->getData()); + + char *outp = (char *) pinnedBuf + offset; + if (fragmentPosition == 0) { + datap->assign(outp, copyCount); + } + else { + datap->append(outp, copyCount); + } + + position += copyCount; + fragmentPosition += copyCount; + remaining -= copyCount; + offset += copyCount; + } +} + + +void MessageBodyStream::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + try {} + finally + { + if (frameSetpp != NULL) { + delete frameSetpp; + frameSetpp = NULL; + } + if (currentFramep != NULL) { + delete currentFramep; + currentFramep = NULL; + } + } +} + +MessageBodyStream::~MessageBodyStream() +{ + Cleanup(); +} + +MessageBodyStream::!MessageBodyStream() +{ + Cleanup(); +} + +void MessageBodyStream::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h new file mode 100644 index 0000000000..fa8e3f6bde --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h @@ -0,0 +1,131 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace qpid::framing; +using namespace std; + + +// This class provides memory streaming of the message body contents +// between native and managed space. To avoid additional memory copies +// in native space, it reads and writes directly to the low level Qpid +// frames. + +public ref class MessageBodyStream : System::IO::Stream +{ +private: + bool isInputStream; + long long length; + long long position; + + // the boost smart pointer that keeps the message body frames in memory + FrameSet::shared_ptr *frameSetpp; + + int fragmentCount; + int fragmentIndex; + const char* currentFragment; + int fragmentPosition; + int fragmentLength; + array<IntPtr>^ fragments; + + int maxFrameContentSize; + AMQFrame* currentFramep; + void* headerBodyp; + bool disposed; + bool finalizing; + void Cleanup(); + +internal: + // incoming message + MessageBodyStream(FrameSet::shared_ptr *fspp); + // outgoing message + MessageBodyStream(int maxFrameSize); + void pushCurrentFrame(bool last); +public: + ~MessageBodyStream(); + !MessageBodyStream(); + virtual void Close() override; + virtual int Read( + [InAttribute] [OutAttribute] array<unsigned char>^ buffer, + int offset, + int count) override; + + virtual void Write( + array<unsigned char>^ buffer, + int offset, + int count) override; + + + IntPtr GetFrameSet(); + IntPtr GetHeader(); + + virtual void Flush() override {} // noop + + + // TODO: see CanSeek below. + virtual long long Seek( + long long offset, + System::IO::SeekOrigin origin) override {throw gcnew System::NotSupportedException(); } + + // TODO: see CanSeek below. + virtual void SetLength( + long long value) override {throw gcnew System::NotSupportedException(); } + + virtual property long long Length { + long long get() override { return length; } + }; + + virtual property long long Position { + long long get() override { return position; } + void set(long long p) override { throw gcnew System::NotSupportedException(); } + }; + + + virtual property bool CanRead { + bool get () override { return isInputStream; } + } + + virtual property bool CanWrite { + bool get () override { return !isInputStream; } + } + + // Note: this class must return true to signal that the Length property works. + // Required by the raw message encoder. + // "If a class derived from Stream does not support seeking, calls to Length, + // SetLength, Position, and Seek throw a NotSupportedException". + + virtual property bool CanSeek { + bool get () override { return true; } + } + + virtual property bool CanTimeout { + bool get () override { return isInputStream; } + } +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp new file mode 100644 index 0000000000..f7a28b0692 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp @@ -0,0 +1,251 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + + +MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state) +{ + this->consuming = consuming; + if (!consuming) { + GC::SuppressFinalize(this); + } + + if (async) { + this->async = true; + this->asyncCallback = callback; + this->state = state; + } + else { + this->assigned = true; + } + this->parent = parent; + this->thisLock = gcnew Object(); + + // do this after the Message Waiter is fully initialized, in case of + // very small timespan + if (timeSpan != TimeSpan::MaxValue) { + this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); + } +} + +MessageWaiter::~MessageWaiter() +{ + if (message != IntPtr::Zero) { + try{} + finally { + delete message.ToPointer(); + message = IntPtr::Zero; + } + } +} + +MessageWaiter::!MessageWaiter() +{ + this->~MessageWaiter(); +} + + +void MessageWaiter::WaitForCompletion() +{ + if (isCompleted) + return; + + lock l(thisLock); + while (!isCompleted) { + Monitor::Wait(thisLock); + } +} + +void MessageWaiter::Activate() +{ + if (activated) + return; + + lock l(thisLock); + if (!activated) { + activated = true; + Monitor::PulseAll(thisLock); + } +} + + +void MessageWaiter::Run() +{ + lock l(thisLock); + + // wait until Activate(), i.e. our turn in the waiter list or a timeout + while (!activated) { + Monitor::Wait(thisLock); + } + bool haveMessage = false; + bool mustReset = false; + + if (!timedOut) + blocking = true; + + if (blocking) { + l.release(); + + try { + haveMessage = parent->internalWaitForMessage(); + } + catch (System::Exception^ e) { + runException = e; + } + + l.acquire(); + blocking = false; + if (timedOut) { + // TimeoutCallback() called parent->unblockWaiter() + mustReset = true; + // let the timer thread move past critical region + while (processingTimeout) { + Monitor::Wait(thisLock); + } + } + } + + if (timer != nullptr) { + timer->~Timer(); + timer = nullptr; + } + + if (haveMessage) { + timedOut = false; // for the case timeout and message arrival are essentially tied + if (!consuming) { + // just waiting + haveMessage = false; + } + } + + if (haveMessage || mustReset) { + l.release(); + if (haveMessage) { + // hang on to it for when the async caller gets around to retrieving + message = parent->nextLocalMessage(); + } + if (mustReset) { + parent->resetQueue(); + } + l.acquire(); + } + + isCompleted = true; + Monitor::PulseAll(thisLock); + + // do this check and signal while locked + if (asyncWaitHandle != nullptr) + asyncWaitHandle->Set(); + + l.release(); + parent->removeWaiter(this); + + + if (asyncCallback != nullptr) { + // guard against application callback exception + try { + asyncCallback(this); + } + catch (System::Exception^) { + // log it? + } + } + +} + +bool MessageWaiter::AcceptForWork() +{ + lock l(thisLock); + if (!assigned) { + assigned = true; + return true; + } + return false; +} + +void MessageWaiter::TimeoutCallback(Object^ state) +{ + MessageWaiter^ waiter = (MessageWaiter^) state; + if (waiter->isCompleted) + return; + + // make sure parent has finished initializing us before we get going + waiter->parent->sync(); + + lock l(waiter->thisLock); + if (waiter->timer == nullptr) { + // the waiter is in the clean up phase and doesn't need a wakeup + return; + } + + // timedOut, blocking and processingTimeout work as a unit + waiter->timedOut = true; + if (waiter->blocking) { + // let the waiter know that we are busy with an upcoming unblock operation + waiter->processingTimeout = true; + } + + waiter->Activate(); + + if (waiter->processingTimeout) { + // call with lock off + l.release(); + waiter->parent->unblockWaiter(); + + // synchronize with blocked thread + l.acquire(); + waiter->processingTimeout = false; + Monitor::PulseAll(waiter->thisLock); + } + + l.release(); + + // If waiter has no associated thread, we must move it to completion + if (waiter->AcceptForWork()) { + waiter->Run(); // does not block since timedOut == true + } +} + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h new file mode 100644 index 0000000000..3eb4919646 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h @@ -0,0 +1,127 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; + +public ref class MessageWaiter : IAsyncResult +{ +private: + // Receive() or WaitForMessage() + bool consuming; + bool consumed; + bool timedOut; + bool async; + // has an owner thread + bool assigned; + // can Run (i.e. earlier MessageWaiters in the queue have completed) + bool activated; + // is making a call to internalWaitForMessage() which (usually) blocks + bool blocking; + // the timeout timer thread is lurking + bool processingTimeout; + // the saved exception from within Run() for async delivery + System::Exception^ runException; + AsyncCallback^ asyncCallback; + Threading::Timer ^timer; + bool isCompleted; + bool completedSynchronously; + Object^ state; + Object^ thisLock; + ManualResetEvent^ asyncWaitHandle; + InputLink^ parent; + static void TimeoutCallback(Object^ state); + static TimerCallback^ timeoutCallback = gcnew TimerCallback(MessageWaiter::TimeoutCallback); + IntPtr message; + !MessageWaiter(); + ~MessageWaiter(); + + internal: + MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state); + + void Run(); + bool AcceptForWork(); + void Activate(); + void WaitForCompletion(); + +// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; } + + property IntPtr Message { + IntPtr get () { + if (!consuming || consumed) + throw gcnew InvalidOperationException("Message property"); + consumed = true; + IntPtr v = message; + message = IntPtr::Zero; + GC::SuppressFinalize(this); + return v; + } + // void set (IntPtr v) { message = v; } + } + + property bool Assigned { + bool get () { return assigned; } + } + + property bool TimedOut { + bool get () { return timedOut; } + } + + + property System::Exception^ RunException { + System::Exception^ get() { return runException; } + } + + public: + + virtual property bool IsCompleted { + bool get () { return isCompleted; } + } + + virtual property bool CompletedSynchronously { + bool get () { return completedSynchronously; } + } + + virtual property WaitHandle^ AsyncWaitHandle { + WaitHandle^ get () { + if (asyncWaitHandle != nullptr) { + return asyncWaitHandle; + } + + msclr::lock l(thisLock); + if (asyncWaitHandle == nullptr) { + asyncWaitHandle = gcnew ManualResetEvent(isCompleted); + } + return asyncWaitHandle; + } + } + + virtual property Object^ AsyncState { + Object^ get () { return state; } + } +}; + +}}} // namespace Apache::Qpid::Interop + diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp new file mode 100644 index 0000000000..27725b8207 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp @@ -0,0 +1,251 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" + + +#include "AmqpSession.h" +#include "AmqpMessage.h" +#include "OutputLink.h" +#include "QpidMarshal.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + +using namespace Apache::Qpid::AmqpTypes; + + +OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) : + amqpSession(session), + queue(defaultQueue), + disposed(false), + maxFrameSize(session->Connection->MaxFrameSize), + finalizing(false) +{ +} + +void OutputLink::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + amqpSession->NotifyClosed(); +} + +OutputLink::~OutputLink() +{ + Cleanup(); +} + +OutputLink::!OutputLink() +{ + Cleanup(); +} + +void OutputLink::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + + +AmqpMessage^ OutputLink::CreateMessage() +{ + MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize); + AmqpMessage ^amqpm = gcnew AmqpMessage(mbody); + return amqpm; +} + + +void OutputLink::ManagedToNative(AmqpMessage^ m) +{ + MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream; + + AmqpProperties^ mprops = m->Properties; + + if (mprops != nullptr) { + AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer(); + + if (mprops->HasDeliveryProperties) { + DeliveryProperties* deliveryPropertiesp = bodyp->get<DeliveryProperties>(true); + + if (mprops->RoutingKey != nullptr) { + deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey)); + } + + if (mprops->Durable) { + deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT); + } + + if (mprops->TimeToLive.HasValue) { + long long ttl = mprops->TimeToLive.Value.Ticks; + bool was_positive = (ttl > 0); + if (ttl < 0) + ttl = 0; + ttl = ttl / TimeSpan::TicksPerMillisecond; + if ((ttl == 0) && was_positive) + ttl = 1; + deliveryPropertiesp->setTtl(ttl); + } + } + + if (mprops->HasMessageProperties) { + qpid::framing::MessageProperties* messagePropertiesp = + bodyp->get<qpid::framing::MessageProperties>(true); + + String^ replyToExchange = mprops->ReplyToExchange; + String^ replyToRoutingKey = mprops->ReplyToRoutingKey; + if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) { + qpid::framing::ReplyTo nReplyTo; + if (replyToExchange != nullptr) { + nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange)); + } + if (replyToRoutingKey != nullptr) { + nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey)); + } + messagePropertiesp->setReplyTo(nReplyTo); + } + + // TODO: properly split 1.0 style to 0-10 content type + encoding + + String^ contentType = mprops->ContentType; + if (contentType != nullptr) { + String^ type = nullptr; + String^ enc = nullptr; + int idx = contentType->IndexOf(';'); + if (idx == -1) { + type = contentType; + } + else { + type = contentType->Substring(0, idx); + contentType = contentType->Substring(idx + 1); + idx = contentType->IndexOf('='); + if (idx != -1) { + enc = contentType->Substring(idx + 1); + enc = enc->Trim(); + } + } + if (!String::IsNullOrEmpty(type)) { + messagePropertiesp->setContentType(QpidMarshal::ToNative(type)); + } + if (!String::IsNullOrEmpty(enc)) { + messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc)); + } + } + + + array<unsigned char>^ mbytes = mprops->CorrelationId; + if (mbytes != nullptr) { + pin_ptr<unsigned char> pinnedBuf = &mbytes[0]; + std::string s((char *) pinnedBuf, mbytes->Length); + messagePropertiesp->setCorrelationId(s); + } + + mbytes = mprops->UserId; + if (mbytes != nullptr) { + pin_ptr<unsigned char> pinnedBuf = &mbytes[0]; + std::string s((char *) pinnedBuf, mbytes->Length); + messagePropertiesp->setUserId(s); + } + + if (mprops->HasMappedProperties) { + qpid::framing::FieldTable fieldTable; + // TODO: add support for abitrary AMQP types + for each (Collections::Generic::KeyValuePair<System::String^, AmqpType^> kvp in mprops->PropertyMap) { + Type^ type = kvp.Value->GetType(); + if (type == AmqpInt::typeid) { + fieldTable.setInt(QpidMarshal::ToNative(kvp.Key), + ((AmqpInt ^) kvp.Value)->Value); + } + else if (type == AmqpString::typeid) { + AmqpString^ str = (AmqpString ^) kvp.Value; + // For now, FieldTable supports a single string type + fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value)); + } + } + + messagePropertiesp->setApplicationHeaders(fieldTable); + } + } + } +} + + + +void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout) +{ + // copy properties from managed space to the native counterparts + ManagedToNative(amqpMessage); + + MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; + CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr); + + if (waiter != nullptr) { + waiter->WaitForCompletion(); + if (waiter->TimedOut) { + throw gcnew TimeoutException("Receive"); + } + } + // else: SendMessage() has already waited for the Completion + +} + +IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + ManagedToNative(amqpMessage); + + MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; + CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state); + return waiter; +} + +void OutputLink::EndSend(IAsyncResult^ result) +{ + CompletionWaiter^ waiter = (CompletionWaiter ^) result; + waiter->WaitForCompletion(); + if (waiter->TimedOut) { + throw gcnew TimeoutException("Receive"); + } +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h new file mode 100644 index 0000000000..1f049a7412 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h @@ -0,0 +1,64 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + + +public ref class OutputLink +{ +private: + AmqpSession^ amqpSession; + String^ queue; + bool disposed; + bool finalizing; + void Cleanup(); + AmqpTypes::AmqpProperties^ defaultProperties; + void ManagedToNative(AmqpMessage^ m); + int maxFrameSize; + +internal: + OutputLink(AmqpSession^ session, String^ defaultQueue); + +public: + ~OutputLink(); + !OutputLink(); + void Close(); + AmqpMessage^ CreateMessage(); + void Send(AmqpMessage^ m, TimeSpan timeout); + IAsyncResult^ BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state); + void EndSend(IAsyncResult^ result); + + property AmqpTypes::AmqpProperties^ DefaultProperties { + AmqpTypes::AmqpProperties^ get () { return defaultProperties; } + void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; } + } +}; + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h new file mode 100644 index 0000000000..91677a5e73 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; + +public ref class QpidException : System::Exception +{ + public: + + QpidException() : System::Exception() {} + QpidException(String^ estring) : System::Exception(estring) {} + +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h new file mode 100644 index 0000000000..3e22af7b39 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Text; + + +// Helper functions for marshaling. + +private ref class QpidMarshal +{ + public: + + // marshal_as<T> not available in all Visual Studio editions. + + static std::string ToNative (System::String^ managed) { + if (managed->Length == 0) { + return std::string(); + } + array<unsigned char>^ mbytes = Encoding::UTF8->GetBytes(managed); + if (mbytes->Length == 0) { + return std::string(); + } + + pin_ptr<unsigned char> pinnedBuf = &mbytes[0]; + std::string native((char *) pinnedBuf, mbytes->Length); + return native; + } +}; + +}}} // namespace Apache::Qpid::Interop |