summaryrefslogtreecommitdiff
path: root/qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2011-05-27 15:44:23 +0000
commit66765100f4257159622cefe57bed50125a5ad017 (patch)
treea88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs
parent1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff)
parent88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff)
downloadqpid-python-66765100f4257159622cefe57bed50125a5ad017.tar.gz
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs')
-rw-r--r--qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs218
1 files changed, 218 insertions, 0 deletions
diff --git a/qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs b/qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs
new file mode 100644
index 0000000000..7a05153df9
--- /dev/null
+++ b/qpid/dotnet/client-010/wcf/model/QpidInputChannel.cs
@@ -0,0 +1,218 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using System.Threading;
+using org.apache.qpid.client;
+using org.apache.qpid.transport;
+using org.apache.qpid.transport.util;
+
+namespace org.apache.qpid.wcf.model
+{
+ internal sealed class QpidInputChannel : QpidInputChannelBase
+ {
+ private static readonly Logger _log = Logger.get(typeof (QpidInputChannel));
+
+ private readonly QpidTransportBindingElement _bindingElement;
+ private readonly MessageEncoder _encoder;
+ private readonly ClientSession _session;
+ private readonly string _queueName;
+ private BlockingQueue _queue;
+ private bool _closed = false;
+
+ public QpidInputChannel(BindingContext context, ClientSession session, EndpointAddress address)
+ : base(context, address)
+ {
+ _bindingElement = context.Binding.Elements.Find<QpidTransportBindingElement>();
+ var encoderElem = context.BindingParameters.Find<MessageEncodingBindingElement>();
+ if (encoderElem != null)
+ {
+ _encoder = encoderElem.CreateMessageEncoderFactory().Encoder;
+ }
+ _session = session;
+ _queueName = address.Uri.ToString();
+ _queue = new BlockingQueue();
+ }
+
+
+ public override System.ServiceModel.Channels.Message Receive(TimeSpan timeout)
+ {
+ _session.messageFlow("myDest", MessageCreditUnit.MESSAGE, 1);
+ _session.sync();
+ IMessage m = _queue.Dequeue();
+ System.ServiceModel.Channels.Message result = null;
+ if (m != null)
+ {
+ var reader = new BinaryReader(m.Body, Encoding.UTF8);
+ var body = new byte[m.Body.Length - m.Body.Position];
+ reader.Read(body, 0, body.Length);
+ try
+ {
+ result = _encoder.ReadMessage(new MemoryStream(body),
+ (int) _bindingElement.MaxReceivedMessageSize);
+ }
+ catch(Exception e)
+ {
+ Console.WriteLine(e.StackTrace);
+ }
+ result.Headers.To = LocalAddress.Uri;
+
+ var ack = new RangeSet();
+ // ack this message
+ ack.add(m.Id);
+ _session.messageAccept(ack);
+ _session.sync();
+ }
+ else
+ {
+ if(! _closed )
+ {
+ return Receive(timeout);
+ }
+ }
+ return result;
+ }
+
+ public override bool TryReceive(TimeSpan timeout, out System.ServiceModel.Channels.Message message)
+ {
+ message = Receive(timeout);
+ return message != null;
+ }
+
+ public override bool WaitForMessage(TimeSpan timeout)
+ {
+ throw new NotImplementedException();
+ }
+
+ public override void Close(TimeSpan timeout)
+ {
+ _closed = true;
+ _queue = null;
+ }
+
+ public override void Open(TimeSpan timeout)
+ {
+ if (State != CommunicationState.Created && State != CommunicationState.Closed)
+ throw new InvalidOperationException(string.Format("Cannot open the channel from the {0} state.", State));
+
+ OnOpening();
+
+ var qr = (QueueQueryResult) _session.queueQuery(_queueName).Result;
+ if (qr.getQueue() == null)
+ {
+ // create the queue
+ _session.queueDeclare(_queueName, null, null);
+ }
+ // bind the queue
+ _session.exchangeBind(_queueName, "amq.direct", _queueName, null);
+ var myListener = new WCFListener(_queue);
+ _session.attachMessageListener(myListener, "myDest");
+ _session.messageSubscribe(_queueName, "myDest", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+ null,
+ 0, null);
+ // issue credits
+ _session.messageSetFlowMode("myDest", MessageFlowMode.WINDOW);
+ _session.messageFlow("myDest", MessageCreditUnit.BYTE, ClientSession.MESSAGE_FLOW_MAX_BYTES);
+ _session.sync();
+
+ OnOpened();
+ }
+ }
+
+ internal class WCFListener : IMessageListener
+ {
+ private static readonly Logger _log = Logger.get(typeof (WCFListener));
+ private readonly BlockingQueue _q;
+
+ public WCFListener(BlockingQueue q)
+ {
+ _q = q;
+ }
+
+ public void messageTransfer(IMessage m)
+ {
+ _log.debug("message received by listener");
+ _q.Enqueue(m);
+ }
+ }
+
+ internal class BlockingQueue
+ {
+ private int _count;
+ private readonly Queue<IMessage> _queue = new Queue<IMessage>();
+
+ public IMessage Dequeue(TimeSpan timeout)
+ {
+ lock (_queue)
+ {
+ DateTime start = DateTime.Now;
+ long elapsed = 0;
+ while (_count <= 0 && elapsed < timeout.Milliseconds)
+ {
+ Monitor.Wait(_queue, new TimeSpan(timeout.Milliseconds - elapsed));
+ elapsed = DateTime.Now.Subtract(start).Milliseconds;
+ }
+ if (_count > 0)
+ {
+ _count--;
+ return _queue.Dequeue();
+ }
+ return null;
+ }
+ }
+
+ public IMessage Dequeue()
+ {
+ lock (_queue)
+ {
+ while (_count <= 0)
+ {
+ Monitor.Wait(_queue);
+ }
+ if (_count > 0)
+ {
+ _count--;
+ return _queue.Dequeue();
+ }
+ return null;
+ }
+ }
+
+ public void Enqueue(IMessage data)
+ {
+ if (data != null)
+ {
+ lock (_queue)
+ {
+ _queue.Enqueue(data);
+ _count++;
+ Monitor.Pulse(_queue);
+ }
+ }
+ }
+ }
+}
+