diff options
Diffstat (limited to 'qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs')
-rw-r--r-- | qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs | 694 |
1 files changed, 694 insertions, 0 deletions
diff --git a/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs new file mode 100644 index 0000000000..34b47137e5 --- /dev/null +++ b/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs @@ -0,0 +1,694 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +using System; +using System.Collections; +using System.Text; +using log4net; +using Apache.Qpid.Framing; +using Apache.Qpid.Messaging; +using Apache.Qpid.Buffer; + +namespace Apache.Qpid.Client.Message +{ + public abstract class AbstractQmsMessage : AMQMessage, IMessage + { + private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage)); + + protected bool _redelivered; + + protected ByteBuffer _data; + protected bool _readableMessage = false; + private QpidHeaders _headers; + + protected AbstractQmsMessage(ByteBuffer data) + : base(new BasicContentHeaderProperties()) + { + Init(data); + } + + protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data) + : this(contentHeader, deliveryTag) + { + Init(data); + } + + protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag) + { + Init(null); + } + + private void Init(ByteBuffer data) + { + _data = data; + if ( _data != null ) + { + _data.Acquire(); + } + _readableMessage = (data != null); + if ( ContentHeaderProperties.Headers == null ) + ContentHeaderProperties.Headers = new FieldTable(); + _headers = new QpidHeaders(ContentHeaderProperties.Headers); + } + + // + // Properties + // + + /// <summary> + /// The application message identifier + /// </summary> + public string MessageId + { + get + { + if (ContentHeaderProperties.MessageId == null) + { + ContentHeaderProperties.MessageId = "ID:" + DeliveryTag; + } + return ContentHeaderProperties.MessageId; + } + set { ContentHeaderProperties.MessageId = value; } + } + + /// <summary> + /// The message timestamp + /// </summary> + public long Timestamp + { + get + { + // TODO: look at ulong/long choice + return (long) ContentHeaderProperties.Timestamp; + } + set + { + ContentHeaderProperties.Timestamp = (ulong) value; + } + } + + /// <summary> + /// The <see cref="CorrelationId"/> as a byte array. + /// </summary> + public byte[] CorrelationIdAsBytes + { + get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); } + set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); } + } + + /// <summary> + /// The application correlation identifier + /// </summary> + public string CorrelationId + { + get { return ContentHeaderProperties.CorrelationId; } + set { ContentHeaderProperties.CorrelationId = value; } + } + + struct Dest + { + public string ExchangeName; + public string RoutingKey; + + public Dest(string exchangeName, string routingKey) + { + ExchangeName = exchangeName; + RoutingKey = routingKey; + } + } + + /// <summary> + /// Exchange name of the reply-to address + /// </summary> + public string ReplyToExchangeName + { + get + { + return ReadReplyToHeader().ExchangeName; + } + set + { + BindingURL dest = ReadReplyToHeader(); + dest.ExchangeName = value; + WriteReplyToHeader(dest); + } + } + + /// <summary> + /// Routing key of the reply-to address + /// </summary> + public string ReplyToRoutingKey + { + get + { + return ReadReplyToHeader().RoutingKey; + } + set + { + BindingURL dest = ReadReplyToHeader(); + dest.RoutingKey = value; + WriteReplyToHeader(dest); + } + } + + /// <summary> + /// Non-persistent (1) or persistent (2) + /// </summary> + public DeliveryMode DeliveryMode + { + get + { + byte b = ContentHeaderProperties.DeliveryMode; + switch (b) + { + case 1: + return DeliveryMode.NonPersistent; + case 2: + return DeliveryMode.Persistent; + default: + throw new QpidException("Illegal value for delivery mode in content header properties"); + } + } + set + { + ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2); + } + } + + /// <summary> + /// True, if this is a redelivered message + /// </summary> + public bool Redelivered + { + get { return _redelivered; } + set { _redelivered = value; } + } + + /// <summary> + /// The message type name + /// </summary> + public string Type + { + get { return ContentHeaderProperties.Type; } + set { ContentHeaderProperties.Type = value; } + } + + /// <summary> + /// Message expiration specification + /// </summary> + public long Expiration + { + get { return ContentHeaderProperties.Expiration; } + set { ContentHeaderProperties.Expiration = value; } + } + + /// <summary> + /// The message priority, 0 to 9 + /// </summary> + public byte Priority + { + get { return ContentHeaderProperties.Priority; } + set { ContentHeaderProperties.Priority = (byte) value; } + } + + /// <summary> + /// The MIME Content Type + /// </summary> + public string ContentType + { + get { return ContentHeaderProperties.ContentType; } + set { ContentHeaderProperties.ContentType = value; } + } + + /// <summary> + /// The MIME Content Encoding + /// </summary> + public string ContentEncoding + { + get { return ContentHeaderProperties.Encoding; } + set { ContentHeaderProperties.Encoding = value; } + } + + /// <summary> + /// Headers of this message + /// </summary> + public IHeaders Headers + { + get { return _headers; } + } + + /// <summary> + /// The creating user id + /// </summary> + public string UserId + { + get { return ContentHeaderProperties.UserId; } + set { ContentHeaderProperties.UserId = value; } + } + + /// <summary> + /// The creating application id + /// </summary> + public string AppId + { + get { return ContentHeaderProperties.AppId; } + set { ContentHeaderProperties.AppId = value; } + } + + /// <summary> + /// Intra-cluster routing identifier + /// </summary> + public string ClusterId + { + get { return ContentHeaderProperties.ClusterId; } + set { ContentHeaderProperties.ClusterId = value; } + } + + /// <summary> + /// Return the raw byte array that is used to populate the frame when sending + /// the message. + /// </summary> + /// <value>a byte array of message data</value> + public ByteBuffer Data + { + get + { + if (_data != null) + { + if (!_readableMessage) + { + _data.Flip(); + } + else + { + // Make sure we rewind the data just in case any method has moved the + // position beyond the start. + _data.Rewind(); + } + } + return _data; + } + + set + { + _data = value; + } + } + + public void Acknowledge() + { + // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge + // is not specified. In our case, we only set the session field where client acknowledge mode is specified. + if (_channel != null) + { + // we set multiple to true here since acknowledgement implies acknowledge of all count messages + // received on the session + _channel.AcknowledgeMessage((ulong)DeliveryTag, true); + } + + } + + public abstract void ClearBodyImpl(); + + public void ClearBody() + { + ClearBodyImpl(); + _readableMessage = false; + } + + /// <summary> + /// Get a String representation of the body of the message. Used in the + /// toString() method which outputs this before message properties. + /// </summary> + /// <exception cref="QpidException"></exception> + public abstract string ToBodyString(); + + public override string ToString() + { + try + { + StringBuilder buf = new StringBuilder("Body:\n"); + buf.Append(ToBodyString()); + buf.Append("\nQmsTimestamp: ").Append(Timestamp); + buf.Append("\nQmsExpiration: ").Append(Expiration); + buf.Append("\nQmsPriority: ").Append(Priority); + buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode); + buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName); + buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey); + buf.Append("\nAMQ message number: ").Append(DeliveryTag); + buf.Append("\nProperties:"); + if (ContentHeaderProperties.Headers == null) + { + buf.Append("<NONE>"); + } + else + { + buf.Append(Headers.ToString()); + } + return buf.ToString(); + } + catch (Exception e) + { + return e.ToString(); + } + } + + public FieldTable PopulateHeadersFromMessageProperties() + { + if (ContentHeaderProperties.Headers == null) + { + return null; + } + else + { + // + // We need to convert every property into a String representation + // Note that type information is preserved in the property name + // + FieldTable table = new FieldTable(); + foreach (DictionaryEntry entry in ContentHeaderProperties.Headers) + { + string propertyName = (string) entry.Key; + if (propertyName == null) + { + continue; + } + else + { + table[propertyName] = entry.Value.ToString(); + } + } + return table; + } + } + + public BasicContentHeaderProperties ContentHeaderProperties + { + get + { + return (BasicContentHeaderProperties) _contentHeaderProperties; + } + } + + protected virtual void Reset() + { + _readableMessage = true; + } + + public bool IsReadable + { + get { return _readableMessage; } + } + + public bool isWritable + { + get { return !_readableMessage; } + } + + protected void CheckReadable() + { + if ( !_readableMessage ) + { + throw new MessageNotReadableException("You need to call reset() to make the message readable"); + } + } + + /// <summary> + /// Decodes the replyto field if one is set. + /// + /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and + /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is + /// empty the replyto field is expected to being with ':'. + /// + /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception. + /// </summary> + /// + /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns> + private BindingURL ReadReplyToHeader() + { + string replyToEncoding = ContentHeaderProperties.ReplyTo; + //log.Debug("replyToEncoding = " + replyToEncoding); + + BindingURL bindingUrl = new BindingURL(replyToEncoding); + //log.Debug("bindingUrl = " + bindingUrl.ToString()); + + return bindingUrl; + + //log.Info("replyToEncoding = " + replyToEncoding); + +// if ( replyToEncoding == null ) +// { +// return new Dest(); +// } else +// { +// // Split the replyto field on a ':' +// string[] split = replyToEncoding.Split(':'); + +// // Ensure that the replyto field argument only consisted of two parts. +// if ( split.Length != 2 ) +// { +// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding); +// } + +// // Extract the exchange name and routing key from the split replyto field. +// string exchangeName = split[0]; + +// string[] split2 = split[1].Split('/'); +// string routingKey = split2[3]; + +// return new Dest(exchangeName, routingKey); +// } + } + + private void WriteReplyToHeader(BindingURL dest) + { + string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey); + ContentHeaderProperties.ReplyTo = encodedDestination; + } + } + + public class BindingURL + { + public readonly static string OPTION_EXCLUSIVE = "exclusive"; + public readonly static string OPTION_AUTODELETE = "autodelete"; + public readonly static string OPTION_DURABLE = "durable"; + public readonly static string OPTION_CLIENTID = "clientid"; + public readonly static string OPTION_SUBSCRIPTION = "subscription"; + public readonly static string OPTION_ROUTING_KEY = "routingkey"; + + /// <summary> Holds the undecoded URL </summary> + string url; + + /// <summary> Holds the decoded options. </summary> + IDictionary options = new Hashtable(); + + /// <summary> Holds the decoded exchange class. </summary> + string exchangeClass; + + /// <summary> Holds the decoded exchange name. </summary> + string exchangeName; + + /// <summary> Holds the destination name. </summary> + string destination; + + /// <summary> Holds the decoded queue name. </summary> + string queueName; + + /// <summary> + /// The binding URL has the format: + /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']* + /// </summary> + public BindingURL(string url) + { + this.url = url; + Parse(); + } + + public string Url { get { return url; } } + + public string ExchangeClass + { + get { return exchangeClass; } + set { exchangeClass = value; } + } + + public string ExchangeName + { + get { return exchangeName; } + set { exchangeName = value; } + } + + public string QueueName + { + get { return queueName; } + set { queueName = value; } + } + + public string DestinationName + { + get { return destination; } + set { destination = value; } + } + + public string RoutingKey { + get { return (string)options[OPTION_ROUTING_KEY]; } + set { options[OPTION_ROUTING_KEY] = value; } + } + + public bool ContainsOption(string key) { return options.Contains(key); } + + public string ToString() + { + return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName + + ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] "; + } + + private void Parse() + { + Uri binding = new Uri(url); + + // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified. + string exchangeClass = binding.Scheme; + + if (exchangeClass == null) + { + url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url; + Parse(); + + return; + } + else + { + this.exchangeClass = exchangeClass; + } + + // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified. + string exchangeName = binding.Host; + + if (exchangeName == null) + { + if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS)) + { + this.exchangeName = ""; + } + } + else + { + this.exchangeName = exchangeName; + } + + // Extract the destination and queue name. + if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals("")) + { + throw new UriFormatException("Destination or Queue required"); + } + else + { + int slashOffset = binding.AbsolutePath.IndexOf("/", 1); + if (slashOffset == -1) + { + throw new UriFormatException("Destination required"); + } + else + { + String path = binding.AbsolutePath; + + this.destination = path.Substring(1, slashOffset - 1); + this.queueName = path.Substring(slashOffset + 1); + } + } + + ParseOptions(options, binding.Query); + + // If the routing key is not set as an option, set it to the destination name. + if (!ContainsOption(OPTION_ROUTING_KEY)) + { + options[OPTION_ROUTING_KEY] = destination; + } + } + + /// <summary> + /// options looks like this + /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value' + /// </summary> + public static void ParseOptions(IDictionary optionMap, string options) + { + // Check that there really are some options to parse. + if ((options == null) || (options.IndexOf('=') == -1)) + { + return; + } + + int optionIndex = options.IndexOf('='); + string option = options.Substring(0, optionIndex); + int length = options.Length; + int nestedQuotes = 0; + + // Holds the index of the final "'". + int valueIndex = optionIndex; + + // Loop over all the options.Dest + while ((nestedQuotes > 0) || (valueIndex < length)) + { + valueIndex++; + + if (valueIndex >= length) + { + break; + } + + if (options[valueIndex] == '\'') + { + if ((valueIndex + 1) < options.Length) + { + if ((options[valueIndex + 1] == '&') || + (options[valueIndex + 1] == ',') || + (options[valueIndex + 1] == ';') || + (options[valueIndex + 1] == '\'')) + { + nestedQuotes--; + + if (nestedQuotes == 0) + { + // We've found the value of an option + break; + } + } + else + { + nestedQuotes++; + } + } + else + { + // We are at the end of the string + // Check to see if we are corectly closing quotes + if (options[valueIndex] == '\'') + { + nestedQuotes--; + } + + break; + } + } + } + } + } +} |