diff options
Diffstat (limited to 'dotnet/client-010/wcf/model/QpidInputChannel.cs')
-rw-r--r-- | dotnet/client-010/wcf/model/QpidInputChannel.cs | 218 |
1 files changed, 0 insertions, 218 deletions
diff --git a/dotnet/client-010/wcf/model/QpidInputChannel.cs b/dotnet/client-010/wcf/model/QpidInputChannel.cs deleted file mode 100644 index 7a05153df9..0000000000 --- a/dotnet/client-010/wcf/model/QpidInputChannel.cs +++ /dev/null @@ -1,218 +0,0 @@ -/* -* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -* -*/ - -using System; -using System.Collections.Generic; -using System.IO; -using System.ServiceModel; -using System.ServiceModel.Channels; -using System.Text; -using System.Threading; -using org.apache.qpid.client; -using org.apache.qpid.transport; -using org.apache.qpid.transport.util; - -namespace org.apache.qpid.wcf.model -{ - internal sealed class QpidInputChannel : QpidInputChannelBase - { - private static readonly Logger _log = Logger.get(typeof (QpidInputChannel)); - - private readonly QpidTransportBindingElement _bindingElement; - private readonly MessageEncoder _encoder; - private readonly ClientSession _session; - private readonly string _queueName; - private BlockingQueue _queue; - private bool _closed = false; - - public QpidInputChannel(BindingContext context, ClientSession session, EndpointAddress address) - : base(context, address) - { - _bindingElement = context.Binding.Elements.Find<QpidTransportBindingElement>(); - var encoderElem = context.BindingParameters.Find<MessageEncodingBindingElement>(); - if (encoderElem != null) - { - _encoder = encoderElem.CreateMessageEncoderFactory().Encoder; - } - _session = session; - _queueName = address.Uri.ToString(); - _queue = new BlockingQueue(); - } - - - public override System.ServiceModel.Channels.Message Receive(TimeSpan timeout) - { - _session.messageFlow("myDest", MessageCreditUnit.MESSAGE, 1); - _session.sync(); - IMessage m = _queue.Dequeue(); - System.ServiceModel.Channels.Message result = null; - if (m != null) - { - var reader = new BinaryReader(m.Body, Encoding.UTF8); - var body = new byte[m.Body.Length - m.Body.Position]; - reader.Read(body, 0, body.Length); - try - { - result = _encoder.ReadMessage(new MemoryStream(body), - (int) _bindingElement.MaxReceivedMessageSize); - } - catch(Exception e) - { - Console.WriteLine(e.StackTrace); - } - result.Headers.To = LocalAddress.Uri; - - var ack = new RangeSet(); - // ack this message - ack.add(m.Id); - _session.messageAccept(ack); - _session.sync(); - } - else - { - if(! _closed ) - { - return Receive(timeout); - } - } - return result; - } - - public override bool TryReceive(TimeSpan timeout, out System.ServiceModel.Channels.Message message) - { - message = Receive(timeout); - return message != null; - } - - public override bool WaitForMessage(TimeSpan timeout) - { - throw new NotImplementedException(); - } - - public override void Close(TimeSpan timeout) - { - _closed = true; - _queue = null; - } - - public override void Open(TimeSpan timeout) - { - if (State != CommunicationState.Created && State != CommunicationState.Closed) - throw new InvalidOperationException(string.Format("Cannot open the channel from the {0} state.", State)); - - OnOpening(); - - var qr = (QueueQueryResult) _session.queueQuery(_queueName).Result; - if (qr.getQueue() == null) - { - // create the queue - _session.queueDeclare(_queueName, null, null); - } - // bind the queue - _session.exchangeBind(_queueName, "amq.direct", _queueName, null); - var myListener = new WCFListener(_queue); - _session.attachMessageListener(myListener, "myDest"); - _session.messageSubscribe(_queueName, "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED, - null, - 0, null); - // issue credits - _session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW); - _session.messageFlow("myDest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES); - _session.sync(); - - OnOpened(); - } - } - - internal class WCFListener : IMessageListener - { - private static readonly Logger _log = Logger.get(typeof (WCFListener)); - private readonly BlockingQueue _q; - - public WCFListener(BlockingQueue q) - { - _q = q; - } - - public void messageTransfer(IMessage m) - { - _log.debug("message received by listener"); - _q.Enqueue(m); - } - } - - internal class BlockingQueue - { - private int _count; - private readonly Queue<IMessage> _queue = new Queue<IMessage>(); - - public IMessage Dequeue(TimeSpan timeout) - { - lock (_queue) - { - DateTime start = DateTime.Now; - long elapsed = 0; - while (_count <= 0 && elapsed < timeout.Milliseconds) - { - Monitor.Wait(_queue, new TimeSpan(timeout.Milliseconds - elapsed)); - elapsed = DateTime.Now.Subtract(start).Milliseconds; - } - if (_count > 0) - { - _count--; - return _queue.Dequeue(); - } - return null; - } - } - - public IMessage Dequeue() - { - lock (_queue) - { - while (_count <= 0) - { - Monitor.Wait(_queue); - } - if (_count > 0) - { - _count--; - return _queue.Dequeue(); - } - return null; - } - } - - public void Enqueue(IMessage data) - { - if (data != null) - { - lock (_queue) - { - _queue.Enqueue(data); - _count++; - Monitor.Pulse(_queue); - } - } - } - } -} - |