summaryrefslogtreecommitdiff
path: root/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs')
-rw-r--r--trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs694
1 files changed, 0 insertions, 694 deletions
diff --git a/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs b/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
deleted file mode 100644
index 34b47137e5..0000000000
--- a/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
+++ /dev/null
@@ -1,694 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using log4net;
-using Apache.Qpid.Framing;
-using Apache.Qpid.Messaging;
-using Apache.Qpid.Buffer;
-
-namespace Apache.Qpid.Client.Message
-{
- public abstract class AbstractQmsMessage : AMQMessage, IMessage
- {
- private static ILog log = LogManager.GetLogger(typeof(AbstractQmsMessage));
-
- protected bool _redelivered;
-
- protected ByteBuffer _data;
- protected bool _readableMessage = false;
- private QpidHeaders _headers;
-
- protected AbstractQmsMessage(ByteBuffer data)
- : base(new BasicContentHeaderProperties())
- {
- Init(data);
- }
-
- protected AbstractQmsMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, ByteBuffer data)
- : this(contentHeader, deliveryTag)
- {
- Init(data);
- }
-
- protected AbstractQmsMessage(BasicContentHeaderProperties contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
- {
- Init(null);
- }
-
- private void Init(ByteBuffer data)
- {
- _data = data;
- if ( _data != null )
- {
- _data.Acquire();
- }
- _readableMessage = (data != null);
- if ( ContentHeaderProperties.Headers == null )
- ContentHeaderProperties.Headers = new FieldTable();
- _headers = new QpidHeaders(ContentHeaderProperties.Headers);
- }
-
- //
- // Properties
- //
-
- /// <summary>
- /// The application message identifier
- /// </summary>
- public string MessageId
- {
- get
- {
- if (ContentHeaderProperties.MessageId == null)
- {
- ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
- }
- return ContentHeaderProperties.MessageId;
- }
- set { ContentHeaderProperties.MessageId = value; }
- }
-
- /// <summary>
- /// The message timestamp
- /// </summary>
- public long Timestamp
- {
- get
- {
- // TODO: look at ulong/long choice
- return (long) ContentHeaderProperties.Timestamp;
- }
- set
- {
- ContentHeaderProperties.Timestamp = (ulong) value;
- }
- }
-
- /// <summary>
- /// The <see cref="CorrelationId"/> as a byte array.
- /// </summary>
- public byte[] CorrelationIdAsBytes
- {
- get { return Encoding.Default.GetBytes(ContentHeaderProperties.CorrelationId); }
- set { ContentHeaderProperties.CorrelationId = Encoding.Default.GetString(value); }
- }
-
- /// <summary>
- /// The application correlation identifier
- /// </summary>
- public string CorrelationId
- {
- get { return ContentHeaderProperties.CorrelationId; }
- set { ContentHeaderProperties.CorrelationId = value; }
- }
-
- struct Dest
- {
- public string ExchangeName;
- public string RoutingKey;
-
- public Dest(string exchangeName, string routingKey)
- {
- ExchangeName = exchangeName;
- RoutingKey = routingKey;
- }
- }
-
- /// <summary>
- /// Exchange name of the reply-to address
- /// </summary>
- public string ReplyToExchangeName
- {
- get
- {
- return ReadReplyToHeader().ExchangeName;
- }
- set
- {
- BindingURL dest = ReadReplyToHeader();
- dest.ExchangeName = value;
- WriteReplyToHeader(dest);
- }
- }
-
- /// <summary>
- /// Routing key of the reply-to address
- /// </summary>
- public string ReplyToRoutingKey
- {
- get
- {
- return ReadReplyToHeader().RoutingKey;
- }
- set
- {
- BindingURL dest = ReadReplyToHeader();
- dest.RoutingKey = value;
- WriteReplyToHeader(dest);
- }
- }
-
- /// <summary>
- /// Non-persistent (1) or persistent (2)
- /// </summary>
- public DeliveryMode DeliveryMode
- {
- get
- {
- byte b = ContentHeaderProperties.DeliveryMode;
- switch (b)
- {
- case 1:
- return DeliveryMode.NonPersistent;
- case 2:
- return DeliveryMode.Persistent;
- default:
- throw new QpidException("Illegal value for delivery mode in content header properties");
- }
- }
- set
- {
- ContentHeaderProperties.DeliveryMode = (byte)(value==DeliveryMode.NonPersistent?1:2);
- }
- }
-
- /// <summary>
- /// True, if this is a redelivered message
- /// </summary>
- public bool Redelivered
- {
- get { return _redelivered; }
- set { _redelivered = value; }
- }
-
- /// <summary>
- /// The message type name
- /// </summary>
- public string Type
- {
- get { return ContentHeaderProperties.Type; }
- set { ContentHeaderProperties.Type = value; }
- }
-
- /// <summary>
- /// Message expiration specification
- /// </summary>
- public long Expiration
- {
- get { return ContentHeaderProperties.Expiration; }
- set { ContentHeaderProperties.Expiration = value; }
- }
-
- /// <summary>
- /// The message priority, 0 to 9
- /// </summary>
- public byte Priority
- {
- get { return ContentHeaderProperties.Priority; }
- set { ContentHeaderProperties.Priority = (byte) value; }
- }
-
- /// <summary>
- /// The MIME Content Type
- /// </summary>
- public string ContentType
- {
- get { return ContentHeaderProperties.ContentType; }
- set { ContentHeaderProperties.ContentType = value; }
- }
-
- /// <summary>
- /// The MIME Content Encoding
- /// </summary>
- public string ContentEncoding
- {
- get { return ContentHeaderProperties.Encoding; }
- set { ContentHeaderProperties.Encoding = value; }
- }
-
- /// <summary>
- /// Headers of this message
- /// </summary>
- public IHeaders Headers
- {
- get { return _headers; }
- }
-
- /// <summary>
- /// The creating user id
- /// </summary>
- public string UserId
- {
- get { return ContentHeaderProperties.UserId; }
- set { ContentHeaderProperties.UserId = value; }
- }
-
- /// <summary>
- /// The creating application id
- /// </summary>
- public string AppId
- {
- get { return ContentHeaderProperties.AppId; }
- set { ContentHeaderProperties.AppId = value; }
- }
-
- /// <summary>
- /// Intra-cluster routing identifier
- /// </summary>
- public string ClusterId
- {
- get { return ContentHeaderProperties.ClusterId; }
- set { ContentHeaderProperties.ClusterId = value; }
- }
-
- /// <summary>
- /// Return the raw byte array that is used to populate the frame when sending
- /// the message.
- /// </summary>
- /// <value>a byte array of message data</value>
- public ByteBuffer Data
- {
- get
- {
- if (_data != null)
- {
- if (!_readableMessage)
- {
- _data.Flip();
- }
- else
- {
- // Make sure we rewind the data just in case any method has moved the
- // position beyond the start.
- _data.Rewind();
- }
- }
- return _data;
- }
-
- set
- {
- _data = value;
- }
- }
-
- public void Acknowledge()
- {
- // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
- // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_channel != null)
- {
- // we set multiple to true here since acknowledgement implies acknowledge of all count messages
- // received on the session
- _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
- }
-
- }
-
- public abstract void ClearBodyImpl();
-
- public void ClearBody()
- {
- ClearBodyImpl();
- _readableMessage = false;
- }
-
- /// <summary>
- /// Get a String representation of the body of the message. Used in the
- /// toString() method which outputs this before message properties.
- /// </summary>
- /// <exception cref="QpidException"></exception>
- public abstract string ToBodyString();
-
- public override string ToString()
- {
- try
- {
- StringBuilder buf = new StringBuilder("Body:\n");
- buf.Append(ToBodyString());
- buf.Append("\nQmsTimestamp: ").Append(Timestamp);
- buf.Append("\nQmsExpiration: ").Append(Expiration);
- buf.Append("\nQmsPriority: ").Append(Priority);
- buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
- buf.Append("\nReplyToExchangeName: ").Append(ReplyToExchangeName);
- buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
- buf.Append("\nAMQ message number: ").Append(DeliveryTag);
- buf.Append("\nProperties:");
- if (ContentHeaderProperties.Headers == null)
- {
- buf.Append("<NONE>");
- }
- else
- {
- buf.Append(Headers.ToString());
- }
- return buf.ToString();
- }
- catch (Exception e)
- {
- return e.ToString();
- }
- }
-
- public FieldTable PopulateHeadersFromMessageProperties()
- {
- if (ContentHeaderProperties.Headers == null)
- {
- return null;
- }
- else
- {
- //
- // We need to convert every property into a String representation
- // Note that type information is preserved in the property name
- //
- FieldTable table = new FieldTable();
- foreach (DictionaryEntry entry in ContentHeaderProperties.Headers)
- {
- string propertyName = (string) entry.Key;
- if (propertyName == null)
- {
- continue;
- }
- else
- {
- table[propertyName] = entry.Value.ToString();
- }
- }
- return table;
- }
- }
-
- public BasicContentHeaderProperties ContentHeaderProperties
- {
- get
- {
- return (BasicContentHeaderProperties) _contentHeaderProperties;
- }
- }
-
- protected virtual void Reset()
- {
- _readableMessage = true;
- }
-
- public bool IsReadable
- {
- get { return _readableMessage; }
- }
-
- public bool isWritable
- {
- get { return !_readableMessage; }
- }
-
- protected void CheckReadable()
- {
- if ( !_readableMessage )
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
- }
-
- /// <summary>
- /// Decodes the replyto field if one is set.
- ///
- /// Splits a replyto field containing an exchange name followed by a ':', followed by a routing key into the exchange name and
- /// routing key seperately. The exchange name may be empty in which case the empty string is returned. If the exchange name is
- /// empty the replyto field is expected to being with ':'.
- ///
- /// Anyhting other than a two part replyto field sperated with a ':' will result in an exception.
- /// </summary>
- ///
- /// <returns>A destination initialized to the replyto location if a replyto field was set, or an empty destination otherwise.</returns>
- private BindingURL ReadReplyToHeader()
- {
- string replyToEncoding = ContentHeaderProperties.ReplyTo;
- //log.Debug("replyToEncoding = " + replyToEncoding);
-
- BindingURL bindingUrl = new BindingURL(replyToEncoding);
- //log.Debug("bindingUrl = " + bindingUrl.ToString());
-
- return bindingUrl;
-
- //log.Info("replyToEncoding = " + replyToEncoding);
-
-// if ( replyToEncoding == null )
-// {
-// return new Dest();
-// } else
-// {
-// // Split the replyto field on a ':'
-// string[] split = replyToEncoding.Split(':');
-
-// // Ensure that the replyto field argument only consisted of two parts.
-// if ( split.Length != 2 )
-// {
-// throw new QpidException("Illegal value in ReplyTo property: " + replyToEncoding);
-// }
-
-// // Extract the exchange name and routing key from the split replyto field.
-// string exchangeName = split[0];
-
-// string[] split2 = split[1].Split('/');
-// string routingKey = split2[3];
-
-// return new Dest(exchangeName, routingKey);
-// }
- }
-
- private void WriteReplyToHeader(BindingURL dest)
- {
- string encodedDestination = string.Format("{0}:{1}", dest.ExchangeName, dest.RoutingKey);
- ContentHeaderProperties.ReplyTo = encodedDestination;
- }
- }
-
- public class BindingURL
- {
- public readonly static string OPTION_EXCLUSIVE = "exclusive";
- public readonly static string OPTION_AUTODELETE = "autodelete";
- public readonly static string OPTION_DURABLE = "durable";
- public readonly static string OPTION_CLIENTID = "clientid";
- public readonly static string OPTION_SUBSCRIPTION = "subscription";
- public readonly static string OPTION_ROUTING_KEY = "routingkey";
-
- /// <summary> Holds the undecoded URL </summary>
- string url;
-
- /// <summary> Holds the decoded options. </summary>
- IDictionary options = new Hashtable();
-
- /// <summary> Holds the decoded exchange class. </summary>
- string exchangeClass;
-
- /// <summary> Holds the decoded exchange name. </summary>
- string exchangeName;
-
- /// <summary> Holds the destination name. </summary>
- string destination;
-
- /// <summary> Holds the decoded queue name. </summary>
- string queueName;
-
- /// <summary>
- /// The binding URL has the format:
- /// <exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- /// </summary>
- public BindingURL(string url)
- {
- this.url = url;
- Parse();
- }
-
- public string Url { get { return url; } }
-
- public string ExchangeClass
- {
- get { return exchangeClass; }
- set { exchangeClass = value; }
- }
-
- public string ExchangeName
- {
- get { return exchangeName; }
- set { exchangeName = value; }
- }
-
- public string QueueName
- {
- get { return queueName; }
- set { queueName = value; }
- }
-
- public string DestinationName
- {
- get { return destination; }
- set { destination = value; }
- }
-
- public string RoutingKey {
- get { return (string)options[OPTION_ROUTING_KEY]; }
- set { options[OPTION_ROUTING_KEY] = value; }
- }
-
- public bool ContainsOption(string key) { return options.Contains(key); }
-
- public string ToString()
- {
- return "BindingURL: [ ExchangeClass = " + ExchangeClass + ", ExchangeName = " + ExchangeName + ", QueueName = " + QueueName +
- ", DestinationName = " + DestinationName + ", RoutingKey = " + RoutingKey + " ] ";
- }
-
- private void Parse()
- {
- Uri binding = new Uri(url);
-
- // Extract the URI scheme, this contains the exchange class. It is defaulted to the direct exchange if not specified.
- string exchangeClass = binding.Scheme;
-
- if (exchangeClass == null)
- {
- url = ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS + "://" + ExchangeNameDefaults.DIRECT + "//" + url;
- Parse();
-
- return;
- }
- else
- {
- this.exchangeClass = exchangeClass;
- }
-
- // Extract the host name, this contains the exchange name. It is defaulted to the default direct exchange if not specified.
- string exchangeName = binding.Host;
-
- if (exchangeName == null)
- {
- if (exchangeClass.Equals(ExchangeNameDefaults.DIRECT_EXCHANGE_CLASS))
- {
- this.exchangeName = "";
- }
- }
- else
- {
- this.exchangeName = exchangeName;
- }
-
- // Extract the destination and queue name.
- if ((binding.AbsolutePath == null) || binding.AbsolutePath.Equals(""))
- {
- throw new UriFormatException("Destination or Queue required");
- }
- else
- {
- int slashOffset = binding.AbsolutePath.IndexOf("/", 1);
- if (slashOffset == -1)
- {
- throw new UriFormatException("Destination required");
- }
- else
- {
- String path = binding.AbsolutePath;
-
- this.destination = path.Substring(1, slashOffset - 1);
- this.queueName = path.Substring(slashOffset + 1);
- }
- }
-
- ParseOptions(options, binding.Query);
-
- // If the routing key is not set as an option, set it to the destination name.
- if (!ContainsOption(OPTION_ROUTING_KEY))
- {
- options[OPTION_ROUTING_KEY] = destination;
- }
- }
-
- /// <summary>
- /// options looks like this
- /// brokerlist='tcp://host:port?option='value',option='value';vm://:3/virtualpath?option='value'',failover='method?option='value',option='value'
- /// </summary>
- public static void ParseOptions(IDictionary optionMap, string options)
- {
- // Check that there really are some options to parse.
- if ((options == null) || (options.IndexOf('=') == -1))
- {
- return;
- }
-
- int optionIndex = options.IndexOf('=');
- string option = options.Substring(0, optionIndex);
- int length = options.Length;
- int nestedQuotes = 0;
-
- // Holds the index of the final "'".
- int valueIndex = optionIndex;
-
- // Loop over all the options.Dest
- while ((nestedQuotes > 0) || (valueIndex < length))
- {
- valueIndex++;
-
- if (valueIndex >= length)
- {
- break;
- }
-
- if (options[valueIndex] == '\'')
- {
- if ((valueIndex + 1) < options.Length)
- {
- if ((options[valueIndex + 1] == '&') ||
- (options[valueIndex + 1] == ',') ||
- (options[valueIndex + 1] == ';') ||
- (options[valueIndex + 1] == '\''))
- {
- nestedQuotes--;
-
- if (nestedQuotes == 0)
- {
- // We've found the value of an option
- break;
- }
- }
- else
- {
- nestedQuotes++;
- }
- }
- else
- {
- // We are at the end of the string
- // Check to see if we are corectly closing quotes
- if (options[valueIndex] == '\'')
- {
- nestedQuotes--;
- }
-
- break;
- }
- }
- }
- }
- }
-}