diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-09-02 23:38:03 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-09-02 23:38:03 +0000 |
commit | d70a26aa665c7eebc7cc4d1bb91ce7928f3c7d89 (patch) | |
tree | 1ff32f66e0733a5b08d8701255b194b47d3b710b /qpid | |
parent | d743ea32e78e9ab4dbd2a61f117de2e40b879a34 (diff) | |
download | qpid-python-d70a26aa665c7eebc7cc4d1bb91ce7928f3c7d89.tar.gz |
Add WCF channel code: patches from QPID-2065, QPID-2066, QPID-2067
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@810734 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
87 files changed, 10044 insertions, 0 deletions
diff --git a/qpid/wcf/ReadMe.txt b/qpid/wcf/ReadMe.txt new file mode 100644 index 0000000000..0ef3e06ce5 --- /dev/null +++ b/qpid/wcf/ReadMe.txt @@ -0,0 +1,162 @@ +1. WCF supported features +========================= + +1. WCF service model programming using one way contracts +2. WCF channel model programming using IInputChannel and IOutputChannel based factories +3. Programmatic access to AMQP message properties on WCF messages +4. AMQP version 0-10 (as provided by the Qpid C++ native client library) +5. Shared connections for multiple channels based on binding parameters +6. WCF to WCF applications (using SOAP message encoders) +7. WCF to non-WCF applications (using raw content encoders) +8. Rudimentary AMQP type support for headers (Int and String) +9. Channel functional tests using NUnit +10. Programming samples + + +2. Planned features (not yet available) +======================================= + +1. Full AMQP type support, including maps and arrays +2. System.Transactions integration (local and distributed with dynamic escalation) +3. Prefetch window for inbound messages +4. Shared sessions +5. Connection failover with AMQP broker clusters +6. Temporary queues +7. Broker management +8. System logging and tracing +9. CMake build system support +10. Transport and message based security + + +3. Prerequisites +================ + +1. Qpid C++ client and common libraries for Windows including BOOST +Ensure the location of the Boost library (e.g. %BOOST_ROOT%\lib) is +included in your PATH environment variable. + +2. .NET Framework 3.5 SP1 +Install the .NET Framework from http://www.microsoft.com/net/ + +3. Windows SDK +Install the Windows SDK for the version of Windows that you are using +from http://msdn.microsoft.com/en-us/windows/bb980924.aspx + +4. NUnit +Install NUnit from http://www.nunit.org + +NOTE: In the following instructions %QPID_ROOT% refers to the root of +qpid source code location e.g. C:\trunk\qpid + +5. Build Qpid cpp +Run CMake and choose "%QPID_ROOT%\cpp\build" as the location for "Where to +build the binaries". Build at least the "qpidd", "qpidclient" and +"qpidcommon" projects. + + +4. Building the solution file +============================= + +Option 1: Using MSBuild + +1. %systemroot%\Microsoft.NET\Framework\v3.5\MSBuild.exe %QPID_ROOT%\wcf\QpidWcf.sln +2. %systemroot%\Microsoft.NET\Framework\v3.5\MSBuild.exe %QPID_ROOT%\wcf\tools\QCreate\QCreate.sln + + +Option 2: Using Visual Studio 2008 (the Professional Edition, Team +System Development Edition, or Team System Team Suite SKU) + +1. Open the solution file QpidWcf.sln in Visual Studio. +2. Make sure that the reference to 'nunit.framework.dll' by the 'FunctionalTests' + project is appropriately resolved. +3. Select the Debug configuration. +3. Right-click the solution file in the Solution Explorer and select 'Build Solution'. +4. Follow the above steps to build %QPID_ROOT%\wcf\tools\QCreate.sln as well. + + +5. Executing tests +================== + +1. Make sure that the batch file + %QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional\RunTests.bat has the correct + values for the nunit_exe, qpid_dll_location and configuration_name variables as per + your installation. +2. Start the qpid broker from the qpid build folder e.g. %QPID_ROOT%\cpp\build\src\Debug. +3. Execute RunTests.bat from its location e.g. %QPID_ROOT%\wcf\test\Apache\Qpid\Test\Channel\Functional. + + +6. Building and executing samples +================================= + +WCFToWCFDirect + +1. Copy the dlls Apache.Qpid.Channel.dll and Apache.Qpid.Interop.dll that you built + in step 2 to the %QPID_ROOT%\wcf\samples\Channel\WCFToWCFDirect folder. + +2. Build the solution WCFToWCFDirect.sln. + +3. Copy qpidclient.dll and qpidcommon.dll from the Qpid build folder + e.g. %QPID_ROOT%\cpp\build\src\Debug to the same location as the exe files + e.g. bin\Debug of each of the projects. These dlls are needed at runtime. + +4. Copy qpidclient.dll and qpidcommon.dll to %QPID_ROOT%\wcf\tools\QCreate\Debug folder. + +5. Start the qpid broker from the qpid build folder e.g. %QPID_ROOT%\cpp\build\src\Debug. + +6. Create queue required using the QCreate tool located at + %QPID_ROOT%\wcf\tools\QCreate\Debug. The syntax is QCreate %QPID_ROOT%. For + this sample you should do + + QCreate amq.direct routing_key message_queue + +7. Start Service.exe from + %QPID_ROOT%\wcf\samples\Channel\WCFToWCFDirect\Service\bin\Debug. + +8. Start Client.exe from + %QPID_ROOT%\wcf\samples\Channel\WCFToWCFDirect\Client\bin\Debug. + + +WCFToWCFPubSub + +1. Copy the dlls Apache.Qpid.Channel.dll and Apache.Qpid.Interop.dll that you built + in step 2 to the %QPID_ROOT%\wcf\samples\Channel\WCFToWCFPubSub folder. + +2. Build the solution WCFToWCFPubSub.sln. + +3. Copy qpidclient.dll and qpidcommon.dll from the Qpid build folder + e.g. %QPID_ROOT%\cpp\build\src\Debug to the same location as the exe files + e.g. bin\Debug of each of the projects. These dlls are needed at runtime. + +4. Copy qpidclient.dll and qpidcommon.dll to %QPID_ROOT%\wcf\tools\QCreate\Debug folder. + +5. Start the qpid broker from the qpid build folder e.g. %QPID_ROOT%\cpp\build\src\Debug. + +6. Create queues required using the QCreate tool located at + \wcf\tools\QCreate\Debug. The syntax is QCreate %QPID_ROOT%. For this sample you + should do + + QCreate amq.topic usa.# usa + QCreate amq.topic #.news news + +7. Start Topic_Consumer.exe from + %QPID_ROOT%\wcf\samples\Channel\WCFToWCFPubSub\Topic_Consumer\bin\Debug. + +8. Start Another_Topic_Consumer.exe from + %QPID_ROOT%\wcf\samples\Channel\WCFToWCFPubSub\Another_Topic_Consumer\bin\Debug. + +9. Start Topic_Producer.exe from + %QPID_ROOT%\wcf\samples\Channel\WCFToWCFPubSub\Topic_Producer\bin\Debug. + + +7. Known Issues +=============== + +1. The Release configuration of the build (specified using the + /p:Configuration=Release switch with MSBuild) fails. + +2. The AmqpChannelListener is limited to single threaded use and the async methods + throw NotImplementedException. + +3. The AmqpChannelListener can hang on close for 60 seconds. + + diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.cs b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.cs new file mode 100644 index 0000000000..93ac97bc66 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.cs @@ -0,0 +1,68 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +namespace Apache.Qpid.Samples.Channel.WCFToWCFDirect +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using Apache.Qpid.Channel; + + class Client + { + static void Main(string[] args) + { + try + { + // Create binding for the service endpoint. + CustomBinding amqpBinding = new CustomBinding(); + amqpBinding.Elements.Add(new BinaryMessageEncodingBindingElement()); + amqpBinding.Elements.Add(new AmqpTransportBindingElement()); + + // Create endpoint address. + Uri amqpClientUri = new Uri("amqp:amq.direct?routingkey=routing_key"); + EndpointAddress endpointAddress = new EndpointAddress(amqpClientUri); + + // Create a client with given client endpoint configuration. + ChannelFactory<IHelloService> channelFactory = new ChannelFactory<IHelloService>(amqpBinding, endpointAddress); + IHelloService clientProxy = channelFactory.CreateChannel(); + + Console.WriteLine(); + + string name = "name"; + for (int i = 0; i < 5; i++) + { + Console.WriteLine("Sending message: " + name + (i + 1)); + clientProxy.SayHello(name + (i + 1)); + } + + Console.WriteLine(); + Console.WriteLine("Press <ENTER> to terminate client."); + Console.ReadLine(); + + channelFactory.Close(); + } + catch (Exception e) + { + Console.WriteLine("Exception: {0}", e); + } + } + } +} diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj new file mode 100644 index 0000000000..7e1d2d9f5d --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Client.csproj @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{0CCD5711-2072-47B8-B902-07EC12C04159}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Client</RootNamespace>
+ <AssemblyName>Client</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Client.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Service\Service.csproj">
+ <Project>{D0A46136-B4E3-4C50-AB6D-FB2BC6683D6E}</Project>
+ <Name>Service</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Properties/AssemblyInfo.cs b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..414a3b5858 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Client/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Client")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("MSIT")] +[assembly: AssemblyProduct("Client")] +[assembly: AssemblyCopyright("Copyright © MSIT 2009")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("c3743ce0-3054-4188-8cd7-3a22734ee313")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Properties/AssemblyInfo.cs b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..2b75210ce3 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Service")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("MSIT")] +[assembly: AssemblyProduct("Service")] +[assembly: AssemblyCopyright("Copyright © MSIT 2009")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("5447546e-8547-4b0c-981a-1757ab8d9ec5")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.cs b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.cs new file mode 100644 index 0000000000..0342097ed9 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.cs @@ -0,0 +1,83 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +namespace Apache.Qpid.Samples.Channel.WCFToWCFDirect +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Description; + using Apache.Qpid.Channel; + + // Define a service contract. + [ServiceContract] + public interface IHelloService + { + [OperationContract(IsOneWay = true, Action="*")] + void SayHello(string name); + } + + // Service class which implements the service contract. + [ServiceBehavior(AddressFilterMode = AddressFilterMode.Any)] + public class HelloService : IHelloService + { + [OperationBehavior] + public void SayHello(string name) + { + Console.WriteLine("Hello " + name); + } + } + + class Service + { + static void Main(string[] args) + { + // Create binding for the service endpoint. + AmqpBinding amqpBinding = new AmqpBinding(); + + // Create ServiceHost. + ServiceHost serviceHost = new ServiceHost(typeof(HelloService), new Uri[] { new Uri("http://localhost:8080/HelloService") }); + + // Add behavior for our MEX endpoint. + ServiceMetadataBehavior mexBehavior = new ServiceMetadataBehavior(); + mexBehavior.HttpGetEnabled = true; + serviceHost.Description.Behaviors.Add(mexBehavior); + + // Add MEX endpoint. + serviceHost.AddServiceEndpoint(typeof(IMetadataExchange), new BasicHttpBinding(), "MEX"); + + // Add AMQP endpoint. + Uri amqpUri = new Uri("amqp:message_queue"); + serviceHost.AddServiceEndpoint(typeof(IHelloService), amqpBinding, amqpUri.ToString()); + + serviceHost.Open(); + + Console.WriteLine(); + Console.WriteLine("The service is ready."); + Console.WriteLine("Press <ENTER> to terminate service."); + Console.WriteLine(); + Console.ReadLine(); + + if (serviceHost.State != CommunicationState.Faulted) + { + serviceHost.Close(); + } + } + } +} diff --git a/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj new file mode 100644 index 0000000000..3252380c98 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFDirect/Service/Service.csproj @@ -0,0 +1,85 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{D0A46136-B4E3-4C50-AB6D-FB2BC6683D6E}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Service</RootNamespace>
+ <AssemblyName>Service</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Messaging" />
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Service.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.cs b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.cs new file mode 100644 index 0000000000..c1e3ebbc88 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.cs @@ -0,0 +1,67 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +namespace Apache.Qpid.Samples.Channel.WCFToWCFPubSub +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using Apache.Qpid.Channel; + + class Another_Topic_Consumer + { + static void Main(string[] args) + { + // Create binding for the service endpoint. + CustomBinding amqpBinding = new CustomBinding(); + amqpBinding.Elements.Add(new BinaryMessageEncodingBindingElement()); + amqpBinding.Elements.Add(new AmqpTransportBindingElement()); + + // Create ServiceHost. + ServiceHost serviceHost = new ServiceHost(typeof(HelloService), new Uri[] { new Uri("http://localhost:8080/HelloService2") }); + + // Add behavior for our MEX endpoint. + ServiceMetadataBehavior mexBehavior = new ServiceMetadataBehavior(); + mexBehavior.HttpGetEnabled = true; + serviceHost.Description.Behaviors.Add(mexBehavior); + + // Add MEX endpoint. + serviceHost.AddServiceEndpoint(typeof(IMetadataExchange), new BasicHttpBinding(), "MEX"); + + // Add AMQP endpoint. + Uri amqpUri = new Uri("amqp:news"); + serviceHost.AddServiceEndpoint(typeof(IHelloService), amqpBinding, amqpUri.ToString()); + + serviceHost.Open(); + + Console.WriteLine(); + Console.WriteLine("The consumer is now listening on the queue \"news\"."); + Console.WriteLine("Press <ENTER> to terminate service."); + Console.WriteLine(); + Console.ReadLine(); + + if (serviceHost.State != CommunicationState.Faulted) + { + serviceHost.Close(); + } + } + } +} diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj new file mode 100644 index 0000000000..47769e086d --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Another_Topic_Consumer.csproj @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{6AC32E9D-EFB2-4DEF-81D7-F70A0D7A606F}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Another_Topic_Consumer</RootNamespace>
+ <AssemblyName>Another_Topic_Consumer</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Another_Topic_Consumer.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Topic_Consumer\Topic_Consumer.csproj">
+ <Project>{248A3A0B-FDC4-4E70-8428-BE0AF5AB021B}</Project>
+ <Name>Topic_Consumer</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Properties/AssemblyInfo.cs b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..8c22cb6d1f --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Another_Topic_Consumer/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Another_Topic_Consumer")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("MSIT")] +[assembly: AssemblyProduct("Another_Topic_Consumer")] +[assembly: AssemblyCopyright("Copyright © MSIT 2009")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("ba584c88-26a8-4910-a9a1-b4632b9adf01")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Properties/AssemblyInfo.cs b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..19fea85618 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Topic_Consumer")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("MSIT")] +[assembly: AssemblyProduct("Topic_Consumer")] +[assembly: AssemblyCopyright("Copyright © MSIT 2009")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("3facd6d1-f604-4ac9-ace3-7b7acff471eb")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.cs b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.cs new file mode 100644 index 0000000000..c4dd1e2256 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.cs @@ -0,0 +1,85 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +namespace Apache.Qpid.Samples.Channel.WCFToWCFPubSub +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using Apache.Qpid.Channel; + + // Define a service contract. + [ServiceContract] + public interface IHelloService + { + [OperationContract(IsOneWay = true)] + void SayHello(string name); + } + + // Service class which implements the service contract. + public class HelloService : IHelloService + { + [OperationBehavior] + public void SayHello(string name) + { + Console.WriteLine("Hello " + name); + } + } + + class Consumer + { + static void Main(string[] args) + { + // Create binding for the service endpoint. + CustomBinding amqpBinding = new CustomBinding(); + amqpBinding.Elements.Add(new BinaryMessageEncodingBindingElement()); + amqpBinding.Elements.Add(new AmqpTransportBindingElement()); + + // Create ServiceHost. + ServiceHost serviceHost = new ServiceHost(typeof(HelloService), new Uri[] { new Uri("http://localhost:8080/HelloService1") }); + + // Add behavior for our MEX endpoint. + ServiceMetadataBehavior mexBehavior = new ServiceMetadataBehavior(); + mexBehavior.HttpGetEnabled = true; + serviceHost.Description.Behaviors.Add(mexBehavior); + + // Add MEX endpoint. + serviceHost.AddServiceEndpoint(typeof(IMetadataExchange), new BasicHttpBinding(), "MEX"); + + // Add AMQP endpoint. + Uri amqpUri = new Uri("amqp:usa"); + serviceHost.AddServiceEndpoint(typeof(IHelloService), amqpBinding, amqpUri.ToString()); + + serviceHost.Open(); + + Console.WriteLine(); + Console.WriteLine("The consumer is now listening on the queue \"usa\"."); + Console.WriteLine("Press <ENTER> to terminate service."); + Console.WriteLine(); + Console.ReadLine(); + + if (serviceHost.State != CommunicationState.Faulted) + { + serviceHost.Close(); + } + } + } +} diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj new file mode 100644 index 0000000000..b2151c0631 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Consumer/Topic_Consumer.csproj @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{248A3A0B-FDC4-4E70-8428-BE0AF5AB021B}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Topic_Consumer</RootNamespace>
+ <AssemblyName>Topic_Consumer</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Topic_Consumer.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Properties/AssemblyInfo.cs b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..87310bf92a --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Topic_Producer")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("MSIT")] +[assembly: AssemblyProduct("Topic_Producer")] +[assembly: AssemblyCopyright("Copyright © MSIT 2009")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("a70e852d-a510-4e00-af72-68bb8547696f")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.cs b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.cs new file mode 100644 index 0000000000..e3850eb4c0 --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.cs @@ -0,0 +1,68 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + + +namespace Apache.Qpid.Samples.Channel.WCFToWCFPubSub +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using Apache.Qpid.Channel; + + class Topic_Producer + { + static void Main(string[] args) + { + try + { + // Create binding for the service endpoint. + CustomBinding amqpBinding = new CustomBinding(); + amqpBinding.Elements.Add(new BinaryMessageEncodingBindingElement()); + amqpBinding.Elements.Add(new AmqpTransportBindingElement()); + + // Create endpoint address. + Uri amqpClientUri = new Uri("amqp:amq.topic?routingkey=usa.news"); + EndpointAddress endpointAddress = new EndpointAddress(amqpClientUri); + + // Create a client with given client endpoint configuration. + ChannelFactory<IHelloService> channelFactory = new ChannelFactory<IHelloService>(amqpBinding, endpointAddress); + IHelloService clientProxy = channelFactory.CreateChannel(); + + Console.WriteLine(); + + string name = "name"; + for (int i = 0; i < 5; i++) + { + Console.WriteLine("Sending message: " + name + (i + 1)); + clientProxy.SayHello(name + (i+1)); + } + + Console.WriteLine(); + Console.WriteLine("Press <ENTER> to terminate client."); + Console.ReadLine(); + + channelFactory.Close(); + } + catch (Exception e) + { + Console.WriteLine("Exception: {0}", e); + } + } + } +} diff --git a/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj new file mode 100644 index 0000000000..b4318ead3f --- /dev/null +++ b/qpid/wcf/samples/Channel/WCFToWCFPubSub/Topic_Producer/Topic_Producer.csproj @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{67B413EF-3B9C-4988-87DE-0386C209D368}</ProjectGuid>
+ <OutputType>Exe</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Topic_Producer</RootNamespace>
+ <AssemblyName>Topic_Producer</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="Apache.Qpid.Channel, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
+ <SpecificVersion>False</SpecificVersion>
+ <HintPath>..\Apache.Qpid.Channel.dll</HintPath>
+ </Reference>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="Topic_Producer.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Topic_Consumer\Topic_Consumer.csproj">
+ <Project>{248A3A0B-FDC4-4E70-8428-BE0AF5AB021B}</Project>
+ <Name>Topic_Consumer</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs new file mode 100644 index 0000000000..980ae78361 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpBoolean.cs @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + public class AmqpBoolean : AmqpType + { + bool value; + + public AmqpBoolean(bool i) + { + this.value = i; + } + + public override void Encode(byte[] bufer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override int EncodedSize + { + get { throw new NotImplementedException(); } + } + + public override AmqpType Clone() + { + return new AmqpBoolean(this.value); + } + + public bool Value + { + get { return this.value; } + set { this.value = value; } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs new file mode 100644 index 0000000000..c114e98a71 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpInt.cs @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + public class AmqpInt : AmqpType + { + int value; + + public AmqpInt(int i) + { + this.value = i; + } + + public override void Encode(byte[] bufer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override int EncodedSize + { + get { throw new NotImplementedException(); } + } + + public override AmqpType Clone() + { + return new AmqpInt(this.value); + } + + public int Value + { + get { return this.value; } + set { this.value = value; } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs new file mode 100644 index 0000000000..0f649dcd36 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpProperties.cs @@ -0,0 +1,292 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + public class AmqpProperties + { + // AMQP 0-10 delivery properties + private bool durable; + private Nullable<TimeSpan> timeToLive; + private string routingKey; + + // AMQP 0-10 message properties + private string replyToExchange; + private string replyToRoutingKey; + private byte[] userId; + private byte[] correlationId; + private string contentType; + + // for application and vendor properties + Dictionary<String, AmqpType> propertyMap; + + public AmqpProperties() + { + } + + // AMQP 0-10 "message.delivery-properties + internal bool HasDeliveryProperties + { + get + { + return ((this.routingKey != null) || this.durable || this.timeToLive.HasValue); + } + } + + internal bool HasMappedProperties + { + get + { + if (this.propertyMap != null) + { + if (this.propertyMap.Count > 0) + { + return true; + } + } + + return false; + } + } + + // AMQP 0-10 "message.message-properties" + internal bool HasMessageProperties + { + get + { + if ((this.replyToExchange != null) || + (this.replyToRoutingKey != null) || + (this.userId != null) || + (this.correlationId != null) || + (this.contentType != null)) + { + return true; + } + + if (this.propertyMap == null) + { + return false; + } + + return (this.propertyMap.Count != 0); + } + } + + public Dictionary<String, AmqpType> PropertyMap + { + get + { + if (this.propertyMap == null) + { + this.propertyMap = new Dictionary<string, AmqpType>(); + } + return propertyMap; + } + set { this.propertyMap = value; } + } + + internal bool Empty + { + get + { + if (this.HasDeliveryProperties || this.HasMessageProperties) + { + return true; + } + return false; + } + } + + public string ContentType + { + get { return contentType; } + // TODO: validate + set { contentType = value; } + } + + public byte[] CorrelationId + { + get { return correlationId; } + set + { + if (value != null) + { + if (value.Length > 65535) + { + throw new ArgumentException("CorrelationId too big"); + } + } + correlationId = value; + } + } + + public byte[] UserId + { + get { return userId; } + set + { + if (value != null) + { + if (value.Length > 65535) + { + throw new ArgumentException("UserId too big"); + } + } + userId = value; + } + } + + public TimeSpan? TimeToLive + { + get { return this.timeToLive; } + set { this.timeToLive = value; } + } + + public string RoutingKey + { + get { return this.routingKey; } + set { this.routingKey = value; } + } + + public string ReplyToExchange + { + get { return this.replyToExchange; } + } + + public string ReplyToRoutingKey + { + get { return this.replyToRoutingKey; } + } + + // this changes from 0-10 to 1.0 + public void SetReplyTo(string exchange, string routingKey) + { + if ((exchange == null && routingKey == null)) + { + throw new ArgumentNullException("SetReplyTo"); + } + + this.replyToExchange = exchange; + this.replyToRoutingKey = routingKey; + } + + public bool Durable + { + get { return durable; } + set { durable = value; } + } + + public void Clear() + { + this.timeToLive = null; + this.routingKey = null; + this.replyToRoutingKey = null; + this.replyToExchange = null; + this.durable = false; + this.contentType = null; + this.userId = null; + this.correlationId = null; + this.propertyMap = null; + } + + public AmqpProperties Clone() + { + // memberwise clone ok for string, byte[], and value types + AmqpProperties clonedProps = (AmqpProperties)this.MemberwiseClone(); + + // deeper copy for the dictionary + if (this.propertyMap != null) + { + if (this.propertyMap.Count > 0) + { + Dictionary<string, AmqpType> clonedDictionary = new Dictionary<string, AmqpType>(this.propertyMap.Count); + foreach (KeyValuePair<string, AmqpType> original in this.propertyMap) + { + clonedDictionary.Add(original.Key, original.Value.Clone()); + } + + clonedProps.propertyMap = clonedDictionary; + } + else + { + clonedProps.propertyMap = null; + } + } + return clonedProps; + } + + // adds/replaces from the other AmqpProperty object. + // just inserts references, i.e. provides shallow copy semantics (see Clone for deep copy) + public void MergeFrom(AmqpProperties other) + { + if (other.timeToLive.HasValue) + { + this.timeToLive = other.timeToLive; + } + + if ((other.replyToRoutingKey != null) || (other.replyToExchange != null)) + { + this.replyToExchange = other.replyToExchange; + this.replyToRoutingKey = other.replyToRoutingKey; + } + + if (other.routingKey != null) + { + this.routingKey = other.routingKey; + } + + if (other.durable) + { + this.durable = true; + } + + if (other.contentType != null) + { + this.contentType = other.contentType; + } + + if (other.correlationId != null) + { + this.correlationId = other.correlationId; + } + + if (other.userId != null) + { + this.userId = other.userId; + } + + if (other.propertyMap != null) + { + if (other.propertyMap.Count > 0) + { + Dictionary<string, AmqpType> thisMap = this.PropertyMap; + foreach (KeyValuePair<string, AmqpType> kvp in other.propertyMap) + { + thisMap[kvp.Key] = kvp.Value; + } + } + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs new file mode 100644 index 0000000000..87cebe878c --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpString.cs @@ -0,0 +1,91 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + // for big strings: str16 in 0-10 and str32 in 1.0 + + public class AmqpString : AmqpType + { + string value; + Encoding encoding; + + public AmqpString(string s) + { + this.value = s; + this.encoding = Encoding.UTF8; + } + + public AmqpString(string s, Encoding enc) + { + ValidateEncoding(enc); + this.value = s; + this.encoding = enc; + } + + public Encoding Encoding + { + get { return encoding; } + set + { + ValidateEncoding(value); + encoding = value; + } + } + + private void ValidateEncoding(Encoding enc) + { + if (value == null) + { + throw new ArgumentNullException("Encoding"); + } + + if ((enc != Encoding.UTF8) && (enc != Encoding.Unicode)) + { + throw new ArgumentException("Encoding not one of UTF8 or Unicode"); + } + } + + public override void Encode(byte[] bufer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override int EncodedSize + { + get { throw new NotImplementedException(); } + } + + public override AmqpType Clone() + { + return new AmqpString(this.value); + } + + public string Value + { + get { return this.value; } + set { this.value = value; } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs new file mode 100644 index 0000000000..8cd3ac9e4a --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpType.cs @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + public abstract class AmqpType + { + public abstract void Encode(byte[] bufer, int offset, int count); + public abstract int EncodedSize { get; } + public abstract AmqpType Clone(); + } +} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj new file mode 100644 index 0000000000..9c13d47296 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpTypes.csproj @@ -0,0 +1,153 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{820BFC34-A40F-46BA-B86B-05334854CA17}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.AmqpTypes</RootNamespace>
+ <AssemblyName>Apache.Qpid.AmqpTypes</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <RunPostBuildEvent>OnBuildSuccess</RunPostBuildEvent>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AmqpBoolean.cs" />
+ <Compile Include="AmqpInt.cs" />
+ <Compile Include="AmqpProperties.cs" />
+ <Compile Include="AmqpString.cs" />
+ <Compile Include="AmqpType.cs" />
+ <Compile Include="AmqpUbyte.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="PropertyName.cs" />
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+<Message Text="yet another debug line" />
+ </Target>
+ <Target Name="AfterBuild"
+ >
+<Message Text="a debug line before banana.netmodule" />
+ <PropertyGroup Condition="('$(TargetFrameworkVersion)' != 'v1.0') and ('$(TargetFrameworkVersion)' != 'v1.1')">
+ <NoWarn>$(NoWarn);1701;1702</NoWarn>
+ </PropertyGroup>
+
+ <Csc
+ AdditionalLibPaths="$(AdditionalLibPaths)"
+ AddModules="@(AddModules)"
+ AllowUnsafeBlocks="$(AllowUnsafeBlocks)"
+ BaseAddress="$(BaseAddress)"
+ CheckForOverflowUnderflow="$(CheckForOverflowUnderflow)"
+ CodePage="$(CodePage)"
+ DebugType="$(DebugType)"
+ DefineConstants="$(DefineConstants)"
+ DelaySign="$(DelaySign)"
+ DisabledWarnings="$(NoWarn)"
+ DocumentationFile="@(DocFileItem)"
+ EmitDebugInformation="$(DebugSymbols)"
+ ErrorReport="$(ErrorReport)"
+ FileAlignment="$(FileAlignment)"
+ GenerateFullPaths="$(GenerateFullPaths)"
+ KeyContainer="$(KeyContainerName)"
+ KeyFile="$(KeyOriginatorFile)"
+ LangVersion="$(LangVersion)"
+ MainEntryPoint="$(StartupObject)"
+ ModuleAssemblyName="banana"
+ NoConfig="true"
+ NoLogo="$(NoLogo)"
+ NoStandardLib="$(NoStdLib)"
+ NoWin32Manifest="$(NoWin32Manifest)"
+ Optimize="$(Optimize)"
+ OutputAssembly="@(IntermediateAssembly)"
+ PdbFile="$(PdbFile)"
+ Platform="$(PlatformTarget)"
+ References="@(ReferencePath)"
+ Resources="@(_CoreCompileResourceInputs);@(CompiledLicenseFile)"
+ ResponseFiles="$(CompilerResponseFile)"
+ Sources="@(Compile)"
+ TargetType="module"
+ ToolExe="$(CscToolExe)"
+ ToolPath="$(CscToolPath)"
+ TreatWarningsAsErrors="$(TreatWarningsAsErrors)"
+ UseHostCompilerIfAvailable="$(UseHostCompilerIfAvailable)"
+ Utf8Output="$(Utf8Output)"
+ WarningLevel="$(WarningLevel)"
+ WarningsAsErrors="$(WarningsAsErrors)"
+ WarningsNotAsErrors="$(WarningsNotAsErrors)"
+ Win32Icon="$(ApplicationIcon)"
+ Win32Manifest="$(Win32Manifest)"
+ Win32Resource="$(Win32Resource)"
+ />
+
+ <ItemGroup>
+ <_CoreCompileResourceInputs Remove="@(_CoreCompileResourceInputs)" />
+ </ItemGroup>
+
+<Message Text="a debug line after banana.netmodule" />
+ </Target>
+ -->
+ <PropertyGroup>
+ <PostBuildEvent>cd "$(ProjectDir)bin\$(ConfigurationName)"
+del $(AssemblyName).dll
+del $(AssemblyName).pdb
+cd "$(ProjectDir)obj\$(ConfigurationName)"
+del $(AssemblyName).dll
+del $(AssemblyName).pdb
+cd "$(ProjectDir)"
+CreateNetModule.bat</PostBuildEvent>
+ </PropertyGroup>
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs new file mode 100644 index 0000000000..5ec8a732cf --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/AmqpUbyte.cs @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + public class AmqpUbyte : AmqpType + { + byte value; + + public AmqpUbyte(byte i) + { + this.value = i; + } + + public override void Encode(byte[] bufer, int offset, int count) + { + throw new NotImplementedException(); + } + + public override int EncodedSize + { + get { throw new NotImplementedException(); } + } + + public override AmqpType Clone() + { + return new AmqpUbyte(this.value); + } + + public byte Value + { + get { return this.value; } + set { this.value = value; } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat b/qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat new file mode 100755 index 0000000000..ddbe1407a7 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/CreateNetModule.bat @@ -0,0 +1,19 @@ + +REM Licensed to the Apache Software Foundation (ASF) under one +REM or more contributor license agreements. See the NOTICE file +REM distributed with this work for additional information +REM regarding copyright ownership. The ASF licenses this file +REM to you under the Apache License, Version 2.0 (the +REM "License"); you may not use this file except in compliance +REM with the License. You may obtain a copy of the License at +REM +REM http://www.apache.org/licenses/LICENSE-2.0 +REM +REM Unless required by applicable law or agreed to in writing, +REM software distributed under the License is distributed on an +REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +REM KIND, either express or implied. See the License for the +REM specific language governing permissions and limitations +REM under the License. + +%systemroot%\Microsoft.NET\Framework\v3.5\Csc.exe /noconfig /nowarn:1701,1702 /errorreport:prompt /warn:4 /define:DEBUG;TRACE /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Core.dll" /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Data.DataSetExtensions.dll" /reference:%systemroot%\Microsoft.NET\Framework\v2.0.50727\System.Data.dll /reference:%systemroot%\Microsoft.NET\Framework\v2.0.50727\System.dll /reference:%systemroot%\Microsoft.NET\Framework\v2.0.50727\System.Xml.dll /reference:"%programfiles%\Reference Assemblies\Microsoft\Framework\v3.5\System.Xml.Linq.dll" /debug+ /debug:full /filealign:512 /optimize- /out:obj\Debug\Apache.Qpid.AmqpTypes.netmodule /target:module *.cs
\ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..0bce6f9795 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Apache.Qpid.AmqpTypes")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("79b8b5d9-047d-4f3b-8610-7fe112ce6416")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs b/qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs new file mode 100644 index 0000000000..b80f8b9e9e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/AmqpTypes/PropertyName.cs @@ -0,0 +1,35 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.AmqpTypes +{ + using System; + using System.IO; + using System.Collections.Generic; + using System.Text; + + public sealed class PropertyName + { + public const string Priority = "amqpx.priority"; + public const string ContentType = "amqp.content-type"; + public const string ReplyTo = "amqp.reply-to"; + public const string ReplyToExchange = "amqpx.qpid0-10.reply-to-exchange"; + public const string RoutingKey = "amqpx.qpid0-10.routing-key"; + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs new file mode 100644 index 0000000000..e207f2fe45 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBinding.cs @@ -0,0 +1,60 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + + using Apache.Qpid.AmqpTypes; + + public class AmqpBinaryBinding : AmqpBinding + { + public AmqpBinaryBinding() +: base (new RawMessageEncodingBindingElement()) + { + } + + public AmqpBinaryBinding(string configurationName) + : this() + { + ApplyConfiguration(configurationName); + } + + private void ApplyConfiguration(string configurationName) + { + AmqpBinaryBindingCollectionElement section = (AmqpBinaryBindingCollectionElement)ConfigurationManager.GetSection(AmqpConstants.AmqpBinaryBindingSectionName); + AmqpBinaryBindingConfigurationElement element = section.Bindings[configurationName]; + if (element == null) + { + throw new ConfigurationErrorsException(string.Format(System.Globalization.CultureInfo.CurrentCulture, + "There is no binding named {0} at {1}.", configurationName, section.BindingName)); + } + else + { + element.ApplyConfiguration(this); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs new file mode 100644 index 0000000000..de263bc4ef --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingCollectionElement.cs @@ -0,0 +1,29 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + /// <summary> + /// Implement application configuration of bindingExtensions for AmqpBinaryBinding + /// </summary> + public class AmqpBinaryBindingCollectionElement + : System.ServiceModel.Configuration.StandardBindingCollectionElement<AmqpBinaryBinding, AmqpBinaryBindingConfigurationElement> + { + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs new file mode 100644 index 0000000000..a537a6c6c3 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinaryBindingConfigurationElement.cs @@ -0,0 +1,79 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + using Apache.Qpid.AmqpTypes; + + public class AmqpBinaryBindingConfigurationElement : AmqpBindingConfigurationElement + { + public AmqpBinaryBindingConfigurationElement(string configurationName) + : base(configurationName) + { + } + + public AmqpBinaryBindingConfigurationElement() + : this(null) + { + } + + protected override Type BindingElementType + { + get { return typeof(AmqpBinaryBinding); } + } + + protected override ConfigurationPropertyCollection Properties + { + get + { + ConfigurationPropertyCollection properties = base.Properties; + + return properties; + } + } + + protected override void InitializeFrom(Binding binding) + { + base.InitializeFrom(binding); + AmqpBinaryBinding amqpBinding = (AmqpBinaryBinding)binding; + } + + protected override void OnApplyConfiguration(Binding binding) + { + if (binding == null) + throw new ArgumentNullException("binding"); + + if (binding.GetType() != typeof(AmqpBinaryBinding)) + { + throw new ArgumentException(string.Format("Invalid type for configuring an AMQP binding. Expected type: {0}. Type passed in: {1}.", + typeof(AmqpBinaryBinding).AssemblyQualifiedName, + binding.GetType().AssemblyQualifiedName)); + } + + base.OnApplyConfiguration(binding); + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs new file mode 100644 index 0000000000..b952faf9e5 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs @@ -0,0 +1,115 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + + using Apache.Qpid.AmqpTypes; + + public class AmqpBinding : Binding + { + protected AmqpTransportBindingElement transport; + protected MessageEncodingBindingElement encoding; + + public AmqpBinding() + { + transport = new AmqpTransportBindingElement(); + encoding = new BinaryMessageEncodingBindingElement(); + } + + protected AmqpBinding(MessageEncodingBindingElement encoding) + { + this.encoding = encoding; + transport = new AmqpTransportBindingElement(); + } + + public AmqpBinding(string configurationName) + : this() + { + ApplyConfiguration(configurationName); + } + + public string BrokerHost + { + get { return transport.BrokerHost; } + set { transport.BrokerHost = value; } + } + + public int BrokerPort + { + get { return transport.BrokerPort; } + set { transport.BrokerPort = value; } + } + + public bool Shared + { + get { return transport.Shared; } + set { transport.Shared = value; } + } + + public TransferMode TransferMode + { + get { return transport.TransferMode; } + set { transport.TransferMode = value; } + } + + public AmqpProperties DefaultMessageProperties + { + get { return transport.DefaultMessageProperties; } + set { transport.DefaultMessageProperties = value; } + } + + public override string Scheme + { + get { return AmqpConstants.Scheme; } + } + + public override BindingElementCollection CreateBindingElements() + { + BindingElementCollection bindingElements = new BindingElementCollection(); + + bindingElements.Add(encoding); + bindingElements.Add(transport); + + return bindingElements.Clone(); + } + + private void ApplyConfiguration(string configurationName) + { + AmqpBindingCollectionElement section = (AmqpBindingCollectionElement)ConfigurationManager.GetSection(AmqpConstants.AmqpBindingSectionName); + AmqpBindingConfigurationElement element = section.Bindings[configurationName]; + if (element == null) + { + throw new ConfigurationErrorsException(string.Format(System.Globalization.CultureInfo.CurrentCulture, + "There is no binding named {0} at {1}.", configurationName, section.BindingName)); + } + else + { + element.ApplyConfiguration(this); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs new file mode 100644 index 0000000000..e8d3b6fad4 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingCollectionElement.cs @@ -0,0 +1,29 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + /// <summary> + /// Implement application configuration of bindingExtensions for AmqpBinding + /// </summary> + public class AmqpBindingCollectionElement + : System.ServiceModel.Configuration.StandardBindingCollectionElement<AmqpBinding, AmqpBindingConfigurationElement> + { + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs new file mode 100644 index 0000000000..3ec62e809d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs @@ -0,0 +1,258 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections.Generic; + using System.Collections.ObjectModel; + using System.Configuration; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Configuration; + using Apache.Qpid.AmqpTypes; + + public class AmqpBindingConfigurationElement : StandardBindingElement + { + // not regular config elements. See PostDeserialize + string brokerHost; + int brokerPort; + + public AmqpBindingConfigurationElement(string configurationName) + : base(configurationName) + { + brokerHost = AmqpDefaults.BrokerHost; + brokerPort = AmqpDefaults.BrokerPort; + } + + public AmqpBindingConfigurationElement() + : this(null) + { + } + + protected override Type BindingElementType + { + get { return typeof(AmqpBinding); } + } + + public string BrokerHost + { + get { return brokerHost; } + set { brokerHost = value; } + } + + public int BrokerPort + { + get { return brokerPort; } + set { brokerPort = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)] + public bool Shared + { + get { return (bool)base[AmqpConfigurationStrings.Shared]; } + set { base[AmqpConfigurationStrings.Shared] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.TransferMode, DefaultValue = AmqpDefaults.TransferMode)] + public TransferMode TransferMode + { + get { return (TransferMode)base[AmqpConfigurationStrings.TransferMode]; } + set { base[AmqpConfigurationStrings.TransferMode] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.Brokers)] + public BrokerCollection Brokers + { + get + { + return (BrokerCollection)base[AmqpConfigurationStrings.Brokers]; + } + set + { + base[AmqpConfigurationStrings.Brokers] = value; + } + } + + protected override ConfigurationPropertyCollection Properties + { + get + { + ConfigurationPropertyCollection properties = base.Properties; + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared, + typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode, + typeof(TransferMode), AmqpDefaults.TransferMode, null, null, ConfigurationPropertyOptions.None)); + properties.Add(new ConfigurationProperty("brokers", typeof(BrokerCollection), null)); + return properties; + } + } + + protected override void InitializeFrom(Binding binding) + { + base.InitializeFrom(binding); + AmqpBinding amqpBinding = (AmqpBinding)binding; + this.BrokerHost = amqpBinding.BrokerHost; + this.BrokerPort = amqpBinding.BrokerPort; + this.TransferMode = amqpBinding.TransferMode; + this.Shared = amqpBinding.Shared; + + AmqpProperties props = amqpBinding.DefaultMessageProperties; + } + + protected override void OnApplyConfiguration(Binding binding) + { + if (binding == null) + throw new ArgumentNullException("binding"); + + if (!(binding is AmqpBinding)) + { + throw new ArgumentException(string.Format("Invalid type for configuring an AMQP binding. Expected type: {0}. Type passed in: {1}.", + typeof(AmqpBinding).AssemblyQualifiedName, + binding.GetType().AssemblyQualifiedName)); + } + + AmqpBinding amqpBinding = (AmqpBinding)binding; + amqpBinding.BrokerHost = this.BrokerHost; + amqpBinding.BrokerPort = this.BrokerPort; + amqpBinding.TransferMode = this.TransferMode; + amqpBinding.Shared = this.Shared; + } + + protected override void PostDeserialize() + { + base.PostDeserialize(); + + BrokerCollection brokers = Brokers; + if (brokers != null) + { + if (brokers.Count > 0) + { + // just grab the first element until failover is supported + System.Collections.IEnumerator brokersEnum = brokers.GetEnumerator(); + // move to first element + brokersEnum.MoveNext(); + BrokerElement be = (BrokerElement)brokersEnum.Current; + this.BrokerHost = be.Host; + this.BrokerPort = be.Port; + } + } + } + } + + public class BrokerCollection : ConfigurationElementCollection + { + public BrokerCollection() + { + //this.AddElementName = "broker"; + } + + protected override ConfigurationElement CreateNewElement() + { + return new BrokerElement(); + } + + protected override void BaseAdd(ConfigurationElement element) + { + BrokerElement be = (BrokerElement)element; + if (this.BaseGet((Object)be.Key) != null) + { + throw new ConfigurationErrorsException("duplicate broker definition at line " + element.ElementInformation.LineNumber); + } + base.BaseAdd(element); + } + + protected override Object GetElementKey(ConfigurationElement element) + { + BrokerElement be = (BrokerElement) element; + return be.Key; + } + + protected override void PostDeserialize() + { + base.PostDeserialize(); + if (this.Count == 0) + { + throw new ArgumentException("Brokers collection requires at least one broker"); + } + if (this.Count > 1) + { + Console.WriteLine("Warning: multiple brokers not supported, selecting first instance"); + } + BrokerElement be = (BrokerElement)this.BaseGet(0); + } + + protected override string ElementName + { + get + { + return "broker"; + } + } + + public override ConfigurationElementCollectionType CollectionType + { + get + { + return ConfigurationElementCollectionType.BasicMap; + } + } + } + + public class BrokerElement : ConfigurationElement + { + string key; + + public BrokerElement() + { + Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.BrokerHost, + typeof(string), AmqpDefaults.BrokerHost, null, null, ConfigurationPropertyOptions.None)); + Properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.BrokerPort, + typeof(int), AmqpDefaults.BrokerPort, null, null, ConfigurationPropertyOptions.None)); + + } + + [ConfigurationProperty(AmqpConfigurationStrings.BrokerHost, DefaultValue = AmqpDefaults.BrokerHost)] + public string Host + { + get { return (string)base[AmqpConfigurationStrings.BrokerHost]; } + set { base[AmqpConfigurationStrings.BrokerHost] = value; } + } + + [ConfigurationProperty(AmqpConfigurationStrings.BrokerPort, DefaultValue = AmqpDefaults.BrokerPort)] + public int Port + { + get { return (int)base[AmqpConfigurationStrings.BrokerPort]; } + set { base[AmqpConfigurationStrings.BrokerPort] = value; } + } + + public string Key + { + get + { + if (this.key == null) + { + this.key = this.Host + ':' + this.Port; + } + return this.key; + } + } + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs new file mode 100644 index 0000000000..b8e2811527 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -0,0 +1,98 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Collections.Generic; + using System.Collections.ObjectModel; + + class AmqpChannelFactory<TChannel> : ChannelFactoryBase<TChannel> + { + MessageEncoderFactory messageEncoderFactory; + AmqpTransportBindingElement bindingElement; + AmqpChannelProperties channelProperties; + long maxBufferPoolSize; + bool shared; + + internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) + : base(context.Binding) + { + this.bindingElement = bindingElement; + this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.shared = bindingElement.Shared; + this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; + Collection<MessageEncodingBindingElement> messageEncoderBindingElements + = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); + + if(messageEncoderBindingElements.Count > 1) + { + throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); + } + else if (messageEncoderBindingElements.Count == 1) + { + this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); + } + else + { + this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); + } + } + + + public override T GetProperty<T>() + { + T mep = messageEncoderFactory.Encoder.GetProperty<T>(); + if (mep != null) + { + return mep; + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)messageEncoderFactory.Encoder.MessageVersion; + } + + return base.GetProperty<T>(); + } + + protected override void OnOpen(TimeSpan timeout) + { + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelFactory OnBeginOpen"); + //// return null; + } + + protected override void OnEndOpen(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelFactory OnEndOpen"); + } + + protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) + { + return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared); + } + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs new file mode 100644 index 0000000000..f1de30406a --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs @@ -0,0 +1,142 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Net; + using System.Net.Sockets; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Globalization; + + using Apache.Qpid.AmqpTypes; + + /// <summary> + /// Collection of constants used by the Amqp Channel classes + /// </summary> + static class AmqpConstants + { + internal const string Scheme = "amqp"; + internal const string AmqpBindingSectionName = "system.serviceModel/bindings/amqpBinding"; + internal const string AmqpBinaryBindingSectionName = "system.serviceModel/bindings/amqpBinaryBinding"; + internal const string AmqpTransportSectionName = "amqpTransport"; + } + + static class AmqpConfigurationStrings + { + public const string BrokerHost = "host"; + public const string BrokerPort = "port"; + public const string TransferMode = "transferMode"; + public const string Brokers = "brokers"; + public const string Shared = "shared"; + public const string MaxBufferPoolSize = "maxBufferPoolSize"; + public const string MaxReceivedMessageSize = "maxReceivedMessageSize"; + } + + static class AmqpDefaults + { + internal const string BrokerHost = "localhost"; + internal const int BrokerPort = 5672; + internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered; + internal const byte Priority = 4; + internal const long MaxBufferPoolSize = 64 * 1024; + internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024; + } + + // parking spot for properties that may be shared by separate channels on a single AMQP connection + internal class AmqpChannelProperties + { + string brokerHost; + int brokerPort; + TransferMode transferMode; + AmqpProperties defaultMessageProperties; + + long maxBufferPoolSize; + int maxReceivedMessageSize; + + internal AmqpChannelProperties() + { + this.brokerHost = AmqpDefaults.BrokerHost; + this.brokerPort = AmqpDefaults.BrokerPort; + this.transferMode = AmqpDefaults.TransferMode; + this.defaultMessageProperties = null; + this.maxBufferPoolSize = AmqpDefaults.MaxBufferPoolSize; + this.maxReceivedMessageSize = AmqpDefaults.MaxReceivedMessageSize; + } + + public AmqpChannelProperties Clone() + { + AmqpChannelProperties props = (AmqpChannelProperties) this.MemberwiseClone(); + if (this.defaultMessageProperties != null) + { + props.defaultMessageProperties = this.defaultMessageProperties.Clone(); + } + + return props; + } + + internal string BrokerHost + { + get { return this.brokerHost; } + set { this.brokerHost = value; } + } + + internal int BrokerPort + { + get { return this.brokerPort; } + set { this.brokerPort = value; } + } + + internal TransferMode TransferMode + { + get { return this.transferMode; } + set { this.transferMode = value; } + } + + internal AmqpProperties DefaultMessageProperties + { + get { return this.defaultMessageProperties; } + set { this.defaultMessageProperties = value; } + } + + internal long MaxBufferPoolSize + { + get { return this.maxBufferPoolSize; } + set { this.maxBufferPoolSize = value; } + } + + internal int MaxReceivedMessageSize + { + get { return this.maxReceivedMessageSize; } + set { this.maxReceivedMessageSize = value; } + } + } + + static class AmqpChannelHelpers + { + internal static void ValidateTimeout(TimeSpan timeout) + { + if (timeout < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException("timeout", timeout, "Timeout must be greater than or equal to TimeSpan.Zero. To disable timeout, specify TimeSpan.MaxValue."); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs new file mode 100644 index 0000000000..44fecdaf62 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -0,0 +1,174 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Threading; + using System.Collections.Generic; + using System.Collections.ObjectModel; + + class AmqpChannelListener : ChannelListenerBase<IInputChannel> + { + MessageEncoderFactory messageEncoderFactory; + AmqpTransportBindingElement bindingElement; + AmqpChannelProperties channelProperties; + bool shared; + long maxBufferPoolSize; + Uri uri; + AmqpTransportChannel amqpTransportChannel; + delegate IInputChannel AsyncOnAcceptCaller (TimeSpan timeout); + AsyncOnAcceptCaller asyncOnAcceptCaller; + ManualResetEvent acceptWaitEvent; + + internal AmqpChannelListener(AmqpTransportBindingElement bindingElement, BindingContext context) + : base(context.Binding) + { + this.bindingElement = bindingElement; + this.channelProperties = bindingElement.ChannelProperties.Clone(); + this.shared = bindingElement.Shared; + + this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; + + // TODO: review this. Should be unique hostname based + this.uri = context.ListenUriBaseAddress; + this.asyncOnAcceptCaller = new AsyncOnAcceptCaller(this.OnAcceptChannel); + this.acceptWaitEvent = new ManualResetEvent(false); + + Collection<MessageEncodingBindingElement> messageEncoderBindingElements + = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); + + if(messageEncoderBindingElements.Count > 1) + { + throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext"); + } + else if (messageEncoderBindingElements.Count == 1) + { + this.messageEncoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory(); + } + else + { + this.messageEncoderFactory = new TextMessageEncodingBindingElement().CreateMessageEncoderFactory(); + } + } + + public override Uri Uri + { + get + { + return this.uri; + } + } + + + + public override T GetProperty<T>() + { + T mep = messageEncoderFactory.Encoder.GetProperty<T>(); + if (mep != null) + { + return mep; + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)messageEncoderFactory.Encoder.MessageVersion; + } + + return base.GetProperty<T>(); + } + + protected override void OnOpen(TimeSpan timeout) + { + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelListener OnBeginOpen"); + //// return null; + } + + protected override void OnEndOpen(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelListener OnEndOpen"); + } + + protected override bool OnWaitForChannel(TimeSpan timeout) + { + throw new NotImplementedException("AmqpChannelListener OnWaitForChannel"); + } + + protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelListener OnBeginWaitForChannel"); + } + + protected override bool OnEndWaitForChannel(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelListener OnEndWaitForChannel"); + } + + protected override IInputChannel OnAcceptChannel(TimeSpan timeout) + { + if (amqpTransportChannel == null) + { + amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, + new EndpointAddress(uri), messageEncoderFactory.Encoder, + maxBufferPoolSize, this.shared); + return (IInputChannel)(object) amqpTransportChannel; + } + + // TODO: remove "max one channel" restriction, add timeout processing + acceptWaitEvent.WaitOne(); + return null; + } + + protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) + { + return asyncOnAcceptCaller.BeginInvoke(timeout, callback, state); + } + + protected override IInputChannel OnEndAcceptChannel(IAsyncResult result) + { + return asyncOnAcceptCaller.EndInvoke(result); + } + + protected override void OnClose(TimeSpan timeout) + { + // TODO: (+ OnAbort) + } + + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + throw new NotImplementedException("AmqpChannelListener OnBeginClose"); + } + + protected override void OnEndClose(IAsyncResult result) + { + throw new NotImplementedException("AmqpChannelListener OnEndClose"); + } + + protected override void OnAbort() + { + // TODO: + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs new file mode 100644 index 0000000000..f23b8072e9 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -0,0 +1,145 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.ServiceModel.Description; + using Apache.Qpid.AmqpTypes; + + public class AmqpTransportBindingElement : TransportBindingElement + { + AmqpChannelProperties channelProperties; + bool shared; + + public AmqpTransportBindingElement() + { + // start with default properties + channelProperties = new AmqpChannelProperties(); + } + + protected AmqpTransportBindingElement(AmqpTransportBindingElement other) + : base(other) + { + this.channelProperties = other.channelProperties.Clone(); + this.shared = other.shared; + } + + public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) + { + if (context == null) + { + throw new ArgumentNullException("context"); + } + + return (IChannelFactory<TChannel>)(object)new AmqpChannelFactory<TChannel>(this, context); + } + + public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) + { + if (context == null) + { + throw new ArgumentNullException("context"); + } + + return (IChannelListener<TChannel>)(object)new AmqpChannelListener(this, context); + } + + + + public override bool CanBuildChannelFactory<TChannel>(BindingContext context) + { + return ((typeof(TChannel) == typeof(IOutputChannel)) || + (typeof(TChannel) == typeof(IInputChannel))); + } + + public override bool CanBuildChannelListener<TChannel>(BindingContext context) + { + return ((typeof(TChannel) == typeof(IInputChannel))); + } + + public override BindingElement Clone() + { + return new AmqpTransportBindingElement(this); + } + + internal AmqpChannelProperties ChannelProperties + { + get { return channelProperties; } + } + + public string BrokerHost + { + get { return this.channelProperties.BrokerHost; } + set { this.channelProperties.BrokerHost = value; } + } + + public int BrokerPort + { + get { return this.channelProperties.BrokerPort; } + set { this.channelProperties.BrokerPort = value; } + } + + public bool Shared + { + get { return this.shared; } + set { this.shared = value; } + } + + public TransferMode TransferMode + { + get { return this.channelProperties.TransferMode; } + set { this.channelProperties.TransferMode = value; } + } + + public AmqpProperties DefaultMessageProperties + { + get { return this.channelProperties.DefaultMessageProperties; } + + set { this.channelProperties.DefaultMessageProperties = value; } + } + + public override T GetProperty<T>(BindingContext context) + { + if (context == null) + { + throw new ArgumentNullException("context"); + } + + if (typeof(T) == typeof(MessageVersion)) + { + return (T)(object)MessageVersion.Default; + } + + + return context.GetInnerProperty<T>(); + } + + public override string Scheme + { + get + { + return AmqpConstants.Scheme; + } + } + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs new file mode 100644 index 0000000000..ca9c10be69 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -0,0 +1,592 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +// TODO: flow control +// timeout handling +// transactions +// check if should split into separate input and output classes (little overlap) + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Text; + using System.Threading; + using System.Globalization; + using System.Xml; + + // the thin interop layer that provides access to the Qpid AMQP client libraries + using Apache.Qpid.Interop; + using Apache.Qpid.AmqpTypes; + + /// <summary> + /// WCF client transport channel for accessing AMQP brokers using the Qpid C++ library + /// </summary> + public class AmqpTransportChannel : ChannelBase, IOutputChannel, IInputChannel + { + private static readonly EndpointAddress AnonymousAddress = + new EndpointAddress("http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous"); + + private EndpointAddress remoteAddress; + private MessageEncoder encoder; + private AmqpChannelProperties factoryChannelProperties; + private bool shared; + private string encoderContentType; + + // input = 0-10 queue, output = 0-10 exchange + private string queueName; + + private String routingKey; + private BufferManager bufferManager; + private AmqpProperties outputMessageProperties; + + private InputLink inputLink; + private OutputLink outputLink; + + private bool isInputChannel; + private bool streamed; + + private AsyncTimeSpanCaller asyncOpenCaller; + private AsyncTimeSpanCaller asyncCloseCaller; + + internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection) + : base(factory) + { + this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>); + + if (remoteAddress == null) + { + throw new ArgumentException("Null Endpoint Address"); + } + + this.factoryChannelProperties = channelProperties; + this.shared = sharedConnection; + this.remoteAddress = remoteAddress; + + // pull out host, port, queue, and connection arguments + this.ParseAmqpUri(remoteAddress.Uri); + + this.encoder = msgEncoder; + string ct = String.Empty; + if (this.encoder != null) + { + ct = this.encoder.ContentType; + if (ct != null) + { + int pos = ct.IndexOf(';'); + if (pos != -1) + { + ct = ct.Substring(0, pos).Trim(); + } + } + else + { + ct = "application/octet-stream"; + } + } + + this.encoderContentType = ct; + + if (this.factoryChannelProperties.TransferMode == TransferMode.Streamed) + { + this.streamed = true; + } + else + { + if (!(this.factoryChannelProperties.TransferMode == TransferMode.Buffered)) + { + throw new ArgumentException("TransferMode mode must be \"Streamed\" or \"Buffered\""); + } + + this.streamed = false; + } + + this.bufferManager = BufferManager.CreateBufferManager(maxBufferPoolSize, int.MaxValue); + + this.asyncOpenCaller = new AsyncTimeSpanCaller(this.OnOpen); + this.asyncCloseCaller = new AsyncTimeSpanCaller(this.OnClose); + + if (this.isInputChannel) + { + this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName); + } + else + { + this.outputLink = ConnectionManager.GetOutputLink(this.factoryChannelProperties, shared, false, this.queueName); + } + } + + private delegate bool AsyncTryReceiveCaller(TimeSpan timeout, out Message message); + + private delegate void AsyncTimeSpanCaller(TimeSpan timeout); + + EndpointAddress IOutputChannel.RemoteAddress + { + get + { + return this.remoteAddress; + } + } + + // i.e what you would insert into a ReplyTo header to reach + // here. Presumably should be exchange/link and routing info, + // rather than the actual input queue name. + EndpointAddress IInputChannel.LocalAddress + { + get + { + // TODO: something better + return AnonymousAddress; + } + } + + AmqpProperties OutputMessageProperties + { + get + { + if (this.outputMessageProperties == null) + { + this.outputMessageProperties = this.factoryChannelProperties.DefaultMessageProperties; + if (this.outputMessageProperties == null) + { + this.outputMessageProperties = new AmqpProperties(); + } + } + + return this.outputMessageProperties; + } + } + + Uri IOutputChannel.Via + { + get + { + return this.remoteAddress.Uri; + } + } + + public override T GetProperty<T>() + { + if (typeof(T) == typeof(IInputChannel)) + { + if (this.isInputChannel) + { + return (T)(object)this; + } + } + else if (typeof(T) == typeof(IOutputChannel)) + { + if (!this.isInputChannel) + { + return (T)(object)this; + } + } + + return base.GetProperty<T>(); + } + + public void Send(Message message, TimeSpan timeout) + { + this.ThrowIfDisposedOrNotOpen(); + AmqpChannelHelpers.ValidateTimeout(timeout); + + try + { + using (AmqpMessage amqpMessage = this.WcfToQpid(message)) + { + this.outputLink.Send(amqpMessage, timeout); + } + } + finally + { + message.Close(); + } + } + + public void Send(Message message) + { + this.Send(message, this.DefaultSendTimeout); + } + + public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state) + { + this.ThrowIfDisposedOrNotOpen(); + AmqpChannelHelpers.ValidateTimeout(timeout); + + try + { + using (AmqpMessage amqpMessage = this.WcfToQpid(message)) + { + return this.outputLink.BeginSend(amqpMessage, timeout, callback, state); + } + } + finally + { + message.Close(); + } + } + + public IAsyncResult BeginSend(Message message, AsyncCallback callback, object state) + { + return this.BeginSend(message, this.DefaultSendTimeout, callback, state); + } + + public void EndSend(IAsyncResult result) + { + this.outputLink.EndSend(result); + } + + public Message Receive(TimeSpan timeout) + { + Message message; + if (this.TryReceive(timeout, out message)) + { + return message; + } + else + { + throw new TimeoutException("Receive"); + } + } + + public Message Receive() + { + return this.Receive(this.DefaultReceiveTimeout); + } + + public bool TryReceive(TimeSpan timeout, out Message message) + { + this.ThrowIfDisposedOrNotOpen(); + AmqpMessage amqpMessage; + message = null; + + if (this.inputLink.TryReceive(timeout, out amqpMessage)) + { + message = this.QpidToWcf(amqpMessage); + return true; + } + + return false; + } + + public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.inputLink.BeginTryReceive(timeout, callback, state); + } + + public bool EndTryReceive(IAsyncResult result, out Message message) + { + AmqpMessage amqpMessage = null; + if (!this.inputLink.EndTryReceive(result, out amqpMessage)) + { + message = null; + return false; + } + message = QpidToWcf(amqpMessage); + return true; + } + + public bool WaitForMessage(TimeSpan timeout) + { + return this.inputLink.WaitForMessage(timeout); + } + + public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.inputLink.BeginTryReceive(timeout, callback, state); + } + + public IAsyncResult BeginReceive(AsyncCallback callback, object state) + { + return this.BeginReceive(this.DefaultReceiveTimeout, callback, state); + } + + public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.inputLink.BeginWaitForMessage(timeout, callback, state); + } + + public Message EndReceive(IAsyncResult result) + { + Message message; + if (this.EndTryReceive(result, out message)) + { + return message; + } + else + { + throw new TimeoutException("EndReceive"); + } + } + + public bool EndWaitForMessage(IAsyncResult result) + { + return this.inputLink.EndWaitForMessage(result); + } + + public void CloseEndPoint() + { + if (this.inputLink != null) + { + this.inputLink.Close(); + } + if (this.outputLink != null) + { + this.outputLink.Close(); + } + } + + /// <summary> + /// Open connection to Broker + /// </summary> + protected override void OnOpen(TimeSpan timeout) + { + // TODO: move open logic to here from constructor + } + + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.asyncOpenCaller.BeginInvoke(timeout, callback, state); + } + + protected override void OnEndOpen(IAsyncResult result) + { + this.asyncOpenCaller.EndInvoke(result); + } + + protected override void OnAbort() + { + //// TODO: check for network-less qpid teardown or launch special thread + this.Cleanup(); + } + + /// <summary> + /// Shutdown gracefully + /// </summary> + protected override void OnClose(TimeSpan timeout) + { + this.CloseEndPoint(); + this.Cleanup(); + } + + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + return this.asyncCloseCaller.BeginInvoke(timeout, callback, state); + } + + protected override void OnEndClose(IAsyncResult result) + { + this.asyncCloseCaller.EndInvoke(result); + } + + private AmqpMessage WcfToQpid(Message wcfMessage) + { + object obj; + AmqpProperties applicationProperties = null; + bool success = false; + AmqpMessage amqpMessage = null; + + if (wcfMessage.Properties.TryGetValue("AmqpProperties", out obj)) + { + applicationProperties = obj as AmqpProperties; + } + + try + { + AmqpProperties outgoingProperties = new AmqpProperties(); + + // Start with AMQP properties from the binding and the URI + if (this.factoryChannelProperties.DefaultMessageProperties != null) + { + outgoingProperties.MergeFrom(this.factoryChannelProperties.DefaultMessageProperties); + } + + if (this.routingKey != null) + { + outgoingProperties.RoutingKey = this.routingKey; + } + + // Add the Properties set by the application on this particular message. + // Application properties trump channel properties + if (applicationProperties != null) + { + outgoingProperties.MergeFrom(applicationProperties); + } + + amqpMessage = this.outputLink.CreateMessage(); + amqpMessage.Properties = outgoingProperties; + + // copy the WCF message body to the AMQP message body + if (this.streamed) + { + this.encoder.WriteMessage(wcfMessage, amqpMessage.BodyStream); + } + else + { + ArraySegment<byte> encodedBody = this.encoder.WriteMessage(wcfMessage, int.MaxValue, this.bufferManager); + try + { + amqpMessage.BodyStream.Write(encodedBody.Array, encodedBody.Offset, encodedBody.Count); + } + finally + { + this.bufferManager.ReturnBuffer(encodedBody.Array); + } + } + + success = true; + } + finally + { + if (!success && (amqpMessage != null)) + { + amqpMessage.Dispose(); + } + } + return amqpMessage; + } + + + private Message QpidToWcf(AmqpMessage amqpMessage) + { + if (amqpMessage == null) + { + return null; + } + + Message wcfMessage = null; + byte[] managedBuffer = null; + + try + { + if (this.streamed) + { + wcfMessage = this.encoder.ReadMessage(amqpMessage.BodyStream, int.MaxValue); + } + else + { + int count = (int)amqpMessage.BodyStream.Length; + managedBuffer = this.bufferManager.TakeBuffer(count); + int nr = amqpMessage.BodyStream.Read(managedBuffer, 0, count); + ArraySegment<byte> bufseg = new ArraySegment<byte>(managedBuffer, 0, count); + + wcfMessage = this.encoder.ReadMessage(bufseg, this.bufferManager); + + // set to null for finally{} block, since the encoder is now responsible for + // returning the BufferManager memory + managedBuffer = null; + } + + // This message will be discarded unless the "To" header matches + // the WCF endpoint dispatcher's address filter (or the service is + // AddressFilterMode=AddressFilterMode.Any). + + this.remoteAddress.ApplyTo(wcfMessage); + + if (amqpMessage.Properties != null) + { + wcfMessage.Properties.Add("AmqpProperties", amqpMessage.Properties); + } + } + catch (XmlException xmlException) + { + throw new ProtocolException( + "There is a problem with the XML that was received from the network. See inner exception for more details.", + xmlException); + } + catch (Exception e) + { + // TODO: logging + Console.WriteLine("TX channel encoder exception " + e); + } + finally + { + // close the amqpMessage unless the body will be read at a later time. + if (!this.streamed || wcfMessage == null) + { + amqpMessage.Close(); + } + + // the handoff to the encoder failed + if (managedBuffer != null) + { + this.bufferManager.ReturnBuffer(managedBuffer); + } + } + + return wcfMessage; + } + + private void Cleanup() + { + this.bufferManager.Clear(); + } + + // "amqp:queue1" | "amqp:stocks@broker1.com" | "amqp:queue3?routingkey=key" + private void ParseAmqpUri(Uri uri) + { + if (uri.Scheme != AmqpConstants.Scheme) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "The scheme {0} specified in address is not supported.", uri.Scheme), "uri"); + } + + this.queueName = uri.LocalPath; + + if ((this.queueName.IndexOf('@') != -1) && this.isInputChannel) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Invalid input queue name: \"{0}\" specified.", this.queueName), "uri"); + } + + // search out session parameters in the query portion of the URI + + string routingParseKey = "routingkey="; + char[] charSeparators = new char[] { '?', ';' }; + string[] args = uri.Query.Split(charSeparators, StringSplitOptions.RemoveEmptyEntries); + foreach (string s in args) + { + if (s.StartsWith(routingParseKey)) + { + this.routingKey = s.Substring(routingParseKey.Length); + } + } + + if (this.queueName == String.Empty) + { + if (this.isInputChannel) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "Empty queue target specifier not allowed."), "uri"); + } + else + { + if (this.routingKey == null) + { + throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, + "No target queue or routing key specified."), "uri"); + } + } + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj new file mode 100644 index 0000000000..0b04eba986 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/Channel.csproj @@ -0,0 +1,102 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.21022</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.Channel</RootNamespace>
+ <AssemblyName>Apache.Qpid.Channel</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ <StartupObject>
+ </StartupObject>
+ <SignAssembly>false</SignAssembly>
+ <AssemblyOriginatorKeyFile>
+ </AssemblyOriginatorKeyFile>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+ <ItemGroup>
+ <Compile Include="AmqpBinaryBinding.cs" />
+ <Compile Include="AmqpBinaryBindingCollectionElement.cs" />
+ <Compile Include="AmqpBinaryBindingConfigurationElement.cs" />
+ <Compile Include="AmqpChannelFactory.cs" />
+ <Compile Include="AmqpChannelHelpers.cs" />
+ <Compile Include="AmqpChannelListener.cs" />
+ <Compile Include="AmqpBinding.cs" />
+ <Compile Include="AmqpBindingCollectionElement.cs" />
+ <Compile Include="AmqpBindingConfigurationElement.cs" />
+ <Compile Include="AmqpTransportBindingElement.cs" />
+ <Compile Include="AmqpTransportChannel.cs" />
+ <Compile Include="ConnectionManager.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="RawMessage.cs" />
+ <Compile Include="RawMessageEncoder.cs" />
+ <Compile Include="RawMessageEncoderFactory.cs" />
+ <Compile Include="RawMessageEncodingBindingElement.cs" />
+ <Compile Include="RawXmlReader.cs" />
+ <Compile Include="RawXmlWriter.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.configuration" />
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.XML" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\Interop\Interop.vcproj">
+ <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+ <Name>Interop</Name>
+ </ProjectReference>
+ </ItemGroup>
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs new file mode 100644 index 0000000000..a63e5333f4 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/ConnectionManager.cs @@ -0,0 +1,266 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Threading; + + using Apache.Qpid.Interop; + + // The ConnectionManager looks after a shareable pool of AmqpConnection and AmqpSession + // objects. If two connection requests could be shared (see MakeKey() properties), and + // are designated as shareable, then they will be paired up. Each shared connection is + // a separate instance of a ManagedConnection. All unshared connections use a single + // instance of ManagedConnection with locking turned off. The ManagedConnection object + // registers for notifictation when a connection goes idle (all grandchild InputLink and + // OutputLink objects have been closed), and closes the connection. + + // TODO: the session sharing is roughed-in via comments but needs completing. + + internal sealed class ConnectionManager + { + // A side effect of creating InputLinks and OutputLinks is that counters + // in the respective AmqpSession and AmqpConnection are updated, so care + // must be taken to hold the lock across acquiring a session and opening + // a link on it. + + // one for each shared connection + private static Dictionary<string, ManagedConnection> sharedInstances; + + // this one creates and releases connections that are not shared. No locking required. + private static ManagedConnection unsharedInstance; + + // lock for finding or creating ManagedConnection instances + private static Object connectionLock; + + static ConnectionManager() + { + unsharedInstance = null; + sharedInstances = new Dictionary<string, ManagedConnection>(); + connectionLock = new Object(); + } + + private static string MakeKey(AmqpChannelProperties props) + { + return props.BrokerHost + ':' + props.BrokerPort + ':' + props.TransferMode; + } + + private static ManagedConnection GetManagedConnection(AmqpChannelProperties channelProperties, bool connectionSharing) + { + if (connectionSharing) + { + string key = MakeKey(channelProperties); + lock (connectionLock) + { + ManagedConnection mc = null; + if (!sharedInstances.TryGetValue(key, out mc)) + { + mc = new ManagedConnection(true); + sharedInstances.Add(key, mc); + } + return mc; + } + } + else + { + lock (connectionLock) + { + if (unsharedInstance == null) + { + unsharedInstance = new ManagedConnection(false); + } + return unsharedInstance; + } + } + } + + public static OutputLink GetOutputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname) + { + ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing); + return (OutputLink)mc.GetLink(channelProperties, sessionSharing, null, qname); + } + + public static InputLink GetInputLink(AmqpChannelProperties channelProperties, bool connectionSharing, bool sessionSharing, string qname) + { + ManagedConnection mc = GetManagedConnection(channelProperties, connectionSharing); + return (InputLink)mc.GetLink(channelProperties, sessionSharing, qname, null); + } + + + + class ManagedConnection + { + private Boolean shared; + private AmqpConnection sharedConnection; + //private Dictionary<string, AmqpSession> sharedSessions; + + public ManagedConnection(bool shared) + { + this.shared = shared; + } + + + public object GetLink(AmqpChannelProperties channelProperties, bool sessionSharing, string inputQueue, string outputQueue) + { + AmqpConnection connection = null; + AmqpSession session = null; + Object link = null; + bool newConnection = false; + //bool newSession = false; + bool success = false; + + // when called in the non-shared case, only stack variables should be used for holding connections/sessions/links + + if (this.shared) + { + Monitor.Enter(this); // lock + } + + try + { + if (this.shared) + { + // TODO: check shared connection not closed (i.e. network drop) and refresh this instance if needed + if (sessionSharing) + { + throw new NotImplementedException("shared session"); + /* * ... once we have a defined shared session config parameter: + + // lazilly create + if (this.sharedSessions == null) + { + this.sharedSessions = new Dictionary<string, AmqpSession>(); + } + + alreadydeclaredstring sessionKey = channelProperties.name_of_key_goes_here; + this.sharedSessions.TryGetValue(sessionKey, out session); + + * */ + } + + if (this.sharedConnection != null) + { + connection = this.sharedConnection; + } + } + + if (connection == null) + { + connection = new AmqpConnection(channelProperties.BrokerHost, channelProperties.BrokerPort); + newConnection = true; + if (this.shared) + { + connection.OnConnectionIdle += new ConnectionIdleEventHandler(this.IdleConnectionHandler); + } + else + { + connection.OnConnectionIdle += new ConnectionIdleEventHandler(UnsharedIdleConnectionHandler); + } + } + + if (session == null) + { + session = connection.CreateSession(); + //newSession = true; + } + + if (inputQueue != null) + { + link = session.CreateInputLink(inputQueue); + } + else + { + link = session.CreateOutputLink(outputQueue); + } + + if (this.shared) + { + if (newConnection) + { + this.sharedConnection = connection; + } + /* + if (newSession) + { + sharedSessions.Add(foo, session); + } + * */ + } + + success = true; + } + finally + { + if (this.shared) + { + Monitor.Exit(this); + } + if (!success) + { + /* + if (newSession) + { + session.Close(); + } + */ + if (newConnection) + { + connection.Close(); + } + } + } + + return link; + } + + + static void UnsharedIdleConnectionHandler(Object sender, EventArgs empty) + { + if (sender is AmqpConnection) + { + AmqpConnection connection = (AmqpConnection)sender; + connection.Close(); + } + } + + void IdleConnectionHandler(Object sender, EventArgs empty) + { + lock (this) + { + if (sharedConnection != sender || sharedConnection == null) + { + return; + } + if (!sharedConnection.IsIdle) + { + // Another thread made the connection busy again. + // That's OK. Another idle event will come along later. + return; + } + sharedConnection.Close(); // also closes all child sessions + sharedConnection = null; + //sharedSessions = null; + } + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs b/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..bc047d59b3 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/Properties/AssemblyInfo.cs @@ -0,0 +1,52 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Apache.Qpid.Channel")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("ac02bbb0-2c19-43fb-a36c-b1b0a50eaf1a")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs new file mode 100644 index 0000000000..5925fa47dc --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs @@ -0,0 +1,374 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.ServiceModel.Channels; + using System.Xml; + + // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes). + // If bodyBytes belongs to a BufferManager, we must return it when done. + // The pay-off is OnGetReaderAtBodyContents(). + // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery. + internal class RawMessage : Message + { + private MessageHeaders headers; + private MessageProperties properties; + private XmlDictionaryReaderQuotas readerQuotas; + private Stream bodyStream; + private byte[] bodyBytes; + private int index; + private int count; + private BufferManager bufferManager; + + public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports MessageEncoder.ReadMessage(ArraySegment<byte> b, BufferManager mgr, string contentType) + if (quotas == null) + { + quotas = new XmlDictionaryReaderQuotas(); + } + + this.headers = new MessageHeaders(MessageVersion.None); + this.properties = new MessageProperties(); + this.readerQuotas = quotas; + this.bodyBytes = buffer; + this.index = index; + this.count = count; + this.bufferManager = bufferManager; + } + + public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType) + if (quotas == null) + { + quotas = new XmlDictionaryReaderQuotas(); + } + + this.headers = new MessageHeaders(MessageVersion.None); + this.properties = new MessageProperties(); + this.bodyStream = stream; + } + + public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports internal needs for CreateBufferedCopy().CreateMessage() + this.headers = new MessageHeaders(headers); + this.properties = new MessageProperties(properties); + this.bodyBytes = bytes; + this.index = index; + this.count = count; + this.readerQuotas = quotas; + } + + public override MessageHeaders Headers + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return this.headers; + } + } + + public override bool IsEmpty + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return false; + } + } + + public override bool IsFault + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return false; + } + } + + public override MessageProperties Properties + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return this.properties; + } + } + + public override MessageVersion Version + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return MessageVersion.None; + } + } + + protected override void OnBodyToString(XmlDictionaryWriter writer) + { + if (this.bodyStream != null) + { + writer.WriteString("Stream"); + } + else + { + writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); + writer.WriteBase64(this.bodyBytes, this.index, this.count); + writer.WriteEndElement(); + } + } + + protected override void OnClose() + { + Exception deferEx = null; + try + { + base.OnClose(); + } + catch (Exception e) + { + deferEx = e; + } + + try + { + if (this.properties != null) + { + this.properties.Dispose(); + } + } + catch (Exception e) + { + if (deferEx == null) + { + deferEx = e; + } + } + + try + { + if (this.bufferManager != null) + { + this.bufferManager.ReturnBuffer(this.bodyBytes); + this.bufferManager = null; + } + } + catch (Exception e) + { + if (deferEx == null) + { + deferEx = e; + } + } + + if (deferEx != null) + { + throw deferEx; + } + } + + protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize) + { + if (this.bodyStream != null) + { + int len = (int)this.bodyStream.Length; + byte[] buf = new byte[len]; + this.bodyStream.Read(buf, 0, len); + this.bodyStream = null; + this.bodyBytes = buf; + this.count = len; + this.index = 0; + } + else + { + if (this.bufferManager != null) + { + // we could take steps to share the buffer among copies and release the memory + // after the last user finishes by a reference count or such, but we are already + // far from the intended optimized use. Make one GC managed memory copy that is + // shared by all. + byte[] buf = new byte[this.count]; + + Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count); + this.bufferManager.ReturnBuffer(this.bodyBytes); + this.bufferManager = null; + this.bodyBytes = buf; + this.index = 0; + } + } + + return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); + } + + protected override XmlDictionaryReader OnGetReaderAtBodyContents() + { + Stream readerStream = null; + bool ownsStream; + + if (this.bodyStream != null) + { + readerStream = this.bodyStream; + ownsStream = false; + } + else + { + // create stream for duration of XmlReader. + ownsStream = true; + if (this.bufferManager != null) + { + readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager); + this.bufferManager = null; + } + else + { + readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false); + } + } + + return new RawXmlReader(readerStream, this.readerQuotas, ownsStream); + } + + protected override void OnWriteBodyContents(XmlDictionaryWriter writer) + { + writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); + if (this.bodyStream != null) + { + int len = (int)this.bodyStream.Length; + byte[] buf = new byte[len]; + this.bodyStream.Read(buf, 0, len); + writer.WriteBase64(buf, 0, len); + } + else + { + writer.WriteBase64(this.bodyBytes, this.index, this.count); + } + + writer.WriteEndElement(); + } + + private class RawMemoryStream : MemoryStream + { + private BufferManager bufferManager; + private byte[] buffer; + + public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr) + : base(bytes, index, count, false) + { + this.bufferManager = mgr; + this.buffer = bytes; + } + + protected override void Dispose(bool disposing) + { + if (this.bufferManager != null) + { + try + { + this.bufferManager.ReturnBuffer(this.buffer); + } + finally + { + this.bufferManager = null; + base.Dispose(disposing); + } + } + } + } + + private class RawMessageBuffer : MessageBuffer + { + private bool closed; + private MessageHeaders headers; + private MessageProperties properties; + private byte[] bodyBytes; + private int index; + private int count; + private XmlDictionaryReaderQuotas readerQuotas; + + public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) + : base() + { + this.headers = new MessageHeaders(headers); + this.properties = new MessageProperties(properties); + this.bodyBytes = bytes; + this.index = index; + this.count = count; + this.readerQuotas = new XmlDictionaryReaderQuotas(); + quotas.CopyTo(this.readerQuotas); + } + + public override int BufferSize + { + get { return this.count; } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.headers = null; + if (this.properties != null) + { + this.properties.Dispose(); + this.properties = null; + } + + this.bodyBytes = null; + this.readerQuotas = null; + } + } + + public override Message CreateMessage() + { + if (this.closed) + { + throw new ObjectDisposedException("message"); + } + + return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs new file mode 100644 index 0000000000..76dae6f6c7 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoder.cs @@ -0,0 +1,113 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.ServiceModel.Channels; + using System.ServiceModel; + using System.Xml; + + + class RawMessageEncoder : MessageEncoder + { + public const string StreamElementName = "Binary"; + + XmlDictionaryReaderQuotas readerQuotas; + + public RawMessageEncoder(XmlDictionaryReaderQuotas quotas) + { + this.readerQuotas = new XmlDictionaryReaderQuotas(); + if (quotas != null) + { + quotas.CopyTo(this.readerQuotas); + } + } + + public override string ContentType + { + get { return null; } + } + + public override bool IsContentTypeSupported(string contentType) + { + return true; + } + + public override string MediaType + { + get { return null; } + } + + public override MessageVersion MessageVersion + { + get { return MessageVersion.None; } + } + + public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager, string contentType) + { + RawMessage message = new RawMessage(buffer.Array, buffer.Offset, buffer.Count, bufferManager, readerQuotas); + message.Properties.Encoder = this; + return message; + } + + public override Message ReadMessage(Stream stream, int maxSizeOfHeaders, string contentType) + { + RawMessage message = new RawMessage(stream, readerQuotas); + message.Properties.Encoder = this; + return message; + } + + private void CheckType(XmlDictionaryReader reader, XmlNodeType type) + { + if (reader.NodeType != type) + { + throw new System.IO.InvalidDataException(String.Format("RawMessageEncoder xml check {0} type should be {1}", type, reader.NodeType)); + } + } + + public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset) + { + MemoryStream tempStream = new MemoryStream(); + this.WriteMessage(message, tempStream); + int len = messageOffset + (int)tempStream.Length; + byte[] buf = bufferManager.TakeBuffer(len); + MemoryStream targetStream = new MemoryStream(buf); + if (messageOffset > 0) + { + targetStream.Seek(messageOffset, SeekOrigin.Begin); + } + + tempStream.WriteTo(targetStream); + targetStream.Close(); + + return new ArraySegment<byte>(buf, messageOffset, len - messageOffset); + } + + public override void WriteMessage(Message message, Stream stream) + { + using (XmlWriter writer = new RawXmlWriter(stream)) + { + message.WriteMessage(writer); + writer.Flush(); + } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs new file mode 100644 index 0000000000..5c015f9a1b --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncoderFactory.cs @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.Xml; + using System.ServiceModel.Channels; + + internal class RawMessageEncoderFactory : MessageEncoderFactory + { + RawMessageEncoder encoder; + + public RawMessageEncoderFactory(XmlDictionaryReaderQuotas quotas) + { + this.encoder = new RawMessageEncoder(quotas); + } + + public override MessageEncoder Encoder + { + get { return this.encoder; } + } + + public override MessageVersion MessageVersion + { + get { return encoder.MessageVersion; } + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs new file mode 100644 index 0000000000..5ec10a976d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessageEncodingBindingElement.cs @@ -0,0 +1,102 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.ServiceModel.Channels; + + public class RawMessageEncodingBindingElement : MessageEncodingBindingElement + { + + public RawMessageEncodingBindingElement() + : base() + { + } + + RawMessageEncodingBindingElement(RawMessageEncodingBindingElement originalBindingElement) + { + } + + public override MessageEncoderFactory CreateMessageEncoderFactory() + { + return new RawMessageEncoderFactory(null); + } + + + public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + context.BindingParameters.Add(this); + return context.BuildInnerChannelFactory<TChannel>(); + } + + public override bool CanBuildChannelFactory<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + return context.CanBuildInnerChannelFactory<TChannel>(); + } + + public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + context.BindingParameters.Add(this); + return context.BuildInnerChannelListener<TChannel>(); + } + + public override bool CanBuildChannelListener<TChannel>(BindingContext context) + { + if (context == null) + throw new ArgumentNullException("context"); + + context.BindingParameters.Add(this); + return context.CanBuildInnerChannelListener<TChannel>(); + } + + + public override BindingElement Clone() + { + return new RawMessageEncodingBindingElement(this); + } + + + + public override MessageVersion MessageVersion + { + get + { + return MessageVersion.None; + } + + set + { + if (value != MessageVersion.None) + throw new ArgumentException("Unsupported message version"); + } + } + + + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs new file mode 100644 index 0000000000..8fadfce441 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlReader.cs @@ -0,0 +1,353 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.Xml; + + internal class RawXmlReader : XmlDictionaryReader + { + ////this class presents a hardcoded XML InfoSet: "<rawtag>X</rawtag>" where X is the entire stream content + + private Stream stream; + private bool closed; + private bool streamOwner; + private ReaderPosition position; + private string contentAsBase64; + private XmlNameTable xmlNameTable; + private XmlDictionaryReaderQuotas readerQuotas; + + public RawXmlReader(Stream stream, XmlDictionaryReaderQuotas quotas, bool streamOwner) + { + this.stream = stream; + this.streamOwner = streamOwner; + if (quotas == null) + { + this.readerQuotas = new XmlDictionaryReaderQuotas(); + } + else + { + this.readerQuotas = quotas; + } + } + + private enum ReaderPosition + { + None, + StartElement, + Content, + EndElement, + EOF + } + + public override int AttributeCount + { + get { return 0; } + } + + public override string BaseURI + { + get { return string.Empty; } + } + + public override int Depth + { + get { return (this.position == ReaderPosition.Content) ? 1 : 0; } + } + + public override bool EOF + { + get { return this.position == ReaderPosition.EOF; } + } + + public override bool HasAttributes + { + get { return false; } + } + + public override bool HasValue + { + get { return this.position == ReaderPosition.Content; } + } + + public override bool IsEmptyElement + { + get { return false; } + } + + public override string LocalName + { + get + { + if (this.position == ReaderPosition.StartElement) + { + return RawMessageEncoder.StreamElementName; + } + + return null; + } + } + + public override string NamespaceURI + { + get { return string.Empty; } + } + + public override XmlNameTable NameTable + { + get + { + if (this.xmlNameTable == null) + { + this.xmlNameTable = new NameTable(); + this.xmlNameTable.Add(RawMessageEncoder.StreamElementName); + } + + return this.xmlNameTable; + } + } + + public override XmlNodeType NodeType + { + get + { + switch (this.position) + { + case ReaderPosition.StartElement: + return XmlNodeType.Element; + case ReaderPosition.Content: + return XmlNodeType.Text; + case ReaderPosition.EndElement: + return XmlNodeType.EndElement; + default: + // and StreamPosition.EOF + return XmlNodeType.None; + } + } + } + + public override string Prefix + { + get { return string.Empty; } + } + + public override ReadState ReadState + { + get + { + switch (this.position) + { + case ReaderPosition.None: + return ReadState.Initial; + case ReaderPosition.StartElement: + case ReaderPosition.Content: + case ReaderPosition.EndElement: + return ReadState.Interactive; + case ReaderPosition.EOF: + return ReadState.Closed; + default: + return ReadState.Error; + } + } + } + + public override string Value + { + get + { + switch (this.position) + { + case ReaderPosition.Content: + if (this.contentAsBase64 == null) + { + this.contentAsBase64 = Convert.ToBase64String(this.ReadContentAsBase64()); + } + + return this.contentAsBase64; + + default: + return string.Empty; + } + } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.position = ReaderPosition.EOF; + this.readerQuotas = null; + if (this.streamOwner) + { + this.stream.Close(); + } + } + } + + public override string GetAttribute(int i) + { + throw new ArgumentOutOfRangeException("i", i, "Argument not in set of valid values"); + } + + public override string GetAttribute(string name, string namespaceURI) + { + return null; + } + + public override string GetAttribute(string name) + { + return null; + } + + public override string LookupNamespace(string prefix) + { + if (prefix == string.Empty) + { + return string.Empty; + } + else if (prefix == "xml") + { + return "http://www.w3.org/XML/1998/namespace"; + } + else if (prefix == "xmlns") + { + return "http://www.w3.org/2000/xmlns/"; + } + else + { + return null; + } + } + + public override bool MoveToAttribute(string name, string ns) + { + return false; + } + + public override bool MoveToAttribute(string name) + { + return false; + } + + public override bool MoveToElement() + { + if (this.position == ReaderPosition.None) + { + this.position = ReaderPosition.StartElement; + return true; + } + + return false; + } + + public override bool MoveToFirstAttribute() + { + return false; + } + + public override bool MoveToNextAttribute() + { + return false; + } + + public override bool Read() + { + switch (this.position) + { + case ReaderPosition.None: + this.position = ReaderPosition.StartElement; + return true; + case ReaderPosition.StartElement: + this.position = ReaderPosition.Content; + return true; + case ReaderPosition.Content: + this.position = ReaderPosition.EndElement; + return true; + case ReaderPosition.EndElement: + this.position = ReaderPosition.EOF; + return false; + case ReaderPosition.EOF: + return false; + default: + return false; + } + } + + public override bool ReadAttributeValue() + { + return false; + } + + public override int ReadContentAsBase64(byte[] buffer, int index, int count) + { + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + if (this.position != ReaderPosition.Content) + { + throw new InvalidOperationException("XML reader not in Element"); + } + + if (count == 0) + { + return 0; + } + + int readCount = this.stream.Read(buffer, index, count); + if (readCount == 0) + { + this.position = ReaderPosition.EndElement; + } + + return readCount; + } + + public override int ReadContentAsBinHex(byte[] buffer, int index, int count) + { + throw new NotSupportedException(); + } + + public override void ResolveEntity() + { + throw new NotSupportedException(); + } + + public override bool TryGetBase64ContentLength(out int length) + { + // The whole stream is this one element + if (!this.closed && this.stream.CanSeek) + { + long streamLength = this.stream.Length; + if (streamLength <= int.MaxValue) + { + length = (int)streamLength; + return true; + } + } + + length = -1; + return false; + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs new file mode 100644 index 0000000000..7d05b70807 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawXmlWriter.cs @@ -0,0 +1,221 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.Xml; + + internal sealed class RawXmlWriter : XmlDictionaryWriter + { + + WriteState state; + Stream stream; + bool closed; + bool rawWritingEnabled; + + public RawXmlWriter(Stream stream) + { + if (stream == null) + { + throw new ArgumentNullException("Stream"); + } + + this.stream = stream; + this.state = WriteState.Start; + } + + public override WriteState WriteState + { + get + { + return this.state; + } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.state = WriteState.Closed; + this.rawWritingEnabled = false; + } + } + + public override void Flush() + { + this.ThrowIfClosed(); + this.stream.Flush(); + } + + public override string LookupPrefix(string ns) + { + return null; + } + + public override void WriteBase64(byte[] buffer, int index, int count) + { + if (buffer == null) + { + throw new ArgumentNullException("buffer"); + } + + ThrowIfClosed(); + + if (!this.rawWritingEnabled) + { + throw new InvalidOperationException("XmlWriter not in Element"); + } + + this.stream.Write(buffer, index, count); + this.state = WriteState.Content; + } + + public override void WriteStartElement(string prefix, string localName, string ns) + { + ThrowIfClosed(); + if (this.state != WriteState.Start) + { + throw new InvalidOperationException("Start Element Already Called"); + } + + if (!string.IsNullOrEmpty(prefix) || !string.IsNullOrEmpty(ns) || localName != RawMessageEncoder.StreamElementName) + { + throw new XmlException("Wrong XML Start Element Name"); + } + this.state = WriteState.Element; + this.rawWritingEnabled = true; + } + + public override void WriteEndElement() + { + ThrowIfClosed(); + if (!this.rawWritingEnabled) + { + throw new InvalidOperationException("Unexpected End Element"); + } + this.rawWritingEnabled = false; + } + + public override void WriteFullEndElement() + { + this.WriteEndElement(); + } + + public override void WriteEndDocument() + { + this.rawWritingEnabled = false; + this.ThrowIfClosed(); + } + + public override void WriteStartDocument() + { + this.rawWritingEnabled = false; + this.ThrowIfClosed(); + } + + public override void WriteStartDocument(bool standalone) + { + this.rawWritingEnabled = false; + this.ThrowIfClosed(); + } + + private void ThrowIfClosed() + { + if (this.closed) + { + throw new InvalidOperationException("XML Writer closed"); + } + } + + + public override void WriteString(string text) + { + throw new NotSupportedException(); + } + + public override void WriteCData(string text) + { + throw new NotSupportedException(); + } + + public override void WriteCharEntity(char ch) + { + throw new NotSupportedException(); + } + + public override void WriteChars(char[] buffer, int index, int count) + { + throw new NotSupportedException(); + } + + public override void WriteComment(string text) + { + throw new NotSupportedException(); + } + + public override void WriteDocType(string name, string pubid, string sysid, string subset) + { + throw new NotSupportedException(); + } + + public override void WriteEndAttribute() + { + throw new NotSupportedException(); + } + + public override void WriteEntityRef(string name) + { + throw new NotSupportedException(); + } + + + public override void WriteProcessingInstruction(string name, string text) + { + throw new NotSupportedException(); + } + + public override void WriteRaw(string data) + { + throw new NotSupportedException(); + } + + public override void WriteRaw(char[] buffer, int index, int count) + { + throw new NotSupportedException(); + } + + public override void WriteStartAttribute(string prefix, string localName, string ns) + { + throw new NotSupportedException(); + } + + public override void WriteSurrogateCharEntity(char lowChar, char highChar) + { + throw new NotSupportedException(); + } + + public override void WriteWhitespace(string ws) + { + throw new NotSupportedException(); + } + } +} diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp new file mode 100644 index 0000000000..02d6c7ab18 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.cpp @@ -0,0 +1,165 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/framing/FrameSet.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + + +// Note on locks: Use "this" for fast counting and idle/busy +// notifications. Use the "sessions" list to serialize session +// creation/reaping and overall tear down. +// TODO: switch "this" lock to separate non-visible Object. + + +AmqpConnection::AmqpConnection(String^ server, int port) : + connectionp(NULL), + busyCount(0), + disposed(false) +{ + bool success = false; + System::Exception^ openException = nullptr; + sessions = gcnew Collections::Generic::List<AmqpSession^>(); + + try { + connectionp = new Connection; + connectionp->open (QpidMarshal::ToNative(server), port); + // TODO: registerFailureCallback for failover + success = true; + const ConnectionSettings& settings = connectionp->getNegotiatedSettings(); + this->maxFrameSize = settings.maxFrameSize; + } catch (const qpid::Exception& error) { + String^ errmsg = gcnew String(error.what()); + openException = gcnew QpidException(errmsg); + } finally { + if (!success) { + Cleanup(); + if (openException == nullptr) { + openException = gcnew QpidException ("unknown connection failure"); + } + throw openException; + } + } +} + +void AmqpConnection::Cleanup() +{ + { + lock l(sessions); + if (disposed) + return; + disposed = true; + } + + try { + // let the child sessions clean up + for each(AmqpSession^ s in sessions) { + s->ConnectionClosed(); + } + } + finally + { + if (connectionp != NULL) { + connectionp->close(); + delete connectionp; + connectionp = NULL; + } + } +} + +AmqpConnection::~AmqpConnection() +{ + Cleanup(); +} + +AmqpConnection::!AmqpConnection() +{ + Cleanup(); +} + +void AmqpConnection::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +AmqpSession^ AmqpConnection::CreateSession() +{ + lock l(sessions); + if (disposed) { + throw gcnew ObjectDisposedException("AmqpConnection"); + } + AmqpSession^ session = gcnew AmqpSession(this, connectionp); + sessions->Add(session); + return session; +} + +// called whenever a child session becomes newly busy (a first reader or writer since last idle) + +void AmqpConnection::NotifyBusy() +{ + bool changed = false; + { + lock l(this); + if (busyCount++ == 0) + changed = true; + } +} + +// called whenever a child session becomes newly idle (a last reader or writer has closed) +// The connection is idle when none of its child sessions are busy + +void AmqpConnection::NotifyIdle() +{ + bool connectionIdle = false; + { + lock l(this); + if (--busyCount == 0) + connectionIdle = true; + } + if (connectionIdle) { + OnConnectionIdle(this, System::EventArgs::Empty); + } +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h new file mode 100644 index 0000000000..2641391e82 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpConnection.h @@ -0,0 +1,71 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace std; +using namespace qpid::client; + +ref class AmqpSession; + +public delegate void ConnectionIdleEventHandler(Object^ sender, EventArgs^ eventArgs); + +public ref class AmqpConnection +{ +private: + Connection* connectionp; + void Cleanup(); + bool disposed; + Collections::Generic::List<AmqpSession^>^ sessions; + bool isOpen; + int busyCount; + int maxFrameSize; + + internal: + void NotifyBusy(); + void NotifyIdle(); + + property int MaxFrameSize { + int get () { return maxFrameSize; } + } + +public: + AmqpConnection(System::String^ server, int port); + ~AmqpConnection(); + !AmqpConnection(); + void Close(); + AmqpSession^ CreateSession(); + event ConnectionIdleEventHandler^ OnConnectionIdle; + + property bool IsOpen { + bool get() { return isOpen; } + }; + + property bool IsIdle { + bool get() { return (busyCount == 0); } + } +}; + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp new file mode 100644 index 0000000000..5c333aff60 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.cpp @@ -0,0 +1,76 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/AMQFrame.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + +using namespace Apache::Qpid::AmqpTypes; + +AmqpMessage::AmqpMessage(MessageBodyStream ^mbs) : + messageBodyStream(mbs), + disposed(false) +{ +} + +void AmqpMessage::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + messageBodyStream->Close(); +} + +AmqpMessage::~AmqpMessage() +{ + Cleanup(); +} + +AmqpMessage::!AmqpMessage() +{ + Cleanup(); +} + +void AmqpMessage::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h new file mode 100644 index 0000000000..f0801d30dc --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpMessage.h @@ -0,0 +1,61 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + + + +public ref class AmqpMessage +{ +private: + MessageBodyStream^ messageBodyStream; + AmqpTypes::AmqpProperties^ amqpProperties; + bool disposed; + void Cleanup(); + +internal: + AmqpMessage(MessageBodyStream ^bstream); + +public: + ~AmqpMessage(); + !AmqpMessage(); + void Close(); + + property AmqpTypes::AmqpProperties^ Properties { + AmqpTypes::AmqpProperties^ get () { return amqpProperties; } + void set(AmqpTypes::AmqpProperties^ p) { amqpProperties = p; } + } + + property System::IO::Stream^ BodyStream { + System::IO::Stream^ get() { return messageBodyStream; } + } +}; + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp new file mode 100644 index 0000000000..bab73da74e --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.cpp @@ -0,0 +1,287 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/Message.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/client/Future.h" + +#include "AmqpConnection.h" +#include "AmqpSession.h" +#include "AmqpMessage.h" +#include "MessageBodyStream.h" +#include "InputLink.h" +#include "OutputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + + +AmqpSession::AmqpSession(AmqpConnection^ conn, qpid::client::Connection* qpidConnectionp) : + connection(conn), + sessionp(NULL), + sessionImplp(NULL), + subs_mgrp(NULL), + openCount(0) +{ + bool success = false; + + try { + sessionp = new qpid::client::AsyncSession; + *sessionp = qpidConnectionp->newSession(); + subs_mgrp = new SubscriptionManager (*sessionp); + success = true; + waiters = gcnew Collections::Generic::List<CompletionWaiter^>(); + } finally { + if (!success) { + Cleanup(); + throw gcnew QpidException ("session creation failure"); + } + } +} + + +void AmqpSession::Cleanup() +{ + if (subscriptionp != NULL) { + subscriptionp->cancel(); + delete subscriptionp; + subscriptionp=NULL; + } + + if (subs_mgrp != NULL) { + subs_mgrp->stop(); + delete subs_mgrp; + subs_mgrp = NULL; + } + + if (localQueuep != NULL) { + delete localQueuep; + localQueuep = NULL; + } + + if (sessionp != NULL) { + sessionp->close(); + delete sessionp; + sessionp = NULL; + sessionImplp = NULL; + } + + if (connectionp != NULL) { + connectionp->close(); + delete connectionp; + connectionp = NULL; + } +} + + +// Called by the parent AmqpConnection + +void AmqpSession::ConnectionClosed() +{ + Cleanup(); +} + +InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue) +{ + return CreateInputLink(sourceQueue, true, false, nullptr, nullptr); +} + +InputLink^ AmqpSession::CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, + System::String^ filterKey, System::String^ exchange) +{ + InputLink^ link = gcnew InputLink (this, sourceQueue, sessionp, subs_mgrp, exclusive, temporary, filterKey, exchange); + { + lock l(waiters); + if (openCount == 0) { + connection->NotifyBusy(); + } + openCount++; + } + return link; +} + +OutputLink^ AmqpSession::CreateOutputLink(System::String^ targetQueue) +{ + OutputLink^ link = gcnew OutputLink (this, targetQueue); + + lock l(waiters); + + if (sessionImplp == NULL) { + // not needed unless sending messages + SessionBase_0_10Access sa(*sessionp); + boost::shared_ptr<SessionImpl> sip = sa.get(); + sessionImplp = sip.get(); + } + + if (openCount == 0) { + connection->NotifyBusy(); + } + openCount++; + + return link; +} + + +// called whenever a child InputLink or OutputLink is closed or finalized +void AmqpSession::NotifyClosed() +{ + lock l(waiters); + openCount--; + if (openCount == 0) { + connection->NotifyIdle(); + } +} + + +CompletionWaiter^ AmqpSession::SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state) +{ + lock l(waiters); + if (sessionp == NULL) + throw gcnew ObjectDisposedException("Send"); + + // create an AMQP message.transfer command to use with the partial frameset from the MessageBodyStream + + std::string exname = QpidMarshal::ToNative(queue); + FrameSet *framesetp = (FrameSet *) mbody->GetFrameSet().ToPointer(); + uint8_t acceptMode=1; + uint8_t acquireMode=0; + MessageTransferBody mtcmd(ProtocolVersion(0,10), exname, acceptMode, acquireMode); + // ask for a command completion + mtcmd.setSync(true); + + //send it + + Future *futurep = NULL; + try { + futurep = new Future(sessionImplp->send(mtcmd, *framesetp)); + + CompletionWaiter^ waiter = nullptr; + if (async || (timeout != TimeSpan::MaxValue)) { + waiter = gcnew CompletionWaiter(this, timeout, (IntPtr) futurep, callback, state); + // waiter is responsible for releasing the Future native resource + futurep = NULL; + addWaiter(waiter); + } + + l.release(); + + if (waiter != nullptr) + return waiter; + + // synchronous send with no timeout: no need to involve the asyncHelper thread + + internalWaitForCompletion((IntPtr) futurep); + } + finally { + if (futurep != NULL) + delete (futurep); + } + return nullptr; +} + +void AmqpSession::Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey) +{ + sessionp->exchangeBind(arg::queue=QpidMarshal::ToNative(queue), + arg::exchange=QpidMarshal::ToNative(exchange), + arg::bindingKey=QpidMarshal::ToNative(filterKey)); + +} + + +void AmqpSession::internalWaitForCompletion(IntPtr fp) +{ + lock l(waiters); + if (sessionp == NULL) + throw gcnew ObjectDisposedException("AmqpSession"); + + // increment the smart pointer count to sessionImplp to guard agains async close + Session sessionCopy(*sessionp); + + l.release(); + // Qpid native lib call to wait for the command completion + ((Future *)fp.ToPointer())->wait(*sessionImplp); +} + +// call with lock held +void AmqpSession::addWaiter(CompletionWaiter^ waiter) +{ + waiters->Add(waiter); + if (!helperRunning) { + helperRunning = true; + ThreadPool::QueueUserWorkItem(gcnew WaitCallback(this, &AmqpSession::asyncHelper)); + } +} + + +void AmqpSession::removeWaiter(CompletionWaiter^ waiter) +{ + // a waiter can be removed from anywhere in the list if timed out + + lock l(waiters); + int idx = waiters->IndexOf(waiter); + if (idx == -1) { + // TODO: assert or log + } + else { + waiters->RemoveAt(idx); + } +} + + +// process CompletionWaiter list one at a time. + +void AmqpSession::asyncHelper(Object ^unused) +{ + lock l(waiters); + + while (true) { + if (waiters->Count == 0) { + helperRunning = false; + return; + } + + CompletionWaiter^ waiter = waiters[0]; + l.release(); + // can block, but for short time + // the waiter removes itself from the list, possibly as the timer thread on timeout + waiter->Run(); + l.acquire(); + } +} + + +}}} // namespace Apache::Qpid::Cli diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h new file mode 100644 index 0000000000..b959a4123a --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AmqpSession.h @@ -0,0 +1,80 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +#include "AmqpConnection.h" +#include "MessageBodyStream.h" +#include "CompletionWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + +ref class InputLink; +ref class OutputLink; + +public ref class AmqpSession +{ +private: + AmqpConnection^ connection; + Connection* connectionp; + AsyncSession* sessionp; + SessionImpl* sessionImplp; + SubscriptionManager* subs_mgrp; + Subscription* subscriptionp; + LocalQueue* localQueuep; + Collections::Generic::List<CompletionWaiter^>^ waiters; + bool helperRunning; + int openCount; + + void Cleanup(); + void asyncHelper(Object ^); + void addWaiter(CompletionWaiter^ waiter); + +public: + OutputLink^ CreateOutputLink(System::String^ targetQueue); + InputLink^ CreateInputLink(System::String^ sourceQueue); + + // 0-10 specific support + InputLink^ CreateInputLink(System::String^ sourceQueue, bool exclusive, bool temporary, System::String^ filterKey, System::String^ exchange); + void Bind(System::String^ queue, System::String^ exchange, System::String^ filterKey); + +internal: + AmqpSession(AmqpConnection^ connection, qpid::client::Connection* qpidConnection); + void NotifyClosed(); + CompletionWaiter^ SendMessage (System::String^ queue, MessageBodyStream ^mbody, TimeSpan timeout, bool async, AsyncCallback^ callback, Object^ state); + void ConnectionClosed(); + void internalWaitForCompletion(IntPtr Future); + void removeWaiter(CompletionWaiter^ waiter); + + property AmqpConnection^ Connection { + AmqpConnection^ get () { return connection; } + } + + +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp new file mode 100644 index 0000000000..91c23ae30a --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/AssemblyInfo.cpp @@ -0,0 +1,57 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using namespace System; +using namespace System::Reflection; +using namespace System::Runtime::CompilerServices; +using namespace System::Runtime::InteropServices; +using namespace System::Security::Permissions; + +// +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +// +[assembly:AssemblyTitleAttribute("Apache.Qpid.Interop")]; +[assembly:AssemblyDescriptionAttribute("")]; +[assembly:AssemblyConfigurationAttribute("")]; +[assembly:AssemblyCompanyAttribute("")]; +[assembly:AssemblyProductAttribute("")]; +[assembly:AssemblyCopyrightAttribute("")]; +[assembly:AssemblyTrademarkAttribute("")]; +[assembly:AssemblyCultureAttribute("")]; + +// +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the value or you can default the Revision and Build Numbers +// by using the '*' as shown below: + +[assembly:AssemblyVersionAttribute("1.0.*")]; + +[assembly:ComVisible(false)]; + +[assembly:CLSCompliantAttribute(true)]; + +[assembly:SecurityPermission(SecurityAction::RequestMinimum, UnmanagedCode = true)]; diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp new file mode 100644 index 0000000000..4f6746828d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.cpp @@ -0,0 +1,143 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "CompletionWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + +// A class to provide IAsyncResult semantics for a qpid AsyncSession command (i.e. 0-10 messageTransfer) +// when the client session receives a "Completion" notification from the Broker. + + +CompletionWaiter::CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback^ callback, Object^ state) +{ + this->qpidFuture = future; + this->asyncCallback = callback; + this->state = state; + if (timeSpan != TimeSpan::MaxValue) { + this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); + } + this->parent = parent; + this->thisLock = gcnew Object(); +} + + +void CompletionWaiter::WaitForCompletion() +{ + if (isCompleted) + return; + + lock l(thisLock); + while (!isCompleted) { + Monitor::Wait(thisLock); + } +} + +void CompletionWaiter::Run() +{ + // no locks required in this method + if (isCompleted) + return; + + try { + // Wait for the arrival of the "AMQP Completion" indication from the Broker + parent->internalWaitForCompletion(qpidFuture); + } + catch (System::Exception^ e) { + runException = e; + } + finally { + delete(qpidFuture.ToPointer()); + qpidFuture = (IntPtr) NULL; + } + + if (timer != nullptr) { + timer->~Timer(); + timer = nullptr; + } + + Complete(false); +} + + +// "Complete" here means complete the AsyncResult, which may precede broker "command completion" if timed out + +void CompletionWaiter::Complete(bool isTimerThread) +{ + lock l(thisLock); + if (isCompleted) + return; + + isCompleted = true; + if (isTimerThread) + timedOut = true; + + Monitor::PulseAll(thisLock); + + // do this check and signal while locked + if (asyncWaitHandle != nullptr) + asyncWaitHandle->Set(); + + l.release(); + + parent->removeWaiter(this); + + if (asyncCallback != nullptr) { + // guard against application callback exception + try { + asyncCallback(this); + } + catch (System::Exception^) { + // log it? + } + } +} + + +void CompletionWaiter::TimeoutCallback(Object^ state) +{ + CompletionWaiter^ waiter = (CompletionWaiter^) state; + waiter->Complete(true); +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h new file mode 100644 index 0000000000..197ac632b0 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/CompletionWaiter.h @@ -0,0 +1,99 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; + +public ref class CompletionWaiter : IAsyncResult +{ +private: + bool timedOut; + // has an owner thread + bool assigned; + // can Run (i.e. earlier CompletionWaiters in the queue have completed) + System::Exception^ runException; + AsyncCallback^ asyncCallback; + Threading::Timer ^timer; + bool isCompleted; + Object^ state; + Object^ thisLock; + ManualResetEvent^ asyncWaitHandle; + AmqpSession^ parent; + IntPtr qpidFuture; + void Complete(bool isTimerThread); + static void TimeoutCallback(Object^ state); + static TimerCallback^ timeoutCallback = gcnew TimerCallback(CompletionWaiter::TimeoutCallback); + + internal: + CompletionWaiter(AmqpSession^ parent, TimeSpan timeSpan, IntPtr future, AsyncCallback ^callback, Object^ state); + + void Run(); + void WaitForCompletion(); + + property bool Assigned { + bool get () { return assigned; } + } + + property bool TimedOut { + bool get () { return timedOut; } + } + + + public: + + virtual property bool IsCompleted { + bool get () { return isCompleted; } + } + + virtual property bool CompletedSynchronously { + bool get () { return false; } + } + + virtual property WaitHandle^ AsyncWaitHandle { + WaitHandle^ get () { + if (asyncWaitHandle != nullptr) { + return asyncWaitHandle; + } + + msclr::lock l(thisLock); + if (asyncWaitHandle == nullptr) { + asyncWaitHandle = gcnew ManualResetEvent(isCompleted); + } + return asyncWaitHandle; + } + } + + + virtual property Object^ AsyncState { + Object^ get () { return state; } + } + + + + +}; + +}}} // namespace Apache::Qpid::Interop + diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp new file mode 100644 index 0000000000..cee394b05d --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.cpp @@ -0,0 +1,685 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "QpidMarshal.h" +#include "QpidException.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +using namespace Apache::Qpid::AmqpTypes; + +// Scalability note: When using async methods, an async helper thread is created +// to block on the Demux BlockingQueue. This design should be revised in line +// with proposed changes to the native library to reduce the number of servicing +// threads for large numbers of subscriptions. + + +// The folowing def must match the "Frames" private typedef. +// TODO, make Qpid-cpp "Frames" definition visible. +typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; + +InputLink::InputLink(AmqpSession^ session, System::String^ sourceQueue, + qpid::client::AsyncSession *qpidSessionp, qpid::client::SubscriptionManager *qpidSubsMgrp, + bool exclusive, + bool temporary, System::String^ filterKey, System::String^ exchange) : + amqpSession(session), + subscriptionp(NULL), + localQueuep(NULL), + queuePtrp(NULL), + dequeuedFrameSetpp(NULL), + disposed(false), + finalizing(false) +{ + bool success = false; + System::Exception^ linkException = nullptr; + + waiters = gcnew Collections::Generic::List<MessageWaiter^>(); + + try { + std::string qname = QpidMarshal::ToNative(sourceQueue); + + if (temporary) { + qpidSessionp->queueDeclare(arg::queue=qname, arg::durable=false, arg::autoDelete=true, arg::exclusive=true); + qpidSessionp->exchangeBind(arg::exchange=QpidMarshal::ToNative(exchange), + arg::queue=qname, arg::bindingKey=QpidMarshal::ToNative(filterKey)); + qpidSessionp->sync(); + } + + localQueuep = new LocalQueue; + SubscriptionSettings settings; + settings.flowControl = FlowControl::messageCredit(0); + Subscription sub = qpidSubsMgrp->subscribe(*localQueuep, qname, settings); + subscriptionp = new Subscription (sub); // copy smart pointer for later IDisposable cleanup + + // the roundabout way to obtain localQueuep->queue + SessionBase_0_10Access sa(*qpidSessionp); + boost::shared_ptr<SessionImpl> simpl = sa.get(); + queuePtrp = new Demux::QueuePtr(simpl->getDemux().get(sub.getName())); + + success = true; + } finally { + if (!success) { + Cleanup(); + linkException = gcnew QpidException ("InputLink creation failure"); + throw linkException; + } + } +} + +void InputLink::ReleaseNative() +{ + // involves talking to the Broker unless the connection is broken + if (subscriptionp != NULL) { + try { + subscriptionp->cancel(); + } + catch (const std::exception& error) { + // TODO: log this properly + std::cout << "shutdown error " << error.what() << std::endl; + } + } + + // free native mem (or smart pointers) that we own + if (subscriptionp != NULL) + delete subscriptionp; + if (queuePtrp != NULL) + delete queuePtrp; + if (localQueuep != NULL) + delete localQueuep; + if (dequeuedFrameSetpp != NULL) + delete dequeuedFrameSetpp; +} + +void InputLink::Cleanup() +{ + { + lock l(waiters); + if (disposed) + return; + + disposed = true; + + // if the asyncHelper exists and is idle, unblock it + if (asyncHelperWaitHandle != nullptr) { + asyncHelperWaitHandle->Set(); + } + + // wakeup anyone waiting for messages + if (queuePtrp != NULL) + (*queuePtrp)->close(); + + try {} + finally + { + ReleaseNative(); + } + + } + amqpSession->NotifyClosed(); +} + +InputLink::~InputLink() +{ + Cleanup(); +} + +InputLink::!InputLink() +{ + Cleanup(); +} + +void InputLink::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + +// call with lock held +bool InputLink::haveMessage() +{ + if (dequeuedFrameSetpp != NULL) + return true; + + if (queuePtrp != NULL) { + if ((*queuePtrp)->size() > 0) + return true; + } + return false; +} + +IntPtr InputLink::nextLocalMessage() +{ + lock l(waiters); + if (disposed) + return (IntPtr) NULL; + + // A message already pulled off BlockingQueue? + if (dequeuedFrameSetpp != NULL) { + QpidFrameSetPtr* rv = dequeuedFrameSetpp; + dequeuedFrameSetpp = NULL; + return (IntPtr) rv; + } + + if ((*queuePtrp)->empty()) + return (IntPtr) NULL; + + bool received = false; + QpidFrameSetPtr* frameSetpp = new QpidFrameSetPtr; + + try { + received = (*queuePtrp)->pop(*frameSetpp, qpid::sys::TIME_INFINITE); + if (received) { + QpidFrameSetPtr* rv = frameSetpp; + // no need to free native in finally block + frameSetpp = NULL; + return (IntPtr) rv; + } + } catch(const std::exception& error) { + // should be no async tampering with queue since we hold the lock and have a + // smart pointer ref to the native LocalQueue, even if the network connection fails... + cout << "unknown exception in InputLink.nextLocalMessage() " << error.what() <<endl; + // TODO: log this + } + finally { + if (frameSetpp != NULL) { + delete frameSetpp; + } + } + + return (IntPtr) NULL; +} + + + +void InputLink::unblockWaiter() +{ + // to be followed by resetQueue() below + lock l(waiters); + if (disposed) + return; + (*queuePtrp)->close(); +} + + + +// Set things right after unblockWaiter(). Closing and opening a Qpid BlockingQueue unsticks +// a blocking thread without interefering with queue contents or the ability to push +// new incoming messages. + +void InputLink::resetQueue() +{ + lock l(waiters); + if (disposed) + return; + if ((*queuePtrp)->isClosed()) { + (*queuePtrp)->open(); + } +} + + +// returns true if there is a message to consume, i.e. nextLocalMessage() won't block + +bool InputLink::internalWaitForMessage() +{ + Demux::QueuePtr demuxQueuePtr; + + bool received = false; + QpidFrameSetPtr* frameSetpp = NULL; + try { + lock l(waiters); + if (disposed) + return false; + if (haveMessage()) + return true; + + // TODO: prefetch window of messages, compatible with both 0-10 and 1.0. + subscriptionp->grantMessageCredit(1); + + // get a scoped smart ptr ref to guard against async close or hangup + demuxQueuePtr = *queuePtrp; + frameSetpp = new QpidFrameSetPtr; + + l.release(); + // Async cleanup is now possible. Only use demuxQueuePtr until lock reacquired. + received = demuxQueuePtr->pop(*frameSetpp, qpid::sys::TIME_INFINITE); + l.acquire(); + + if (received) { + dequeuedFrameSetpp = frameSetpp; + frameSetpp = NULL; // native will eventually be freed in Cleanup or MessageBodyStream + } + + return true; + } catch(const std::exception& ) { + // timeout or connection closed + return false; + } + finally { + if (frameSetpp != NULL) { + delete frameSetpp; + } + } + + return false; +} + + +// call with lock held +void InputLink::addWaiter(MessageWaiter^ waiter) +{ + waiters->Add(waiter); + if (waiters->Count == 1) { + // mark this waiter as ready to run + // Only the waiter at the head of the queue is active. + waiter->Activate(); + } + + if (waiter->Assigned) + return; + + if (asyncHelperWaitHandle == nullptr) { + asyncHelperWaitHandle = gcnew ManualResetEvent(false); + ThreadStart^ threadDelegate = gcnew ThreadStart(this, &InputLink::asyncHelper); + (gcnew Thread(threadDelegate))->Start(); + } + + if (waiters->Count == 1) { + // wake up the asyncHelper + asyncHelperWaitHandle->Set(); + } +} + + +void InputLink::removeWaiter(MessageWaiter^ waiter) { + // a waiter can be removed from anywhere in the list if timed out + + lock l(waiters); + int idx = waiters->IndexOf(waiter); + if (idx == -1) { + // TODO: assert or log + if (asyncHelperWaitHandle != nullptr) { + // just in case. + asyncHelperWaitHandle->Set(); + } + return; + } + waiters->RemoveAt(idx); + + // let the next waiter know it's his turn. + if (waiters->Count > 0) { + MessageWaiter^ nextWaiter = waiters[0]; + + // wakeup the asyncHelper thread to help out if necessary. + if (!nextWaiter->Assigned) { + asyncHelperWaitHandle->Set(); + } + + l.release(); + nextWaiter->Activate(); + return; + } + else { + if (disposed && (asyncHelperWaitHandle != nullptr)) { + asyncHelperWaitHandle->Set(); + } + } +} + + +void InputLink::asyncHelper() +{ + lock l(waiters); + + while (true) { + if (disposed && (waiters->Count == 0)) { + asyncHelperWaitHandle = nullptr; + return; + } + + if (waiters->Count > 0) { + MessageWaiter^ waiter = waiters[0]; + + l.release(); + if (waiter->AcceptForWork()) { + waiter->Run(); + } + l.acquire(); + } + + // sleep if more work may be coming or it is currently someone else's turn + if (((waiters->Count == 0) && !disposed) || ((waiters->Count != 0) && waiters[0]->Assigned)) { + // wait for something to do + asyncHelperWaitHandle->Reset(); + l.release(); + asyncHelperWaitHandle->WaitOne(); + l.acquire(); + } + } +} + +void InputLink::sync() +{ + // for the timeout thread + lock l(waiters); +} + + +AmqpMessage^ InputLink::createAmqpMessage(IntPtr msgp) +{ + QpidFrameSetPtr* fspp = (QpidFrameSetPtr*) msgp.ToPointer(); + bool ownFrameSet = true; + bool haveProperties = false; + + try { + MessageBodyStream^ mstream = gcnew MessageBodyStream(fspp); + ownFrameSet = false; // stream releases on close/dispose + + AmqpMessage^ amqpMessage = gcnew AmqpMessage(mstream); + + AMQHeaderBody* headerBodyp = (*fspp)->getHeaders(); + uint64_t contentSize = (*fspp)->getContentSize(); + SequenceSet frameSetID((*fspp)->getId()); + + // target managed representation + AmqpProperties^ amqpProperties = gcnew AmqpProperties(); + + // source native representation + const DeliveryProperties* deliveryProperties = headerBodyp->get<DeliveryProperties>(); + const qpid::framing::MessageProperties* messageProperties = headerBodyp->get<qpid::framing::MessageProperties>(); + + if (deliveryProperties) { + if (deliveryProperties->hasRoutingKey()) { + haveProperties = true; + + amqpProperties->RoutingKey = gcnew String(deliveryProperties->getRoutingKey().c_str()); + } + + if (deliveryProperties->hasDeliveryMode()) { + if (deliveryProperties->getDeliveryMode() == qpid::framing::PERSISTENT) + amqpProperties->Durable = true; + } + + if (deliveryProperties->hasTtl()) { + long long ticks = deliveryProperties->getTtl() * TimeSpan::TicksPerMillisecond; + amqpProperties->TimeToLive = Nullable<TimeSpan>(TimeSpan::FromTicks(ticks)); + } + } + + if (messageProperties) { + + if (messageProperties->hasReplyTo()) { + haveProperties = true; + const ReplyTo& rpto = messageProperties->getReplyTo(); + String^ rk = nullptr; + String^ ex = nullptr; + if (rpto.hasRoutingKey()) { + rk = gcnew String(rpto.getRoutingKey().c_str()); + } + if (rpto.hasExchange()) { + ex = gcnew String(rpto.getExchange().c_str()); + } + amqpProperties->SetReplyTo(ex,rk); + } + + if (messageProperties->hasContentType()) { + haveProperties = true; + amqpProperties->ContentType = gcnew String(messageProperties->getContentType().c_str()); + + if (messageProperties->hasContentEncoding()) { + String^ enc = gcnew String(messageProperties->getContentEncoding().c_str()); + if (!String::IsNullOrEmpty(enc)) { + // TODO: properly assemble 1.0 style to 0-10 for all cases + amqpProperties->ContentType += "; charset=" + enc; + } + } + } + + if (messageProperties->hasCorrelationId()) { + haveProperties = true; + const std::string& ncid = messageProperties->getCorrelationId(); + int len = ncid.size(); + array<unsigned char>^ mcid = gcnew array<unsigned char>(len); + Marshal::Copy ((IntPtr) (void *) ncid.data(), mcid, 0, len); + amqpProperties->CorrelationId = mcid; + } + + if (messageProperties->hasUserId()) { + haveProperties = true; + const std::string& nuid = messageProperties->getUserId(); + int len = nuid.size(); + array<unsigned char>^ muid = gcnew array<unsigned char>(len); + Marshal::Copy ((IntPtr) (void *) nuid.data(), muid, 0, len); + amqpProperties->UserId = muid; + } + + if (messageProperties->hasApplicationHeaders()) { + haveProperties = true; + const qpid::framing::FieldTable& fieldTable = messageProperties->getApplicationHeaders(); + int count = fieldTable.count(); + + if (count > 0) { + haveProperties = true; + Collections::Generic::Dictionary<System::String^, AmqpType^>^ mmap = + gcnew Collections::Generic::Dictionary<System::String^, AmqpType^>(count); + + for(qpid::framing::FieldTable::ValueMap::const_iterator i = fieldTable.begin(); i != fieldTable.end(); i++) { + + qpid::framing::FieldValue::Data &data = i->second->getData(); + + // TODO: replace these generic int/string conversions with handler for each AMQP specific type: + // uint8_t dataType = i->second->getType(); + // switch (dataType) { case TYPE_CODE_STR8: ... } + + if (data.convertsToInt()) { + mmap->Add (gcnew String(i->first.data()), gcnew AmqpInt((int) i->second->getData().getInt())); + } + if (data.convertsToString()) { + std::string ns = data.getString(); + String^ ms = gcnew String(ns.data(), 0, ns.size()); + mmap->Add (gcnew String(i->first.data()), gcnew AmqpString(ms)); + } + } + + amqpProperties->PropertyMap = mmap; + } + + } + } + + if (haveProperties) { + amqpMessage->Properties = amqpProperties; + } + + // We have a message we can return to the caller. + // Tell the broker we got it. + subscriptionp->accept(frameSetID); + return amqpMessage; + } + finally { + if (ownFrameSet) + delete (fspp); + } +} + + // As for IInputChannel: + // if success, return true + amqpMessage + // elseif timeout, return false + // elseif closed/EOF, return true and amqpMessage = null + // else throw an Exception + +bool InputLink::TryReceive(TimeSpan timeout, [Out] AmqpMessage^% amqpMessage) +{ + lock l(waiters); + + if (waiters->Count == 0) { + // see if there is a message already available without blocking + IntPtr fspp = nextLocalMessage(); + if (fspp.ToPointer() != NULL) { + amqpMessage = createAmqpMessage(fspp); + return true; + } + } + + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, false, nullptr, nullptr); + addWaiter(waiter); + + l.release(); + waiter->Run(); + l.acquire(); + + if (waiter->TimedOut) { + return false; + } + + IntPtr waiterMsg = waiter->Message; + if (waiterMsg.ToPointer() == NULL) { + if (disposed) { + // indicate normal EOF on channel + amqpMessage = nullptr; + return true; + } + } + + amqpMessage = createAmqpMessage(waiterMsg); + return true; +} + +IAsyncResult^ InputLink::BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + + //TODO: if haveMessage() complete synchronously + + lock l(waiters); + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, true, true, callback, state); + addWaiter(waiter); + return waiter; +} + +bool InputLink::EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage) +{ + + // TODO: validate result + + MessageWaiter^ waiter = (MessageWaiter ^) result; + + waiter->WaitForCompletion(); + + if (waiter->RunException != nullptr) + throw waiter->RunException; + + if (waiter->TimedOut) { + amqpMessage = nullptr; + return false; + } + + IntPtr waiterMsg = waiter->Message; + if (waiterMsg.ToPointer() == NULL) { + if (disposed) { + // indicate normal EOF on channel + amqpMessage = nullptr; + return true; + } + } + + amqpMessage = createAmqpMessage(waiterMsg); + return true; +} + + +bool InputLink::WaitForMessage(TimeSpan timeout) +{ + lock l(waiters); + + if (waiters->Count == 0) { + // see if there is a message already available without blocking + if (haveMessage()) + return true; + } + + // Same as for TryReceive, except consuming = false + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, false, nullptr, nullptr); + addWaiter(waiter); + + l.release(); + waiter->Run(); + l.acquire(); + + if (waiter->TimedOut) { + return false; + } + + return true; +} + +IAsyncResult^ InputLink::BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + lock l(waiters); + + // Same as for BeginTryReceive, except consuming = false + MessageWaiter^ waiter = gcnew MessageWaiter(this, timeout, false, true, callback, state); + addWaiter(waiter); + return waiter; +} + +bool InputLink::EndWaitForMessage(IAsyncResult^ result) +{ + MessageWaiter^ waiter = (MessageWaiter ^) result; + + waiter->WaitForCompletion(); + + if (waiter->TimedOut) { + return false; + } + + return true; +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h new file mode 100644 index 0000000000..366780c137 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/InputLink.h @@ -0,0 +1,85 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + +// smart pointer to the low level AMQP 0-10 frames of the message +typedef qpid::framing::FrameSet::shared_ptr QpidFrameSetPtr; + +public ref class InputLink +{ +private: + AmqpSession^ amqpSession; + Subscription* subscriptionp; + LocalQueue* localQueuep; + Demux::QueuePtr* queuePtrp; + Collections::Generic::List<MessageWaiter^>^ waiters; + bool disposed; + bool finalizing; + QpidFrameSetPtr* dequeuedFrameSetpp; + ManualResetEvent^ asyncHelperWaitHandle; + + void Cleanup(); + void ReleaseNative(); + bool haveMessage(); + void addWaiter(MessageWaiter^ waiter); + void asyncHelper(); + AmqpMessage^ createAmqpMessage(IntPtr msgp); + +internal: + InputLink(AmqpSession^ session, System::String^ sourceQueue, qpid::client::AsyncSession *qpidSessionp, + qpid::client::SubscriptionManager *qpidSubsMgrp, bool exclusive, bool temporary, System::String^ filterKey, + System::String^ exchange); + + bool internalWaitForMessage(); + void unblockWaiter(); + void resetQueue(); + IntPtr nextLocalMessage(); + void removeWaiter(MessageWaiter^ waiter); + void sync(); + +public: + ~InputLink(); + !InputLink(); + void Close(); + + bool TryReceive(TimeSpan timeout, [Out] AmqpMessage ^% amqpMessage); + IAsyncResult^ BeginTryReceive(TimeSpan timeout, AsyncCallback^ callback, Object^ state); + bool EndTryReceive(IAsyncResult^ result, [Out] AmqpMessage^% amqpMessage); + + bool WaitForMessage(TimeSpan timeout); + IAsyncResult^ BeginWaitForMessage(TimeSpan timeout, AsyncCallback^ callback, Object^ state); + bool EndWaitForMessage(IAsyncResult^ result); + +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp new file mode 100644 index 0000000000..f2cb5740d3 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.cpp @@ -0,0 +1,337 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/framing/AMQFrame.h" + +#include "MessageBodyStream.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace qpid::framing; + +// Thefolowing def must match "Frames" private typedef. +// TODO: make "Frames" publicly visible. +typedef qpid::InlineVector<AMQFrame, 4> FrameSetFrames; + +using namespace std; + +static void ThrowIfBadArgs (array<unsigned char>^ buffer, int offset, int count) +{ + if (buffer == nullptr) + throw gcnew ArgumentNullException("buffer"); + + if (offset < 0) + throw gcnew ArgumentOutOfRangeException("offset"); + + if (count < 0) + throw gcnew ArgumentOutOfRangeException("count"); + + if ((offset + count) > buffer->Length) + throw gcnew ArgumentException("offset + count"); +} + + +// Input stream constructor + +MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp) +{ + isInputStream = true; + frameSetpp = fspp; + fragmentCount = 0; + length = 0; + position = 0; + currentFramep = NULL; + + const std::string *datap; // pointer to the fragment's string variable that holds the content + + for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) { + fragmentCount++; + datap = &(i->castBody<AMQContentBody>()->getData()); + length += datap->size(); + } + } + + // fragmentCount can be zero for an empty message + + fragmentIndex = 0; + fragmentPosition = 0; + + if (fragmentCount == 0) { + currentFragment = NULL; + fragmentLength = 0; + } + else if (fragmentCount == 1) { + currentFragment = datap->data(); + fragmentLength = (int) length; + } + else { + fragments = gcnew array<IntPtr>(fragmentCount); + fragmentIndex = 0; + for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { + if (i->getBody()->type() == CONTENT_BODY) { + datap = &(i->castBody<AMQContentBody>()->getData()); + fragments[fragmentIndex++] = (IntPtr) (void *) datap; + } + } + fragmentIndex = 0; + datap = (const std::string *) fragments[0].ToPointer(); + currentFragment = datap->data(); + fragmentLength = datap->size(); + } +} + + +int MessageBodyStream::Read(array<unsigned char>^ buffer, int offset, int count) +{ + if (!isInputStream) + throw gcnew NotSupportedException(); + if (disposed) + throw gcnew ObjectDisposedException("Stream"); + if (count == 0) + return 0; + ThrowIfBadArgs(buffer, offset, count); + + int nRead = 0; + int remaining = count; + + while (nRead < count) { + int fragAvail = fragmentLength - fragmentPosition; + int copyCount = min (fragAvail, remaining); + if (copyCount == 0) { + // no more to read + return nRead; + } + + // copy from native space + IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition); + Marshal::Copy (nativep, buffer, offset, copyCount); + nRead += copyCount; + remaining -= copyCount; + fragmentPosition += copyCount; + offset += copyCount; + + // advance to next fragment? + if (fragmentPosition == fragmentLength) { + if (++fragmentIndex < fragmentCount) { + const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer(); + currentFragment = datap->data(); + fragmentLength = datap->size(); + fragmentPosition = 0; + } + } + } + + return nRead; +} + + +void MessageBodyStream::pushCurrentFrame(bool lastFrame) +{ + // set flags as in SessionImpl::sendContent. + if (currentFramep->getBody()->type() == CONTENT_BODY) { + + if ((fragmentCount == 1) && lastFrame) { + // only one content frame + currentFramep->setFirstSegment(false); + } + else { + currentFramep->setFirstSegment(false); + currentFramep->setLastSegment(true); + if (fragmentCount != 1) { + currentFramep->setFirstFrame(false); + } + if (!lastFrame) { + currentFramep->setLastFrame(false); + } + } + } + else { + // the header frame + currentFramep->setFirstSegment(false); + if (!lastFrame) { + // there will be at least one content frame + currentFramep->setLastSegment(false); + } + } + + // add to frame set. This makes a copy and ref counts the body + (*frameSetpp)->append(*currentFramep); + + delete currentFramep; + + currentFramep = NULL; +} + + +IntPtr MessageBodyStream::GetFrameSet() +{ + if (currentFramep != NULL) { + // No more content. Tidy up the pending (possibly single header) frame. + pushCurrentFrame(true); + } + + if (frameSetpp == NULL) { + return (IntPtr) NULL; + } + + // shared_ptr.get() + return (IntPtr) (void *) (*frameSetpp).get(); +} + +IntPtr MessageBodyStream::GetHeader() +{ + return (IntPtr) headerBodyp; +} + + +// Ouput stream constructor + +MessageBodyStream::MessageBodyStream(int maxFrameSize) +{ + isInputStream = false; + + maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead(); + SequenceNumber unused; // only meaningful on incoming frames + frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused)); + fragmentCount = 0; + length = 0; + position = 0; + + // header goes first in the outgoing frameset + + boost::intrusive_ptr<AMQBody> headerBody(new AMQHeaderBody); + currentFramep = new AMQFrame(headerBody); + headerBodyp = static_cast<AMQHeaderBody*>(headerBody.get()); + + // mark this header frame as "full" to force the first write to create a new content frame + fragmentPosition = maxFrameContentSize; +} + +void MessageBodyStream::Write(array<unsigned char>^ buffer, int offset, int count) +{ + if (isInputStream) + throw gcnew NotSupportedException(); + if (disposed) + throw gcnew ObjectDisposedException("Stream"); + if (count == 0) + return; + ThrowIfBadArgs(buffer, offset, count); + + if (currentFramep == NULL) { + // GetFrameSet() has been called and we no longer exclusively own the underlying frames. + throw gcnew InvalidOperationException ("Mesage Body output already completed"); + } + + if (count <= 0) + return; + + // keep GC memory movement at bay while copying to native space + pin_ptr<unsigned char> pinnedBuf = &buffer[0]; + + string *datap; + + int remaining = count; + while (remaining > 0) { + if (fragmentPosition == maxFrameContentSize) { + // move to a new frame, but not until ready to add new content. + // zero content is valid, or the final write may exactly fill to maxFrameContentSize + + pushCurrentFrame(false); + + currentFramep = new AMQFrame(AMQContentBody()); + fragmentPosition = 0; + fragmentCount++; + } + + int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition)); + datap = &(currentFramep->castBody<AMQContentBody>()->getData()); + + char *outp = (char *) pinnedBuf + offset; + if (fragmentPosition == 0) { + datap->assign(outp, copyCount); + } + else { + datap->append(outp, copyCount); + } + + position += copyCount; + fragmentPosition += copyCount; + remaining -= copyCount; + offset += copyCount; + } +} + + +void MessageBodyStream::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + try {} + finally + { + if (frameSetpp != NULL) { + delete frameSetpp; + frameSetpp = NULL; + } + if (currentFramep != NULL) { + delete currentFramep; + currentFramep = NULL; + } + } +} + +MessageBodyStream::~MessageBodyStream() +{ + Cleanup(); +} + +MessageBodyStream::!MessageBodyStream() +{ + Cleanup(); +} + +void MessageBodyStream::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h new file mode 100644 index 0000000000..fa8e3f6bde --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageBodyStream.h @@ -0,0 +1,131 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace qpid::framing; +using namespace std; + + +// This class provides memory streaming of the message body contents +// between native and managed space. To avoid additional memory copies +// in native space, it reads and writes directly to the low level Qpid +// frames. + +public ref class MessageBodyStream : System::IO::Stream +{ +private: + bool isInputStream; + long long length; + long long position; + + // the boost smart pointer that keeps the message body frames in memory + FrameSet::shared_ptr *frameSetpp; + + int fragmentCount; + int fragmentIndex; + const char* currentFragment; + int fragmentPosition; + int fragmentLength; + array<IntPtr>^ fragments; + + int maxFrameContentSize; + AMQFrame* currentFramep; + void* headerBodyp; + bool disposed; + bool finalizing; + void Cleanup(); + +internal: + // incoming message + MessageBodyStream(FrameSet::shared_ptr *fspp); + // outgoing message + MessageBodyStream(int maxFrameSize); + void pushCurrentFrame(bool last); +public: + ~MessageBodyStream(); + !MessageBodyStream(); + virtual void Close() override; + virtual int Read( + [InAttribute] [OutAttribute] array<unsigned char>^ buffer, + int offset, + int count) override; + + virtual void Write( + array<unsigned char>^ buffer, + int offset, + int count) override; + + + IntPtr GetFrameSet(); + IntPtr GetHeader(); + + virtual void Flush() override {} // noop + + + // TODO: see CanSeek below. + virtual long long Seek( + long long offset, + System::IO::SeekOrigin origin) override {throw gcnew System::NotSupportedException(); } + + // TODO: see CanSeek below. + virtual void SetLength( + long long value) override {throw gcnew System::NotSupportedException(); } + + virtual property long long Length { + long long get() override { return length; } + }; + + virtual property long long Position { + long long get() override { return position; } + void set(long long p) override { throw gcnew System::NotSupportedException(); } + }; + + + virtual property bool CanRead { + bool get () override { return isInputStream; } + } + + virtual property bool CanWrite { + bool get () override { return !isInputStream; } + } + + // Note: this class must return true to signal that the Length property works. + // Required by the raw message encoder. + // "If a class derived from Stream does not support seeking, calls to Length, + // SetLength, Position, and Seek throw a NotSupportedException". + + virtual property bool CanSeek { + bool get () override { return true; } + } + + virtual property bool CanTimeout { + bool get () override { return isInputStream; } + } +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp new file mode 100644 index 0000000000..4868b9efce --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.cpp @@ -0,0 +1,248 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/Demux.h" +#include "qpid/client/SessionImpl.h" +#include "qpid/client/SessionBase_0_10Access.h" + +#include "MessageBodyStream.h" +#include "AmqpMessage.h" +#include "AmqpSession.h" +#include "InputLink.h" +#include "MessageWaiter.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; +using namespace msclr; + + +MessageWaiter::MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state) +{ + this->consuming = consuming; + if (!consuming) { + GC::SuppressFinalize(this); + } + + if (async) { + this->async = true; + this->asyncCallback = callback; + this->state = state; + } + else { + this->assigned = true; + } + if (timeSpan != TimeSpan::MaxValue) { + this->timer = gcnew Timer(timeoutCallback, this, timeSpan, TimeSpan::FromMilliseconds(-1)); + } + this->parent = parent; + this->thisLock = gcnew Object(); +} + +MessageWaiter::~MessageWaiter() +{ + if (message != IntPtr::Zero) { + try{} + finally { + delete message.ToPointer(); + message = IntPtr::Zero; + } + } +} + +MessageWaiter::!MessageWaiter() +{ + this->~MessageWaiter(); +} + + +void MessageWaiter::WaitForCompletion() +{ + if (isCompleted) + return; + + lock l(thisLock); + while (!isCompleted) { + Monitor::Wait(thisLock); + } +} + +void MessageWaiter::Activate() +{ + if (activated) + return; + + lock l(thisLock); + if (!activated) { + activated = true; + Monitor::PulseAll(thisLock); + } +} + + +void MessageWaiter::Run() +{ + lock l(thisLock); + + // wait until Activate(), i.e. our turn in the waiter list or a timeout + while (!activated) { + Monitor::Wait(thisLock); + } + bool haveMessage = false; + bool mustReset = false; + + if (!timedOut) + blocking = true; + + if (blocking) { + l.release(); + + try { + haveMessage = parent->internalWaitForMessage(); + } + catch (System::Exception^ e) { + runException = e; + } + + l.acquire(); + blocking = false; + if (timedOut) { + // TimeoutCallback() called parent->unblockWaiter() + mustReset = true; + // let the timer thread move past critical region + while (processingTimeout) { + Monitor::Wait(thisLock); + } + } + } + + if (timer != nullptr) { + timer->~Timer(); + timer = nullptr; + } + + if (haveMessage) { + timedOut = false; // for the case timeout and message arrival are essentially tied + if (!consuming) { + // just waiting + haveMessage = false; + } + } + + if (haveMessage || mustReset) { + l.release(); + if (haveMessage) { + // hang on to it for when the async caller gets around to retrieving + message = parent->nextLocalMessage(); + } + if (mustReset) { + parent->resetQueue(); + } + l.acquire(); + } + + isCompleted = true; + Monitor::PulseAll(thisLock); + + // do this check and signal while locked + if (asyncWaitHandle != nullptr) + asyncWaitHandle->Set(); + + l.release(); + parent->removeWaiter(this); + + + if (asyncCallback != nullptr) { + // guard against application callback exception + try { + asyncCallback(this); + } + catch (System::Exception^) { + // log it? + } + } + +} + +bool MessageWaiter::AcceptForWork() +{ + lock l(thisLock); + if (!assigned) { + assigned = true; + return true; + } + return false; +} + +void MessageWaiter::TimeoutCallback(Object^ state) +{ + MessageWaiter^ waiter = (MessageWaiter^) state; + if (waiter->isCompleted) + return; + + // make sure parent has finished initializing us before we get going + waiter->parent->sync(); + + lock l(waiter->thisLock); + if (waiter->timer == nullptr) { + // the waiter is in the clean up phase and doesn't need a wakeup + return; + } + + // timedOut, blocking and processingTimeout work as a unit + waiter->timedOut = true; + if (waiter->blocking) { + // let the waiter know that we are busy with an upcoming unblock operation + waiter->processingTimeout = true; + } + + waiter->Activate(); + + if (waiter->processingTimeout) { + // call with lock off + l.release(); + waiter->parent->unblockWaiter(); + + // synchronize with blocked thread + l.acquire(); + waiter->processingTimeout = false; + Monitor::PulseAll(waiter->thisLock); + } + + l.release(); + + // If waiter has no associated thread, we must move it to completion + if (waiter->AcceptForWork()) { + waiter->Run(); // does not block since timedOut == true + } +} + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h new file mode 100644 index 0000000000..3eb4919646 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/MessageWaiter.h @@ -0,0 +1,127 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Threading; + +public ref class MessageWaiter : IAsyncResult +{ +private: + // Receive() or WaitForMessage() + bool consuming; + bool consumed; + bool timedOut; + bool async; + // has an owner thread + bool assigned; + // can Run (i.e. earlier MessageWaiters in the queue have completed) + bool activated; + // is making a call to internalWaitForMessage() which (usually) blocks + bool blocking; + // the timeout timer thread is lurking + bool processingTimeout; + // the saved exception from within Run() for async delivery + System::Exception^ runException; + AsyncCallback^ asyncCallback; + Threading::Timer ^timer; + bool isCompleted; + bool completedSynchronously; + Object^ state; + Object^ thisLock; + ManualResetEvent^ asyncWaitHandle; + InputLink^ parent; + static void TimeoutCallback(Object^ state); + static TimerCallback^ timeoutCallback = gcnew TimerCallback(MessageWaiter::TimeoutCallback); + IntPtr message; + !MessageWaiter(); + ~MessageWaiter(); + + internal: + MessageWaiter(InputLink^ parent, TimeSpan timeSpan, bool consuming, bool async, AsyncCallback ^callback, Object^ state); + + void Run(); + bool AcceptForWork(); + void Activate(); + void WaitForCompletion(); + +// inline void SetCompletedSynchronously (bool v) { completedSynchronously = v; } + + property IntPtr Message { + IntPtr get () { + if (!consuming || consumed) + throw gcnew InvalidOperationException("Message property"); + consumed = true; + IntPtr v = message; + message = IntPtr::Zero; + GC::SuppressFinalize(this); + return v; + } + // void set (IntPtr v) { message = v; } + } + + property bool Assigned { + bool get () { return assigned; } + } + + property bool TimedOut { + bool get () { return timedOut; } + } + + + property System::Exception^ RunException { + System::Exception^ get() { return runException; } + } + + public: + + virtual property bool IsCompleted { + bool get () { return isCompleted; } + } + + virtual property bool CompletedSynchronously { + bool get () { return completedSynchronously; } + } + + virtual property WaitHandle^ AsyncWaitHandle { + WaitHandle^ get () { + if (asyncWaitHandle != nullptr) { + return asyncWaitHandle; + } + + msclr::lock l(thisLock); + if (asyncWaitHandle == nullptr) { + asyncWaitHandle = gcnew ManualResetEvent(isCompleted); + } + return asyncWaitHandle; + } + } + + virtual property Object^ AsyncState { + Object^ get () { return state; } + } +}; + +}}} // namespace Apache::Qpid::Interop + diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp new file mode 100644 index 0000000000..27725b8207 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.cpp @@ -0,0 +1,251 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include <windows.h> +#include <msclr\lock.h> + +#include "qpid/client/AsyncSession.h" +#include "qpid/framing/FrameSet.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Connection.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" + + +#include "AmqpSession.h" +#include "AmqpMessage.h" +#include "OutputLink.h" +#include "QpidMarshal.h" + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; +using namespace System::Threading; +using namespace msclr; + +using namespace qpid::client; +using namespace std; + +using namespace Apache::Qpid::AmqpTypes; + + +OutputLink::OutputLink(AmqpSession^ session, String^ defaultQueue) : + amqpSession(session), + queue(defaultQueue), + disposed(false), + maxFrameSize(session->Connection->MaxFrameSize), + finalizing(false) +{ +} + +void OutputLink::Cleanup() +{ + { + lock l(this); + if (disposed) + return; + + disposed = true; + } + + amqpSession->NotifyClosed(); +} + +OutputLink::~OutputLink() +{ + Cleanup(); +} + +OutputLink::!OutputLink() +{ + Cleanup(); +} + +void OutputLink::Close() +{ + // Simulate Dispose()... + Cleanup(); + GC::SuppressFinalize(this); +} + + +AmqpMessage^ OutputLink::CreateMessage() +{ + MessageBodyStream ^mbody = gcnew MessageBodyStream(maxFrameSize); + AmqpMessage ^amqpm = gcnew AmqpMessage(mbody); + return amqpm; +} + + +void OutputLink::ManagedToNative(AmqpMessage^ m) +{ + MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) m->BodyStream; + + AmqpProperties^ mprops = m->Properties; + + if (mprops != nullptr) { + AMQHeaderBody* bodyp = (AMQHeaderBody*) messageBodyStream->GetHeader().ToPointer(); + + if (mprops->HasDeliveryProperties) { + DeliveryProperties* deliveryPropertiesp = bodyp->get<DeliveryProperties>(true); + + if (mprops->RoutingKey != nullptr) { + deliveryPropertiesp->setRoutingKey(QpidMarshal::ToNative(mprops->RoutingKey)); + } + + if (mprops->Durable) { + deliveryPropertiesp->setDeliveryMode(qpid::framing::PERSISTENT); + } + + if (mprops->TimeToLive.HasValue) { + long long ttl = mprops->TimeToLive.Value.Ticks; + bool was_positive = (ttl > 0); + if (ttl < 0) + ttl = 0; + ttl = ttl / TimeSpan::TicksPerMillisecond; + if ((ttl == 0) && was_positive) + ttl = 1; + deliveryPropertiesp->setTtl(ttl); + } + } + + if (mprops->HasMessageProperties) { + qpid::framing::MessageProperties* messagePropertiesp = + bodyp->get<qpid::framing::MessageProperties>(true); + + String^ replyToExchange = mprops->ReplyToExchange; + String^ replyToRoutingKey = mprops->ReplyToRoutingKey; + if ((replyToExchange != nullptr) || (replyToRoutingKey != nullptr)) { + qpid::framing::ReplyTo nReplyTo; + if (replyToExchange != nullptr) { + nReplyTo.setExchange(QpidMarshal::ToNative(replyToExchange)); + } + if (replyToRoutingKey != nullptr) { + nReplyTo.setRoutingKey(QpidMarshal::ToNative(replyToRoutingKey)); + } + messagePropertiesp->setReplyTo(nReplyTo); + } + + // TODO: properly split 1.0 style to 0-10 content type + encoding + + String^ contentType = mprops->ContentType; + if (contentType != nullptr) { + String^ type = nullptr; + String^ enc = nullptr; + int idx = contentType->IndexOf(';'); + if (idx == -1) { + type = contentType; + } + else { + type = contentType->Substring(0, idx); + contentType = contentType->Substring(idx + 1); + idx = contentType->IndexOf('='); + if (idx != -1) { + enc = contentType->Substring(idx + 1); + enc = enc->Trim(); + } + } + if (!String::IsNullOrEmpty(type)) { + messagePropertiesp->setContentType(QpidMarshal::ToNative(type)); + } + if (!String::IsNullOrEmpty(enc)) { + messagePropertiesp->setContentEncoding(QpidMarshal::ToNative(enc)); + } + } + + + array<unsigned char>^ mbytes = mprops->CorrelationId; + if (mbytes != nullptr) { + pin_ptr<unsigned char> pinnedBuf = &mbytes[0]; + std::string s((char *) pinnedBuf, mbytes->Length); + messagePropertiesp->setCorrelationId(s); + } + + mbytes = mprops->UserId; + if (mbytes != nullptr) { + pin_ptr<unsigned char> pinnedBuf = &mbytes[0]; + std::string s((char *) pinnedBuf, mbytes->Length); + messagePropertiesp->setUserId(s); + } + + if (mprops->HasMappedProperties) { + qpid::framing::FieldTable fieldTable; + // TODO: add support for abitrary AMQP types + for each (Collections::Generic::KeyValuePair<System::String^, AmqpType^> kvp in mprops->PropertyMap) { + Type^ type = kvp.Value->GetType(); + if (type == AmqpInt::typeid) { + fieldTable.setInt(QpidMarshal::ToNative(kvp.Key), + ((AmqpInt ^) kvp.Value)->Value); + } + else if (type == AmqpString::typeid) { + AmqpString^ str = (AmqpString ^) kvp.Value; + // For now, FieldTable supports a single string type + fieldTable.setString(QpidMarshal::ToNative(kvp.Key), QpidMarshal::ToNative(str->Value)); + } + } + + messagePropertiesp->setApplicationHeaders(fieldTable); + } + } + } +} + + + +void OutputLink::Send(AmqpMessage^ amqpMessage, TimeSpan timeout) +{ + // copy properties from managed space to the native counterparts + ManagedToNative(amqpMessage); + + MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; + CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, false, nullptr, nullptr); + + if (waiter != nullptr) { + waiter->WaitForCompletion(); + if (waiter->TimedOut) { + throw gcnew TimeoutException("Receive"); + } + } + // else: SendMessage() has already waited for the Completion + +} + +IAsyncResult^ OutputLink::BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state) +{ + ManagedToNative(amqpMessage); + + MessageBodyStream^ messageBodyStream = (MessageBodyStream^ ) amqpMessage->BodyStream; + CompletionWaiter^ waiter = amqpSession->SendMessage(queue, messageBodyStream, timeout, true, callback, state); + return waiter; +} + +void OutputLink::EndSend(IAsyncResult^ result) +{ + CompletionWaiter^ waiter = (CompletionWaiter ^) result; + waiter->WaitForCompletion(); + if (waiter->TimedOut) { + throw gcnew TimeoutException("Receive"); + } +} + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h new file mode 100644 index 0000000000..1f049a7412 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/OutputLink.h @@ -0,0 +1,64 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Runtime::InteropServices; + +using namespace qpid::client; +using namespace std; + + +public ref class OutputLink +{ +private: + AmqpSession^ amqpSession; + String^ queue; + bool disposed; + bool finalizing; + void Cleanup(); + AmqpTypes::AmqpProperties^ defaultProperties; + void ManagedToNative(AmqpMessage^ m); + int maxFrameSize; + +internal: + OutputLink(AmqpSession^ session, String^ defaultQueue); + +public: + ~OutputLink(); + !OutputLink(); + void Close(); + AmqpMessage^ CreateMessage(); + void Send(AmqpMessage^ m, TimeSpan timeout); + IAsyncResult^ BeginSend(AmqpMessage^ amqpMessage, TimeSpan timeout, AsyncCallback^ callback, Object^ state); + void EndSend(IAsyncResult^ result); + + property AmqpTypes::AmqpProperties^ DefaultProperties { + AmqpTypes::AmqpProperties^ get () { return defaultProperties; } + void set(AmqpTypes::AmqpProperties^ p) { defaultProperties = p; } + } +}; + + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h new file mode 100644 index 0000000000..91677a5e73 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidException.h @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; + +public ref class QpidException : System::Exception +{ + public: + + QpidException() : System::Exception() {} + QpidException(String^ estring) : System::Exception(estring) {} + +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h new file mode 100644 index 0000000000..3e22af7b39 --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Interop/QpidMarshal.h @@ -0,0 +1,53 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +namespace Apache { +namespace Qpid { +namespace Interop { + +using namespace System; +using namespace System::Text; + + +// Helper functions for marshaling. + +private ref class QpidMarshal +{ + public: + + // marshal_as<T> not available in all Visual Studio editions. + + static std::string ToNative (System::String^ managed) { + if (managed->Length == 0) { + return std::string(); + } + array<unsigned char>^ mbytes = Encoding::UTF8->GetBytes(managed); + if (mbytes->Length == 0) { + return std::string(); + } + + pin_ptr<unsigned char> pinnedBuf = &mbytes[0]; + std::string native((char *) pinnedBuf, mbytes->Length); + return native; + } +}; + +}}} // namespace Apache::Qpid::Interop diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs new file mode 100644 index 0000000000..bf20b5083d --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/AsyncTest.cs @@ -0,0 +1,190 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Threading; + using NUnit.Framework; + + [TestFixture] + public class AsyncTest + { + private const int MessageCount = 20; + private const string Queue = "amqp:amq.direct?routingkey=routing_key"; + private Uri endpoint = new Uri("amqp:message_queue"); + private TimeSpan standardTimeout = TimeSpan.FromSeconds(10.0); // seconds + + [Test] + public void NonTryReceives() + { + this.SendMessages(this.standardTimeout, this.standardTimeout); + this.ReceiveNonTryMessages(this.standardTimeout, this.standardTimeout); + } + + [Test] + public void TryReceives() + { + this.SendMessages(this.standardTimeout, this.standardTimeout); + this.ReceiveTryMessages(this.standardTimeout, this.standardTimeout); + } + + [Test] + public void SmallTimeout() + { + // This code is commented out due to a bug in asynchronous channel open. + ////IChannelListener parentListener; + ////try + ////{ + //// this.RetrieveAsyncChannel(new Uri("amqp:fake_queue_do_not_create"), TimeSpan.FromMilliseconds(10.0), out parentListener); + //// parentListener.Close(); + //// Assert.Fail("Accepting the channel did not time out."); + ////} + ////catch (TimeoutException) + ////{ + //// // Intended exception. + ////} + + try + { + this.ReceiveNonTryMessages(this.standardTimeout, TimeSpan.FromMilliseconds(10.0)); + Assert.Fail("Receiving a message did not time out."); + } + catch (TimeoutException) + { + // Intended exception. + } + } + + private void SendMessages(TimeSpan channelTimeout, TimeSpan messageSendTimeout) + { + ChannelFactory<IOutputChannel> channelFactory = + new ChannelFactory<IOutputChannel>(Util.GetBinding(), Queue); + IOutputChannel proxy = channelFactory.CreateChannel(); + IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; + + for (int i = 0; i < MessageCount; i++) + { + Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, i); + resultArray[i] = proxy.BeginSend(toSend, messageSendTimeout, null, null); + } + + for (int j = 0; j < MessageCount; j++) + { + proxy.EndSend(resultArray[j]); + } + + IAsyncResult iocCloseResult = proxy.BeginClose(channelTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + proxy.EndClose(iocCloseResult); + + IAsyncResult chanFactCloseResult = channelFactory.BeginClose(channelTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + channelFactory.EndClose(chanFactCloseResult); + } + + private void ReceiveNonTryMessages(TimeSpan channelTimeout, TimeSpan messageTimeout) + { + IChannelListener inputChannelParentListener; + IInputChannel inputChannel = this.RetrieveAsyncChannel(this.endpoint, channelTimeout, out inputChannelParentListener); + + inputChannel.Open(); + + IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; + try + { + for (int i = 0; i < MessageCount; i++) + { + resultArray[i] = inputChannel.BeginReceive(messageTimeout, null, null); + } + + for (int j = 0; j < MessageCount; j++) + { + inputChannel.EndReceive(resultArray[j]); + } + } + finally + { + IAsyncResult channelCloseResult = inputChannel.BeginClose(channelTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + inputChannel.EndClose(channelCloseResult); + + // Asynchronous listener close has not been implemented. + ////IAsyncResult listenerCloseResult = inputChannelParentListener.BeginClose(channelTimeout, null, null); + ////Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); // Dummy work + ////inputChannelParentListener.EndClose(listenerCloseResult); + + inputChannelParentListener.Close(); + } + } + + private void ReceiveTryMessages(TimeSpan channelAcceptTimeout, TimeSpan messageReceiveTimeout) + { + IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(this.endpoint, new BindingParameterCollection()); + listener.Open(); + IInputChannel inputChannel = listener.AcceptChannel(channelAcceptTimeout); + IAsyncResult channelResult = inputChannel.BeginOpen(channelAcceptTimeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(50.0)); + inputChannel.EndOpen(channelResult); + + IAsyncResult[] resultArray = new IAsyncResult[MessageCount]; + + for (int i = 0; i < MessageCount; i++) + { + resultArray[i] = inputChannel.BeginTryReceive(messageReceiveTimeout, null, null); + } + + for (int j = 0; j < MessageCount; j++) + { + Message tempMessage; + Assert.True(inputChannel.EndTryReceive(resultArray[j], out tempMessage), "Did not successfully receive message #{0}", j); + } + + inputChannel.Close(); + listener.Close(); + } + + private IInputChannel RetrieveAsyncChannel(Uri queue, TimeSpan timeout, out IChannelListener parentListener) + { + IChannelListener<IInputChannel> listener = + Util.GetBinding().BuildChannelListener<IInputChannel>(queue, new BindingParameterCollection()); + listener.Open(); + IInputChannel inputChannel; + + try + { + IAsyncResult acceptResult = listener.BeginAcceptChannel(timeout, null, null); + Thread.Sleep(TimeSpan.FromMilliseconds(300.0)); // Dummy work + inputChannel = listener.EndAcceptChannel(acceptResult); + } + catch (TimeoutException e) + { + listener.Close(); + throw e; + } + finally + { + parentListener = listener; + } + return inputChannel; + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs new file mode 100644 index 0000000000..45a926ce4d --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/CustomAmqpBindingTest.cs @@ -0,0 +1,77 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.Collections.Generic; + using System.Reflection; + using System.ServiceModel; + using System.Threading; + using NUnit.Framework; + + [TestFixture] + public class CustomAmqpBindingTest + { + private MessageClient client; + + [SetUp] + public void Setup() + { + // Create client + this.client = new MessageClient(); + this.client.NumberOfMessages = 3; + this.client.NumberOfIterations = 3; + + // Setup and start service + MessageService.EndpointAddress = "amqp:message_queue"; + MessageService.ContractTypes = new List<Type>(); + MessageService.ContractTypes.Add(typeof(IInteropService)); + MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset); + MessageService.IntendedInvocationCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count; + MessageService.StartService(Util.GetCustomBinding()); + } + + [Test] + public void Run() + { + // Create the WCF AMQP channel and send messages + MethodInfo runClientMethod = this.client.GetType().GetMethod("RunInteropClient"); + EndpointAddress address = new EndpointAddress("amqp:amq.direct?routingkey=routing_key"); + foreach (Type contractType in MessageService.ContractTypes) + { + MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType); + runClientT.Invoke(this.client, new object[] { address }); + } + + // Allow the WCF service to process all the messages before validation + MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false); + + // Validation + int expectedMethodCallCount = this.client.NumberOfIterations * this.client.NumberOfMessages * MessageService.ContractTypes.Count; + Assert.AreEqual(expectedMethodCallCount, MessageService.TotalMethodCallCount); + } + + [TearDown] + public void Cleanup() + { + MessageService.StopService(); + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj new file mode 100644 index 0000000000..b7f4ed5d0a --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/FunctionalTests.csproj @@ -0,0 +1,110 @@ +<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>9.0.30729</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{E2D8C779-E417-40BA-BEE1-EE034268482F}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>Apache.Qpid.Test.Channel.Functional</RootNamespace>
+ <AssemblyName>Apache.Qpid.Test.Channel.Functional</AssemblyName>
+ <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>bin\Debug\</OutputPath>
+ <DefineConstants>DEBUG;TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>bin\Release\</OutputPath>
+ <DefineConstants>TRACE</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="nunit.framework, Version=2.5.2.9222, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77, processorArchitecture=MSIL" />
+ <Reference Include="System" />
+ <Reference Include="System.Core">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Runtime.Serialization">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.ServiceModel">
+ <RequiredTargetFramework>3.0</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Xml.Linq">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data.DataSetExtensions">
+ <RequiredTargetFramework>3.5</RequiredTargetFramework>
+ </Reference>
+ <Reference Include="System.Data" />
+ <Reference Include="System.Xml" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="AsyncTest.cs" />
+ <Compile Include="CustomAmqpBindingTest.cs" />
+ <Compile Include="IGenericObjectService.cs" />
+ <Compile Include="IInteropService.cs" />
+ <Compile Include="IQueuedDatagramService1.cs" />
+ <Compile Include="IQueuedDatagramService2.cs" />
+ <Compile Include="IQueuedDatagramService3.cs" />
+ <Compile Include="MessageBodyTest.cs" />
+ <Compile Include="MessagePropertiesTest.cs" />
+ <Compile Include="MultipleEndpointsSameQueueTest.cs" />
+ <Compile Include="MessageClient.cs" />
+ <Compile Include="MessageService.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Util.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\..\..\..\..\..\src\Apache\Qpid\Channel\Channel.csproj">
+ <Project>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</Project>
+ <Name>Channel</Name>
+ </ProjectReference>
+ <ProjectReference Include="..\..\..\..\..\..\src\Apache\Qpid\Interop\Interop.vcproj">
+ <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+ <Name>Interop</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+ <PropertyGroup>
+ <PostBuildEvent>
+ </PostBuildEvent>
+ </PropertyGroup>
+</Project>
\ No newline at end of file diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs new file mode 100644 index 0000000000..a1ffac50b3 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IGenericObjectService.cs @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System.ServiceModel; + + [ServiceContract(SessionMode = SessionMode.NotAllowed)] + public interface IGenericObjectService + { + [OperationContract(IsOneWay = true)] + void SendObject(object message); + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs new file mode 100644 index 0000000000..25f7010a89 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IInteropService.cs @@ -0,0 +1,31 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System.ServiceModel; + using System.ServiceModel.Channels; + + [ServiceContract] + public interface IInteropService + { + [OperationContract(IsOneWay = true, Action = "*")] + void Hello(Message message); + } +}
\ No newline at end of file diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs new file mode 100644 index 0000000000..bdbbb82f7e --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService1.cs @@ -0,0 +1,34 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System.ServiceModel; + using System.ServiceModel.Channels; + + [ServiceContract(SessionMode = SessionMode.NotAllowed)] + public interface IQueuedDatagramService1 + { + [OperationContract(IsOneWay = true)] + void Hello(string message); + + [OperationContract(IsOneWay = true)] + void Goodbye(); + } +}
\ No newline at end of file diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs new file mode 100644 index 0000000000..565f7aa27b --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService2.cs @@ -0,0 +1,34 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System.ServiceModel; + using System.ServiceModel.Channels; + + [ServiceContract(SessionMode = SessionMode.NotAllowed)] + public interface IQueuedDatagramService2 + { + [OperationContract(IsOneWay = true)] + void Hello(string message); + + [OperationContract(IsOneWay = true)] + void Goodbye(); + } +}
\ No newline at end of file diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs new file mode 100644 index 0000000000..3ff2085557 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/IQueuedDatagramService3.cs @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System.ServiceModel; + + [ServiceContract(SessionMode = SessionMode.NotAllowed)] + public interface IQueuedDatagramService3 + { + [OperationContract(IsOneWay = true)] + void Hello(string message); + + [OperationContract(IsOneWay = true)] + void Goodbye(); + } +}
\ No newline at end of file diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs new file mode 100644 index 0000000000..3fad6b336d --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageBodyTest.cs @@ -0,0 +1,135 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.Collections.Generic; + using System.Runtime.Serialization; + using System.ServiceModel; + using System.ServiceModel.Channels; + using NUnit.Framework; + + [TestFixture] + public class MessageBodyTest + { + private const string Queue = "amqp:amq.direct?routingkey=routing_key"; + + [Test] + public void DateVariation() + { + DateTime rightNow = DateTime.UtcNow; + this.SendMessage(rightNow); + this.ReceiveMessage<DateTime>(rightNow); + } + + [Test] + public void EmptyStringVariation() + { + const string TestString = ""; + this.SendMessage(TestString); + this.ReceiveMessage<string>(TestString); + } + + [Test] + public void IntPrimitiveVariation() + { + const int TheAnswer = 42; + this.SendMessage(TheAnswer); + this.ReceiveMessage<int>(TheAnswer); + } + + [Test] + public void MultipleIntVariation() + { + const int NumberOfMessages = 20; + int[] listOfNumbers = new int[NumberOfMessages]; + + for (int i = 0; i < NumberOfMessages; i++) + { + this.SendMessage(i); + listOfNumbers[i] = i; + } + + Assert.True(listOfNumbers[NumberOfMessages - 1] != 0, "Not all messages were sent."); + + for (int j = 0; j < NumberOfMessages; j++) + { + int receivedNumber = this.ReceiveMessage<int>(); + Assert.True(listOfNumbers[j].Equals(receivedNumber), "Received {0} - this number is unknown or has been received more than once.", receivedNumber); + } + } + + [Test] + public void StringVariation() + { + const string TestString = "The darkest of dim, dreary days dost draw deathly deeds. どーも"; + this.SendMessage(TestString); + this.ReceiveMessage<string>(TestString); + } + + private void SendMessage(object objectToSend) + { + IChannelFactory<IOutputChannel> channelFactory = + Util.GetBinding().BuildChannelFactory<IOutputChannel>(); + channelFactory.Open(); + IOutputChannel proxy = channelFactory.CreateChannel(new EndpointAddress(Queue)); + proxy.Open(); + Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, objectToSend); + proxy.Send(toSend); + toSend.Close(); + channelFactory.Close(); + } + + private TObjectType ReceiveMessage<TObjectType>() + { + Uri endpoint = new Uri("amqp:message_queue"); + IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(endpoint, new BindingParameterCollection()); + listener.Open(); + IInputChannel service = listener.AcceptChannel(TimeSpan.FromSeconds(10)); + service.Open(); + Message receivedMessage = service.Receive(TimeSpan.FromSeconds(10)); + Assert.NotNull(receivedMessage, "Message was not received"); + try + { + TObjectType receivedObject = receivedMessage.GetBody<TObjectType>(); + return receivedObject; + } + catch (SerializationException) + { + Assert.Fail("Deserialized object not of correct type"); + } + finally + { + receivedMessage.Close(); + service.Close(); + listener.Close(); + } + + return default(TObjectType); + } + + private TObjectType ReceiveMessage<TObjectType>(TObjectType objectToMatch) + { + TObjectType receivedObject = this.ReceiveMessage<TObjectType>(); + Assert.True(objectToMatch.Equals(receivedObject), "Original and deserialized objects do not match"); + return receivedObject; + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs new file mode 100644 index 0000000000..8f867551b1 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageClient.cs @@ -0,0 +1,101 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.Reflection; + using System.ServiceModel; + using System.ServiceModel.Channels; + + public class MessageClient + { + public int NumberOfMessages + { + get; + set; + } + + public int NumberOfIterations + { + get; + set; + } + + public void RunClient<TServiceContract>(EndpointAddress address) + { + Binding amqpBinding = Util.GetBinding(); + Type proxyType = typeof(TServiceContract); + MethodInfo helloMethod = proxyType.GetMethod("Hello"); + MethodInfo goodbyeMethod = proxyType.GetMethod("Goodbye"); + + string[] messages = new string[this.NumberOfMessages]; + for (int i = 0; i < this.NumberOfMessages; ++i) + { + messages[i] = "Message " + i; + } + + for (int i = 0; i < this.NumberOfIterations; ++i) + { + this.CreateChannelAndSendMessages<TServiceContract>(address, amqpBinding, helloMethod, goodbyeMethod, messages); + } + } + + public void RunInteropClient<TServiceContract>(EndpointAddress address) + { + Binding amqpBinding = Util.GetBinding(); + Type proxyType = typeof(TServiceContract); + MethodInfo helloMethod = proxyType.GetMethod("Hello"); + + Message[] messages = new Message[this.NumberOfMessages]; + + for (int i = 0; i < this.NumberOfIterations; ++i) + { + this.CreateInteropChannelAndSendMessages<TServiceContract>(address, amqpBinding, helloMethod, this.NumberOfMessages); + } + } + + private void CreateChannelAndSendMessages<TServiceContract>(EndpointAddress address, Binding amqpBinding, MethodInfo helloMethod, MethodInfo goodbyeMethod, object[] messages) + { + ChannelFactory<TServiceContract> channelFactory = new ChannelFactory<TServiceContract>(amqpBinding, address); + TServiceContract proxy = channelFactory.CreateChannel(); + + foreach (object message in messages) + { + helloMethod.Invoke(proxy, new object[] { message }); + } + + goodbyeMethod.Invoke(proxy, new object[0]); + channelFactory.Close(); + } + + private void CreateInteropChannelAndSendMessages<TServiceContract>(EndpointAddress address, Binding amqpBinding, MethodInfo helloMethod, int messageCount) + { + ChannelFactory<TServiceContract> channelFactory = new ChannelFactory<TServiceContract>(amqpBinding, address); + TServiceContract proxy = channelFactory.CreateChannel(); + + for (int i = 0; i < messageCount; i++) + { + helloMethod.Invoke(proxy, new object[] { Message.CreateMessage(MessageVersion.Soap12WSAddressing10, "*") }); + } + + channelFactory.Close(); + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt new file mode 100644 index 0000000000..bd6459ccb9 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageProperties.txt @@ -0,0 +1,22 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +ContentType=Text +Durable=true +RoutingKey=routing_key +TimeToLive=00:00:10
\ No newline at end of file diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs new file mode 100644 index 0000000000..8e192e90f1 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessagePropertiesTest.cs @@ -0,0 +1,131 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.Collections.Generic; + using System.Reflection; + using System.Runtime.Serialization; + using System.ServiceModel; + using System.ServiceModel.Channels; + using Apache.Qpid.AmqpTypes; + using NUnit.Framework; + + [TestFixture] + public class MessagePropertiesTest + { + private const string RoutingKey = "routing_key"; + private const string SendToUri = "amqp:amq.direct?routingkey=" + RoutingKey; + + [Test] + public void DefaultAmqpProperties() + { + const string TestString = "Test Message"; + AmqpProperties messageProperties = new AmqpProperties(); + + this.SendMessage(TestString, messageProperties); + this.ReceiveMessage<string>(TestString, messageProperties); + } + + [Test] + public void NonDefaultAmqpProperties() + { + const string TestString = "Test Message"; + AmqpProperties messageProperties = this.CreateMessageProperties(); + + this.SendMessage(TestString, messageProperties); + this.ReceiveMessage<string>(TestString, messageProperties); + } + + private AmqpProperties CreateMessageProperties() + { + Dictionary<string, string> messageProperties = Util.GetProperties("..\\..\\MessageProperties.txt"); + + AmqpProperties amqpProperties = new AmqpProperties(); + amqpProperties.ContentType = (string)messageProperties["ContentType"]; + amqpProperties.Durable = Convert.ToBoolean((string)messageProperties["Durable"]); + amqpProperties.RoutingKey = (string)messageProperties["RoutingKey"]; + amqpProperties.TimeToLive = TimeSpan.Parse((string)messageProperties["TimeToLive"]); + + return amqpProperties; + } + + private void SendMessage(object objectToSend, AmqpProperties propertiesToSend) + { + ChannelFactory<IOutputChannel> channelFactory = + new ChannelFactory<IOutputChannel>(Util.GetBinding(), SendToUri); + IOutputChannel proxy = channelFactory.CreateChannel(); + proxy.Open(); + + Message toSend = Message.CreateMessage(MessageVersion.Default, string.Empty, objectToSend); + toSend.Properties["AmqpProperties"] = propertiesToSend; + proxy.Send(toSend); + + toSend.Close(); + proxy.Close(); + channelFactory.Close(); + } + + private void ReceiveMessage<TObjectType>(TObjectType objectToMatch, AmqpProperties expectedProperties) + { + Uri receiveFromUri = new Uri("amqp:message_queue"); + IChannelListener<IInputChannel> listener = Util.GetBinding().BuildChannelListener<IInputChannel>(receiveFromUri, new BindingParameterCollection()); + listener.Open(); + IInputChannel service = listener.AcceptChannel(TimeSpan.FromSeconds(10)); + service.Open(); + Message receivedMessage = service.Receive(TimeSpan.FromSeconds(10)); + try + { + TObjectType receivedObject = receivedMessage.GetBody<TObjectType>(); + Assert.True(receivedObject.Equals(objectToMatch), "Original and deserialized objects do not match"); + + AmqpProperties receivedProperties = (AmqpProperties)receivedMessage.Properties["AmqpProperties"]; + PropertyInfo[] propInfo = typeof(AmqpProperties).GetProperties(); + + for (int i = 0; i < propInfo.Length; i++) + { + string propertyName = propInfo[i].Name; + if (propertyName.Equals("RoutingKey", StringComparison.InvariantCultureIgnoreCase)) + { + Assert.AreEqual(RoutingKey, Convert.ToString(propInfo[i].GetValue(receivedProperties, null))); + } + else + { + Assert.AreEqual(Convert.ToString(propInfo[i].GetValue(expectedProperties, null)), Convert.ToString(propInfo[i].GetValue(receivedProperties, null))); + } + } + } + catch (NullReferenceException) + { + Assert.Fail("Message not received"); + } + catch (SerializationException) + { + Assert.Fail("Deserialized object not of correct type"); + } + finally + { + receivedMessage.Close(); + service.Close(); + listener.Close(); + } + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs new file mode 100644 index 0000000000..a473cad986 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MessageService.cs @@ -0,0 +1,158 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.Collections.Generic; + using System.ServiceModel; + using System.ServiceModel.Channels; + using System.Threading; + + public class MessageService : IQueuedDatagramService1, IQueuedDatagramService2, IQueuedDatagramService3, IInteropService + { + private static Dictionary<string, int> methodCallCount = new Dictionary<string, int>(); + private static ServiceHost serviceHost; + + public static EventWaitHandle CompletionHandle + { + get; + set; + } + + public static int IntendedInvocationCount + { + get; + set; + } + + public static int TotalMethodCallCount + { + get; + set; + } + + // The test must set these paramters + public static List<Type> ContractTypes + { + get; + set; + } + + public static string EndpointAddress + { + get; + set; + } + + public static void DisplayCounts() + { + Console.WriteLine("Method calls:"); + foreach (string key in methodCallCount.Keys) + { + Console.WriteLine(" {0}: {1}", key, methodCallCount[key]); + } + + Console.WriteLine("Total: {0}", TotalMethodCallCount); + } + + public static void StartService(Binding amqpBinding) + { + MessageService.methodCallCount.Clear(); + MessageService.TotalMethodCallCount = 0; + + serviceHost = new ServiceHost(typeof(MessageService)); + + foreach (Type contractType in ContractTypes) + { + serviceHost.AddServiceEndpoint(contractType, amqpBinding, EndpointAddress); + } + + serviceHost.Open(); + } + + public static void StopService() + { + if (serviceHost.State != CommunicationState.Faulted) + { + serviceHost.Close(); + } + } + + public void UpdateCounts(string method) + { + lock (methodCallCount) + { + if (!methodCallCount.ContainsKey(method)) + { + methodCallCount[method] = 0; + } + + ++methodCallCount[method]; + ++TotalMethodCallCount; + if (TotalMethodCallCount >= IntendedInvocationCount && CompletionHandle != null) + { + CompletionHandle.Set(); + } + } + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedDatagramService1.Hello(string message) + { + this.UpdateCounts("IQueuedDatagramService1.Hello"); + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedDatagramService1.Goodbye() + { + this.UpdateCounts("IQueuedDatagramService1.Goodbye"); + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedDatagramService2.Hello(string message) + { + this.UpdateCounts("IQueuedDatagramService2.Hello"); + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedDatagramService2.Goodbye() + { + this.UpdateCounts("IQueuedDatagramService2.Goodbye"); + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedDatagramService3.Hello(string message) + { + this.UpdateCounts("IQueuedDatagramService3.Hello"); + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IQueuedDatagramService3.Goodbye() + { + this.UpdateCounts("IQueuedDatagramService3.Goodbye"); + } + + [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] + void IInteropService.Hello(Message message) + { + this.UpdateCounts("IInteropService.Hello"); + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs new file mode 100644 index 0000000000..d09832757a --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/MultipleEndpointsSameQueueTest.cs @@ -0,0 +1,83 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System; + using System.Collections.Generic; + using System.Reflection; + using System.ServiceModel; + using System.Threading; + using NUnit.Framework; + + [TestFixture] + public class MultipleEndpointsSameQueueTest + { + private MessageClient client; + + [SetUp] + public void Setup() + { + // Create client + this.client = new MessageClient(); + this.client.NumberOfMessages = 3; + this.client.NumberOfIterations = 5; + + // Setup and start service + MessageService.EndpointAddress = "amqp:message_queue"; + + MessageService.ContractTypes = new List<Type>(); + MessageService.ContractTypes.Add(typeof(IQueuedDatagramService1)); + MessageService.ContractTypes.Add(typeof(IQueuedDatagramService2)); + MessageService.ContractTypes.Add(typeof(IQueuedDatagramService3)); + MessageService.CompletionHandle = new EventWaitHandle(false, EventResetMode.AutoReset); + MessageService.IntendedInvocationCount = this.client.NumberOfIterations * (this.client.NumberOfMessages + 1) * MessageService.ContractTypes.Count; + + MessageService.StartService(Util.GetBinding()); + } + + [Test] + public void Run() + { + // Create wcf amqpchannel and send messages + MethodInfo runClientMethod = this.client.GetType().GetMethod("RunClient"); + EndpointAddress address = new EndpointAddress("amqp:amq.direct?routingkey=routing_key"); + + foreach (Type contractType in MessageService.ContractTypes) + { + MethodInfo runClientT = runClientMethod.MakeGenericMethod(contractType); + runClientT.Invoke(this.client, new object[] { address }); + } + + // Allow the wcf service to process all the messages before validation + MessageService.CompletionHandle.WaitOne(TimeSpan.FromSeconds(10.0), false); + + // Validation + int expectedMethodCallCount = this.client.NumberOfIterations * (this.client.NumberOfMessages + 1) * MessageService.ContractTypes.Count; + + Assert.AreEqual(expectedMethodCallCount, MessageService.TotalMethodCallCount); + } + + [TearDown] + public void Cleanup() + { + MessageService.StopService(); + } + } +} diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..b47a25494f --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Apache.Qpid.Test.Channel.Functional")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("552dca74-b5a3-4ad3-a718-4a1dd03db039")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat new file mode 100755 index 0000000000..4b83993257 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/RunTests.bat @@ -0,0 +1,34 @@ +@echo OFF + +REM Licensed to the Apache Software Foundation (ASF) under one +REM or more contributor license agreements. See the NOTICE file +REM distributed with this work for additional information +REM regarding copyright ownership. The ASF licenses this file +REM to you under the Apache License, Version 2.0 (the +REM "License"); you may not use this file except in compliance +REM with the License. You may obtain a copy of the License at +REM +REM http://www.apache.org/licenses/LICENSE-2.0 +REM +REM Unless required by applicable law or agreed to in writing, +REM software distributed under the License is distributed on an +REM "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +REM KIND, either express or implied. See the License for the +REM specific language governing permissions and limitations +REM under the License. + + +set nunit_exe=%programfiles%\NUnit 2.5.1\bin\net-2.0\nunit-console.exe +set qpid_dll_location=..\..\..\..\..\..\..\cpp\build\src\Debug +set configuration_name=bin\Debug +set qcreate_location=..\..\..\..\..\..\tools\QCreate\Debug + +copy %qpid_dll_location%\qpidclient.dll %configuration_name% +copy %qpid_dll_location%\qpidcommon.dll %configuration_name% + +copy %qpid_dll_location%\qpidclient.dll %qcreate_location% +copy %qpid_dll_location%\qpidcommon.dll %qcreate_location% + +%qcreate_location%\QCreate.exe amq.direct routing_key message_queue + +"%nunit_exe%" %configuration_name%\Apache.Qpid.Test.Channel.Functional.dll diff --git a/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs new file mode 100644 index 0000000000..97be1fb925 --- /dev/null +++ b/qpid/wcf/test/Apache/Qpid/Test/Channel/Functional/Util.cs @@ -0,0 +1,74 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Test.Channel.Functional +{ + using System.Collections.Generic; + using System.IO; + using System.ServiceModel; + using System.ServiceModel.Channels; + using Apache.Qpid.Channel; + + internal class Util + { + public static Dictionary<string, string> GetProperties(string path) + { + string fileData = string.Empty; + using (StreamReader sr = new StreamReader(path)) + { + fileData = sr.ReadToEnd().Replace("\r", string.Empty); + } + + Dictionary<string, string> properties = new Dictionary<string, string>(); + string[] kvp; + string[] records = fileData.Split("\n".ToCharArray()); + foreach (string record in records) + { + if (record[0] == '/' || record[0] == '*') + { + continue; + } + + kvp = record.Split("=".ToCharArray()); + properties.Add(kvp[0], kvp[1]); + } + + return properties; + } + + public static Binding GetBinding() + { + return new AmqpBinding(); + } + + public static Binding GetCustomBinding() + { + AmqpTransportBindingElement transportElement = new AmqpTransportBindingElement(); + RawMessageEncodingBindingElement encodingElement = new RawMessageEncodingBindingElement(); + transportElement.BrokerHost = "127.0.0.1"; + transportElement.TransferMode = TransferMode.Streamed; + + CustomBinding brokerBinding = new CustomBinding(); + brokerBinding.Elements.Add(encodingElement); + brokerBinding.Elements.Add(transportElement); + + return brokerBinding; + } + } +} diff --git a/qpid/wcf/tools/QCreate/QCreate.cpp b/qpid/wcf/tools/QCreate/QCreate.cpp new file mode 100644 index 0000000000..7b0231f339 --- /dev/null +++ b/qpid/wcf/tools/QCreate/QCreate.cpp @@ -0,0 +1,65 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#include "stdafx.h" + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +int main(int argc, char** argv) { + + std::string exchange = argc>1 ? argv[1] : "amq.direct"; + std::string bindingKey = argc>2 ? argv[2] : "routing_key"; + std::string queue = argc>3 ? argv[3] : "message_queue"; + + const char* host = "127.0.0.1"; + int port = 5672; + Connection connection; + + try { + connection.open(host, port); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Create a queue and route all messages whose + // routing key is "routing_key" to this newly created queue. + + session.queueDeclare(arg::queue=queue); + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=bindingKey); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + diff --git a/qpid/wcf/tools/QCreate/ReadMe.txt b/qpid/wcf/tools/QCreate/ReadMe.txt new file mode 100644 index 0000000000..b3efb84503 --- /dev/null +++ b/qpid/wcf/tools/QCreate/ReadMe.txt @@ -0,0 +1,52 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +======================================================================== + CONSOLE APPLICATION : QCreate Project Overview +======================================================================== + +AppWizard has created this QCreate application for you. + +This file contains a summary of what you will find in each of the files that +make up your QCreate application. + + +QCreate.vcproj + This is the main project file for VC++ projects generated using an Application Wizard. + It contains information about the version of Visual C++ that generated the file, and + information about the platforms, configurations, and project features selected with the + Application Wizard. + +QCreate.cpp + This is the main application source file. + +///////////////////////////////////////////////////////////////////////////// +Other standard files: + +StdAfx.h, StdAfx.cpp + These files are used to build a precompiled header (PCH) file + named QCreate.pch and a precompiled types file named StdAfx.obj. + +///////////////////////////////////////////////////////////////////////////// +Other notes: + +AppWizard uses "TODO:" comments to indicate parts of the source code you +should add to or customize. + +///////////////////////////////////////////////////////////////////////////// diff --git a/qpid/wcf/tools/QCreate/stdafx.cpp b/qpid/wcf/tools/QCreate/stdafx.cpp new file mode 100644 index 0000000000..568cd3b7d6 --- /dev/null +++ b/qpid/wcf/tools/QCreate/stdafx.cpp @@ -0,0 +1,27 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +// stdafx.cpp : source file that includes just the standard includes +// QCreate.pch will be the pre-compiled header +// stdafx.obj will contain the pre-compiled type information + +#include "stdafx.h" + +// TODO: reference any additional headers you need in STDAFX.H +// and not in this file diff --git a/qpid/wcf/tools/QCreate/stdafx.h b/qpid/wcf/tools/QCreate/stdafx.h new file mode 100644 index 0000000000..a516e19a10 --- /dev/null +++ b/qpid/wcf/tools/QCreate/stdafx.h @@ -0,0 +1,34 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +// stdafx.h : include file for standard system include files, +// or project specific include files that are used frequently, but +// are changed infrequently +// + +#pragma once + +#include "targetver.h" + +#include <stdio.h> +#include <tchar.h> + + + +// TODO: reference additional headers your program requires here diff --git a/qpid/wcf/tools/QCreate/targetver.h b/qpid/wcf/tools/QCreate/targetver.h new file mode 100644 index 0000000000..9cfb78641b --- /dev/null +++ b/qpid/wcf/tools/QCreate/targetver.h @@ -0,0 +1,32 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +#pragma once + +// The following macros define the minimum required platform. The minimum required platform +// is the earliest version of Windows, Internet Explorer etc. that has the necessary features to run +// your application. The macros work by enabling all features available on platform versions up to and +// including the version specified. + +// Modify the following defines if you have to target a platform prior to the ones specified below. +// Refer to MSDN for the latest info on corresponding values for different platforms. +#ifndef _WIN32_WINNT // Specifies that the minimum required platform is Windows Vista. +#define _WIN32_WINNT 0x0600 // Change this to the appropriate value to target other versions of Windows. +#endif + |