Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:24
erlang
4146-Clean-up-output_handler.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 4146-Clean-up-output_handler.patch of Package erlang
From a626e320710c1a1572126e7aa5e2285a42cb747e Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Mon, 24 Oct 2022 17:11:18 +0200 Subject: [PATCH 26/27] Clean up output_handler --- lib/ssl/test/inet_crypto_dist.erl | 240 +++++++++++++++++------------- 1 file changed, 139 insertions(+), 101 deletions(-) diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl index b9a7e5ba1f..f28da210f7 100644 --- a/lib/ssl/test/inet_crypto_dist.erl +++ b/lib/ssl/test/inet_crypto_dist.erl @@ -1093,26 +1093,8 @@ handshake( link(Controller), receive DistHandle -> - ok = - inet:setopts( - Socket, - [{active, ?TCP_ACTIVE}, - inet_tcp_dist:nodelay()]), - try - input_handler( - RecvParams#params{ - dist_handle = DistHandle}, - RecvSeq) - catch - Class : Reason : Stacktrace -> - error_logger:info_report( - [input_handler_exception, - {class, Class}, - {reason, Reason}, - {stacktrace, Stacktrace}]), - erlang:raise( - Class, Reason, Stacktrace) - end + input_handler( + RecvParams, RecvSeq, DistHandle) end end, [link, @@ -1137,29 +1119,12 @@ handshake( {fullsweep_after, 0}])), _ = monitor(process, InputHandler), % For the benchmark test ok = gen_tcp:controlling_process(Socket, InputHandler), + false = erlang:dist_ctrl_set_opt(DistHandle, get_size, true), ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler), InputHandler ! DistHandle, - crypto:rand_seed_alg(crypto_cache), reply(From, ok), process_flag(priority, normal), - erlang:dist_ctrl_get_data_notification(DistHandle), - try - output_handler( - SendParams#params{ - dist_handle = DistHandle, - rekey_msg = - start_rekey_timer(SendParams#params.rekey_time)}, - SendSeq) - catch - Class : Reason : Stacktrace -> - error_logger:info_report( - [output_handler_exception, - {class, Class}, - {reason, Reason}, - {stacktrace, Stacktrace}]), - erlang:raise( - Class, Reason, Stacktrace) - end; + output_handler(SendParams, SendSeq, DistHandle); %% {?MODULE, From, {send, Data}} -> {SendParams_1, SendSeq_1, Result} = @@ -1223,10 +1188,38 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) -> %% ------------------------------------------------------------------------- %% Output handler process %% -%% The game here is to flush all dist_data and dist_tick messages, -%% prioritize dist_data over dist_tick, and to not use selective receive -%% because that would hurt performance during overload +%% Await an event about what to do; fetch dist data from the VM, +%% send a dist tick, or rekey outbound encryption parameters. +%% +%% In case we are overloaded and could get many accumulated +%% dist_data or dist_tick messages; make sure to flush all of them +%% before proceeding with what to do. But, do not use selective +%% receive since that does not perform well when there are +%% many messages in the process mailbox. + +%% Entry function +output_handler(Params, Seq, DistHandle) -> + try + _ = crypto:rand_seed_alg(crypto_cache), + erlang:dist_ctrl_get_data_notification(DistHandle), + output_handler( + Params#params{ + dist_handle = DistHandle, + rekey_msg = start_rekey_timer(Params#params.rekey_time)}, + Seq) + catch + Class : Reason : Stacktrace -> + error_logger:info_report( + [output_handler_exception, + {class, Class}, + {reason, Reason}, + {stacktrace, Stacktrace}]), + erlang:raise(Class, Reason, Stacktrace) + end. +%% Loop top +%% +%% State: lurking until any interesting message output_handler(Params, Seq) -> receive Msg -> @@ -1305,64 +1298,130 @@ output_handler_rekey(Params, Seq) -> end. - +%% Get outbound data from VM; encrypt and send, +%% until the VM has no more +%% output_handler_xfer(Params, Seq) -> output_handler_xfer(Params, Seq, [], 0, []). %% -output_handler_xfer(Params, Seq, {Front, Size, Rear}) -> - output_handler_xfer(Params, Seq, Front, Size, Rear). +%% Front,Size,Rear is an Okasaki queue of binaries with total byte Size %% output_handler_xfer(Params, Seq, Front, Size, Rear) when ?CHUNK_SIZE =< Size -> - {Data, Q} = deq_iovec(?CHUNK_SIZE, Front, Size, Rear), - {Params_1, Seq_1, Result} = - encrypt_and_send_chunk( - Params, Seq, [?DATA_CHUNK, Data], 1 + ?CHUNK_SIZE), - if - Result =:= ok -> - output_handler_xfer(Params_1, Seq_1, Q); - true -> - death_row({send_chunk, trace(Result)}) - end; + %% + %% We have a full chunk or more + %% -> collect one chunk or less and send + output_handler_collect(Params, Seq, Front, Size, Rear); output_handler_xfer(Params, Seq, Front, Size, Rear) -> + %% when Size < ?CHUNK_SIZE -> + %% + %% We do not have a full chunk -> try to fetch more from VM case erlang:dist_ctrl_get_data(Params#params.dist_handle) of none -> if Size =:= 0 -> + %% No more data from VM, nothing buffered + %% -> go back to lurking {Params, Seq}; true -> - Data = Front ++ lists:reverse(Rear), - {Params_1, Seq_1, Result} = - encrypt_and_send_chunk( - Params, Seq, [?DATA_CHUNK, Data], 1 + Size), - if - Result =:= ok -> - {Params_1, Seq_1}; - true -> - death_row({send_chunk, trace(Result)}) - end + %% The VM had no more -> send what we have + output_handler_collect(Params, Seq, Front, Size, Rear) end; - Bin when is_binary(Bin) -> - Len = byte_size(Bin), - output_handler_xfer( - Params, Seq, Front, - Size + 4 + Len, [Bin, <<Len:32>>|Rear]); - [Bin1, Bin2] -> - Len = byte_size(Bin1) + byte_size(Bin2), - output_handler_xfer( - Params, Seq, Front, - Size + 4 + Len, [Bin2, Bin1, <<Len:32>>|Rear]); - Iovec -> - Len = iolist_size(Iovec), - output_handler_xfer( - Params, Seq, Front, - Size + 4 + Len, lists:reverse(Iovec, [<<Len:32>>|Rear])) + {Len,Iov} -> + output_handler_enq( + Params, Seq, Front, Size + 4 + Len, [<<Len:32>>|Rear], Iov) + end. + +%% Enqueue VM data while splitting large binaries into ?CHUNK_SIZE +%% +output_handler_enq(Params, Seq, Front, Size, Rear, []) -> + output_handler_xfer(Params, Seq, Front, Size, Rear); +output_handler_enq(Params, Seq, Front, Size, Rear, [Bin|Iov]) -> + output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin). +%% +output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin) -> + BinSize = byte_size(Bin), + if + BinSize =< ?CHUNK_SIZE -> + output_handler_enq( + Params, Seq, Front, Size, [Bin|Rear], Iov); + true -> + <<Bin1:?CHUNK_SIZE/binary, Bin2/binary>> = Bin, + output_handler_enq( + Params, Seq, Front, Size, [Bin1|Rear], Iov, Bin2) + end. + +%% Collect small binaries into chunks of at most ?CHUNK_SIZE +%% +output_handler_collect(Params, Seq, [], Zero, []) -> + 0 = Zero, % Assert + %% No more enqueued -> try to get more form VM + output_handler_xfer(Params, Seq); +output_handler_collect(Params, Seq, Front, Size, Rear) -> + output_handler_collect(Params, Seq, Front, Size, Rear, [], 0). +%% +output_handler_collect(Params, Seq, [], Zero, [], Acc, DataSize) -> + 0 = Zero, % Assert + output_handler_chunk(Params, Seq, [], Zero, [], Acc, DataSize); +output_handler_collect(Params, Seq, [], Size, Rear, Acc, DataSize) -> + %% Okasaki queue transfer Rear -> Front + output_handler_collect( + Params, Seq, lists:reverse(Rear), Size, [], Acc, DataSize); +output_handler_collect( + Params, Seq, [Bin|Iov] = Front, Size, Rear, Acc, DataSize) -> + BinSize = byte_size(Bin), + DataSize_1 = DataSize + BinSize, + if + ?CHUNK_SIZE < DataSize_1 -> + %% Bin does not fit in chunk -> send Acc + output_handler_chunk( + Params, Seq, Front, Size, Rear, Acc, DataSize); + DataSize_1 < ?CHUNK_SIZE -> + %% Chunk not full yet -> try to accumulate more + output_handler_collect( + Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1); + true -> % DataSize_1 == ?CHUNK_SIZE -> + %% Optimize one iteration; Bin fits exactly -> accumulate and send + output_handler_chunk( + Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1) + end. + +%% Encrypt and send a chunk +%% +output_handler_chunk(Params, Seq, Front, Size, Rear, Acc, DataSize) -> + Data = lists:reverse(Acc), + {Params_1, Seq_1, Result} = + encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK|Data], 1 + DataSize), + if + Result =:= ok -> + %% Try to collect another chunk + output_handler_collect(Params_1, Seq_1, Front, Size, Rear); + true -> + death_row({send_chunk, trace(Result)}) end. %% ------------------------------------------------------------------------- %% Input handler process %% +%% Entry function +input_handler(#params{socket = Socket} = Params, Seq, DistHandle) -> + try + ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}, nodelay()]), + input_handler( + Params#params{dist_handle = DistHandle}, + Seq) + catch + Class : Reason : Stacktrace -> + error_logger:info_report( + [input_handler_exception, + {class, Class}, + {reason, Reason}, + {stacktrace, Stacktrace}]), + erlang:raise(Class, Reason, Stacktrace) + end. + +%% Loop top input_handler(Params, Seq) -> %% Shortcut into the loop {Params_1, Seq_1, Data} = input_data(Params, Seq), @@ -1626,27 +1685,6 @@ block_decrypt( error end. -%% ------------------------------------------------------------------------- -%% Queue of binaries i.e an iovec queue - -deq_iovec(GetSize, Front, Size, Rear) -> - deq_iovec(GetSize, Front, Size, Rear, []). -%% -deq_iovec(GetSize, [], Size, Rear, Acc) -> - deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc); -deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) -> - BinSize = byte_size(Bin), - if - BinSize < GetSize -> - deq_iovec( - GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]); - GetSize < BinSize -> - {Bin1,Bin2} = erlang:split_binary(Bin, GetSize), - {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}}; - true -> - {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}} - end. - %% ------------------------------------------------------------------------- death_row(Reason) -> -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor