diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel')
7 files changed, 39 insertions, 7 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs index b952faf9e5..b0b71c87f3 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs @@ -64,6 +64,12 @@ namespace Apache.Qpid.Channel set { transport.BrokerPort = value; } } + public int PrefetchLimit + { + get { return transport.PrefetchLimit; } + set { transport.PrefetchLimit = value; } + } + public bool Shared { get { return transport.Shared; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs index 3ec62e809d..554824cea9 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs @@ -63,6 +63,13 @@ namespace Apache.Qpid.Channel set { brokerPort = value; } } + [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)] + public int PrefetchLimit + { + get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; } + set { base[AmqpConfigurationStrings.PrefetchLimit] = value; } + } + [ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)] public bool Shared { @@ -95,6 +102,8 @@ namespace Apache.Qpid.Channel get { ConfigurationPropertyCollection properties = base.Properties; + properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, + typeof(int), 0, null, null, ConfigurationPropertyOptions.None)); properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared, typeof(bool), false, null, null, ConfigurationPropertyOptions.None)); properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode, @@ -112,6 +121,7 @@ namespace Apache.Qpid.Channel this.BrokerPort = amqpBinding.BrokerPort; this.TransferMode = amqpBinding.TransferMode; this.Shared = amqpBinding.Shared; + this.PrefetchLimit = amqpBinding.PrefetchLimit; AmqpProperties props = amqpBinding.DefaultMessageProperties; } @@ -133,6 +143,7 @@ namespace Apache.Qpid.Channel amqpBinding.BrokerPort = this.BrokerPort; amqpBinding.TransferMode = this.TransferMode; amqpBinding.Shared = this.Shared; + amqpBinding.PrefetchLimit = this.PrefetchLimit; } protected override void PostDeserialize() diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs index b8e2811527..542f1a00a8 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs @@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel AmqpChannelProperties channelProperties; long maxBufferPoolSize; bool shared; + int prefetchLimit; internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context) : base(context.Binding) @@ -39,6 +40,7 @@ namespace Apache.Qpid.Channel this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; Collection<MessageEncodingBindingElement> messageEncoderBindingElements = context.BindingParameters.FindAll<MessageEncodingBindingElement>(); @@ -91,7 +93,7 @@ namespace Apache.Qpid.Channel protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via) { - return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared); + return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit); } } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs index f1de30406a..0853b3d6f3 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs @@ -46,6 +46,7 @@ namespace Apache.Qpid.Channel public const string TransferMode = "transferMode"; public const string Brokers = "brokers"; public const string Shared = "shared"; + public const string PrefetchLimit = "prefetchLimit"; public const string MaxBufferPoolSize = "maxBufferPoolSize"; public const string MaxReceivedMessageSize = "maxReceivedMessageSize"; } @@ -55,7 +56,6 @@ namespace Apache.Qpid.Channel internal const string BrokerHost = "localhost"; internal const int BrokerPort = 5672; internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered; - internal const byte Priority = 4; internal const long MaxBufferPoolSize = 64 * 1024; internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs index 44fecdaf62..8894b68584 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs @@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel AmqpTransportBindingElement bindingElement; AmqpChannelProperties channelProperties; bool shared; + int prefetchLimit; long maxBufferPoolSize; Uri uri; AmqpTransportChannel amqpTransportChannel; @@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel this.bindingElement = bindingElement; this.channelProperties = bindingElement.ChannelProperties.Clone(); this.shared = bindingElement.Shared; + this.prefetchLimit = bindingElement.PrefetchLimit; this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize; @@ -132,7 +134,7 @@ namespace Apache.Qpid.Channel { amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties, new EndpointAddress(uri), messageEncoderFactory.Encoder, - maxBufferPoolSize, this.shared); + maxBufferPoolSize, this.shared, this.prefetchLimit); return (IInputChannel)(object) amqpTransportChannel; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs index f23b8072e9..08c565af18 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs @@ -29,6 +29,7 @@ namespace Apache.Qpid.Channel { AmqpChannelProperties channelProperties; bool shared; + int prefetchLimit; public AmqpTransportBindingElement() { @@ -41,6 +42,7 @@ namespace Apache.Qpid.Channel { this.channelProperties = other.channelProperties.Clone(); this.shared = other.shared; + this.prefetchLimit = other.prefetchLimit; } public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) @@ -98,6 +100,12 @@ namespace Apache.Qpid.Channel set { this.channelProperties.BrokerPort = value; } } + public int PrefetchLimit + { + get { return this.prefetchLimit; } + set { this.prefetchLimit = value; } + } + public bool Shared { get { return this.shared; } diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs index ca9c10be69..5924142046 100644 --- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs +++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs @@ -50,6 +50,7 @@ namespace Apache.Qpid.Channel private MessageEncoder encoder; private AmqpChannelProperties factoryChannelProperties; private bool shared; + private int prefetchLimit; private string encoderContentType; // input = 0-10 queue, output = 0-10 exchange @@ -68,7 +69,7 @@ namespace Apache.Qpid.Channel private AsyncTimeSpanCaller asyncOpenCaller; private AsyncTimeSpanCaller asyncCloseCaller; - internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection) + internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit) : base(factory) { this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>); @@ -80,6 +81,7 @@ namespace Apache.Qpid.Channel this.factoryChannelProperties = channelProperties; this.shared = sharedConnection; + this.prefetchLimit = prefetchLimit; this.remoteAddress = remoteAddress; // pull out host, port, queue, and connection arguments @@ -128,6 +130,7 @@ namespace Apache.Qpid.Channel if (this.isInputChannel) { this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName); + this.inputLink.PrefetchLimit = this.prefetchLimit; } else { @@ -287,7 +290,7 @@ namespace Apache.Qpid.Channel return false; } - + public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state) { return this.inputLink.BeginTryReceive(timeout, callback, state); @@ -464,7 +467,7 @@ namespace Apache.Qpid.Channel } return amqpMessage; } - + private Message QpidToWcf(AmqpMessage amqpMessage) { @@ -531,7 +534,7 @@ namespace Apache.Qpid.Channel { this.bufferManager.ReturnBuffer(managedBuffer); } - } + } return wcfMessage; } |