diff options
Diffstat (limited to 'qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs')
-rw-r--r-- | qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs new file mode 100644 index 0000000000..5925fa47dc --- /dev/null +++ b/qpid/wcf/src/Apache/Qpid/Channel/RawMessage.cs @@ -0,0 +1,374 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +namespace Apache.Qpid.Channel +{ + using System; + using System.IO; + using System.ServiceModel.Channels; + using System.Xml; + + // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes). + // If bodyBytes belongs to a BufferManager, we must return it when done. + // The pay-off is OnGetReaderAtBodyContents(). + // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery. + internal class RawMessage : Message + { + private MessageHeaders headers; + private MessageProperties properties; + private XmlDictionaryReaderQuotas readerQuotas; + private Stream bodyStream; + private byte[] bodyBytes; + private int index; + private int count; + private BufferManager bufferManager; + + public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports MessageEncoder.ReadMessage(ArraySegment<byte> b, BufferManager mgr, string contentType) + if (quotas == null) + { + quotas = new XmlDictionaryReaderQuotas(); + } + + this.headers = new MessageHeaders(MessageVersion.None); + this.properties = new MessageProperties(); + this.readerQuotas = quotas; + this.bodyBytes = buffer; + this.index = index; + this.count = count; + this.bufferManager = bufferManager; + } + + public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType) + if (quotas == null) + { + quotas = new XmlDictionaryReaderQuotas(); + } + + this.headers = new MessageHeaders(MessageVersion.None); + this.properties = new MessageProperties(); + this.bodyStream = stream; + } + + public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) + { + // this constructor supports internal needs for CreateBufferedCopy().CreateMessage() + this.headers = new MessageHeaders(headers); + this.properties = new MessageProperties(properties); + this.bodyBytes = bytes; + this.index = index; + this.count = count; + this.readerQuotas = quotas; + } + + public override MessageHeaders Headers + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return this.headers; + } + } + + public override bool IsEmpty + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return false; + } + } + + public override bool IsFault + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return false; + } + } + + public override MessageProperties Properties + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return this.properties; + } + } + + public override MessageVersion Version + { + get + { + if (this.IsDisposed) + { + throw new ObjectDisposedException("message"); + } + + return MessageVersion.None; + } + } + + protected override void OnBodyToString(XmlDictionaryWriter writer) + { + if (this.bodyStream != null) + { + writer.WriteString("Stream"); + } + else + { + writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); + writer.WriteBase64(this.bodyBytes, this.index, this.count); + writer.WriteEndElement(); + } + } + + protected override void OnClose() + { + Exception deferEx = null; + try + { + base.OnClose(); + } + catch (Exception e) + { + deferEx = e; + } + + try + { + if (this.properties != null) + { + this.properties.Dispose(); + } + } + catch (Exception e) + { + if (deferEx == null) + { + deferEx = e; + } + } + + try + { + if (this.bufferManager != null) + { + this.bufferManager.ReturnBuffer(this.bodyBytes); + this.bufferManager = null; + } + } + catch (Exception e) + { + if (deferEx == null) + { + deferEx = e; + } + } + + if (deferEx != null) + { + throw deferEx; + } + } + + protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize) + { + if (this.bodyStream != null) + { + int len = (int)this.bodyStream.Length; + byte[] buf = new byte[len]; + this.bodyStream.Read(buf, 0, len); + this.bodyStream = null; + this.bodyBytes = buf; + this.count = len; + this.index = 0; + } + else + { + if (this.bufferManager != null) + { + // we could take steps to share the buffer among copies and release the memory + // after the last user finishes by a reference count or such, but we are already + // far from the intended optimized use. Make one GC managed memory copy that is + // shared by all. + byte[] buf = new byte[this.count]; + + Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count); + this.bufferManager.ReturnBuffer(this.bodyBytes); + this.bufferManager = null; + this.bodyBytes = buf; + this.index = 0; + } + } + + return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); + } + + protected override XmlDictionaryReader OnGetReaderAtBodyContents() + { + Stream readerStream = null; + bool ownsStream; + + if (this.bodyStream != null) + { + readerStream = this.bodyStream; + ownsStream = false; + } + else + { + // create stream for duration of XmlReader. + ownsStream = true; + if (this.bufferManager != null) + { + readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager); + this.bufferManager = null; + } + else + { + readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false); + } + } + + return new RawXmlReader(readerStream, this.readerQuotas, ownsStream); + } + + protected override void OnWriteBodyContents(XmlDictionaryWriter writer) + { + writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); + if (this.bodyStream != null) + { + int len = (int)this.bodyStream.Length; + byte[] buf = new byte[len]; + this.bodyStream.Read(buf, 0, len); + writer.WriteBase64(buf, 0, len); + } + else + { + writer.WriteBase64(this.bodyBytes, this.index, this.count); + } + + writer.WriteEndElement(); + } + + private class RawMemoryStream : MemoryStream + { + private BufferManager bufferManager; + private byte[] buffer; + + public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr) + : base(bytes, index, count, false) + { + this.bufferManager = mgr; + this.buffer = bytes; + } + + protected override void Dispose(bool disposing) + { + if (this.bufferManager != null) + { + try + { + this.bufferManager.ReturnBuffer(this.buffer); + } + finally + { + this.bufferManager = null; + base.Dispose(disposing); + } + } + } + } + + private class RawMessageBuffer : MessageBuffer + { + private bool closed; + private MessageHeaders headers; + private MessageProperties properties; + private byte[] bodyBytes; + private int index; + private int count; + private XmlDictionaryReaderQuotas readerQuotas; + + public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) + : base() + { + this.headers = new MessageHeaders(headers); + this.properties = new MessageProperties(properties); + this.bodyBytes = bytes; + this.index = index; + this.count = count; + this.readerQuotas = new XmlDictionaryReaderQuotas(); + quotas.CopyTo(this.readerQuotas); + } + + public override int BufferSize + { + get { return this.count; } + } + + public override void Close() + { + if (!this.closed) + { + this.closed = true; + this.headers = null; + if (this.properties != null) + { + this.properties.Dispose(); + this.properties = null; + } + + this.bodyBytes = null; + this.readerQuotas = null; + } + } + + public override Message CreateMessage() + { + if (this.closed) + { + throw new ObjectDisposedException("message"); + } + + return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); + } + } + } +} |