summaryrefslogtreecommitdiff
path: root/qpid/wcf/src/Apache/Qpid/Channel
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel')
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs6
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs11
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs4
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs2
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs4
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs8
-rw-r--r--qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs11
7 files changed, 39 insertions, 7 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
index b952faf9e5..b0b71c87f3 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBinding.cs
@@ -64,6 +64,12 @@ namespace Apache.Qpid.Channel
set { transport.BrokerPort = value; }
}
+ public int PrefetchLimit
+ {
+ get { return transport.PrefetchLimit; }
+ set { transport.PrefetchLimit = value; }
+ }
+
public bool Shared
{
get { return transport.Shared; }
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
index 3ec62e809d..554824cea9 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpBindingConfigurationElement.cs
@@ -63,6 +63,13 @@ namespace Apache.Qpid.Channel
set { brokerPort = value; }
}
+ [ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit, DefaultValue = false)]
+ public int PrefetchLimit
+ {
+ get { return (int)base[AmqpConfigurationStrings.PrefetchLimit]; }
+ set { base[AmqpConfigurationStrings.PrefetchLimit] = value; }
+ }
+
[ConfigurationProperty(AmqpConfigurationStrings.Shared, DefaultValue = false)]
public bool Shared
{
@@ -95,6 +102,8 @@ namespace Apache.Qpid.Channel
get
{
ConfigurationPropertyCollection properties = base.Properties;
+ properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.PrefetchLimit,
+ typeof(int), 0, null, null, ConfigurationPropertyOptions.None));
properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.Shared,
typeof(bool), false, null, null, ConfigurationPropertyOptions.None));
properties.Add(new ConfigurationProperty(AmqpConfigurationStrings.TransferMode,
@@ -112,6 +121,7 @@ namespace Apache.Qpid.Channel
this.BrokerPort = amqpBinding.BrokerPort;
this.TransferMode = amqpBinding.TransferMode;
this.Shared = amqpBinding.Shared;
+ this.PrefetchLimit = amqpBinding.PrefetchLimit;
AmqpProperties props = amqpBinding.DefaultMessageProperties;
}
@@ -133,6 +143,7 @@ namespace Apache.Qpid.Channel
amqpBinding.BrokerPort = this.BrokerPort;
amqpBinding.TransferMode = this.TransferMode;
amqpBinding.Shared = this.Shared;
+ amqpBinding.PrefetchLimit = this.PrefetchLimit;
}
protected override void PostDeserialize()
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
index b8e2811527..542f1a00a8 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
AmqpChannelProperties channelProperties;
long maxBufferPoolSize;
bool shared;
+ int prefetchLimit;
internal AmqpChannelFactory(AmqpTransportBindingElement bindingElement, BindingContext context)
: base(context.Binding)
@@ -39,6 +40,7 @@ namespace Apache.Qpid.Channel
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
Collection<MessageEncodingBindingElement> messageEncoderBindingElements
= context.BindingParameters.FindAll<MessageEncodingBindingElement>();
@@ -91,7 +93,7 @@ namespace Apache.Qpid.Channel
protected override TChannel OnCreateChannel(EndpointAddress remoteAddress, Uri via)
{
- return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared);
+ return (TChannel)(object) new AmqpTransportChannel(this, this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, this.maxBufferPoolSize, this.shared, this.prefetchLimit);
}
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
index f1de30406a..0853b3d6f3 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelHelpers.cs
@@ -46,6 +46,7 @@ namespace Apache.Qpid.Channel
public const string TransferMode = "transferMode";
public const string Brokers = "brokers";
public const string Shared = "shared";
+ public const string PrefetchLimit = "prefetchLimit";
public const string MaxBufferPoolSize = "maxBufferPoolSize";
public const string MaxReceivedMessageSize = "maxReceivedMessageSize";
}
@@ -55,7 +56,6 @@ namespace Apache.Qpid.Channel
internal const string BrokerHost = "localhost";
internal const int BrokerPort = 5672;
internal const TransferMode TransferMode = System.ServiceModel.TransferMode.Buffered;
- internal const byte Priority = 4;
internal const long MaxBufferPoolSize = 64 * 1024;
internal const int MaxReceivedMessageSize = 5 * 1024 * 1024; //64 * 1024;
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
index 44fecdaf62..8894b68584 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
@@ -32,6 +32,7 @@ namespace Apache.Qpid.Channel
AmqpTransportBindingElement bindingElement;
AmqpChannelProperties channelProperties;
bool shared;
+ int prefetchLimit;
long maxBufferPoolSize;
Uri uri;
AmqpTransportChannel amqpTransportChannel;
@@ -45,6 +46,7 @@ namespace Apache.Qpid.Channel
this.bindingElement = bindingElement;
this.channelProperties = bindingElement.ChannelProperties.Clone();
this.shared = bindingElement.Shared;
+ this.prefetchLimit = bindingElement.PrefetchLimit;
this.maxBufferPoolSize = bindingElement.MaxBufferPoolSize;
@@ -132,7 +134,7 @@ namespace Apache.Qpid.Channel
{
amqpTransportChannel = new AmqpTransportChannel(this, this.channelProperties,
new EndpointAddress(uri), messageEncoderFactory.Encoder,
- maxBufferPoolSize, this.shared);
+ maxBufferPoolSize, this.shared, this.prefetchLimit);
return (IInputChannel)(object) amqpTransportChannel;
}
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
index f23b8072e9..08c565af18 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportBindingElement.cs
@@ -29,6 +29,7 @@ namespace Apache.Qpid.Channel
{
AmqpChannelProperties channelProperties;
bool shared;
+ int prefetchLimit;
public AmqpTransportBindingElement()
{
@@ -41,6 +42,7 @@ namespace Apache.Qpid.Channel
{
this.channelProperties = other.channelProperties.Clone();
this.shared = other.shared;
+ this.prefetchLimit = other.prefetchLimit;
}
public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
@@ -98,6 +100,12 @@ namespace Apache.Qpid.Channel
set { this.channelProperties.BrokerPort = value; }
}
+ public int PrefetchLimit
+ {
+ get { return this.prefetchLimit; }
+ set { this.prefetchLimit = value; }
+ }
+
public bool Shared
{
get { return this.shared; }
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
index ca9c10be69..5924142046 100644
--- a/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
+++ b/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
@@ -50,6 +50,7 @@ namespace Apache.Qpid.Channel
private MessageEncoder encoder;
private AmqpChannelProperties factoryChannelProperties;
private bool shared;
+ private int prefetchLimit;
private string encoderContentType;
// input = 0-10 queue, output = 0-10 exchange
@@ -68,7 +69,7 @@ namespace Apache.Qpid.Channel
private AsyncTimeSpanCaller asyncOpenCaller;
private AsyncTimeSpanCaller asyncCloseCaller;
- internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection)
+ internal AmqpTransportChannel(ChannelManagerBase factory, AmqpChannelProperties channelProperties, EndpointAddress remoteAddress, MessageEncoder msgEncoder, long maxBufferPoolSize, bool sharedConnection, int prefetchLimit)
: base(factory)
{
this.isInputChannel = (factory is ChannelListenerBase) || (factory is AmqpChannelFactory<IInputChannel>);
@@ -80,6 +81,7 @@ namespace Apache.Qpid.Channel
this.factoryChannelProperties = channelProperties;
this.shared = sharedConnection;
+ this.prefetchLimit = prefetchLimit;
this.remoteAddress = remoteAddress;
// pull out host, port, queue, and connection arguments
@@ -128,6 +130,7 @@ namespace Apache.Qpid.Channel
if (this.isInputChannel)
{
this.inputLink = ConnectionManager.GetInputLink(this.factoryChannelProperties, shared, false, this.queueName);
+ this.inputLink.PrefetchLimit = this.prefetchLimit;
}
else
{
@@ -287,7 +290,7 @@ namespace Apache.Qpid.Channel
return false;
}
-
+
public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.inputLink.BeginTryReceive(timeout, callback, state);
@@ -464,7 +467,7 @@ namespace Apache.Qpid.Channel
}
return amqpMessage;
}
-
+
private Message QpidToWcf(AmqpMessage amqpMessage)
{
@@ -531,7 +534,7 @@ namespace Apache.Qpid.Channel
{
this.bufferManager.ReturnBuffer(managedBuffer);
}
- }
+ }
return wcfMessage;
}