/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * This program parses strings of the form "node/subject;{options}" as * used in the Qpid messaging API. It provides basic wiring * capabilities to create/delete temporary queues (to topic * subsciptions) and unbound "point and shoot" queues. */ #include #include #include #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/client/Connection.h" #include "qpid/client/SessionImpl.h" #include "qpid/client/SessionBase_0_10Access.h" #include "qpid/client/Message.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/Future.h" #include "AmqpConnection.h" #include "AmqpSession.h" #include "AmqpMessage.h" #include "MessageBodyStream.h" #include "InputLink.h" #include "OutputLink.h" #include "QpidMarshal.h" #include "QpidException.h" #include "QpidAddress.h" namespace Apache { namespace Qpid { namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; using namespace msclr; using namespace qpid::client; using namespace std; QpidAddress::QpidAddress(String^ s, bool isInput) { address = s; nodeName = s; isInputChannel = isInput; isQueue = true; if (address->StartsWith("//")) { // special case old style address to default exchange, // no options, output only if ((s->IndexOf(';') != -1) || isInputChannel) throw gcnew ArgumentException("Invalid 0-10 address: " + address); nodeName = nodeName->Substring(2); return; } String^ options = nullptr; int pos = s->IndexOf(';'); if (pos != -1) { options = s->Substring(pos + 1); nodeName = s->Substring(0, pos); if (options->Length > 0) { if (!options->StartsWith("{") || !options->EndsWith("}")) throw gcnew ArgumentException("Invalid address: " + address); options = options->Substring(1, options->Length - 2); array^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries); if ((subOpts->Length % 2) != 0) throw gcnew ArgumentException("Bad address (options): " + address); for (int i=0; i < subOpts->Length; i += 2) { String^ opt = subOpts[i]; String^ optArg = subOpts[i+1]; if (opt->Equals("create")) { creating = PolicyApplies(optArg); } else if (opt->Equals("delete")) { deleting = PolicyApplies(optArg); } else if (opt->Equals("mode")) { if (optArg->Equals("browse")) { browsing = isInputChannel; } else if (!optArg->Equals("consume")) { throw gcnew ArgumentException("Invalid browsing option: " + optArg); } } else if (opt->Equals("assert") || opt->Equals("node")) { throw gcnew ArgumentException("Unsupported address option: " + opt); } else { throw gcnew ArgumentException("Bad address option: " + opt); } } } else options = nullptr; } pos = nodeName->IndexOf('/'); if (pos != -1) { subject = nodeName->Substring(pos + 1); if (String::IsNullOrEmpty(subject)) subject = nullptr; nodeName = nodeName->Substring(0, pos); } } QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) { QpidAddress^ addr = gcnew QpidAddress(s, isInput); return addr; } void QpidAddress::ResolveLink(AmqpSession^ amqpSession) { AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); if (asyncSessionp == NULL) throw gcnew ObjectDisposedException("session"); deleteName = nullptr; isQueue = true; try { Session session = sync(*asyncSessionp); std::string n_name = QpidMarshal::ToNative(nodeName); ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name); bool queueFound = !result.getQueueNotFound(); bool exchangeFound = !result.getExchangeNotFound(); if (isInputChannel) { if (queueFound) { linkName = nodeName; if (deleting) deleteName = nodeName; } else if (exchangeFound) { isQueue = false; String^ tmpkey = nullptr; String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString(); bool haveSubject = !String::IsNullOrEmpty(subject); FieldTable bindArgs; std::string exchangeType = session.exchangeQuery(n_name).getType(); if (exchangeType == "topic") { tmpkey = haveSubject ? subject : "#"; } else if (exchangeType == "fanout") { tmpkey = tmpname; } else if (exchangeType == "headers") { tmpkey = haveSubject ? subject : "match-all"; if (haveSubject) bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject)); bindArgs.setString("x-match", "all"); } else if (exchangeType == "xml") { tmpkey = haveSubject ? subject : ""; if (haveSubject) { String^ v = "declare variable $qpid.subject external; $qpid.subject = '" + subject + "'"; bindArgs.setString("xquery", QpidMarshal::ToNative(v)); } else bindArgs.setString("xquery", "true()"); } else { tmpkey = haveSubject ? subject : ""; } std::string qn = QpidMarshal::ToNative(tmpname); session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true); bool success = false; try { session.exchangeBind(arg::exchange=n_name, arg::queue=qn, arg::bindingKey=QpidMarshal::ToNative(tmpkey), arg::arguments=bindArgs); bindKey = tmpkey; // remember for later cleanup success = true; } finally { if (!success) session.queueDelete(arg::queue=qn); } linkName = tmpname; deleteName = tmpname; deleting = true; } else if (creating) { // only create "point and shoot" queues for now session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); // leave unbound linkName = nodeName; if (deleting) deleteName = nodeName; } else { throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); } } else { // Output channel bool oldStyleUri = address->StartsWith("//"); if (queueFound) { linkName = ""; // default exchange for point and shoot routingKey = nodeName; if (deleting) deleteName = nodeName; } else if (exchangeFound && !oldStyleUri) { isQueue = false; linkName = nodeName; routingKey = subject; } else if (creating) { // only create "point and shoot" queues for now session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); // leave unbound linkName = ""; routingKey = nodeName; if (deleting) deleteName = nodeName; } else { throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); } } } finally { amqpSession->ReturnNativeSession(); } } void QpidAddress::CleanupLink(AmqpSession^ amqpSession) { if (deleteName == nullptr) return; AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); if (asyncSessionp == NULL) { // TODO: log it: can't undo tear down actions return; } try { Session session = sync(*asyncSessionp); std::string q = QpidMarshal::ToNative(deleteName); if (isInputChannel && !isQueue) { // undo the temp wiring to the topic session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q, arg::bindingKey=QpidMarshal::ToNative(bindKey)); } session.queueDelete(q); } catch (Exception^ e) { // TODO: log it } finally { amqpSession->ReturnNativeSession(); } } bool QpidAddress::PolicyApplies(String^ mode) { if (mode->Equals("always")) return true; if (mode->Equals("sender")) return !isInputChannel; if (mode->Equals("receiver")) return isInputChannel; if (mode->Equals("never")) return false; throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address)); } }}} // namespace Apache::Qpid::Interop