/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include "qpid/client/AsyncSession.h" #include "qpid/framing/FrameSet.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/MessageListener.h" #include "AmqpSession.h" #include "AmqpMessage.h" #include "OutputLink.h" #include "QpidMarshal.h" namespace Apache { namespace Qpid { namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; using namespace System::Threading; using namespace msclr; using namespace qpid::client; using namespace std; using namespace Apache::Qpid::AmqpTypes; OutputLink::OutputLink(AmqpSession^ session, String^ address) : amqpSession(session), disposed(false), maxFrameSize(session->Connection->MaxFrameSize), finalizing(false) { qpidAddress = QpidAddress::CreateAddress(address, false); qpidAddress->ResolveLink(session); } void OutputLink::Cleanup() { { lock l(this); if (disposed) return; disposed = true; } // process any pending queue delete qpidAddress->CleanupLink(amqpSession); amqpSession->NotifyClosed(); } OutputLink::~OutputLink() { Cleanup(); } OutputLink::!OutputLink() { Cleanup(); } void OutputLink::Close() { // Simulate Dispose()... Cleanup(); GC::SuppressFinalize(this); } AmqpMessage^ OutputLink::CreateMessage() { MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize); AmqpMessage ^amqpm = gcnew AmqpMessage(mbody); return amqpm; } void OutputLink::ManagedToNative(AmqpMessage^ m) { MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream; AmqpProperties^ mprops = m->Properties; if (mprops != nullptr) { AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer(); if (mprops->HasDeliveryProperties) { DeliveryProperties* deliveryPropertiesp = bodyp->get(true); if (mprops->RoutingKey != nullptr) { deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey)); } if (mprops->Durable) { deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT); } if (mprops->TimeToLive.HasValue) { long long ttl = mprops->TimeToLive.Value.Ticks; bool was_positive = (ttl > 0); if (ttl < 0) ttl = 0; ttl = ttl / TimeSpan::TicksPerMillisecond; if ((ttl == 0) && was_positive) ttl = 1; deliveryPropertiesp->setTtl(ttl); } } if (mprops->HasMessageProperties) { qpid::framing::MessageProperties* messagePropertiesp = bodyp->get(true); String^ replyToExchange = mprops->ReplyToExchange; String^ replyToRoutingKey = mprops->ReplyToRoutingKey; if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) { qpid::framing::ReplyTo nReplyTo; if (replyToExchange != nullptr) { nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange)); } if (replyToRoutingKey != nullptr) { nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey)); } messagePropertiesp->setReplyTo(nReplyTo); } // TODO: properly split 1.0 style to 0-10 content type + encoding String^ contentType = mprops->ContentType; if (contentType != nullptr) { String^ type = nullptr; String^ enc = nullptr; int idx = contentType->IndexOf(';'); if (idx == -1) { type = contentType; } else { type = contentType->Substring(0, idx); contentType = contentType->Substring(idx + 1); idx = contentType->IndexOf('='); if (idx != -1) { enc = contentType->Substring(idx + 1); enc = enc->Trim(); } } if (!String::IsNullOrEmpty(type)) { messagePropertiesp->setContentType(QpidMarshal::ToNative(type)); } if (!String::IsNullOrEmpty(enc)) { messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc)); } } array^ mbytes = mprops->CorrelationId; if (mbytes != nullptr) { pin_ptr pinnedBuf = &mbytes[0]; std::string s((char *) pinnedBuf, mbytes->Length); messagePropertiesp->setCorrelationId(s); } mbytes = mprops->UserId; if (mbytes != nullptr) { pin_ptr pinnedBuf = &mbytes[0]; std::string s((char *) pinnedBuf, mbytes->Length); messagePropertiesp->setUserId(s); } if (mprops->HasMappedProperties) { qpid::framing::FieldTable fieldTable; // TODO: add support for abitrary AMQP types for each (Collections::Generic::KeyValuePair kvp in mprops->PropertyMap) { Type^ type = kvp.Value->GetType(); if (type == AmqpInt::typeid) { fieldTable.setInt(QpidMarshal::ToNative(kvp.Key), ((AmqpInt ^) kvp.Value)->Value); } else if (type == AmqpString::typeid) { AmqpString^ str = (AmqpString ^) kvp.Value; // For now, FieldTable supports a single string type fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value)); } } messagePropertiesp->setApplicationHeaders(fieldTable); } } } } void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout) { // copy properties from managed space to the native counterparts ManagedToNative(amqpMessage); MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, false, nullptr, nullptr); if (waiter != nullptr) { waiter->WaitForCompletion(); if (waiter->TimedOut) { throw gcnew TimeoutException("Receive"); } } // else: SendMessage() has already waited for the Completion } IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state) { ManagedToNative(amqpMessage); MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state); return waiter; } void OutputLink::EndSend(IAsyncResult^ result) { CompletionWaiter^ waiter = (CompletionWaiter ^) result; waiter->WaitForCompletion(); if (waiter->TimedOut) { throw gcnew TimeoutException("Receive"); } } }}} // namespace Apache::Qpid::Interop