summaryrefslogtreecommitdiff
path: root/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp')
-rw-r--r--trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp251
1 files changed, 0 insertions, 251 deletions
diff --git a/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
deleted file mode 100644
index 27725b8207..0000000000
--- a/trunk/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-#include <windows.h>
-#include <msclr\lock.h>
-
-#include "qpid/client/AsyncSession.h"
-#include "qpid/framing/FrameSet.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-
-
-#include "AmqpSession.h"
-#include "AmqpMessage.h"
-#include "OutputLink.h"
-#include "QpidMarshal.h"
-
-namespace Apache {
-namespace Qpid {
-namespace Interop {
-
-using namespace System;
-using namespace System::Runtime::InteropServices;
-using namespace System::Threading;
-using namespace msclr;
-
-using namespace qpid::client;
-using namespace std;
-
-using namespace Apache::Qpid::AmqpTypes;
-
-
-OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) :
- amqpSession(session),
- queue(defaultQueue),
- disposed(false),
- maxFrameSize(session->Connection->MaxFrameSize),
- finalizing(false)
-{
-}
-
-void OutputLink::Cleanup()
-{
- {
- lock l(this);
- if (disposed)
- return;
-
- disposed = true;
- }
-
- amqpSession->NotifyClosed();
-}
-
-OutputLink::~OutputLink()
-{
- Cleanup();
-}
-
-OutputLink::!OutputLink()
-{
- Cleanup();
-}
-
-void OutputLink::Close()
-{
- // Simulate Dispose()...
- Cleanup();
- GC::SuppressFinalize(this);
-}
-
-
-AmqpMessage^ OutputLink::CreateMessage()
-{
- MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize);
- AmqpMessage ^amqpm = gcnew AmqpMessage(mbody);
- return amqpm;
-}
-
-
-void OutputLink::ManagedToNative(AmqpMessage^ m)
-{
- MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream;
-
- AmqpProperties^ mprops = m->Properties;
-
- if (mprops != nullptr) {
- AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer();
-
- if (mprops->HasDeliveryProperties) {
- DeliveryProperties* deliveryPropertiesp = bodyp->get<DeliveryProperties>(true);
-
- if (mprops->RoutingKey != nullptr) {
- deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey));
- }
-
- if (mprops->Durable) {
- deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT);
- }
-
- if (mprops->TimeToLive.HasValue) {
- long long ttl = mprops->TimeToLive.Value.Ticks;
- bool was_positive = (ttl > 0);
- if (ttl < 0)
- ttl = 0;
- ttl = ttl / TimeSpan::TicksPerMillisecond;
- if ((ttl == 0) && was_positive)
- ttl = 1;
- deliveryPropertiesp->setTtl(ttl);
- }
- }
-
- if (mprops->HasMessageProperties) {
- qpid::framing::MessageProperties* messagePropertiesp =
- bodyp->get<qpid::framing::MessageProperties>(true);
-
- String^ replyToExchange = mprops->ReplyToExchange;
- String^ replyToRoutingKey = mprops->ReplyToRoutingKey;
- if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) {
- qpid::framing::ReplyTo nReplyTo;
- if (replyToExchange != nullptr) {
- nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange));
- }
- if (replyToRoutingKey != nullptr) {
- nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey));
- }
- messagePropertiesp->setReplyTo(nReplyTo);
- }
-
- // TODO: properly split 1.0 style to 0-10 content type + encoding
-
- String^ contentType = mprops->ContentType;
- if (contentType != nullptr) {
- String^ type = nullptr;
- String^ enc = nullptr;
- int idx = contentType->IndexOf(';');
- if (idx == -1) {
- type = contentType;
- }
- else {
- type = contentType->Substring(0, idx);
- contentType = contentType->Substring(idx + 1);
- idx = contentType->IndexOf('=');
- if (idx != -1) {
- enc = contentType->Substring(idx + 1);
- enc = enc->Trim();
- }
- }
- if (!String::IsNullOrEmpty(type)) {
- messagePropertiesp->setContentType(QpidMarshal::ToNative(type));
- }
- if (!String::IsNullOrEmpty(enc)) {
- messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc));
- }
- }
-
-
- array<unsigned char>^ mbytes = mprops->CorrelationId;
- if (mbytes != nullptr) {
- pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
- std::string s((char *) pinnedBuf, mbytes->Length);
- messagePropertiesp->setCorrelationId(s);
- }
-
- mbytes = mprops->UserId;
- if (mbytes != nullptr) {
- pin_ptr<unsigned char> pinnedBuf = &mbytes[0];
- std::string s((char *) pinnedBuf, mbytes->Length);
- messagePropertiesp->setUserId(s);
- }
-
- if (mprops->HasMappedProperties) {
- qpid::framing::FieldTable fieldTable;
- // TODO: add support for abitrary AMQP types
- for each (Collections::Generic::KeyValuePair<System::String^, AmqpType^> kvp in mprops->PropertyMap) {
- Type^ type = kvp.Value->GetType();
- if (type == AmqpInt::typeid) {
- fieldTable.setInt(QpidMarshal::ToNative(kvp.Key),
- ((AmqpInt ^) kvp.Value)->Value);
- }
- else if (type == AmqpString::typeid) {
- AmqpString^ str = (AmqpString ^) kvp.Value;
- // For now, FieldTable supports a single string type
- fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value));
- }
- }
-
- messagePropertiesp->setApplicationHeaders(fieldTable);
- }
- }
- }
-}
-
-
-
-void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout)
-{
- // copy properties from managed space to the native counterparts
- ManagedToNative(amqpMessage);
-
- MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
- CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr);
-
- if (waiter != nullptr) {
- waiter->WaitForCompletion();
- if (waiter->TimedOut) {
- throw gcnew TimeoutException("Receive");
- }
- }
- // else: SendMessage() has already waited for the Completion
-
-}
-
-IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state)
-{
- ManagedToNative(amqpMessage);
-
- MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream;
- CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state);
- return waiter;
-}
-
-void OutputLink::EndSend(IAsyncResult^ result)
-{
- CompletionWaiter^ waiter = (CompletionWaiter ^) result;
- waiter->WaitForCompletion();
- if (waiter->TimedOut) {
- throw gcnew TimeoutException("Receive");
- }
-}
-
-
-}}} // namespace Apache::Qpid::Interop