summaryrefslogtreecommitdiff
path: root/components/dlink
diff options
context:
space:
mode:
authorUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:17:30 -0800
committerUlf Wiger <ulf@feuerlabs.com>2016-01-12 08:22:36 -0800
commit37dd6cef3e8abdee0829aabf121a2ca5dd35b14e (patch)
treeb7174d0d66f2db6f9a0d724213ad19e427e22847 /components/dlink
parent7922125aba23033945e3b55a4bf78ef8e84521d0 (diff)
downloadrvi_core-37dd6cef3e8abdee0829aabf121a2ca5dd35b14e.tar.gz
fragmentation tests
Diffstat (limited to 'components/dlink')
-rw-r--r--components/dlink/src/dlink_data.erl93
-rw-r--r--components/dlink/src/dlink_data_json.erl2
-rw-r--r--components/dlink/src/dlink_data_msgpack.erl7
3 files changed, 98 insertions, 4 deletions
diff --git a/components/dlink/src/dlink_data.erl b/components/dlink/src/dlink_data.erl
new file mode 100644
index 0000000..d16d923
--- /dev/null
+++ b/components/dlink/src/dlink_data.erl
@@ -0,0 +1,93 @@
+-module(dlink_data).
+
+-export([decode/5,
+ encode/3]).
+
+-include_lib("lager/include/log.hrl").
+
+decode(Data, F, St, Mod, FragOpts) when is_function(F,1) ->
+ DecodeRes = case St of
+ <<>> when Data == <<>> ->
+ {ok, <<>>};
+ <<>> -> do_decode(Data);
+ Rest when is_binary(Rest) ->
+ do_decode(<<Rest/binary, Data/binary>>);
+ Cont when is_function(Cont, 1) ->
+ Cont(Data)
+ end,
+ case DecodeRes of
+ Cont1 when is_function(Cont1, 1) ->
+ {ok, Cont1};
+ {ok, Rest1} ->
+ {ok, Rest1};
+ {ok, Decoded, Rest1} ->
+ decoded(Decoded, Rest1, F, Mod, FragOpts);
+ {error, _} = Err ->
+ Err
+ end.
+
+encode(Msg, PMod, PSt) ->
+ PMod:encode(Msg, PSt).
+
+do_decode(Data) ->
+ case Data of
+ <<8:4,_:4,_/binary>> ->
+ %% msgpack map
+ ?debug("detected msgpack map", []),
+ msgpack_decode(Data);
+ <<H, _/binary>> when H==16#de; H==16#df ->
+ %% msgpack map 16 or map 32
+ ?debug("detected msgpack map 16 or map 32", []),
+ msgpack_decode(Data);
+ _ ->
+ ?debug("assuming json", []),
+ jsx_decode(Data)
+ end.
+
+decoded(Decoded, Rest, F, Mod, FragOpts) ->
+ case rvi_frag:maybe_fragment(Decoded, Mod, FragOpts) of
+ true ->
+ {ok, Rest};
+ {true, Msg} ->
+ case do_decode(Msg) of
+ {ok, DecMsg, <<>>} ->
+ F(DecMsg),
+ decode(Rest, F, <<>>, Mod, FragOpts);
+ {error, _} = Err1 ->
+ Err1
+ end;
+ false ->
+ F(Decoded),
+ decode(Rest, F, <<>>, Mod, FragOpts)
+ end.
+
+msgpack_decode(Data) ->
+ case msgpack:unpack_stream(Data, [jsx]) of
+ {error, incomplete} ->
+ fun(NewData) ->
+ msgpack_decode(
+ <<Data/binary, NewData/binary>>)
+ end;
+ {error, E} ->
+ {error, E};
+ {Decoded, Rest} when is_binary(Rest) ->
+ {ok, Decoded, Rest}
+ end.
+
+jsx_decode(Data) ->
+ try jsx_decode_res(jsx:decode(Data, [stream, return_tail]))
+ catch
+ error:E ->
+ ?error("jsx decode failed: ~p", [E]),
+ {error, E}
+ end.
+
+jsx_decode_res(Res) ->
+ case Res of
+ {incomplete, Cont} ->
+ fun(NewData) ->
+ jsx_decode_res(Cont(NewData))
+ end;
+ {with_tail, Decoded, Rest} ->
+ {ok, Decoded, Rest}
+ end.
diff --git a/components/dlink/src/dlink_data_json.erl b/components/dlink/src/dlink_data_json.erl
index 6a68e48..2442b1c 100644
--- a/components/dlink/src/dlink_data_json.erl
+++ b/components/dlink/src/dlink_data_json.erl
@@ -6,7 +6,7 @@
port_options/0]).
-init(_Opts) ->
+init(_) ->
[].
port_options() ->
diff --git a/components/dlink/src/dlink_data_msgpack.erl b/components/dlink/src/dlink_data_msgpack.erl
index 253da55..14139bc 100644
--- a/components/dlink/src/dlink_data_msgpack.erl
+++ b/components/dlink/src/dlink_data_msgpack.erl
@@ -1,20 +1,21 @@
-module(dlink_data_msgpack).
--export([init/1,
+-export([init/0, init/1,
decode/3,
encode/2]).
-export([port_options/0]).
-record(st, {opts = [{allow_atom, pack},
- {enable_str, true},
jsx],
buf = <<>>}).
port_options() ->
[binary, {packet, 0}].
-init(_CS) ->
+init(_) -> init().
+
+init() ->
#st{}.
decode(Msg0, F, #st{buf = Prev, opts = Opts} = St) when is_function(F, 1) ->