Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:24
erlang
4211-Introduce-receive-optimisation-to-dets-cal...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 4211-Introduce-receive-optimisation-to-dets-calls.patch of Package erlang
From c1bf961a6e42b71317271d46cdd34985ff25b81f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Smyczy=C5=84ski?= <s.smyczynski@simplito.com> Date: Thu, 2 Jun 2022 19:04:55 +0200 Subject: [PATCH] Introduce receive optimisation to dets calls --- lib/stdlib/src/dets.erl | 98 +++++++++++++++++++--------------- lib/stdlib/test/dets_SUITE.erl | 32 ++++++++++- 2 files changed, 84 insertions(+), 46 deletions(-) diff --git a/lib/stdlib/src/dets.erl b/lib/stdlib/src/dets.erl index 640ad7a81c..133d209ae1 100644 --- a/lib/stdlib/src/dets.erl +++ b/lib/stdlib/src/dets.erl @@ -93,7 +93,8 @@ tab_name/0]). -compile({inline, [{einval,2},{badarg,2},{undefined,1}, - {badarg_exit,2},{lookup_reply,2}]}). + {badarg_exit,2},{lookup_reply,2}, + {pidof,1},{resp,2}]}). -include_lib("kernel/include/file.hrl"). @@ -1237,15 +1238,24 @@ treq(Tab, R) -> req(Proc, R) -> Ref = erlang:monitor(process, Proc), - Proc ! ?DETS_CALL(self(), R), + Proc ! ?DETS_CALL({self(), Ref}, R), receive {'DOWN', Ref, process, Proc, _Info} -> badarg; - {Proc, Reply} -> + {Ref, Reply} -> erlang:demonitor(Ref, [flush]), Reply end. +%% Inlined. +pidof({Pid, _Tag}) -> + Pid. + +%% Inlined. +resp({Pid, Tag} = _From, Message) -> + Pid ! {Tag, Message}, + ok. + %% Inlined. einval({error, {file_error, _, einval}}, A) -> erlang:error(badarg, A); @@ -1398,7 +1408,7 @@ apply_op(Op, From, Head, N) -> true -> err({error, incompatible_arguments}) end, - From ! {self(), Res}, + resp(From, Res), ok; auto_save -> case Head#head.update_mode of @@ -1419,7 +1429,7 @@ apply_op(Op, From, Head, N) -> {0, Head} end; close -> - From ! {self(), fclose(Head)}, + resp(From, fclose(Head)), _NewHead = unlink_fixing_procs(Head), ?PROFILE(ep:done()), exit(normal); @@ -1427,25 +1437,25 @@ apply_op(Op, From, Head, N) -> %% Used from dets_server when Pid has closed the table, %% but the table is still opened by some process. NewHead = remove_fix(Head, Pid, close), - From ! {self(), status(NewHead)}, + resp(From, status(NewHead)), NewHead; {corrupt, Reason} -> {H2, Error} = dets_utils:corrupt_reason(Head, Reason), - From ! {self(), Error}, + resp(From, Error), H2; {delayed_write, WrTime} -> delayed_write(Head, WrTime); info -> {H2, Res} = finfo(Head), - From ! {self(), Res}, + resp(From, Res), H2; {info, Tag} -> {H2, Res} = finfo(Head, Tag), - From ! {self(), Res}, + resp(From, Res), H2; {is_compatible_bchunk_format, Term} -> Res = test_bchunk_format(Head, Term), - From ! {self(), Res}, + resp(From, Res), ok; {internal_open, Ref, Args} -> do_internal_open(Head#head.parent, Head#head.server, From, @@ -1462,27 +1472,27 @@ apply_op(Op, From, Head, N) -> end; {set_verbose, What} -> set_verbose(What), - From ! {self(), ok}, + resp(From, ok), ok; {where, Object} -> {H2, Res} = where_is_object(Head, Object), - From ! {self(), Res}, + resp(From, Res), H2; _Message when element(1, Head#head.update_mode) =:= error -> - From ! {self(), status(Head)}, + resp(From, status(Head)), ok; %% The following messages assume that the status of the table is OK. {bchunk_init, Tab} -> {H2, Res} = do_bchunk_init(Head, Tab), - From ! {self(), Res}, + resp(From, Res), H2; {bchunk, State} -> {H2, Res} = do_bchunk(Head, State), - From ! {self(), Res}, + resp(From, Res), H2; delete_all_objects -> {H2, Res} = fdelete_all_objects(Head), - From ! {self(), Res}, + resp(From, Res), erlang:garbage_collect(), {0, H2}; {delete_key, _Keys} when Head#head.update_mode =:= dirty -> @@ -1492,16 +1502,16 @@ apply_op(Op, From, Head, N) -> true -> stream_op(Op, From, [], Head, N); false -> - From ! {self(), badarg}, + resp(From, badarg), ok end; first -> {H2, Res} = ffirst(Head), - From ! {self(), Res}, + resp(From, Res), H2; {initialize, InitFun, Format, MinNoSlots} -> {H2, Res} = finit(Head, InitFun, Format, MinNoSlots), - From ! {self(), Res}, + resp(From, Res), erlang:garbage_collect(), H2; {insert, Objs} when Head#head.update_mode =:= dirty -> @@ -1509,12 +1519,12 @@ apply_op(Op, From, Head, N) -> true -> stream_op(Op, From, [], Head, N); false -> - From ! {self(), badarg}, + resp(From, badarg), ok end; {insert_new, Objs} when Head#head.update_mode =:= dirty -> {H2, Res} = finsert_new(Head, Objs), - From ! {self(), Res}, + resp(From, Res), {N + 1, H2}; {lookup_keys, _Keys} -> stream_op(Op, From, [], Head, N); @@ -1523,48 +1533,48 @@ apply_op(Op, From, Head, N) -> H2 = case Res of {cont,_} -> H1; _ when Safe =:= no_safe-> H1; - _ when Safe =:= safe -> do_safe_fixtable(H1, From, false) + _ when Safe =:= safe -> do_safe_fixtable(H1, pidof(From), false) end, - From ! {self(), Res}, + resp(From, Res), H2; {match, MP, Spec, NObjs, Safe} -> {H2, Res} = fmatch(Head, MP, Spec, NObjs, Safe, From), - From ! {self(), Res}, + resp(From, Res), H2; {member, _Key} = Op -> stream_op(Op, From, [], Head, N); {next, Key} -> {H2, Res} = fnext(Head, Key), - From ! {self(), Res}, + resp(From, Res), H2; {match_delete, State} when Head#head.update_mode =:= dirty -> {H1, Res} = fmatch_delete(Head, State), H2 = case Res of {cont,_S,_N} -> H1; - _ -> do_safe_fixtable(H1, From, false) + _ -> do_safe_fixtable(H1, pidof(From), false) end, - From ! {self(), Res}, + resp(From, Res), {N + 1, H2}; {match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty -> {H2, Res} = fmatch_delete_init(Head, MP, Spec, From), - From ! {self(), Res}, + resp(From, Res), {N + 1, H2}; {safe_fixtable, Bool} -> - NewHead = do_safe_fixtable(Head, From, Bool), - From ! {self(), ok}, + NewHead = do_safe_fixtable(Head, pidof(From), Bool), + resp(From, ok), NewHead; {slot, Slot} -> {H2, Res} = fslot(Head, Slot), - From ! {self(), Res}, + resp(From, Res), H2; sync -> {NewHead, Res} = perform_save(Head, true), - From ! {self(), Res}, + resp(From, Res), erlang:garbage_collect(), {0, NewHead}; {update_counter, Key, Incr} when Head#head.update_mode =:= dirty -> {NewHead, Res} = do_update_counter(Head, Key, Incr), - From ! {self(), Res}, + resp(From, Res), {N + 1, NewHead}; WriteOp when Head#head.update_mode =:= new_dirty -> H2 = Head#head{update_mode = dirty}, @@ -1577,12 +1587,12 @@ apply_op(Op, From, Head, N) -> H2 = Head#head{update_mode = dirty}, apply_op(WriteOp, From, H2, 0); {NewHead, Error} when is_record(NewHead, head) -> - From ! {self(), Error}, + resp(From, Error), NewHead end; WriteOp when is_tuple(WriteOp), Head#head.access =:= read -> Reason = {access_mode, Head#head.filename}, - From ! {self(), err({error, Reason})}, + resp(From, err({error, Reason})), ok end. @@ -1603,7 +1613,7 @@ bug_found(Name, Op, Bad, Stacktrace, From) -> end, if From =/= self() -> - From ! {self(), {error, {dets_bug, Name, Op, Bad}}}, + resp(From, {error, {dets_bug, Name, Op, Bad}}), ok; true -> % auto_save | may_grow | {delayed_write, _} ok @@ -1613,10 +1623,10 @@ do_internal_open(Parent, Server, From, Ref, Args) -> ?PROFILE(ep:do()), case do_open_file(Args, Parent, Server, Ref) of {ok, Head} -> - From ! {self(), ok}, + resp(From, ok), Head; Error -> - From ! {self(), Error}, + resp(From, Error), exit(normal) end. @@ -1698,7 +1708,7 @@ stream_end1(Pids, Next, N, C, Head, PwriteList) -> stream_end2(Pids, Pids, Next, N, C, Head1, PR). stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) -> - Pid ! {self(), Reply}, + resp(Pid, Reply), stream_end2(Pids, Ps, Next, N+1, C, Head, Reply); stream_end2([], Ps, no_more, N, C, Head, _Reply) -> penalty(Head, Ps, C), @@ -1710,7 +1720,7 @@ penalty(H, _Ps, _C) when H#head.fixed =:= false -> ok; penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) -> ok; -penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) -> +penalty(#head{fixed = {_,[{Pid, _}]}}, [{Pid, _Tag} = _From], _C) -> ok; penalty(_H, _Ps, _C) -> timer:sleep(1). @@ -1729,9 +1739,9 @@ lookup_replies(P, O, [{P2,O2} | L]) -> %% If a list of Pid then op was {member, Key}. Inlined. lookup_reply([P], O) -> - P ! {self(), O =/= []}; + resp(P, O =/= []); lookup_reply(P, O) -> - P ! {self(), O}. + resp(P, O). %%----------------------------------------------------------------- %% Callback functions for system messages handling. @@ -2253,7 +2263,7 @@ fmatch(Head, MP, Spec, N, Safe, From) -> {Head1, []} -> NewHead = case Safe of - safe -> do_safe_fixtable(Head1, From, true); + safe -> do_safe_fixtable(Head1, pidof(From), true); no_safe -> Head1 end, C0 = init_scan(NewHead, N), @@ -2370,7 +2380,7 @@ do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'), _From) Reply end; do_fmatch_delete_var_keys(Head, MP, _Spec, From) -> - Head1 = do_safe_fixtable(Head, From, true), + Head1 = do_safe_fixtable(Head, pidof(From), true), {NewHead, []} = write_cache(Head1), C0 = init_scan(NewHead, default), {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}. diff --git a/lib/stdlib/test/dets_SUITE.erl b/lib/stdlib/test/dets_SUITE.erl index c50a2d5baf..8fa877f5a5 100644 --- a/lib/stdlib/test/dets_SUITE.erl +++ b/lib/stdlib/test/dets_SUITE.erl @@ -46,7 +46,7 @@ otp_5487/1, otp_6206/1, otp_6359/1, otp_4738/1, otp_7146/1, otp_8070/1, otp_8856/1, otp_8898/1, otp_8899/1, otp_8903/1, otp_8923/1, otp_9282/1, otp_11245/1, otp_11709/1, otp_13229/1, - otp_13260/1, otp_13830/1]). + otp_13260/1, otp_13830/1, receive_optimisation/1]). -export([dets_dirty_loop/0]). @@ -94,7 +94,7 @@ all() -> insert_new, repair_continuation, otp_5487, otp_6206, otp_6359, otp_4738, otp_7146, otp_8070, otp_8856, otp_8898, otp_8899, otp_8903, otp_8923, otp_9282, otp_11245, otp_11709, - otp_13229, otp_13260, otp_13830 + otp_13229, otp_13260, otp_13830, receive_optimisation ]. groups() -> @@ -3492,6 +3492,34 @@ otp_13830(Config) -> {ok, Tab} = dets:open_file(Tab, [{file, File}, {version, default}]), ok = dets:close(Tab). +receive_optimisation(Config) -> + Tab = dets_receive_optimisation_test, + FName = filename(Tab, Config), + + % Spam message box + lists:foreach(fun(_) -> self() ! {spam, it} end, lists:seq(1, 1_000_000)), + + {ok, _} = dets:open_file(Tab,[{file, FName}]), + ok = dets:insert(Tab,{one, record}), + + StartTime = os:system_time(millisecond), + + % We expect one thousand of simple lookups to finish in one second + Lookups = 1000, + Timeout = 1000, + Loop = fun Loop(N) when N =< 0 -> ok; + Loop(N) -> + Now = os:system_time(millisecond), + (Now - StartTime > Timeout) andalso throw({timeout_after, Lookups - N}), + [{one, record}] = dets:lookup(Tab, one), + Loop(N-1) + end, + + ok = Loop(Lookups), + + ok = dets:close(Tab), + ok = file:delete(FName). + %% %% Parts common to several test cases %% -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor