Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:26
erlang
3352-Implement-read_ahead-option-for-gen_tcp_so...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 3352-Implement-read_ahead-option-for-gen_tcp_socket.patch of Package erlang
From a4104972575a8fb48e232fb9d745db42f82fdf94 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Wed, 21 Aug 2024 16:53:01 +0200 Subject: [PATCH 2/8] Implement read_ahead option for gen_tcp_socket --- lib/kernel/src/gen_tcp_socket.erl | 187 +++++++++++++++++++----------- 1 file changed, 122 insertions(+), 65 deletions(-) diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl index 26d266fcb9..b8abab3b45 100644 --- a/lib/kernel/src/gen_tcp_socket.erl +++ b/lib/kernel/src/gen_tcp_socket.erl @@ -2277,16 +2277,15 @@ handle_recv_raw(P, #{buffer := Buffer} = D, ActionsR, Length, CS) -> handle_recv_raw(P, D, ActionsR, Length, Buffer, CS). handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> - Size = iolist_size(Buffer), + BufferSize = iolist_size(Buffer), if - 0 < Length, Length =< Size -> + 0 < Length, Length =< BufferSize -> %% We have more buffered than requested - %% {Data, NewBuffer} = split_binary(condense_buffer(Buffer), Length), handle_recv_deliver(P, D#{buffer := NewBuffer}, [], Data); - Length == 0, 0 < Size -> + Length == 0, 0 < BufferSize -> %% We have some buffered and "what's available" requested handle_recv_deliver( P, D#{buffer := <<>>}, ActionsR, condense_buffer(Buffer)); @@ -2295,36 +2294,37 @@ handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> %% Less buffered than requested %% or empty buffer and "what's available" requested %% - %% i.e Length == 0, Size == 0; - %% 0 < Length, Size < Length + %% i.e Length == 0, BufferSize == 0; + %% 0 < Length, BufferSize < Length %% In both cases this works: - N = Length - Size, - case socket_recv(P#params.socket, N) of + N = Length - BufferSize, % How much to recv + case socket_recv(P#params.socket, read_size(D, N)) of {ok, <<Data/binary>>} -> - handle_recv_deliver( - P, D#{buffer := <<>>}, ActionsR, - condense_buffer(Data, Buffer)); + handle_recv_raw( + P, D, ActionsR, Length, buffer(Data, Buffer), recv); - {select, {?select_info(_) = SelectInfo, Data}} -> + {select, {?select_info(_) = SelectInfo, <<Data/binary>>}} -> if - 0 < Length -> - %% Need to wait for the rest of the data - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := buffer(Data, Buffer)}}, - reverse(ActionsR)}; - true -> %% Length == 0, Size == 0 - %% We take what we just got - %% and cancel the async recv + Length =< BufferSize + byte_size(Data) -> + %% Enough data; cancel the async recv + %% and use what we have Socket = P#params.socket, case socket:cancel(Socket, SelectInfo) of ok -> - handle_recv_deliver( - P, D, ActionsR, Data); + handle_recv_raw( + P, D, ActionsR, Length, + buffer(Data, Buffer), recv); {error, Reason} -> - handle_recv_error( - P, D, ActionsR, Reason, Data) - end + handle_recv_raw_error_deliver( + P, D, ActionsR, Length, + Reason, condense_buffer(Data, Buffer)) + end; + true -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := buffer(Data, Buffer)}}, + reverse(ActionsR)} end; {select, ?select_info(_) = SelectInfo} -> @@ -2340,15 +2340,17 @@ handle_recv_raw(P, D, ActionsR, Length, Buffer, recv) -> {error, {Reason, <<Data/binary>>}} -> %% ?DBG({'recv error', Reason, byte_size(Data)}), if - 0 < Length -> - %% We didn't get all data we requested + Length < BufferSize + byte_size(Data) -> + %% Enough data + handle_recv_raw_error_deliver( + P, D, ActionsR, Length, + Reason, condense_buffer(Data, Buffer)); + true -> handle_recv_error( P, D#{buffer := buffer(Data, Buffer)}, - ActionsR, Reason); - true -> - %% Deliver what we got, then error - handle_recv_error(P, D, ActionsR, Reason, Data) + ActionsR, Reason) end; + {error, Reason} -> %% ?DBG({'recv error', Reason}), handle_recv_error(P, D, ActionsR, Reason) @@ -2368,54 +2370,90 @@ handle_recv_raw(P, D, ActionsR, Length, Buffer, CompletionStatus) -> ActionsR, Reason); true -> %% Deliver "what's available", then error - handle_recv_error(P, D, ActionsR, Reason, Data) + handle_recv_error( + P, recv_data_deliver(P, D, ActionsR, Data), Reason) end; {error, Reason} -> handle_recv_error(P, D#{buffer := Buffer}, [], Reason) end. +handle_recv_raw_error_deliver(P, D, ActionsR, Length, Reason, Data) -> + if + 0 < Length -> % Request length + {NewData, NewBuffer} = split_binary(Data, Length), + handle_recv_error( + P, + recv_data_deliver( + P, D#{buffer := NewBuffer}, ActionsR, NewData), + Reason); + Data =/= <<>> -> % Length == 0 + handle_recv_error( + P, recv_data_deliver(P, D#{buffer := <<>>}, ActionsR, Data), + Reason); + Data =:= <<>> -> % Length == 0 + handle_recv_error(P, D#{buffer := Data}, ActionsR, Reason) + end. + handle_recv_packet( P, #{recv_length := Length, buffer := Buffer} = D, ActionsR, recv) -> if 0 < Length -> %% We know how much we need for a packet - handle_recv_more(P, D, ActionsR, Length, Buffer); + handle_recv_packet_more(P, D, ActionsR, Length, Buffer); true -> - handle_recv_decode(P, D, ActionsR, condense_buffer(Buffer)) + handle_recv_packet_decode(P, D, ActionsR, condense_buffer(Buffer)) end; handle_recv_packet(P, #{buffer := Buffer} = D, ActionsR, CompletionStatus) -> case CompletionStatus of {ok, <<Data/binary>>} -> - handle_recv_decode( + handle_recv_packet_decode( P, D, ActionsR, condense_buffer(Data, Buffer)); {error, {Reason, <<Data/binary>>}} -> - handle_recv_error_decode( + handle_recv_packet_error_decode( P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); {error, Reason} -> handle_recv_error(P, D, ActionsR, Reason) end. -handle_recv_more(P, D, ActionsR, Length, Buffer) -> - Size = iolist_size(Buffer), +handle_recv_packet_more(P, D, ActionsR, Length, Buffer) -> + BufferSize = iolist_size(Buffer), if - Length =< Size -> + Length =< BufferSize -> %% We have more buffered than we need %% - handle_recv_decode(P, D, ActionsR, condense_buffer(Buffer)); + handle_recv_packet_decode( + P, D, ActionsR, condense_buffer(Buffer)); true -> - N = Length - Size, - case socket_recv(P#params.socket, N) of + N = Length - BufferSize, + case socket_recv(P#params.socket, read_size(D, N)) of {ok, <<Data/binary>>} -> - handle_recv_decode( - P, D, ActionsR, condense_buffer(Data, Buffer)); + handle_recv_packet_more( + P, D, ActionsR, Length, buffer(Data, Buffer)); {select, {?select_info(_) = SelectInfo, Data}} -> - %% Need to wait for the rest of the data - {next_state, - #recv{info = SelectInfo}, - {P, D#{buffer := buffer(Data, Buffer)}}, - reverse(ActionsR)}; + if + Length =< BufferSize + byte_size(Data) -> + %% Enough data; cancel the async recv + %% and use what we have + Socket = P#params.socket, + case socket:cancel(Socket, SelectInfo) of + ok -> + handle_recv_packet_more( + P, D, ActionsR, Length, + buffer(Data, Buffer)); + {error, Reason} -> + handle_recv_packet_error_decode( + P, D, ActionsR, Reason, + condense_buffer(Data, Buffer)) + end; + true -> + %% Need to wait for the rest of the data + {next_state, + #recv{info = SelectInfo}, + {P, D#{buffer := buffer(Data, Buffer)}}, + reverse(ActionsR)} + end; {select, ?select_info(_) = SelectInfo} -> %% ?DBG(['recv select']), @@ -2433,31 +2471,41 @@ handle_recv_more(P, D, ActionsR, Length, Buffer) -> {error, {Reason, <<Data/binary>>}} -> %% ?DBG({'recv error', Reason, byte_size(Data)}), - handle_recv_error_decode( - P, D, ActionsR, Reason, condense_buffer(Data, Buffer)); + if + Length < BufferSize + byte_size(Data) -> + %% Enough data + handle_recv_packet_error_decode( + P, D, ActionsR, Reason, + condense_buffer(Data, Buffer)); + true -> + handle_recv_error( + P, D#{buffer := buffer(Data, Buffer)}, + ActionsR, Reason) + end; {error, Reason} -> %% ?DBG({'recv error', Reason}), handle_recv_error(P, D, ActionsR, Reason) end end. -handle_recv_decode(P, D, ActionsR, Data) -> +handle_recv_packet_decode(P, D, ActionsR, Data) -> %% ?DBG({}), case decode_packet(D, Data) of {D_1, ok, Decoded} -> handle_recv_deliver(P, D_1, ActionsR, Decoded); {D_1, more, Length} -> - handle_recv_more(P, D_1, ActionsR, Length, Data); + handle_recv_packet_more(P, D_1, ActionsR, Length, Data); {D_1, error, invalid} -> handle_recv_error(P, D_1, ActionsR, emsgsize); {D_1, error, Reason} -> handle_recv_error(P, D_1, ActionsR, Reason) end. -handle_recv_error_decode(P, D, ActionsR, Reason, Data) -> +handle_recv_packet_error_decode(P, D, ActionsR, Reason, Data) -> case decode_packet(D, Data) of {D_1, ok, Decoded} -> - handle_recv_error(P, D_1, ActionsR, Reason, Decoded); + handle_recv_error( + P, recv_data_deliver(P, D_1, ActionsR, Decoded), Reason); {D_1, error, invalid} -> handle_recv_error(P, D_1, ActionsR, emsgsize); {D_1, _, _} -> @@ -2497,18 +2545,10 @@ decode_packet( decode_packet(D, Data, PacketType, [{packet_size, PacketSize}]). decode_packet(D, Data, PacketType, Options) -> - case - erlang:decode_packet(PacketType, Data, Options) - of + case erlang:decode_packet(PacketType, Data, Options) of {ok, Decoded, Rest} -> %% ?DBG({ok, PacketType, byte_size(Decoded)}), {D#{buffer := Rest}, ok, Decoded}; - Other -> - decode_packet_common(D, Data, PacketType, Other) - end. - -decode_packet_common(D, Data, PacketType, Other) -> - case Other of {more, undefined} -> Length = packet_header_length(PacketType, Data), {D, more, Length}; @@ -2542,6 +2582,20 @@ packet_header_length(PacketType, Data) -> end. +%% How much to read given read_ahead, {otp,rcvbuf} and request size N +read_size(D, N) -> + case + N == 0 % "What's available" requested + orelse + (maps:get(read_ahead, D) % Read ahead configured + andalso + %% Request smaller than rcvbuf + N < maps:get({otp,rcvbuf}, D, ?RECV_BUFFER_SIZE_DEFAULT)) + of + true -> 0; + false -> N + end. + @@ -2704,6 +2758,9 @@ handle_recv_deliver(P, D, ActionsR, Data) -> handle_connected(P, recv_data_deliver(P, D, ActionsR, Data)). +handle_recv_error(P, {D, ActionsR}, Reason) -> + handle_recv_error(P, D, ActionsR, Reason). + handle_recv_error(P, D, ActionsR, Reason) -> handle_recv_error(P, D, ActionsR, Reason, undefined). %% -- 2.43.0
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor