Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:24
erlang
7043-Move-time-out-to-clients.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 7043-Move-time-out-to-clients.patch of Package erlang
From e517d17b06fafe5ab781ab6f3043ef5382443442 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Thu, 19 Oct 2023 20:38:17 +0200 Subject: [PATCH 3/3] Move time-out to clients Reduces the time-out message passing during high load, since the server no longer acts as a proxy for client time-out messages. There is still one timer per request towards the port driver, instead of one per requesting client, and the clients use receive...after. --- lib/kernel/src/inet_gethost_native.erl | 129 ++++++++++++++++--------- 1 file changed, 86 insertions(+), 43 deletions(-) diff --git a/lib/kernel/src/inet_gethost_native.erl b/lib/kernel/src/inet_gethost_native.erl index 572785cea4..8299d83933 100644 --- a/lib/kernel/src/inet_gethost_native.erl +++ b/lib/kernel/src/inet_gethost_native.erl @@ -65,9 +65,6 @@ -define(dbg(A,B), noop). -endif. --define(SEND_AFTER(A,B,C),erlang:send_after(A,B,C)). --define(CANCEL_TIMER(A),erlang:cancel_timer(A)). - %% In erlang, IPV6 addresses are built as 8-tuples of 16bit values (not 16-tuples of octets). %% This macro, meant to be used in guards checks one such 16bit value in the 8-tuple. -define(VALID_V6(Part), is_integer(Part), Part < 65536). @@ -87,6 +84,17 @@ no_data = 0 }). +-record(request, + {rid, + req, % {Op,Proto,Data} + timer_ref, + %% + %% Timestamp in monotonic time native units, + %% for latest piggy backed client request, used to + %% restart server time-out matching latest client time-out + req_ts = undefined + }). + % The main loopstate... -record( state, @@ -152,6 +160,7 @@ terminate(_Reason, Pid) -> run_once() -> Port = do_open_port(get_poolsize(), get_extra_args()), Timeout = ?REQUEST_TIMEOUT, + persistent_term:put({?MODULE,timeout}, Timeout), RID = 1, {ClientHandle, Request} = receive @@ -194,9 +203,11 @@ server_init(Starter, Ref) -> Poolsize = get_poolsize(), Port = do_open_port(Poolsize, get_extra_args()), Timeout = ?REQUEST_TIMEOUT, + persistent_term:put({?MODULE,timeout}, Timeout), put(rid,0), put(num_requests,0), - RequestTab = ets:new(ign_requests,[set,protected]), + RequestTab = + ets:new(ign_requests,[set,protected,{keypos,#request.rid}]), RequestIndex = ets:new(ign_req_index,[set,protected]), RequestClients = ets:new(ign_req_clients, [bag,protected]), State = #state{port = Port, timeout = Timeout, @@ -249,17 +260,18 @@ handle_message({Port, {data, Data}}, State = #state{port = Port}) -> when Unit =:= ?UNIT_ERROR; Unit =:= ?UNIT_IPV4; Unit =:= ?UNIT_IPV6 -> - case ets:lookup(State#state.requests, RID) of + case ets:take(State#state.requests, RID) of [] -> %% We must have cancelled this request State; - [{_,Req}] -> + [#request{timer_ref = TimerRef, req = Req}] -> + _ = erlang:cancel_timer( + TimerRef, + [{async,true},{info,false}]), %% Clean up the request and reply to clients - ets:delete(State#state.requests, RID), ets:delete(State#state.req_index, Req), lists:foreach( - fun ({_,ClientHandle,TimerRef}) -> - _ = ?CANCEL_TIMER(TimerRef), + fun ({_,ClientHandle}) -> ClientHandle ! {ClientHandle, {ok,BinReply}} end, @@ -289,33 +301,36 @@ handle_message({Port,eof}, State = #state{port = Port}) -> NewPort=restart_port(State), main_loop(State#state{port=NewPort}); -handle_message({timeout,RID,ClientHandle}, State) -> - ClientReqMS = {RID,ClientHandle,'_'}, - case ets:match_object(State#state.req_clients, ClientReqMS) of - [ClientReq] -> - ets:delete_object(State#state.req_clients, ClientReq), - ClientHandle ! {ClientHandle, {error,timeout}}, - case ets:member(State#state.req_clients, RID) of - true -> - %% There are still waiting clients - ok; - false -> - %% The last client timed out - cancel the request - case ets:lookup(State#state.requests, RID) of - [{_,Req}] -> - ets:delete(State#state.requests,RID), - ets:delete(State#state.req_index,Req), - put(num_requests,get(num_requests) - 1), - %% Also cancel the request to the port program... - _ = catch port_command( - State#state.port, - <<RID:32,?OP_CANCEL_REQUEST>>), - ok; - [] -> - ok - end - end; +handle_message({timeout,TimerRef,RID}, State) -> + case ets:lookup(State#state.requests, RID) of [] -> + ok; + [#request{timer_ref = TimerRef, req = Req, req_ts = undefined}] -> + %% Cancel the request, let the clients do their own time-out + ets:delete(State#state.requests, RID), + ets:delete(State#state.req_index, Req), + ets:delete(State#state.req_clients, RID), + put(num_requests,get(num_requests) - 1), + %% Also cancel the request to the port program... + _ = catch port_command( + State#state.port, + <<RID:32,?OP_CANCEL_REQUEST>>), + ok; + [#request{timer_ref = TimerRef, req_ts = ReqTs}] -> + %% We may have more clients, restart the timer + TimeoutTime = + erlang:convert_time_unit(ReqTs, native, millisecond) + + State#state.timeout, + NewTimerRef = + erlang:start_timer(TimeoutTime, self(), RID, [{abs,true}]), + true = + ets:update_element( + State#state.requests, RID, + [{#request.timer_ref,NewTimerRef}, + {#request.req_ts,undefined}]), + ok; + [#request{}] -> + %% Stale timer_ref - ignore ok end, main_loop(State); @@ -331,17 +346,22 @@ handle_message(_, State) -> % Stray messages from dying ports etc. do_handle_call(ClientHandle, Req, RData, State) -> case ets:lookup(State#state.req_index, Req) of [{_,RID}] -> + true = + ets:update_element( + State#state.requests, RID, + {#request.req_ts,erlang:monotonic_time()}), ok; [] -> RID = get_rid(), _ = catch port_command(State#state.port, [<<RID:32>>|RData]), - ets:insert(State#state.requests, {RID,Req}), + Timeout = State#state.timeout, + TimerRef = erlang:start_timer(Timeout, self(), RID), + ets:insert( + State#state.requests, + #request{rid = RID, req = Req, timer_ref = TimerRef}), ets:insert(State#state.req_index, {Req,RID}) end, - TimerMsg = {timeout,RID,ClientHandle}, - TimerRef = ?SEND_AFTER(State#state.timeout, self(), TimerMsg), - ClientReq = {RID,ClientHandle,TimerRef}, - ets:insert(State#state.req_clients, ClientReq), + ets:insert(State#state.req_clients, {RID,ClientHandle}), ok. @@ -367,7 +387,7 @@ restart_port(#state{port = Port, requests = Requests}) -> %% %% Redo all requests on the new port foreach( - fun ({RID,{Op,Proto,Rdata}}) -> + fun (#request{rid = RID, req = {Op,Proto,Rdata}}) -> case Op of ?OP_GETHOSTBYNAME -> port_command( @@ -483,22 +503,45 @@ control(soft_restart) -> getit(restart_port, undefined); control(_) -> {error, formerr}. + getit(Op, Proto, Data, DefaultName) -> getit({Op, Proto, Data}, DefaultName). getit(Req, DefaultName) -> Pid = ensure_started(), + DefaultTimeout = (#state{})#state.timeout, + Timeout = persistent_term:get({?MODULE,timeout}, DefaultTimeout), + case call(Pid, Req, Timeout) of + {ok, BinHostent} -> + parse_address(BinHostent, DefaultName); + ok -> + ok; + {error, _} = Result-> + Result + end. + +call(Pid, Req, Timeout) -> ReqHandle = monitor(process, Pid, [{alias,reply_demonitor}]), Pid ! {ReqHandle, Req}, + wait_reply(ReqHandle, Timeout). + +wait_reply(ReqHandle, Timeout) -> receive - {ReqHandle, {ok,BinHostent}} -> - parse_address(BinHostent, DefaultName); {ReqHandle, Result} -> Result; {'DOWN', ReqHandle, process, _, Reason} -> {error, Reason} + after Timeout -> + case unalias(ReqHandle) of + true -> + erlang:demonitor(ReqHandle, [flush]), + {error, timeout}; + false -> + wait_reply(ReqHandle, infinity) + end end. + ensure_started() -> case whereis(?MODULE) of undefined -> -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor