diff options
author | Clifford Allan Jansen <cliffjansen@apache.org> | 2010-10-24 23:29:37 +0000 |
---|---|---|
committer | Clifford Allan Jansen <cliffjansen@apache.org> | 2010-10-24 23:29:37 +0000 |
commit | 0286ee3221df7b3f1a61f54ff55f9ae06ae2adae (patch) | |
tree | 8247dc2f83ef66f797b16301fe2fb5a16cf04f46 | |
parent | 3b6ae6bbdc08e20e985ef17a5705558de6e831ef (diff) | |
download | qpid-python-0286ee3221df7b3f1a61f54ff55f9ae06ae2adae.tar.gz |
QPID-2646 patches
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1026915 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | wcf/samples/Integration/Drain/Drain.cs | 146 | ||||
-rw-r--r-- | wcf/samples/Integration/Drain/Drain.csproj | 80 | ||||
-rw-r--r-- | wcf/samples/Integration/Integration.sln | 46 | ||||
-rw-r--r-- | wcf/samples/Integration/Spout/Spout.cs | 109 | ||||
-rw-r--r-- | wcf/samples/Integration/Spout/Spout.csproj | 81 | ||||
-rw-r--r-- | wcf/samples/Integration/Util/Options.cs | 157 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs | 23 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs | 113 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Channel/Channel.csproj | 1 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/AmqpSession.cpp | 29 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/AmqpSession.h | 6 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/InputLink.cpp | 20 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/InputLink.h | 8 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/Interop.vcproj | 8 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/OutputLink.cpp | 12 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/OutputLink.h | 13 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/QpidAddress.cpp | 304 | ||||
-rw-r--r-- | wcf/src/Apache/Qpid/Interop/QpidAddress.h | 89 |
18 files changed, 1191 insertions, 54 deletions
diff --git a/wcf/samples/Integration/Drain/Drain.cs b/wcf/samples/Integration/Drain/Drain.cs new file mode 100644 index 0000000000..7a88494458 --- /dev/null +++ b/wcf/samples/Integration/Drain/Drain.cs @@ -0,0 +1,146 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Text;
+ using System.Xml;
+ using Apache.Qpid.Channel;
+ using Apache.Qpid.AmqpTypes;
+
+ class Drain
+ {
+ // delimit multiple values
+ private static void Append(StringBuilder sb, string s)
+ {
+ if (sb.Length > 0)
+ {
+ sb.Append(", ");
+ }
+
+ sb.Append(s);
+ }
+
+ private static string MessagePropertiesAsString(AmqpProperties props)
+ {
+ StringBuilder sb = new StringBuilder();
+
+ if (props.PropertyMap != null)
+ {
+ foreach (KeyValuePair<string, AmqpType> kvp in props.PropertyMap)
+ {
+ string propval;
+ if (kvp.Value is AmqpString)
+ {
+ AmqpString amqps = (AmqpString)kvp.Value;
+ propval = amqps.Value;
+ }
+ else
+ {
+ propval = kvp.Value.ToString();
+ }
+
+ Append(sb, kvp.Key + ":" + propval);
+ }
+ }
+
+ return sb.ToString();
+ }
+
+ private static string MessageContentAsString(Message msg, AmqpProperties props)
+ {
+ // AmqpBinaryBinding provides message content as a single XML "Binary" element
+ XmlDictionaryReader reader = msg.GetReaderAtBodyContents();
+ while (!reader.HasValue)
+ {
+ reader.Read();
+ if (reader.EOF)
+ {
+ throw new InvalidDataException("empty reader for message");
+ }
+ }
+
+ byte[] rawdata = reader.ReadContentAsBase64();
+
+ string ct = props.ContentType;
+ if (ct != null)
+ {
+ if (ct.Equals("amqp/map"))
+ {
+ return "mapdata (coming soon)";
+ }
+ }
+
+ return Encoding.UTF8.GetString(rawdata);
+ }
+
+ static void Main(string[] args)
+ {
+ try
+ {
+ Options options = new Options(args);
+
+ AmqpBinaryBinding binding = new AmqpBinaryBinding();
+ binding.BrokerHost = options.Broker;
+ binding.BrokerPort = options.Port;
+ binding.TransferMode = TransferMode.Streamed;
+
+ IChannelFactory<IInputChannel> factory = binding.BuildChannelFactory<IInputChannel>();
+
+ factory.Open();
+ try
+ {
+ System.ServiceModel.EndpointAddress addr = options.Address;
+ IInputChannel receiver = factory.CreateChannel(addr);
+ receiver.Open();
+
+ TimeSpan timeout = options.Timeout;
+ System.ServiceModel.Channels.Message message;
+
+ while (receiver.TryReceive(timeout, out message))
+ {
+ AmqpProperties props = (AmqpProperties)message.Properties["AmqpProperties"];
+
+ Console.WriteLine("Message(properties=" +
+ MessagePropertiesAsString(props) +
+ ", content='" +
+ MessageContentAsString(message, props) +
+ "')");
+ }
+ }
+ finally
+ {
+ factory.Close();
+ }
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Drain: " + e);
+ }
+ }
+ }
+}
diff --git a/wcf/samples/Integration/Drain/Drain.csproj b/wcf/samples/Integration/Drain/Drain.csproj new file mode 100644 index 0000000000..06c32f5064 --- /dev/null +++ b/wcf/samples/Integration/Drain/Drain.csproj @@ -0,0 +1,80 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{A67D9B60-34A5-462F-84A2-72C22F623749}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <RootNamespace>Drain</RootNamespace>
+ <AssemblyName>Drain</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="Apache.Qpid.Interop, Version=1.0.3796.12140, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=AMD64">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Drain.cs" />
+ <Compile Include="..\Util\Options.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
diff --git a/wcf/samples/Integration/Integration.sln b/wcf/samples/Integration/Integration.sln new file mode 100644 index 0000000000..59b228e92a --- /dev/null +++ b/wcf/samples/Integration/Integration.sln @@ -0,0 +1,46 @@ +
+Microsoft Visual Studio Solution File, Format Version 10.00
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+#
+# Visual Studio 2008
+
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Drain", "Drain\Drain.csproj", "{A67D9B60-34A5-462F-84A2-72C22F623749}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spout", "Spout\Spout.csproj", "{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A67D9B60-34A5-462F-84A2-72C22F623749}.Release|Any CPU.Build.0 = Release|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {347A531B-38DB-4848-9E4D-4E5E7F9C97E7}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+EndGlobal
diff --git a/wcf/samples/Integration/Spout/Spout.cs b/wcf/samples/Integration/Spout/Spout.cs new file mode 100644 index 0000000000..651566fbd6 --- /dev/null +++ b/wcf/samples/Integration/Spout/Spout.cs @@ -0,0 +1,109 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Text;
+ using System.Xml;
+ using Apache.Qpid.Channel;
+ using Apache.Qpid.AmqpTypes;
+
+ class Spout
+ {
+ static void Main(string[] args)
+ {
+ try
+ {
+ Options options = new Options(args);
+
+ AmqpBinaryBinding binding = new AmqpBinaryBinding();
+ binding.BrokerHost = options.Broker;
+ binding.BrokerPort = options.Port;
+ binding.TransferMode = TransferMode.Streamed;
+
+ IChannelFactory<IOutputChannel> factory = binding.BuildChannelFactory<IOutputChannel>();
+
+ factory.Open();
+ try
+ {
+ System.ServiceModel.EndpointAddress addr = options.Address;
+ IOutputChannel sender = factory.CreateChannel(addr);
+ sender.Open();
+
+ MyRawBodyWriter.Initialize(options.Content);
+ DateTime end = DateTime.Now.Add(options.Timeout);
+ System.ServiceModel.Channels.Message message;
+
+ for (int count = 0; ((count < options.Count) || (options.Count == 0)) &&
+ ((options.Timeout == TimeSpan.Zero) || (end.CompareTo(DateTime.Now) > 0)); count++)
+ {
+ message = Message.CreateMessage(MessageVersion.None, "", new MyRawBodyWriter());
+ AmqpProperties props = new AmqpProperties();
+ props.ContentType = "text/plain";
+
+ string id = Guid.NewGuid().ToString() + ":" + count;
+ props.PropertyMap.Add("spout-id", new AmqpString(id));
+
+ message.Properties["AmqpProperties"] = props;
+ sender.Send(message);
+ }
+ }
+ finally
+ {
+ factory.Close();
+ }
+ }
+ catch (Exception e)
+ {
+ Console.WriteLine("Spout: " + e);
+ }
+ }
+
+
+ public class MyRawBodyWriter : BodyWriter
+ {
+ static byte[] body;
+
+ public MyRawBodyWriter()
+ : base(false)
+ {
+ }
+
+ public static void Initialize(String content)
+ {
+ body = Encoding.UTF8.GetBytes(content);
+ }
+
+ // invoked by the binary encoder when the message is written
+ protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
+ {
+ writer.WriteStartElement("Binary");
+ writer.WriteBase64(body, 0, body.Length);
+ }
+ }
+ }
+}
diff --git a/wcf/samples/Integration/Spout/Spout.csproj b/wcf/samples/Integration/Spout/Spout.csproj new file mode 100644 index 0000000000..b104000ad2 --- /dev/null +++ b/wcf/samples/Integration/Spout/Spout.csproj @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{347A531B-38DB-4848-9E4D-4E5E7F9C97E7}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Spout</RootNamespace>
+ <AssemblyName>Spout</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="Apache.Qpid.Interop, Version=1.0.3796.12140, Culture=neutral, PublicKeyToken=679e1f50b62dbace, processorArchitecture=AMD64">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\..\..\src\Apache\Qpid\Channel\bin\Release\Apache.Qpid.Interop.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="..\Util\Options.cs" />
+ <Compile Include="Spout.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
diff --git a/wcf/samples/Integration/Util/Options.cs b/wcf/samples/Integration/Util/Options.cs new file mode 100644 index 0000000000..a929f8f2de --- /dev/null +++ b/wcf/samples/Integration/Util/Options.cs @@ -0,0 +1,157 @@ +/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Samples.Integration
+{
+ using System;
+ using System.Collections;
+ using System.Collections.Generic;
+ using System.Diagnostics;
+ using System.IO;
+ using System.ServiceModel;
+ using System.ServiceModel.Channels;
+ using System.ServiceModel.Description;
+ using System.Text;
+ using System.Xml;
+
+ public class Options
+ {
+ private string broker;
+ private int port;
+ private int messageCount;
+ private EndpointAddress address;
+ private TimeSpan timeout;
+ private string content;
+
+ public Options(string[] args)
+ {
+ this.broker = "127.0.0.1";
+ this.port = 5672;
+ this.messageCount = 1;
+ this.timeout = TimeSpan.FromSeconds(0);
+ Parse(args);
+ }
+
+ private void Parse(string[] args)
+ {
+ int argCount = args.Length;
+ int current = 0;
+ bool typeSelected = false;
+
+ while ((current + 1) < argCount)
+ {
+ string arg = args[current];
+ if (arg == "--count")
+ {
+ arg = args[++current];
+ int i = Int32.Parse(arg);
+ if (i >= 0)
+ {
+ this.messageCount = i;
+ }
+ }
+ else if (arg == "--broker")
+ {
+ this.broker = args[++current];
+ }
+ else if (arg == "--port")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.port = i;
+ }
+ }
+ else if (arg == "--timeout")
+ {
+ arg = args[++current];
+ int i = int.Parse(arg);
+ if (i > 0)
+ {
+ this.timeout = TimeSpan.FromSeconds(i);
+ }
+ }
+
+ else if (arg == "--content")
+ {
+ this.content = args[++current];
+ }
+
+ else
+ {
+ throw new ArgumentException(String.Format("unknown argument \"{0}\"", arg));
+ }
+
+ current++;
+ }
+
+ if (current == argCount)
+ {
+ throw new ArgumentException("missing argument: address");
+ }
+
+ address = new EndpointAddress("amqp:" + args[current]);
+
+ if (timeout < TimeSpan.FromMilliseconds(100))
+ {
+ // WCF timeout of 0 really means no time for even a single message transfer
+ timeout = TimeSpan.FromMilliseconds(100);
+ }
+ }
+
+ public EndpointAddress Address
+ {
+ get { return this.address; }
+ }
+
+ public string Broker
+ {
+ get { return this.broker; }
+ }
+
+ public string Content
+ {
+ get
+ {
+ if (content == null)
+ {
+ return String.Empty;
+ }
+ return content;
+ }
+ }
+
+
+ public int Count
+ {
+ get { return this.messageCount; }
+ }
+
+ public int Port
+ {
+ get { return this.port; }
+ }
+
+ public TimeSpan Timeout
+ {
+ get { return this.timeout; }
+ }
+ }
+}
diff --git a/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs b/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs index 0f649dcd36..4099571fe0 100644 --- a/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs +++ b/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs @@ -29,7 +29,7 @@ namespace Apache.Qpid.AmqpTypes // AMQP 0-10 delivery properties private bool durable; private Nullable<TimeSpan> timeToLive; - private string routingKey; + private string subject; // AMQP 0-10 message properties private string replyToExchange; @@ -50,7 +50,7 @@ namespace Apache.Qpid.AmqpTypes { get { - return ((this.routingKey != null) || this.durable || this.timeToLive.HasValue); + return ((this.subject != null) || this.durable || this.timeToLive.HasValue); } } @@ -163,10 +163,19 @@ namespace Apache.Qpid.AmqpTypes set { this.timeToLive = value; } } + /// <summary> + /// Obsolete: switch to AMQP 1.0 "Subject" naming + /// </summary> public string RoutingKey { - get { return this.routingKey; } - set { this.routingKey = value; } + get { return this.subject; } + set { this.subject = value; } + } + + public string Subject + { + get { return this.subject; } + set { this.subject = value; } } public string ReplyToExchange @@ -200,7 +209,7 @@ namespace Apache.Qpid.AmqpTypes public void Clear() { this.timeToLive = null; - this.routingKey = null; + this.subject = null; this.replyToRoutingKey = null; this.replyToExchange = null; this.durable = false; @@ -251,9 +260,9 @@ namespace Apache.Qpid.AmqpTypes this.replyToRoutingKey = other.replyToRoutingKey; } - if (other.routingKey != null) + if (other.subject != null) { - this.routingKey = other.routingKey; + this.subject = other.subject; } if (other.durable) diff --git a/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs index a6f6ee6800..6f0ffd9815 100644 --- a/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs +++ b/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel using System.Text; using System.Threading; using System.Globalization; + using System.Web; using System.Xml; // the thin interop layer that provides access to the Qpid AMQP client libraries @@ -52,11 +53,11 @@ namespace Apache.Qpid.Channel private bool shared; private int prefetchLimit; private string encoderContentType; + // AMQP subject/routing key + private string subject; + // Qpid addressing value for "qpid.subject" property + private string qpidSubject; - // input = 0-10 queue, output = 0-10 exchange - private string queueName; - - private String routingKey; private BufferManager bufferManager; private AmqpProperties outputMessageProperties; @@ -85,7 +86,7 @@ namespace Apache.Qpid.Channel this.remoteAddress = remoteAddress; // pull out host, port, queue, and connection arguments - this.ParseAmqpUri(remoteAddress.Uri); + string qpidAddress = this.UriToQpidAddress(remoteAddress.Uri, out subject); this.encoder = msgEncoder; string ct = String.Empty; @@ -129,12 +130,14 @@ namespace Apache.Qpid.Channel if (this.isInputChannel) { - this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, qpidAddress); this.inputLink.PrefetchLimit = this.prefetchLimit; } else { - this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, qpidAddress); + this.subject = this.outputLink.DefaultSubject; + this.qpidSubject = this.outputLink.QpidSubject; } } @@ -423,9 +426,14 @@ namespace Apache.Qpid.Channel outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties); } - if (this.routingKey != null) + if (this.subject != null) { - outgoingProperties.RoutingKey = this.routingKey; + outgoingProperties.RoutingKey = this.subject; + } + + if (this.qpidSubject != null) + { + outgoingProperties.PropertyMap["qpid.subject"] = new AmqpString(this.qpidSubject); } // Add the Properties set by the application on this particular message. @@ -544,8 +552,7 @@ namespace Apache.Qpid.Channel this.bufferManager.Clear(); } - // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key" - private void ParseAmqpUri(Uri uri) + private string UriToQpidAddress(Uri uri, out string subject) { if (uri.Scheme != AmqpConstants.Scheme) { @@ -553,43 +560,83 @@ namespace Apache.Qpid.Channel "The scheme {0} specified in address is not supported.", uri.Scheme), "uri"); } - this.queueName = uri.LocalPath; + subject = ""; + string path = uri.LocalPath; + string query = uri.Query; + + // legacy... convert old style myqueue?routingkey=key to myqueue/key - if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel) + if (query.Length > 0) { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri"); - } + if (!query.StartsWith("?")) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid query argument."), "uri"); + } - // search out session parameters in the query portion of the URI + string routingParseKey = "routingkey="; + string subjectParseKey = "subject="; + char[] charSeparators = new char[] { '?', ';' }; + string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); + foreach (string s in args) + { + if (s.StartsWith(routingParseKey)) + { + subject = s.Substring(routingParseKey.Length); + } + else if (s.StartsWith(subjectParseKey)) + { + subject = s.Substring(subjectParseKey.Length); + } + else + { + if (s.Length > 0) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid query argument {0}.", s), "uri"); + } + } + } - string routingParseKey = "routingkey="; - char[] charSeparators = new char[] { '?', ';' }; - string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); - foreach (string s in args) - { - if (s.StartsWith(routingParseKey)) + if (path.Contains("/")) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid queue name {0}.", path), "uri"); + } + + if (path.Length == 0) { - this.routingKey = s.Substring(routingParseKey.Length); + // special case, user wants default exchange + return "//" + subject; } + + return path + "/" + subject; } - if (this.queueName == String.Empty) + // find subject in "myqueue/mysubject;{mode:browse}" + int pos = path.IndexOf('/'); + if ((pos > -1) && (pos < path.Length + 1)) { - if (this.isInputChannel) + subject = path.Substring(pos); + pos = subject.IndexOf(';'); + if (pos == 0) { throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "Empty queue target specifier not allowed."), "uri"); + "Empty subject in address {0}.", path), "uri"); } - else + + if (pos > 0) { - if (this.routingKey == null) - { - throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, - "No target queue or routing key specified."), "uri"); - } + subject = subject.Substring(0, pos); } } + + if (subject.Length > 0) + { + subject = HttpUtility.UrlDecode(subject); + } + + return HttpUtility.UrlDecode(path); } } } diff --git a/wcf/src/Apache/Qpid/Channel/Channel.csproj b/wcf/src/Apache/Qpid/Channel/Channel.csproj index 6bb059daf6..1eb811b425 100644 --- a/wcf/src/Apache/Qpid/Channel/Channel.csproj +++ b/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -97,6 +97,7 @@ under the License. <RequiredTargetFramework>3.0</RequiredTargetFramework>
</Reference>
<Reference Include="System.Transactions" />
+ <Reference Include="System.Web" />
<Reference Include="System.XML" />
</ItemGroup>
<ItemGroup>
diff --git a/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp index d2adb41205..ac7c777d1f 100644 --- a/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp +++ b/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp @@ -357,17 +357,20 @@ bool AmqpSession::MessageStop(std::string &name) return true; } -void AmqpSession::AcceptAndComplete(SequenceSet& transfers) +void AmqpSession::AcceptAndComplete(SequenceSet& transfers, bool browsing) { lock l(sessionLock); - // delimit with session dtx commands depending on the transaction context - UpdateTransactionState(%l); + if (!browsing) { + // delimit with session dtx commands depending on the transaction context + UpdateTransactionState(%l); + } CheckOpen(); sessionp->markCompleted(transfers, false); - sessionp->messageAccept(transfers, false); + if (!browsing) + sessionp->messageAccept(transfers, false); } @@ -609,4 +612,22 @@ void AmqpSession::ReleaseCompletion(IntPtr completion) { delete completion.ToPointer(); } + +// Non-exclusive borrowing for a "brief" period. I.e. several synced +// commands (address resolution) + +IntPtr AmqpSession::BorrowNativeSession() { + lock l(sessionLock); + if (closing) + return IntPtr::Zero; + + IncrementSyncs(); + return (IntPtr) sessionp; +} + +void AmqpSession::ReturnNativeSession() { + lock l(sessionLock); + DecrementSyncs(); +} + }}} // namespace Apache::Qpid::Cli diff --git a/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/wcf/src/Apache/Qpid/Interop/AmqpSession.h index 88ffd18dcc..7a49496805 100644 --- a/wcf/src/Apache/Qpid/Interop/AmqpSession.h +++ b/wcf/src/Apache/Qpid/Interop/AmqpSession.h @@ -78,7 +78,7 @@ public: OutputLink^ CreateOutputLink(System::String^ targetQueue); InputLink^ CreateInputLink(System::String^ sourceQueue); - // 0-10 specific support + // 0-10 specific support; deprecated in favor of Qpid messaging addresses InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange); void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey); @@ -90,7 +90,7 @@ internal: void internalWaitForCompletion(IntPtr future); void removeWaiter(CompletionWaiter^ waiter); bool MessageStop(std::string &name); - void AcceptAndComplete(SequenceSet& transfers); + void AcceptAndComplete(SequenceSet& transfers, bool browsing); IntPtr BeginPhase0Flush(XaTransaction^); void EndPhase0Flush(XaTransaction^, IntPtr); IntPtr DtxStart(IntPtr xidp, bool, bool); @@ -98,6 +98,8 @@ internal: IntPtr DtxCommit(IntPtr xidp, bool onePhase); IntPtr DtxRollback(IntPtr xidp); void ReleaseCompletion(IntPtr completion); + IntPtr BorrowNativeSession(); + void ReturnNativeSession(); property AmqpConnection^ Connection { AmqpConnection^ get () { return connection; } diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/wcf/src/Apache/Qpid/Interop/InputLink.cpp index 3245cd3540..2b0119e338 100644 --- a/wcf/src/Apache/Qpid/Interop/InputLink.cpp +++ b/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -88,9 +88,12 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, waiters = gcnew Collections::Generic::List<MessageWaiter^>(); linkLock = waiters; // private and available subscriptionLock = gcnew Object(); + qpidAddress = QpidAddress::CreateAddress(sourceQueue, true); + qpidAddress->ResolveLink(session); + browsing = qpidAddress->Browsing; try { - std::string qname = QpidMarshal::ToNative(sourceQueue); + std::string qname = QpidMarshal::ToNative(qpidAddress->LinkName); if (temporary) { qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); @@ -104,6 +107,15 @@ InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, settings.flowControl = FlowControl::messageCredit(0); settings.completionMode = CompletionMode::MANUAL_COMPLETION; + if (browsing) { + settings.acquireMode = AcquireMode::ACQUIRE_MODE_NOT_ACQUIRED; + settings.acceptMode = AcceptMode::ACCEPT_MODE_NONE; + } + else { + settings.acquireMode = AcquireMode::ACQUIRE_MODE_PRE_ACQUIRED; + settings.acceptMode = AcceptMode::ACCEPT_MODE_EXPLICIT; + } + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup @@ -186,8 +198,10 @@ void InputLink::Cleanup() { ReleaseNative(); } - } + + // Now that subscription is torn down, we can execute pending delete on remote node + qpidAddress->CleanupLink(amqpSession); amqpSession->NotifyClosed(); } @@ -699,7 +713,7 @@ AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) // subscriptionp->accept(frameSetID) is a slow sync operation in the native API // so do it within the AsyncSession directly - amqpSession->AcceptAndComplete(frameSetID); + amqpSession->AcceptAndComplete(frameSetID, browsing); workingCredit--; // check if more messages need to be requested from broker diff --git a/wcf/src/Apache/Qpid/Interop/InputLink.h b/wcf/src/Apache/Qpid/Interop/InputLink.h index 2f96b91944..136d53d280 100644 --- a/wcf/src/Apache/Qpid/Interop/InputLink.h +++ b/wcf/src/Apache/Qpid/Interop/InputLink.h @@ -20,6 +20,7 @@ #pragma once #include "MessageWaiter.h" +#include "QpidAddress.h" namespace Apache { namespace Qpid { @@ -58,6 +59,9 @@ private: // working credit low water mark int minWorkingCredit; + bool browsing; + QpidAddress^ qpidAddress; + void Cleanup(); void ReleaseNative(); bool haveMessage(); @@ -97,6 +101,10 @@ public: void set (int value); } + property bool Browsing { + bool get () { return browsing; } + } + }; }}} // namespace Apache::Qpid::Interop diff --git a/wcf/src/Apache/Qpid/Interop/Interop.vcproj b/wcf/src/Apache/Qpid/Interop/Interop.vcproj index 2056c97d57..fe288cbe76 100644 --- a/wcf/src/Apache/Qpid/Interop/Interop.vcproj +++ b/wcf/src/Apache/Qpid/Interop/Interop.vcproj @@ -409,6 +409,10 @@ >
</File>
<File
+ RelativePath=".\QpidAddress.cpp"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.cpp"
>
</File>
@@ -455,6 +459,10 @@ >
</File>
<File
+ RelativePath=".\QpidAddress.h"
+ >
+ </File>
+ <File
RelativePath=".\InputLink.h"
>
</File>
diff --git a/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/wcf/src/Apache/Qpid/Interop/OutputLink.cpp index 27725b8207..de7141dadb 100644 --- a/wcf/src/Apache/Qpid/Interop/OutputLink.cpp +++ b/wcf/src/Apache/Qpid/Interop/OutputLink.cpp @@ -48,13 +48,14 @@ using namespace std; using namespace Apache::Qpid::AmqpTypes; -OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) : +OutputLink::OutputLink(AmqpSession^ session, String^ address) : amqpSession(session), - queue(defaultQueue), disposed(false), maxFrameSize(session->Connection->MaxFrameSize), finalizing(false) { + qpidAddress = QpidAddress::CreateAddress(address, false); + qpidAddress->ResolveLink(session); } void OutputLink::Cleanup() @@ -67,6 +68,8 @@ void OutputLink::Cleanup() disposed = true; } + // process any pending queue delete + qpidAddress->CleanupLink(amqpSession); amqpSession->NotifyClosed(); } @@ -217,7 +220,8 @@ void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout) ManagedToNative(amqpMessage); MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; - CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr); + CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, + timeout, false, nullptr, nullptr); if (waiter != nullptr) { waiter->WaitForCompletion(); @@ -234,7 +238,7 @@ IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, ManagedToNative(amqpMessage); MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; - CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state); + CompletionWaiter^ waiter = amqpSession->SendMessage(qpidAddress->LinkName, messageBodyStream, timeout, true, callback, state); return waiter; } diff --git a/wcf/src/Apache/Qpid/Interop/OutputLink.h b/wcf/src/Apache/Qpid/Interop/OutputLink.h index 1f049a7412..e30d1cc79f 100644 --- a/wcf/src/Apache/Qpid/Interop/OutputLink.h +++ b/wcf/src/Apache/Qpid/Interop/OutputLink.h @@ -19,6 +19,8 @@ #pragma once +#include "QpidAddress.h" + namespace Apache { namespace Qpid { namespace Interop { @@ -34,7 +36,7 @@ public ref class OutputLink { private: AmqpSession^ amqpSession; - String^ queue; + QpidAddress^ qpidAddress; bool disposed; bool finalizing; void Cleanup(); @@ -58,6 +60,15 @@ public: AmqpTypes::AmqpProperties^ get () { return defaultProperties; } void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; } } + + property String^ DefaultSubject { + String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->RoutingKey; } + } + + property String^ QpidSubject { + String^ get() { return (qpidAddress == nullptr) ? nullptr : qpidAddress->Subject; } + } + }; diff --git a/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp b/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp new file mode 100644 index 0000000000..bfae1ab313 --- /dev/null +++ b/wcf/src/Apache/Qpid/Interop/QpidAddress.cpp @@ -0,0 +1,304 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +/* + * This program parses strings of the form "node/subject;{options}" as + * used in the Qpid messaging API. It provides basic wiring + * capabilities to create/delete temporary queues (to topic + * subsciptions) and unbound "point and shoot" queues. + */ + + +#include <windows.h> +#include <msclr\lock.h> +#include <oletx2xa.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/Message.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/client/Future.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "AmqpMessage.h" +#include "MessageBodyStream.h" +#include "InputLink.h" +#include "OutputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" +#include "QpidAddress.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + +QpidAddress::QpidAddress(String^ s, bool isInput) { + address = s; + nodeName = s; + isInputChannel = isInput; + isQueue = true; + + if (address->StartsWith("//")) { + // special case old style address to default exchange, + // no options, output only + if ((s->IndexOf(';') != -1) || isInputChannel) + throw gcnew ArgumentException("Invalid 0-10 address: " + address); + nodeName = nodeName->Substring(2); + return; + } + + String^ options = nullptr; + int pos = s->IndexOf(';'); + if (pos != -1) { + options = s->Substring(pos + 1); + nodeName = s->Substring(0, pos); + + if (options->Length > 0) { + if (!options->StartsWith("{") || !options->EndsWith("}")) + throw gcnew ArgumentException("Invalid address: " + address); + options = options->Substring(1, options->Length - 2); + array<String^>^ subOpts = options->Split(String(",: ").ToCharArray(), StringSplitOptions::RemoveEmptyEntries); + + if ((subOpts->Length % 2) != 0) + throw gcnew ArgumentException("Bad address (options): " + address); + + for (int i=0; i < subOpts->Length; i += 2) { + String^ opt = subOpts[i]; + String^ optArg = subOpts[i+1]; + if (opt->Equals("create")) { + creating = PolicyApplies(optArg); + } + else if (opt->Equals("delete")) { + deleting = PolicyApplies(optArg); + } + else if (opt->Equals("mode")) { + if (optArg->Equals("browse")) { + browsing = isInputChannel; + } + else if (!optArg->Equals("consume")) { + throw gcnew ArgumentException("Invalid browsing option: " + optArg); + } + } + else if (opt->Equals("assert") || opt->Equals("node")) { + throw gcnew ArgumentException("Unsupported address option: " + opt); + } + else { + throw gcnew ArgumentException("Bad address option: " + opt); + } + } + } + else + options = nullptr; + } + + pos = nodeName->IndexOf('/'); + if (pos != -1) { + subject = nodeName->Substring(pos + 1); + if (String::IsNullOrEmpty(subject)) + subject = nullptr; + nodeName = nodeName->Substring(0, pos); + } +} + + +QpidAddress^ QpidAddress::CreateAddress(String^ s, bool isInput) { + QpidAddress^ addr = gcnew QpidAddress(s, isInput); + return addr; +} + + +void QpidAddress::ResolveLink(AmqpSession^ amqpSession) { + + AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); + if (asyncSessionp == NULL) + throw gcnew ObjectDisposedException("session"); + + deleteName = nullptr; + isQueue = true; + + try { + Session session = sync(*asyncSessionp); + std::string n_name = QpidMarshal::ToNative(nodeName); + ExchangeBoundResult result = session.exchangeBound(arg::exchange=n_name, arg::queue=n_name); + + bool queueFound = !result.getQueueNotFound(); + bool exchangeFound = !result.getExchangeNotFound(); + + if (isInputChannel) { + + if (queueFound) { + linkName = nodeName; + if (deleting) + deleteName = nodeName; + } + else if (exchangeFound) { + isQueue = false; + String^ tmpkey = nullptr; + String^ tmpname = nodeName + "_" + Guid::NewGuid().ToString(); + bool haveSubject = !String::IsNullOrEmpty(subject); + FieldTable bindArgs; + + std::string exchangeType = session.exchangeQuery(n_name).getType(); + if (exchangeType == "topic") { + tmpkey = haveSubject ? subject : "#"; + } + else if (exchangeType == "fanout") { + tmpkey = tmpname; + } + else if (exchangeType == "headers") { + tmpkey = haveSubject ? subject : "match-all"; + if (haveSubject) + bindArgs.setString("qpid.subject", QpidMarshal::ToNative(subject)); + bindArgs.setString("x-match", "all"); + } + else if (exchangeType == "xml") { + tmpkey = haveSubject ? subject : ""; + if (haveSubject) { + String^ v = "declare variable $qpid.subject external; $qpid.subject = '" + + subject + "'"; + bindArgs.setString("xquery", QpidMarshal::ToNative(v)); + } + else + bindArgs.setString("xquery", "true()"); + } + else { + tmpkey = haveSubject ? subject : ""; + } + + std::string qn = QpidMarshal::ToNative(tmpname); + session.queueDeclare(arg::queue=qn, arg::autoDelete=true, arg::exclusive=true); + bool success = false; + try { + session.exchangeBind(arg::exchange=n_name, arg::queue=qn, + arg::bindingKey=QpidMarshal::ToNative(tmpkey), + arg::arguments=bindArgs); + bindKey = tmpkey; // remember for later cleanup + success = true; + } + finally { + if (!success) + session.queueDelete(arg::queue=qn); + } + linkName = tmpname; + deleteName = tmpname; + deleting = true; + } + else if (creating) { + // only create "point and shoot" queues for now + session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); + // leave unbound + + linkName = nodeName; + + if (deleting) + deleteName = nodeName; + } + else { + throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); + } + } + else { + // Output channel + + bool oldStyleUri = address->StartsWith("//"); + + if (queueFound) { + linkName = ""; // default exchange for point and shoot + routingKey = nodeName; + if (deleting) + deleteName = nodeName; + } + else if (exchangeFound && !oldStyleUri) { + isQueue = false; + linkName = nodeName; + routingKey = subject; + } + else if (creating) { + // only create "point and shoot" queues for now + session.queueDeclare(arg::queue=QpidMarshal::ToNative(nodeName)); + // leave unbound + linkName = ""; + routingKey = nodeName; + if (deleting) + deleteName = nodeName; + } + else { + throw gcnew ArgumentException("AMQP broker node not found: " + nodeName); + } + } + } + finally { + amqpSession->ReturnNativeSession(); + } +} + +void QpidAddress::CleanupLink(AmqpSession^ amqpSession) { + if (deleteName == nullptr) + return; + + AsyncSession* asyncSessionp = (AsyncSession *) amqpSession->BorrowNativeSession().ToPointer(); + if (asyncSessionp == NULL) { + // TODO: log it: can't undo tear down actions + return; + } + + try { + Session session = sync(*asyncSessionp); + std::string q = QpidMarshal::ToNative(deleteName); + if (isInputChannel && !isQueue) { + // undo the temp wiring to the topic + session.exchangeUnbind(arg::exchange=QpidMarshal::ToNative(nodeName), arg::queue=q, + arg::bindingKey=QpidMarshal::ToNative(bindKey)); + } + session.queueDelete(q); + } + catch (Exception^ e) { + // TODO: log it + } + finally { + amqpSession->ReturnNativeSession(); + } +} + +bool QpidAddress::PolicyApplies(String^ mode) { + if (mode->Equals("always")) + return true; + if (mode->Equals("sender")) + return !isInputChannel; + if (mode->Equals("receiver")) + return isInputChannel; + if (mode->Equals("never")) + return false; + + throw gcnew ArgumentException(String::Format("Bad address option {0} for {1}", mode, address)); +} + +}}} // namespace Apache::Qpid::Interop diff --git a/wcf/src/Apache/Qpid/Interop/QpidAddress.h b/wcf/src/Apache/Qpid/Interop/QpidAddress.h new file mode 100644 index 0000000000..d24317c2aa --- /dev/null +++ b/wcf/src/Apache/Qpid/Interop/QpidAddress.h @@ -0,0 +1,89 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + + +public ref class QpidAddress +{ +private: + QpidAddress(String^ address, bool isInput); + + // the original Qpid messaging address string, with WCF uri sematics removed, and URL decoded + String^ address; + + String^ nodeName; + // "qpid.subject" + String^ subject; + // 0-10 routing key (Output channels only) + String^ routingKey; + + String^ linkName; + String^ deleteName; + String^ bindKey; + + // node type: queue/topic + bool isQueue; + + // direction + bool isInputChannel; + + bool creating; + bool deleting; + bool browsing; + + bool PolicyApplies(String^ mode); + +internal: + static QpidAddress^ CreateAddress(String ^s, bool isInput); + void ResolveLink(AmqpSession^ amqpSession); + void CleanupLink(AmqpSession^ amqpSession); + + property String^ LinkName { + String^ get () { return linkName; } + } + + property String^ Subject { + String^ get () { return subject; } + } + + property String^ RoutingKey { + String^ get () { return routingKey; } + } + + property bool Browsing { + bool get () { return browsing; } + } + +}; + +}}} // namespace Apache::Qpid::Interop |