diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Message')
12 files changed, 1936 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs new file mode 100644 index 0000000000..e58de2ab96 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public class AMQMessage + { + protected IContentHeaderProperties _contentHeaderProperties; + + /// <summary> + /// If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required + /// </summary> + protected AmqChannel _channel; + + private long _deliveryTag; + + public AMQMessage(IContentHeaderProperties properties, long deliveryTag) + { + _contentHeaderProperties = properties; + _deliveryTag = deliveryTag; + } + + public AMQMessage(IContentHeaderProperties properties) + : this(properties, -1) + { + } + + public long DeliveryTag + { + get { return _deliveryTag; } + } + + public AmqChannel Channel + { + get { return _channel; } + set { _channel = value; } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs new file mode 100644 index 0000000000..f352d62c11 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Apache.Qpid.Framing; +using log4net; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public abstract class AbstractQmsMessageFactory : IMessageFactory + { + public abstract AbstractQmsMessage CreateMessage(string mimeType); + + private static readonly ILog _logger = LogManager.GetLogger(typeof (AbstractQmsMessageFactory)); + + protected abstract AbstractQmsMessage CreateMessage(long messageNbr, ByteBuffer data, ContentHeaderBody contentHeader); + + protected AbstractQmsMessage CreateMessageWithBody(long messageNbr, + ContentHeaderBody contentHeader, + IList bodies) + { + ByteBuffer data; + + // we optimise the non-fragmented case to avoid copying + if (bodies != null && bodies.Count == 1) + { + _logger.Debug("Non-fragmented message body (bodySize=" + contentHeader.BodySize +")"); + data = ((ContentBody)bodies[0]).Payload; + } + else + { + _logger.Debug("Fragmented message body (" + bodies.Count + " frames, bodySize=" + contentHeader.BodySize + ")"); + data = ByteBuffer.Allocate((int)contentHeader.BodySize); // XXX: Is cast a problem? + foreach (ContentBody body in bodies) { + data.Put(body.Payload); + //body.Payload.Release(); + } + + data.Flip(); + } + _logger.Debug("Creating message from buffer with position=" + data.Position + " and remaining=" + data.Remaining); + + return CreateMessage(messageNbr, data, contentHeader); + } + + public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) + { + AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, contentHeader, bodies); + msg.Redelivered = redelivered; + return msg; + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs new file mode 100644 index 0000000000..34b47137e5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -0,0 +1,694 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public abstract class AbstractQmsMessage : AMQMessage, IMessage + { + private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage)); + + protected bool _redelivered; + + protected ByteBuffer _data; + protected bool _readableMessage = false; + private QpidHeaders _headers; + + protected AbstractQmsMessage(ByteBuffer data) + : base(new BasicContentHeaderProperties()) + { + Init(data); + } + + protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + : this(contentHeader, deliveryTag) + { + Init(data); + } + + protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) + { + Init(null); + } + + private void Init(ByteBuffer data) + { + _data = data; + if ( _data != null ) + { + _data.Acquire(); + } + _readableMessage = (data != null); + if ( ContentHeaderProperties.Headers == null ) + ContentHeaderProperties.Headers = new FieldTable(); + _headers = new QpidHeaders(ContentHeaderProperties.Headers); + } + + // + // Properties + // + + /// <summary> + /// The application message identifier + /// </summary> + public string MessageId + { + get + { + if (ContentHeaderProperties.MessageId == null) + { + ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; + } + return ContentHeaderProperties.MessageId; + } + set { ContentHeaderProperties.MessageId = value; } + } + + /// <summary> + /// The message timestamp + /// </summary> + public long Timestamp + { + get + { + // TODO: look at ulong/long choice + return (long) ContentHeaderProperties.Timestamp; + } + set + { + ContentHeaderProperties.Timestamp = (ulong) value; + } + } + + /// <summary> + /// The <see cref="CorrelationId"/> as a byte array. + /// </summary> + public byte[] CorrelationIdAsBytes + { + get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); } + set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); } + } + + /// <summary> + /// The application correlation identifier + /// </summary> + public string CorrelationId + { + get { return ContentHeaderProperties.CorrelationId; } + set { ContentHeaderProperties.CorrelationId = value; } + } + + struct Dest + { + public string ExchangeName; + public string RoutingKey; + + public Dest(string exchangeName, string routingKey) + { + ExchangeName = exchangeName; + RoutingKey = routingKey; + } + } + + /// <summary> + /// Exchange name of the reply-to address + /// </summary> + public string ReplyToExchangeName + { + get + { + return ReadReplyToHeader().ExchangeName; + } + set + { + BindingURL dest = ReadReplyToHeader(); + dest.ExchangeName = value; + WriteReplyToHeader(dest); + } + } + + /// <summary> + /// Routing key of the reply-to address + /// </summary> + public string ReplyToRoutingKey + { + get + { + return ReadReplyToHeader().RoutingKey; + } + set + { + BindingURL dest = ReadReplyToHeader(); + dest.RoutingKey = value; + WriteReplyToHeader(dest); + } + } + + /// <summary> + /// Non-persistent (1) or persistent (2) + /// </summary> + public DeliveryMode DeliveryMode + { + get + { + byte b = ContentHeaderProperties.DeliveryMode; + switch (b) + { + case 1: + return DeliveryMode.NonPersistent; + case 2: + return DeliveryMode.Persistent; + default: + throw new QpidException("Illegal value for delivery mode in content header properties"); + } + } + set + { + ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); + } + } + + /// <summary> + /// True, if this is a redelivered message + /// </summary> + public bool Redelivered + { + get { return _redelivered; } + set { _redelivered = value; } + } + + /// <summary> + /// The message type name + /// </summary> + public string Type + { + get { return ContentHeaderProperties.Type; } + set { ContentHeaderProperties.Type = value; } + } + + /// <summary> + /// Message expiration specification + /// </summary> + public long Expiration + { + get { return ContentHeaderProperties.Expiration; } + set { ContentHeaderProperties.Expiration = value; } + } + + /// <summary> + /// The message priority, 0 to 9 + /// </summary> + public byte Priority + { + get { return ContentHeaderProperties.Priority; } + set { ContentHeaderProperties.Priority = (byte) value; } + } + + /// <summary> + /// The MIME Content Type + /// </summary> + public string ContentType + { + get { return ContentHeaderProperties.ContentType; } + set { ContentHeaderProperties.ContentType = value; } + } + + /// <summary> + /// The MIME Content Encoding + /// </summary> + public string ContentEncoding + { + get { return ContentHeaderProperties.Encoding; } + set { ContentHeaderProperties.Encoding = value; } + } + + /// <summary> + /// Headers of this message + /// </summary> + public IHeaders Headers + { + get { return _headers; } + } + + /// <summary> + /// The creating user id + /// </summary> + public string UserId + { + get { return ContentHeaderProperties.UserId; } + set { ContentHeaderProperties.UserId = value; } + } + + /// <summary> + /// The creating application id + /// </summary> + public string AppId + { + get { return ContentHeaderProperties.AppId; } + set { ContentHeaderProperties.AppId = value; } + } + + /// <summary> + /// Intra-cluster routing identifier + /// </summary> + public string ClusterId + { + get { return ContentHeaderProperties.ClusterId; } + set { ContentHeaderProperties.ClusterId = value; } + } + + /// <summary> + /// Return the raw byte array that is used to populate the frame when sending + /// the message. + /// </summary> + /// <value>a byte array of message data</value> + public ByteBuffer Data + { + get + { + if (_data != null) + { + if (!_readableMessage) + { + _data.Flip(); + } + else + { + // Make sure we rewind the data just in case any method has moved the + // position beyond the start. + _data.Rewind(); + } + } + return _data; + } + + set + { + _data = value; + } + } + + public void Acknowledge() + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_channel != null) + { + // we set multiple to true here since acknowledgement implies acknowledge of all count messages + // received on the session + _channel.AcknowledgeMessage((ulong)DeliveryTag, true); + } + + } + + public abstract void ClearBodyImpl(); + + public void ClearBody() + { + ClearBodyImpl(); + _readableMessage = false; + } + + /// <summary> + /// Get a String representation of the body of the message. Used in the + /// toString() method which outputs this before message properties. + /// </summary> + /// <exception cref="QpidException"></exception> + public abstract string ToBodyString(); + + public override string ToString() + { + try + { + StringBuilder buf = new StringBuilder("Body:\n"); + buf.Append(ToBodyString()); + buf.Append("\nQmsTimestamp: ").Append(Timestamp); + buf.Append("\nQmsExpiration: ").Append(Expiration); + buf.Append("\nQmsPriority: ").Append(Priority); + buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); + buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); + buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); + buf.Append("\nAMQ message number: ").Append(DeliveryTag); + buf.Append("\nProperties:"); + if (ContentHeaderProperties.Headers == null) + { + buf.Append("<NONE>"); + } + else + { + buf.Append(Headers.ToString()); + } + return buf.ToString(); + } + catch (Exception e) + { + return e.ToString(); + } + } + + public FieldTable PopulateHeadersFromMessageProperties() + { + if (ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + // + // We need to convert every property into a String representation + // Note that type information is preserved in the property name + // + FieldTable table = new FieldTable(); + foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) + { + string propertyName = (string) entry.Key; + if (propertyName == null) + { + continue; + } + else + { + table[propertyName] = entry.Value.ToString(); + } + } + return table; + } + } + + public BasicContentHeaderProperties ContentHeaderProperties + { + get + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + } + + protected virtual void Reset() + { + _readableMessage = true; + } + + public bool IsReadable + { + get { return _readableMessage; } + } + + public bool isWritable + { + get { return !_readableMessage; } + } + + protected void CheckReadable() + { + if ( !_readableMessage ) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + /// <summary> + /// Decodes the replyto field if one is set. + /// + /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and + /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is + /// empty the replyto field is expected to being with ':'. + /// + /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception. + /// </summary> + /// + /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns> + private BindingURL ReadReplyToHeader() + { + string replyToEncoding = ContentHeaderProperties.ReplyTo; + //log.Debug("replyToEncoding = " + replyToEncoding); + + BindingURL bindingUrl = new BindingURL(replyToEncoding); + //log.Debug("bindingUrl = " + bindingUrl.ToString()); + + return bindingUrl; + + //log.Info("replyToEncoding = " + replyToEncoding); + +// if ( replyToEncoding == null ) +// { +// return new Dest(); +// } else +// { +// // Split the replyto field on a ':' +// string[] split = replyToEncoding.Split(':'); + +// // Ensure that the replyto field argument only consisted of two parts. +// if ( split.Length != 2 ) +// { +// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); +// } + +// // Extract the exchange name and routing key from the split replyto field. +// string exchangeName = split[0]; + +// string[] split2 = split[1].Split('/'); +// string routingKey = split2[3]; + +// return new Dest(exchangeName, routingKey); +// } + } + + private void WriteReplyToHeader(BindingURL dest) + { + string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); + ContentHeaderProperties.ReplyTo = encodedDestination; + } + } + + public class BindingURL + { + public readonly static string OPTION_EXCLUSIVE = "exclusive"; + public readonly static string OPTION_AUTODELETE = "autodelete"; + public readonly static string OPTION_DURABLE = "durable"; + public readonly static string OPTION_CLIENTID = "clientid"; + public readonly static string OPTION_SUBSCRIPTION = "subscription"; + public readonly static string OPTION_ROUTING_KEY = "routingkey"; + + /// <summary> Holds the undecoded URL </summary> + string url; + + /// <summary> Holds the decoded options. </summary> + IDictionary options = new Hashtable(); + + /// <summary> Holds the decoded exchange class. </summary> + string exchangeClass; + + /// <summary> Holds the decoded exchange name. </summary> + string exchangeName; + + /// <summary> Holds the destination name. </summary> + string destination; + + /// <summary> Holds the decoded queue name. </summary> + string queueName; + + /// <summary> + /// The binding URL has the format: + /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + /// </summary> + public BindingURL(string url) + { + this.url = url; + Parse(); + } + + public string Url { get { return url; } } + + public string ExchangeClass + { + get { return exchangeClass; } + set { exchangeClass = value; } + } + + public string ExchangeName + { + get { return exchangeName; } + set { exchangeName = value; } + } + + public string QueueName + { + get { return queueName; } + set { queueName = value; } + } + + public string DestinationName + { + get { return destination; } + set { destination = value; } + } + + public string RoutingKey { + get { return (string)options[OPTION_ROUTING_KEY]; } + set { options[OPTION_ROUTING_KEY] = value; } + } + + public bool ContainsOption(string key) { return options.Contains(key); } + + public string ToString() + { + return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + + ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] "; + } + + private void Parse() + { + Uri binding = new Uri(url); + + // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified. + string exchangeClass = binding.Scheme; + + if (exchangeClass == null) + { + url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url; + Parse(); + + return; + } + else + { + this.exchangeClass = exchangeClass; + } + + // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified. + string exchangeName = binding.Host; + + if (exchangeName == null) + { + if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS)) + { + this.exchangeName = ""; + } + } + else + { + this.exchangeName = exchangeName; + } + + // Extract the destination and queue name. + if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals("")) + { + throw new UriFormatException("Destination or Queue required"); + } + else + { + int slashOffset = binding.AbsolutePath.IndexOf("/", 1); + if (slashOffset == -1) + { + throw new UriFormatException("Destination required"); + } + else + { + String path = binding.AbsolutePath; + + this.destination = path.Substring(1, slashOffset - 1); + this.queueName = path.Substring(slashOffset + 1); + } + } + + ParseOptions(options, binding.Query); + + // If the routing key is not set as an option, set it to the destination name. + if (!ContainsOption(OPTION_ROUTING_KEY)) + { + options[OPTION_ROUTING_KEY] = destination; + } + } + + /// <summary> + /// options looks like this + /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value' + /// </summary> + public static void ParseOptions(IDictionary optionMap, string options) + { + // Check that there really are some options to parse. + if ((options == null) || (options.IndexOf('=') == -1)) + { + return; + } + + int optionIndex = options.IndexOf('='); + string option = options.Substring(0, optionIndex); + int length = options.Length; + int nestedQuotes = 0; + + // Holds the index of the final "'". + int valueIndex = optionIndex; + + // Loop over all the options.Dest + while ((nestedQuotes > 0) || (valueIndex < length)) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options[valueIndex] == '\'') + { + if ((valueIndex + 1) < options.Length) + { + if ((options[valueIndex + 1] == '&') || + (options[valueIndex + 1] == ',') || + (options[valueIndex + 1] == ';') || + (options[valueIndex + 1] == '\'')) + { + nestedQuotes--; + + if (nestedQuotes == 0) + { + // We've found the value of an option + break; + } + } + else + { + nestedQuotes++; + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options[valueIndex] == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs new file mode 100644 index 0000000000..bed379290f --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public interface IMessageFactory + { + /// <summary> + /// Create a message + /// </summary> + /// <param name="deliverTag">Delivery Tag</param> + /// <param name="messageNbr">Message Sequence Number</param> + /// <param name="redelivered">True if this is a redelivered message</param> + /// <param name="contentHeader">Content headers</param> + /// <param name="bodies">Message bodies</param> + /// <returns>The new message</returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies); + + /// <summary> + /// Creates the message. + /// </summary> + /// <param name="mimeType">Mime type to associate the new message with</param> + /// <returns>The new message</returns> + /// <exception cref="QpidMessagingException">if the message cannot be created</exception> + AbstractQmsMessage CreateMessage(string mimeType); + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs new file mode 100644 index 0000000000..fdb5e14aa6 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client.Message +{ + public class MessageFactoryRegistry + { + private readonly Hashtable _mimeToFactoryMap = new Hashtable(); + private IMessageFactory _defaultFactory; + + /// <summary> + /// Default factory to use for unknown message types + /// </summary> + public IMessageFactory DefaultFactory + { + get { return _defaultFactory; } + set { _defaultFactory = value; } + } + + /// <summary> + /// Register a new message factory for a MIME type + /// </summary> + /// <param name="mimeType">Mime type to register</param> + /// <param name="mf"></param> + public void RegisterFactory(string mimeType, IMessageFactory mf) + { + if ( mf == null ) + throw new ArgumentNullException("mf"); + if ( mimeType == null || mimeType.Length == 0 ) + throw new ArgumentNullException("mimeType"); + + _mimeToFactoryMap[mimeType] = mf; + } + + /// <summary> + /// Remove a message factory + /// </summary> + /// <param name="mimeType">MIME type to unregister</param> + public void DeregisterFactory(string mimeType) + { + _mimeToFactoryMap.Remove(mimeType); + } + + /// <summary> + /// Create a message. This looks up the MIME type from the content header and instantiates the appropriate + /// concrete message type. + /// </summary> + /// <param name="messageNbr">the AMQ message id</param> + /// <param name="redelivered">true if redelivered</param> + /// <param name="contentHeader">the content header that was received</param> + /// <param name="bodies">a list of ContentBody instances</param> + /// <returns>the message.</returns> + /// <exception cref="AMQException"/> + /// <exception cref="QpidException"/> + public AbstractQmsMessage CreateMessage(long messageNbr, bool redelivered, + ContentHeaderBody contentHeader, + IList bodies) + { + BasicContentHeaderProperties properties = (BasicContentHeaderProperties)contentHeader.Properties; + + if ( properties.ContentType == null ) + { + properties.ContentType = ""; + } + + IMessageFactory mf = GetFactory(properties.ContentType); + return mf.CreateMessage(messageNbr, redelivered, contentHeader, bodies); + } + + /// <summary> + /// Create a new message of the specified type + /// </summary> + /// <param name="mimeType">The Mime type</param> + /// <returns>The new message</returns> + public AbstractQmsMessage CreateMessage(string mimeType) + { + if ( mimeType == null || mimeType.Length == 0 ) + throw new ArgumentNullException("mimeType"); + + IMessageFactory mf = GetFactory(mimeType); + return mf.CreateMessage(mimeType); + } + + /// <summary> + /// Construct a new registry with the default message factories registered + /// </summary> + /// <returns>a message factory registry</returns> + public static MessageFactoryRegistry NewDefaultRegistry() + { + MessageFactoryRegistry mf = new MessageFactoryRegistry(); + mf.RegisterFactory("text/plain", new QpidTextMessageFactory()); + mf.RegisterFactory("text/xml", new QpidTextMessageFactory()); + mf.RegisterFactory("application/octet-stream", new QpidBytesMessageFactory()); + + mf.DefaultFactory = new QpidBytesMessageFactory(); + return mf; + } + + private IMessageFactory GetFactory(string mimeType) + { + IMessageFactory mf = (IMessageFactory)_mimeToFactoryMap[mimeType]; + return mf != null ? mf : _defaultFactory; + } + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs new file mode 100644 index 0000000000..fb3efb1b0f --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs @@ -0,0 +1,353 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.IO; +using System.Runtime.Serialization; +using System.Text; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + [Serializable] + class MessageEOFException : QpidException + { + public MessageEOFException(string message) : base(message) + { + } + + protected MessageEOFException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } + + public class QpidBytesMessage : AbstractQmsMessage, IBytesMessage + { + private const int DEFAULT_BUFFER_INITIAL_SIZE = 1024; + + public QpidBytesMessage() : this(null) + { + } + + /// <summary> + /// Construct a bytes message with existing data. + /// </summary> + /// <param name="data">if data is not null, the message is immediately in read only mode. if data is null, it is in + /// write-only mode</param> + QpidBytesMessage(ByteBuffer data) : base(data) + { + // superclass constructor has instantiated a content header at this point + if (data == null) + { + _data = ByteBuffer.Allocate(DEFAULT_BUFFER_INITIAL_SIZE); + _data.IsAutoExpand = true; + } + } + + internal QpidBytesMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data) + // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea + : base(messageNbr, (BasicContentHeaderProperties)contentHeader.Properties, data) + { + } + + public override void ClearBodyImpl() + { + _data.Clear(); + } + + public override string ToBodyString() + { + CheckReadable(); + try + { + return GetText(); + } + catch (IOException e) + { + throw new QpidException(e.ToString()); + } + } + + private String GetText() + { + // this will use the default platform encoding + if (_data == null) + { + return null; + } + int pos = _data.Position; + _data.Rewind(); + // one byte left is for the end of frame marker + if (_data.Remaining == 0) + { + // this is really redundant since pos must be zero + _data.Position = pos; + return null; + } + else + { + byte[] data = new byte[_data.Remaining]; + _data.GetBytes(data); + return Encoding.UTF8.GetString(data); + } + } + + public long BodyLength + { + get + { + CheckReadable(); + return _data.Limit; + } + } + + private void CheckWritable() + { + if (_readableMessage) + { + throw new MessageNotWriteableException("You need to call clearBody() to make the message writable"); + } + } + + public bool ReadBoolean() + { + CheckReadable(); + CheckAvailable(1); + return _data.GetByte() != 0; + } + + public byte ReadByte() + { + CheckReadable(); + CheckAvailable(1); + return _data.GetByte(); + } + + public short ReadSignedByte() + { + CheckReadable(); + CheckAvailable(1); + return _data.GetSByte(); + } + + public short ReadShort() + { + CheckReadable(); + CheckAvailable(2); + return _data.GetInt16(); + } + + public char ReadChar() + { + CheckReadable(); + CheckAvailable(2); + return _data.GetChar(); + } + + public int ReadInt() + { + CheckReadable(); + CheckAvailable(4); + return _data.GetInt32(); + } + + public long ReadLong() + { + CheckReadable(); + CheckAvailable(8); + return _data.GetInt64(); + } + + public float ReadFloat() + { + CheckReadable(); + CheckAvailable(4); + return _data.GetFloat(); + } + + public double ReadDouble() + { + CheckReadable(); + CheckAvailable(8); + return _data.GetDouble(); + } + + public string ReadUTF() + { + CheckReadable(); + // we check only for one byte since theoretically the string could be only a + // single byte when using UTF-8 encoding + CheckAvailable(1); + try + { + byte[] data = new byte[_data.Remaining]; + _data.GetBytes(data); + return Encoding.UTF8.GetString(data); + } + catch (IOException e) + { + throw new QpidException(e.ToString(), e); + } + } + + public int ReadBytes(byte[] bytes) + { + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + CheckReadable(); + int count = (_data.Remaining >= bytes.Length ? bytes.Length : _data.Remaining); + if (count == 0) + { + return -1; + } + else + { + _data.GetBytes(bytes, 0, count); + return count; + } + } + + public int ReadBytes(byte[] bytes, int maxLength) + { + if (bytes == null) + { + throw new ArgumentNullException("bytes"); + } + if (maxLength > bytes.Length) + { + throw new ArgumentOutOfRangeException("maxLength must be >= 0"); + } + CheckReadable(); + int count = (_data.Remaining >= maxLength ? maxLength : _data.Remaining); + if (count == 0) + { + return -1; + } + else + { + _data.GetBytes(bytes, 0, count); + return count; + } + } + + public void WriteBoolean(bool b) + { + CheckWritable(); + _data.Put(b ? (byte)1 : (byte)0); + } + + public void WriteByte(byte b) + { + CheckWritable(); + _data.Put(b); + } + + public void WriteShort(short i) + { + CheckWritable(); + _data.Put(i); + } + + public void WriteChar(char c) + { + CheckWritable(); + _data.Put(c); + } + + public void WriteSignedByte(short value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteDouble(double value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteFloat(float value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteInt(int value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteLong(long value) + { + CheckWritable(); + _data.Put(value); + } + + public void WriteUTF(string value) + { + CheckWritable(); + byte[] encodedData = Encoding.UTF8.GetBytes(value); + _data.Put(encodedData); + } + + public void WriteBytes(byte[] bytes) + { + CheckWritable(); + _data.Put(bytes); + } + + public void WriteBytes(byte[] bytes, int offset, int length) + { + CheckWritable(); + _data.Put(bytes, offset, length); + } + + protected override void Reset() + { + base.Reset(); + _data.Flip(); + } + + void IBytesMessage.Reset() + { + Reset(); + } + + /** + * Check that there is at least a certain number of bytes available to read + * + * @param len the number of bytes + * @throws MessageEOFException if there are less than len bytes available to read + */ + private void CheckAvailable(int len) + { + if (_data.Remaining < len) + { + throw new MessageEOFException("Unable to read " + len + " bytes"); + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs new file mode 100644 index 0000000000..3cc96cbddc --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using Apache.Qpid.Framing; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public class QpidBytesMessageFactory : AbstractQmsMessageFactory + { + //protected override AbstractQmsMessage CreateMessageWithBody(long messageNbr, + // ContentHeaderBody contentHeader, + // IList bodies) + //{ + // byte[] data; + + // // we optimise the non-fragmented case to avoid copying + // if (bodies != null && bodies.Count == 1) + // { + // data = ((ContentBody)bodies[0]).Payload; + // } + // else + // { + // data = new byte[(long)contentHeader.BodySize]; + // int currentPosition = 0; + // foreach (ContentBody cb in bodies) + // { + // Array.Copy(cb.Payload, 0, data, currentPosition, cb.Payload.Length); + // currentPosition += cb.Payload.Length; + // } + // } + + // return new QpidBytesMessage(messageNbr, data, contentHeader); + //} + + //public override AbstractQmsMessage CreateMessage() + //{ + // return new QpidBytesMessage(); + //} + + protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) + { + return new QpidBytesMessage(deliveryTag, contentHeader, data); + } + + public override AbstractQmsMessage CreateMessage(string mimeType) + { + QpidBytesMessage msg = new QpidBytesMessage(); + msg.ContentType = mimeType; + return msg; + } + + } +} + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs new file mode 100644 index 0000000000..9ad1c26867 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidHeaders.cs @@ -0,0 +1,233 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; + +namespace Apache.Qpid.Client.Message +{ + internal class QpidHeaders : IHeaders + { + private FieldTable _headers; + + public QpidHeaders(FieldTable headers) + { + if ( headers == null ) + throw new ArgumentNullException("headers"); + _headers = headers; + } + + public bool Contains(string name) + { + CheckPropertyName(name); + return _headers.Contains(name); + } + + public void Clear() + { + _headers.Clear(); + } + + public object this[string name] + { + get { return GetObject(name); } + set { SetObject(name, value); } + } + + public bool GetBoolean(string name) + { + CheckPropertyName(name); + if ( Contains(name) ) + return _headers.GetBoolean(name); + return false; + } + + public void SetBoolean(string name, bool b) + { + CheckPropertyName(name); + _headers.SetBoolean(name, b); + } + + public byte GetByte(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetByte(propertyName); + return 0; + } + + public void SetByte(string propertyName, byte b) + { + CheckPropertyName(propertyName); + _headers.SetByte(propertyName, b); + } + + // we have sbyte overloads to interoperate with java + // because the Java client/server uses signed bytes + // by default, while C#'s is unsigned + public sbyte GetSByte(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetSByte(propertyName); + return 0; + } + + public void SetSByte(string propertyName, sbyte b) + { + CheckPropertyName(propertyName); + _headers.SetSByte(propertyName, b); + } + + public short GetShort(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetInt16(propertyName); + return 0; + } + + public void SetShort(string propertyName, short i) + { + CheckPropertyName(propertyName); + _headers.SetInt16(propertyName, i); + } + + public int GetInt(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetInt32(propertyName); + return 0; + } + + public void SetInt(string propertyName, int i) + { + CheckPropertyName(propertyName); + _headers.SetInt32(propertyName, i); + } + + public long GetLong(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetInt64(propertyName); + return 0; + } + + public void SetLong(string propertyName, long l) + { + CheckPropertyName(propertyName); + _headers.SetInt64(propertyName, l); + } + + public float GetFloat(String propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetFloat(propertyName); + return 0f; + } + + public void SetFloat(string propertyName, float f) + { + CheckPropertyName(propertyName); + _headers.SetFloat(propertyName, f); + } + + public double GetDouble(string propertyName) + { + CheckPropertyName(propertyName); + if ( Contains(propertyName) ) + return _headers.GetDouble(propertyName); + return 0; + } + + public void SetDouble(string propertyName, double v) + { + CheckPropertyName(propertyName); + _headers.SetDouble(propertyName, v); + } + + public string GetString(string propertyName) + { + CheckPropertyName(propertyName); + return _headers.GetString(propertyName); + } + + public void SetString(string propertyName, string value) + { + CheckPropertyName(propertyName); + _headers.SetString(propertyName, value); + } + + public object GetObject(string propertyName) + { + CheckPropertyName(propertyName); + return _headers[propertyName]; + } + + public void SetObject(string propertyName, object value) + { + CheckPropertyName(propertyName); + _headers[propertyName] = value; + } + + private static void CheckPropertyName(string propertyName) + { + if ( propertyName == null ) + { + throw new ArgumentException("Property name must not be null"); + } else if ( "".Equals(propertyName) ) + { + throw new ArgumentException("Property name must not be the empty string"); + } + } + + public override string ToString() + { + StringBuilder buf = new StringBuilder("{"); + int i = 0; + foreach ( DictionaryEntry entry in _headers ) + { + ++i; + if ( i > 1 ) + { + buf.Append(", "); + } + string propertyName = (string)entry.Key; + if ( propertyName == null ) + { + buf.Append("\nInternal error: Property with NULL key defined"); + } else + { + buf.Append(propertyName); + buf.Append(" = ").Append(entry.Value); + } + } + buf.Append("}"); + return buf.ToString(); + } + + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs new file mode 100644 index 0000000000..24aef92aa5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs @@ -0,0 +1,115 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Text; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public class QpidTextMessage : AbstractQmsMessage, ITextMessage + { + private string _decodedValue = null; + private static Encoding DEFAULT_ENCODING = Encoding.UTF8; + + internal QpidTextMessage() : this(null, null) + { + ContentEncoding = DEFAULT_ENCODING.BodyName; + } + + internal QpidTextMessage(ByteBuffer data, String encoding) : base(data) + { + ContentEncoding = encoding; + } + + internal QpidTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + :base(deliveryTag, contentHeader, data) + { + } + + public override void ClearBodyImpl() + { + if (_data != null) + { + _data.Release(); + } + _data = null; + _decodedValue = null; + } + + public override string ToBodyString() + { + return Text; + } + + public string Text + { + get + { + if (_data == null && _decodedValue == null) + { + return null; + } + else if (_decodedValue != null) + { + return _decodedValue; + } + else + { + _data.Rewind(); + + // Read remaining bytes. + byte[] bytes = new byte[_data.Remaining]; + _data.GetBytes(bytes); + + // Convert to string based on encoding. + if (ContentHeaderProperties.Encoding != null) + { + // throw ArgumentException if the encoding is not supported + _decodedValue = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(bytes); + } + else + { + _decodedValue = DEFAULT_ENCODING.GetString(bytes); + } + return _decodedValue; + } + } + + set + { + byte[] bytes; + if (ContentHeaderProperties.Encoding == null) + { + bytes = Encoding.Default.GetBytes(value); + } + else + { + // throw ArgumentException if the encoding is not supported + bytes = Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value); + } + _data = ByteBuffer.Wrap(bytes); + _decodedValue = value; + } + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs new file mode 100644 index 0000000000..79871e85ca --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using Apache.Qpid.Buffer; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public class QpidTextMessageFactory : AbstractQmsMessageFactory + { + public override AbstractQmsMessage CreateMessage(string mimeType) + { + QpidTextMessage msg = new QpidTextMessage(); + msg.ContentType = mimeType; + return msg; + } + + protected override AbstractQmsMessage CreateMessage(long deliveryTag, ByteBuffer data, ContentHeaderBody contentHeader) + { + return new QpidTextMessage(deliveryTag, (BasicContentHeaderProperties) contentHeader.Properties, data); + } + } +} diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs new file mode 100644 index 0000000000..4317ef3474 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/UnexpectedBodyReceivedException.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Runtime.Serialization; +using log4net; + +namespace Apache.Qpid.Client.Message +{ + /// <summary> + /// Raised when a message body is received unexpectedly by the client. This typically occurs when the + /// length of bodies received does not match with the declared length in the content header. + /// </summary> + [Serializable] + public class UnexpectedBodyReceivedException : AMQException + { + public UnexpectedBodyReceivedException(ILog logger, string msg, Exception t) + : base(logger, msg, t) + { + } + + public UnexpectedBodyReceivedException(ILog logger, string msg) + : base(logger, msg) + { + } + + public UnexpectedBodyReceivedException(ILog logger, int errorCode, string msg) + : base(logger, errorCode, msg) + { + } + + protected UnexpectedBodyReceivedException(SerializationInfo info, StreamingContext ctxt) + : base(info, ctxt) + { + } + } +} + + + diff --git a/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs new file mode 100644 index 0000000000..d329712334 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/UnprocessedMessage.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System.Collections; +using Apache.Qpid.Framing; + +namespace Apache.Qpid.Client.Message +{ + public class UnprocessedMessage + { + private ulong _bytesReceived = 0; + + public BasicDeliverBody DeliverBody; + public BasicReturnBody BounceBody; + public ushort ChannelId; + public ContentHeaderBody ContentHeader; + + /// <summary> + /// List of ContentBody instances. Due to fragmentation you don't know how big this will be in general + /// </summary> + /// TODO: write and use linked list class + public IList Bodies = new ArrayList(); + + public void ReceiveBody(ContentBody body) + { + Bodies.Add(body); + if (body.Payload != null) + { + _bytesReceived += (uint)body.Payload.Remaining; + } + } + + public bool IsAllBodyDataReceived() + { + return _bytesReceived == ContentHeader.BodySize; + } + } +} + + |