summaryrefslogtreecommitdiff
path: root/contrib/zeromq/csharp/TZmqClient.cs
blob: e9ab5166a57c1eadbd34ce0cae5bed33ee2a6015 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
using System;
using ZMQ;
using System.IO;
using Thrift.Transport;

namespace ZmqClient
{
	public class TZmqClient : TTransport
	{
		Socket _sock;
		String _endpoint;
		MemoryStream _wbuf = new MemoryStream ();
		MemoryStream _rbuf = new MemoryStream ();

		void debug (string msg)
		{
			//Uncomment to enable debug
//			Console.WriteLine (msg);
		}

		public TZmqClient (Context ctx, String endpoint, SocketType sockType)
		{
			_sock = ctx.Socket (sockType);
			_endpoint = endpoint;
		}

		public override void Open ()
		{
			_sock.Connect (_endpoint);
		}
		
		public override void Close ()
		{
			throw new NotImplementedException ();
		}

		public override bool IsOpen {
			get {
				throw new NotImplementedException ();
			}
		}

		public override int Read (byte[] buf, int off, int len)
		{
			debug ("Client_Read");
			if (off != 0 || len != buf.Length)
				throw new NotImplementedException ();

			if (_rbuf.Length == 0) {
				//Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
				debug ("Client_Read Filling buffer..");
				byte[] tmpBuf = _sock.Recv ();
				debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
				_rbuf.Write (tmpBuf, 0, tmpBuf.Length);
				_rbuf.Position = 0;	//For reading
			}
			int ret = _rbuf.Read (buf, 0, len);
			if (_rbuf.Length == _rbuf.Position)	//Finished reading
				_rbuf.SetLength (0);
			debug (string.Format ("Client_Read return {0}b, remaining  {1}b", ret, _rbuf.Length - _rbuf.Position));
			return ret;
		}

		public override void Write (byte[] buf, int off, int len)
		{
			debug ("Client_Write");
			_wbuf.Write (buf, off, len);
		}

		public override void Flush ()
		{
			debug ("Client_Flush");
			_sock.Send (_wbuf.GetBuffer ());
			_wbuf = new MemoryStream ();
		}
	}
}