path: root/lib/ssl/test/dist_cryptcookie.erl
diff options
Diffstat (limited to 'lib/ssl/test/dist_cryptcookie.erl')
1 files changed, 595 insertions, 0 deletions
diff --git a/lib/ssl/test/dist_cryptcookie.erl b/lib/ssl/test/dist_cryptcookie.erl
new file mode 100644
index 0000000000..703d828606
--- /dev/null
+++ b/lib/ssl/test/dist_cryptcookie.erl
@@ -0,0 +1,595 @@
+%% %CopyrightBegin%
+%% Copyright Ericsson AB 2022-2023. All Rights Reserved.
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%% %CopyrightEnd%
+%% -------------------------------------------------------------------------
+%% Module for encrypted Erlang protocol - a minimal encrypted
+%% distribution protocol based on only a shared secret
+%% and the crypto application, over a Stream
+-feature(maybe_expr, enable).
+%% inet_epmd_* API
+ start_dist_ctrl/1,
+ controlling_process/2,
+ hs_data/2]).
+%% ------------------------------------------------------------
+protocol() ->
+ cryptcookie:start_keypair_server().
+%% ------------------------------------------------------------
+start_dist_ctrl({Stream, ChunkSize, CipherState}) ->
+ ControllingProcess = self(),
+ Tag = make_ref(),
+ Pid =
+ spawn_opt(
+ fun () ->
+ receive
+ {Tag, Alias, {start, Stream_2}} ->
+ reply(Alias, trace(started)),
+ handshake(
+ Stream_2, Tag, ControllingProcess, ChunkSize,
+ CipherState)
+ end
+ end,
+ [link,
+ {priority, max},
+ {message_queue_data, off_heap},
+ {fullsweep_after, 0}]),
+ ControllingProcessFun = element(3, Stream),
+ Stream_1 = ControllingProcessFun(Stream, Pid),
+ DistCtrlHandle = {Pid, Tag},
+ started = call(DistCtrlHandle, {start, Stream_1}),
+ DistCtrlHandle.
+call({Pid, Tag}, Request) ->
+ Ref = Alias = monitor(process, Pid, [{alias, reply_demonitor}]),
+ Pid ! {Tag, Alias, Request},
+ receive
+ {Alias, Response} ->
+ Response;
+ {'DOWN', Ref, process, Pid, Reason} ->
+ error({dist_ctrl, Reason})
+ end.
+reply(Alias, Response) when is_reference(Alias) ->
+ Alias ! {Alias, Response},
+ ok.
+%% ------------------------------------------------------------
+controlling_process(DistCtrlHandle, Pid) ->
+ call(DistCtrlHandle, {controlling_process, Pid}),
+ DistCtrlHandle.
+%% ------------------------------------------------------------
+hs_data(NetAddress, {DistCtrl, _} = DistCtrlHandle) ->
+ #hs_data{
+ socket = DistCtrlHandle,
+ f_send =
+ fun (S, Packet) when S =:= DistCtrlHandle ->
+ call(S, {send, Packet})
+ end,
+ f_recv =
+ fun (S, 0, infinity) when S =:= DistCtrlHandle ->
+ call(S, recv)
+ end,
+ f_setopts_pre_nodeup = f_ok(DistCtrlHandle),
+ f_setopts_post_nodeup = f_ok(DistCtrlHandle),
+ f_address =
+ fun (S, Node) when S =:= DistCtrlHandle ->
+ inet_epmd_dist:f_address(NetAddress, Node)
+ end,
+ f_getll =
+ fun (S) when S =:= DistCtrlHandle ->
+ {ok, DistCtrl}
+ end,
+ f_handshake_complete =
+ fun (S, _Node, DistHandle) when S =:= DistCtrlHandle ->
+ call(S, {handshake_complete, DistHandle})
+ end,
+ %%
+ %%
+ mf_tick =
+ fun (S) when S =:= DistCtrlHandle ->
+ DistCtrl ! dist_tick
+ end }.
+f_ok(DistCtrlHandle) ->
+ fun (S) when S =:= DistCtrlHandle -> ok end.
+%% -------------------------------------------------------------------------
+%% net_kernel distribution handshake in progress
+ Stream, Tag, ControllingProcess, ChunkSize,
+ {DecryptState, EncryptState} = _CipherState) ->
+ handshake(
+ Stream, Tag, ControllingProcess,
+ ChunkSize, DecryptState, EncryptState).
+ Stream = {InStream, OutStream, ControllingProcessFun},
+ Tag, ControllingProcess, ChunkSize, DecryptState, EncryptState) ->
+ receive
+ {Tag, From, {controlling_process, NewControllingProcess}} ->
+ link(NewControllingProcess),
+ unlink(ControllingProcess),
+ reply(From, ok),
+ handshake(
+ Stream, Tag, NewControllingProcess,
+ ChunkSize, DecryptState, EncryptState);
+ {Tag, From, {handshake_complete, DistHandle}} ->
+ InputHandler =
+ spawn_opt(
+ fun () ->
+ link(ControllingProcess),
+ receive
+ {Tag, dist_handle, DistHandle, InStream_2} ->
+ input_handler_start(
+ InStream_2, DecryptState, DistHandle)
+ end
+ end,
+ [link,
+ {priority, normal},
+ {message_queue_data, off_heap},
+ {fullsweep_after, 0}]),
+ _ = monitor(process, InputHandler), % For the benchmark test
+ {InStream_1, OutStream_1, _} =
+ ControllingProcessFun(Stream, InputHandler),
+ false = erlang:dist_ctrl_set_opt(DistHandle, get_size, true),
+ ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler),
+ InputHandler ! {Tag, dist_handle, DistHandle, InStream_1},
+ reply(From, ok),
+ process_flag(priority, normal),
+ output_handler_start(
+ OutStream_1, EncryptState, ChunkSize, DistHandle);
+ %%
+ {Tag, From, {send, Data}} ->
+ {OutStream_1, EncryptState_1} =
+ cryptcookie:encrypt_and_send_chunk(
+ OutStream, EncryptState, Data, iolist_size(Data)),
+ if
+ hd(OutStream_1) =:= closed ->
+ reply(From, {error, closed}),
+ death_row({send, trace(closed)});
+ true ->
+ reply(From, ok),
+ handshake(
+ setelement(2, Stream, OutStream_1),
+ Tag, ControllingProcess,
+ ChunkSize, DecryptState, EncryptState_1)
+ end;
+ {Tag, From, recv} ->
+ {[Data | InStream_1], DecryptState_1} =
+ cryptcookie:recv_and_decrypt_chunk(InStream, DecryptState),
+ if
+ Data =:= closed ->
+ reply(From, {error, Data}),
+ death_row({recv, trace(Data)});
+ true ->
+ reply(From, {ok, binary_to_list(Data)}),
+ handshake(
+ setelement(1, Stream, InStream_1),
+ Tag, ControllingProcess,
+ ChunkSize, DecryptState_1, EncryptState)
+ end;
+ %%
+ Alien ->
+ _ = trace(Alien),
+ handshake(
+ Stream, Tag, ControllingProcess,
+ ChunkSize, DecryptState, EncryptState)
+ end.
+%% -------------------------------------------------------------------------
+%% Output handler process
+%% Await an event about what to do; fetch dist data from the VM,
+%% or send a dist tick.
+%% In case we are overloaded we could get many accumulated
+%% dist_tick messages; make sure to flush all of them
+%% before proceeding with what to do. But, do not use selective
+%% receive since that does not perform well when there are
+%% many messages in the process mailbox.
+%% Entry function
+output_handler_start(OutStream, EncryptState, ChunkSize, DistHandle) ->
+ try
+ erlang:dist_ctrl_get_data_notification(DistHandle),
+ output_handler(OutStream, EncryptState, [ChunkSize|DistHandle])
+ catch
+ Class : Reason : Stacktrace when Class =:= error ->
+ error_report(
+ [output_handler_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+%% Loop top
+%% Awaiting outbound data or tick
+output_handler(OutStream, EncryptState, CS_DH) ->
+ receive
+ Msg ->
+ case Msg of
+ dist_data ->
+ output_handler_data(
+ OutStream, EncryptState, CS_DH);
+ dist_tick ->
+ output_handler_tick(
+ OutStream, EncryptState, CS_DH);
+ _ ->
+ %% Ignore
+ _ = trace(Msg),
+ output_handler(OutStream, EncryptState, CS_DH)
+ end
+ end.
+%% We have received at least one dist_tick but no dist_data message
+output_handler_tick(OutStream, EncryptState, CS_DH) ->
+ receive
+ Msg ->
+ case Msg of
+ dist_data ->
+ output_handler_data(
+ OutStream, EncryptState, CS_DH);
+ dist_tick ->
+ %% Consume all dist_tick messages
+ %%
+ output_handler_tick(OutStream, EncryptState, CS_DH);
+ _ ->
+ %% Ignore
+ _ = trace(Msg),
+ output_handler_tick(OutStream, EncryptState, CS_DH)
+ end
+ after 0 ->
+ {OutStream_1, EncryptState_1} =
+ cryptcookie:encrypt_and_send_chunk(
+ OutStream, EncryptState, <<>>, 0),
+ if
+ hd(OutStream_1) =:= closed ->
+ death_row({send_tick, trace(closed)});
+ true ->
+ output_handler(OutStream_1, EncryptState_1, CS_DH)
+ end
+ end.
+%% We have received a dist_data notification
+output_handler_data(OutStream, EncryptState, CS_DH) ->
+ {OutStream_1, EncryptState_1} =
+ output_handler_xfer(OutStream, EncryptState, CS_DH, [], 0, []),
+ erlang:dist_ctrl_get_data_notification(tl(CS_DH)),
+ output_handler(OutStream_1, EncryptState_1, CS_DH).
+%% Get outbound data from VM; encrypt and send,
+%% until the VM has no more
+%% Front,Size,Rear is an Okasaki queue of binaries with total byte Size
+ OutStream, EncryptState, CS_DH, Front, Size, Rear)
+ when hd(CS_DH) =< Size ->
+ %%
+ %% We have a full chunk or more
+ %% -> collect one chunk or less and send
+ output_handler_collect(
+ OutStream, EncryptState, CS_DH, Front, Size, Rear);
+ OutStream, EncryptState, CS_DH, Front, Size, Rear) ->
+ %% when Size < hd(CS_DH) ->
+ %%
+ %% We do not have a full chunk -> try to fetch more from VM
+ case erlang:dist_ctrl_get_data(tl(CS_DH)) of
+ none ->
+ if
+ Size =:= 0 ->
+ %% No more data from VM, nothing buffered
+ %% -> done, for now
+ {OutStream, EncryptState};
+ true ->
+ %% The VM had no more -> send what we have
+ output_handler_collect(
+ OutStream, EncryptState, CS_DH, Front, Size, Rear)
+ end;
+ {Len,Iov} ->
+ output_handler_enq(
+ OutStream, EncryptState, CS_DH,
+ Front, Size + 4 + Len, [<<Len:32>>|Rear],
+ Iov)
+ end.
+%% Enqueue VM data while splitting large binaries into
+%% chunk size; hd(CS_DH)
+ OutStream, EncryptState, CS_DH, Front, Size, Rear, []) ->
+ output_handler_xfer(
+ OutStream, EncryptState, CS_DH, Front, Size, Rear);
+ OutStream, EncryptState, CS_DH, Front, Size, Rear, [Bin|Iov]) ->
+ output_handler_enq(
+ OutStream, EncryptState, CS_DH, Front, Size, Rear, Iov, Bin).
+ OutStream, EncryptState, CS_DH, Front, Size, Rear, Iov, Bin) ->
+ BinSize = byte_size(Bin),
+ ChunkSize = hd(CS_DH),
+ if
+ BinSize =< ChunkSize ->
+ output_handler_enq(
+ OutStream, EncryptState, CS_DH, Front, Size, [Bin|Rear],
+ Iov);
+ true ->
+ <<Bin1:ChunkSize/binary, Bin2/binary>> = Bin,
+ output_handler_enq(
+ OutStream, EncryptState, CS_DH, Front, Size, [Bin1|Rear],
+ Iov, Bin2)
+ end.
+%% Collect small binaries into chunks of at most
+%% chunk size; hd(CS_DH)
+output_handler_collect(OutStream, EncryptState, CS_DH, [], Zero, []) ->
+ 0 = Zero, % ASSERT
+ %% No more enqueued -> try to get more form VM
+ output_handler_xfer(OutStream, EncryptState, CS_DH, [], Zero, []);
+output_handler_collect(OutStream, EncryptState, CS_DH, Front, Size, Rear) ->
+ output_handler_collect(
+ OutStream, EncryptState, CS_DH, Front, Size, Rear, [], 0).
+ OutStream, EncryptState, CS_DH, [], Zero, [], Acc, DataSize) ->
+ 0 = Zero, % ASSERT
+ output_handler_chunk(
+ OutStream, EncryptState, CS_DH, [], Zero, [], Acc, DataSize);
+ OutStream, EncryptState, CS_DH, [], Size, Rear, Acc, DataSize) ->
+ %% Okasaki queue transfer Rear -> Front
+ output_handler_collect(
+ OutStream, EncryptState, CS_DH, lists:reverse(Rear), Size, [],
+ Acc, DataSize);
+ OutStream, EncryptState, CS_DH, [Bin|Iov] = Front, Size, Rear,
+ Acc, DataSize) ->
+ ChunkSize = hd(CS_DH),
+ BinSize = byte_size(Bin),
+ DataSize_1 = DataSize + BinSize,
+ if
+ ChunkSize < DataSize_1 ->
+ %% Bin does not fit in chunk -> send Acc
+ output_handler_chunk(
+ OutStream, EncryptState, CS_DH, Front, Size, Rear,
+ Acc, DataSize);
+ DataSize_1 < ChunkSize ->
+ %% Chunk not full yet -> try to accumulate more
+ output_handler_collect(
+ OutStream, EncryptState, CS_DH, Iov, Size - BinSize, Rear,
+ [Bin|Acc], DataSize_1);
+ true -> % DataSize_1 == ChunkSize ->
+ %% Optimize one iteration; Bin fits exactly
+ %% -> accumulate and send
+ output_handler_chunk(
+ OutStream, EncryptState, CS_DH, Iov, Size - BinSize, Rear,
+ [Bin|Acc], DataSize_1)
+ end.
+%% Encrypt and send a chunk
+ OutStream, EncryptState, CS_DH, Front, Size, Rear, Acc, DataSize) ->
+ Data = lists:reverse(Acc),
+ {OutStream_1, EncryptState_1} =
+ cryptcookie:encrypt_and_send_chunk(
+ OutStream, EncryptState, Data, DataSize),
+ if
+ hd(OutStream_1) =:= closed ->
+ death_row({send_chunk, trace(closed)});
+ true ->
+ output_handler_collect(
+ OutStream_1, EncryptState_1, CS_DH, Front, Size, Rear)
+ end.
+%% -------------------------------------------------------------------------
+%% Input handler process
+%% Entry function
+input_handler_start(InStream, DecryptState, DistHandle) ->
+ try
+ input_handler(InStream, DecryptState, DistHandle)
+ catch
+ Class : Reason : Stacktrace when Class =:= error ->
+ error_report(
+ [input_handler_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+%% Loop top
+input_handler(InStream, DecryptState, DistHandle) ->
+ %% Shortcut into the loop
+ {InStream_1, DecryptState_1, Chunk} =
+ input_chunk(InStream, DecryptState),
+ input_handler(
+ InStream_1, DecryptState_1, DistHandle, Chunk, [], byte_size(Chunk)).
+input_handler(InStream, DecryptState, DistHandle, First, Buffer, Size) ->
+ %% Size is size of First + Buffer
+ case First of
+ <<Packet1Size:32, Packet1:Packet1Size/binary,
+ Packet2Size:32, Packet2:Packet2Size/binary, Rest/binary>> ->
+ erlang:dist_ctrl_put_data(DistHandle, Packet1),
+ erlang:dist_ctrl_put_data(DistHandle, Packet2),
+ input_handler(
+ InStream, DecryptState, DistHandle,
+ Rest, Buffer, Size - (8 + Packet1Size + Packet2Size));
+ <<PacketSize:32, Packet:PacketSize/binary, Rest/binary>> ->
+ erlang:dist_ctrl_put_data(DistHandle, Packet),
+ input_handler(
+ InStream, DecryptState, DistHandle,
+ Rest, Buffer, Size - (4 + PacketSize));
+ <<PacketSize:32, PacketStart/binary>> ->
+ %% Partial packet in First
+ input_handler(
+ InStream, DecryptState, DistHandle,
+ PacketStart, Buffer, Size - 4, PacketSize);
+ Tick = <<>> ->
+ erlang:dist_ctrl_put_data(DistHandle, Tick),
+ if
+ Buffer =:= [] ->
+ Size = 0, % ASSERT
+ input_handler(InStream, DecryptState, DistHandle);
+ true ->
+ [First_1 | Buffer_1] = lists:reverse(Buffer),
+ input_handler(
+ InStream, DecryptState, DistHandle,
+ First_1, Buffer_1, Size)
+ end;
+ <<Bin/binary>> ->
+ %% Partial header in First
+ if
+ 4 =< Size ->
+ %% Complete header in First + Buffer
+ {First_1, Buffer_1, PacketSize} =
+ input_get_packet_size(Bin, lists:reverse(Buffer)),
+ input_handler(
+ InStream, DecryptState, DistHandle,
+ First_1, Buffer_1, Size - 4, PacketSize);
+ true ->
+ %% Incomplete header received so far
+ {InStream_1, DecryptState_1, Chunk} =
+ input_chunk(InStream, DecryptState),
+ input_handler(
+ InStream_1, DecryptState_1, DistHandle,
+ Bin, [Chunk|Buffer], Size + byte_size(Chunk))
+ end
+ end.
+ InStream, DecryptState, DistHandle,
+ PacketStart, Buffer, Size, PacketSize) ->
+ %%
+ %% Size is size of PacketStart + Buffer
+ RestSize = Size - PacketSize,
+ if
+ RestSize < 0 ->
+ %% Incomplete packet received so far
+ {InStream_1, DecryptState_1, Chunk} =
+ input_chunk(InStream, DecryptState),
+ input_handler(
+ InStream_1, DecryptState_1, DistHandle,
+ PacketStart, [Chunk|Buffer], Size + byte_size(Chunk),
+ PacketSize);
+ 0 < RestSize, Buffer =:= [] ->
+ %% Rest data in PacketStart
+ <<Packet:PacketSize/binary, Rest/binary>> = PacketStart,
+ erlang:dist_ctrl_put_data(DistHandle, Packet),
+ input_handler(
+ InStream, DecryptState, DistHandle, Rest, [], RestSize);
+ Buffer =:= [] -> % RestSize == 0, Size == 0
+ %% No rest data
+ erlang:dist_ctrl_put_data(DistHandle, PacketStart),
+ input_handler(InStream, DecryptState, DistHandle);
+ true ->
+ %% Split packet from rest data
+ LastBin = hd(Buffer),
+ <<PacketLast:(byte_size(LastBin) - RestSize)/binary,
+ Rest/binary>> = LastBin,
+ Packet = [PacketStart|lists:reverse(tl(Buffer), PacketLast)],
+ erlang:dist_ctrl_put_data(DistHandle, Packet),
+ input_handler(
+ InStream, DecryptState, DistHandle, Rest, [], RestSize)
+ end.
+input_get_packet_size(First, [Bin|Buffer]) ->
+ MissingSize = 4 - byte_size(First),
+ if
+ MissingSize =< byte_size(Bin) ->
+ <<Last:MissingSize/binary, Rest/binary>> = Bin,
+ <<PacketSize:32>> = <<First/binary, Last/binary>>,
+ {Rest, lists:reverse(Buffer), PacketSize};
+ true ->
+ input_get_packet_size(<<First/binary, Bin/binary>>, Buffer)
+ end.
+input_chunk(InStream, DecryptState) ->
+ {[Chunk | InStream_1], DecryptState_1} =
+ cryptcookie:recv_and_decrypt_chunk(InStream, DecryptState),
+ if
+ is_atom(Chunk) ->
+ _ = Chunk =:= closed
+ orelse
+ error_report(
+ {reason, Chunk}]),
+ _ = trace({?FUNCTION_NAME, Chunk}),
+ exit(connection_closed);
+ is_binary(Chunk) ->
+ {InStream_1, DecryptState_1, Chunk}
+ end.
+%% -------------------------------------------------------------------------
+%% Wait for getting killed by process link,
+%% and if that does not happen - drop dead
+death_row(Reason) ->
+ receive
+ after 5000 ->
+ death_row_timeout(Reason)
+ end.
+death_row_timeout(Reason) ->
+ error_report(
+ {reason, Reason},
+ {pid, self()}]),
+ exit(Reason).
+%% -------------------------------------------------------------------------
+error_report(Report) ->
+ error_logger:error_report(Report).
+info_report(Report) ->
+ error_logger:info_report(Report).
+%% Trace point
+trace(Term) -> Term.