1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
using System;
using ZMQ;
using System.IO;
using Thrift.Transport;
namespace ZmqClient
{
public class TZmqClient : TTransport
{
Socket _sock;
String _endpoint;
MemoryStream _wbuf = new MemoryStream ();
MemoryStream _rbuf = new MemoryStream ();
void debug (string msg)
{
//Uncomment to enable debug
// Console.WriteLine (msg);
}
public TZmqClient (Context ctx, String endpoint, SocketType sockType)
{
_sock = ctx.Socket (sockType);
_endpoint = endpoint;
}
public override void Open ()
{
_sock.Connect (_endpoint);
}
public override void Close ()
{
throw new NotImplementedException ();
}
public override bool IsOpen {
get {
throw new NotImplementedException ();
}
}
public override int Read (byte[] buf, int off, int len)
{
debug ("Client_Read");
if (off != 0 || len != buf.Length)
throw new NotImplementedException ();
if (_rbuf.Length == 0) {
//Fill the Buffer with the complete ZMQ Message which needs to be(?!) the complete Thrift response
debug ("Client_Read Filling buffer..");
byte[] tmpBuf = _sock.Recv ();
debug (string.Format("Client_Read filled with {0}b",tmpBuf.Length));
_rbuf.Write (tmpBuf, 0, tmpBuf.Length);
_rbuf.Position = 0; //For reading
}
int ret = _rbuf.Read (buf, 0, len);
if (_rbuf.Length == _rbuf.Position) //Finished reading
_rbuf.SetLength (0);
debug (string.Format ("Client_Read return {0}b, remaining {1}b", ret, _rbuf.Length - _rbuf.Position));
return ret;
}
public override void Write (byte[] buf, int off, int len)
{
debug ("Client_Write");
_wbuf.Write (buf, off, len);
}
public override void Flush ()
{
debug ("Client_Flush");
_sock.Send (_wbuf.GetBuffer ());
_wbuf = new MemoryStream ();
}
}
}
|