Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:23
erlang
5741-pg-refactor-internals-for-readability.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 5741-pg-refactor-internals-for-readability.patch of Package erlang
From 1dde3119e070472709f6d28296546b2d15b3c4a3 Mon Sep 17 00:00:00 2001 From: Maxim Fedorov <maximfca@gmail.com> Date: Tue, 7 Jun 2022 09:47:09 -0700 Subject: [PATCH 1/4] [pg] refactor internals for readability Original implementation has a number of inconsistencies that makes it harder to read. This patch: - renames 'monitors' to 'local' (processes running on the local node) - renames 'nodes' to 'remote' for consnstency with 'local' - makes naming for join/leave updating server state and ETS table consistent --- lib/kernel/src/pg.erl | 184 ++++++++++++++++++++---------------------- 1 file changed, 89 insertions(+), 95 deletions(-) diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl index 573d2e6953..8d0f6124f1 100644 --- a/lib/kernel/src/pg.erl +++ b/lib/kernel/src/pg.erl @@ -204,9 +204,9 @@ which_local_groups(Scope) when is_atom(Scope) -> %% ETS table name, and also the registered process name (self()) scope :: atom(), %% monitored local processes and groups they joined - monitors = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}}, + local = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}}, %% remote node: scope process monitor and map of groups to pids for fast sync routine - nodes = #{} :: #{pid() => {reference(), #{group() => [pid()]}}} + remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}} }). -type state() :: #state{}. @@ -214,44 +214,44 @@ which_local_groups(Scope) when is_atom(Scope) -> -spec init([Scope :: atom()]) -> {ok, state()}. init([Scope]) -> ok = net_kernel:monitor_nodes(true), - %% discover all nodes in the cluster + %% discover all nodes running this scope in the cluster broadcast([{Scope, Node} || Node <- nodes()], {discover, self()}), Scope = ets:new(Scope, [set, protected, named_table, {read_concurrency, true}]), {ok, #state{scope = Scope}}. -spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()} | {leave_local, Group :: group(), Pid :: pid()}, - From :: {pid(),Tag :: any()}, + From :: {pid(), Tag :: any()}, State :: state()) -> {reply, ok | not_joined, state()}. -handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) -> - NewMons = join_monitors(PidOrPids, Group, Monitors), - join_local_group(Scope, Group, PidOrPids), - broadcast(maps:keys(Nodes), {join, self(), Group, PidOrPids}), - {reply, ok, State#state{monitors = NewMons}}; +handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) -> + NewLocal = join_local(PidOrPids, Group, Local), + join_local_update_ets(Scope, Group, PidOrPids), + broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}), + {reply, ok, State#state{local = NewLocal}}; -handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) -> - case leave_monitors(PidOrPids, Group, Monitors) of - Monitors -> +handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) -> + case leave_local(PidOrPids, Group, Local) of + Local -> {reply, not_joined, State}; - NewMons -> - leave_local_group(Scope, Group, PidOrPids), - broadcast(maps:keys(Nodes), {leave, self(), PidOrPids, [Group]}), - {reply, ok, State#state{monitors = NewMons}} + NewLocal -> + leave_local_update_ets(Scope, Group, PidOrPids), + broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}), + {reply, ok, State#state{local = NewLocal}} end; handle_call(_Request, _From, _S) -> - error(badarg). + erlang:error(badarg). -spec handle_cast( {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]}, State :: state()) -> {noreply, state()}. -handle_cast({sync, Peer, Groups}, #state{scope = Scope, nodes = Nodes} = State) -> - {noreply, State#state{nodes = handle_sync(Scope, Peer, Nodes, Groups)}}; +handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote} = State) -> + {noreply, State#state{remote = handle_sync(Scope, Peer, Remote, Groups)}}; handle_cast(_, _State) -> - error(badarg). + erlang:error(badarg). -spec handle_info( {discover, Peer :: pid()} | @@ -261,13 +261,13 @@ handle_cast(_, _State) -> {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}. %% remote pid or several pids joining the group -handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes} = State) -> - case maps:get(Peer, Nodes, []) of +handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote} = State) -> + case maps:get(Peer, Remote, []) of {MRef, RemoteGroups} -> - join_remote(Scope, Group, PidOrPids), + join_remote_update_ets(Scope, Group, PidOrPids), %% store remote group => pids map for fast sync operation - NewRemoteGroups = join_remote_map(Group, PidOrPids, RemoteGroups), - {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteGroups}}}}; + NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups), + {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}}; [] -> %% handle possible race condition, when remote node is flickering up/down, %% and remote join can happen after the node left overlay network @@ -277,12 +277,12 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes} end; %% remote pid leaving (multiple groups at once) -handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Nodes} = State) -> - case maps:get(Peer, Nodes, []) of +handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote} = State) -> + case maps:get(Peer, Remote, []) of {MRef, RemoteMap} -> - _ = leave_remote(Scope, PidOrPids, Groups), - NewRemoteMap = leave_update_remote_map(PidOrPids, RemoteMap, Groups), - {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteMap}}}}; + _ = leave_remote_update_ets(Scope, PidOrPids, Groups), + NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups), + {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}}; [] -> %% Handle race condition: remote node disconnected, but scope process %% of the remote node was just about to send 'leave' message. In this @@ -294,36 +294,36 @@ handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Node end; %% we're being discovered, let's exchange! -handle_info({discover, Peer}, #state{nodes = Nodes, monitors = Monitors} = State) -> - gen_server:cast(Peer, {sync, self(), all_local_pids(Monitors)}), +handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) -> + gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}), %% do we know who is looking for us? - case maps:is_key(Peer, Nodes) of + case maps:is_key(Peer, Remote) of true -> {noreply, State}; false -> - MRef = monitor(process, Peer), + MRef = erlang:monitor(process, Peer), erlang:send(Peer, {discover, self()}, [noconnect]), - {noreply, State#state{nodes = Nodes#{Peer => {MRef, #{}}}}} + {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}} end; %% handle local process exit -handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) when node(Pid) =:= node() -> - case maps:take(Pid, Monitors) of +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote} = State) when node(Pid) =:= node() -> + case maps:take(Pid, Local) of error -> %% this can only happen when leave request and 'DOWN' are in pg queue {noreply, State}; - {{MRef, Groups}, NewMons} -> - [leave_local_group(Scope, Group, Pid) || Group <- Groups], - %% send update to all nodes - broadcast(maps:keys(Nodes), {leave, self(), Pid, Groups}), - {noreply, State#state{monitors = NewMons}} + {{MRef, Groups}, NewLocal} -> + [leave_local_update_ets(Scope, Group, Pid) || Group <- Groups], + %% send update to all scope processes on remote nodes + broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}), + {noreply, State#state{local = NewLocal}} end; %% handle remote node down or leaving overlay network -handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, nodes = Nodes} = State) -> - {{MRef, RemoteMap}, NewNodes} = maps:take(Pid, Nodes), - maps:foreach(fun (Group, Pids) -> leave_remote(Scope, Pids, [Group]) end, RemoteMap), - {noreply, State#state{nodes = NewNodes}}; +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote} = State) -> + {{MRef, RemoteMap}, NewRemote} = maps:take(Pid, Remote), + maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, Pids, [Group]) end, RemoteMap), + {noreply, State#state{remote = NewRemote}}; %% nodedown: ignore, and wait for 'DOWN' signal for monitored process handle_info({nodedown, _Node}, State) -> @@ -337,7 +337,7 @@ handle_info({nodeup, Node}, #state{scope = Scope} = State) -> {noreply, State}; handle_info(_Info, _State) -> - error(badarg). + erlang:error(badarg). -spec terminate(Reason :: any(), State :: state()) -> true. terminate(_Reason, #state{scope = Scope}) -> @@ -355,31 +355,31 @@ ensure_local(Pids) when is_list(Pids) -> (Pid) when is_pid(Pid), node(Pid) =:= node() -> ok; (Bad) -> - error({nolocal, Bad}) + erlang:error({nolocal, Bad}) end, Pids); ensure_local(Bad) -> - error({nolocal, Bad}). + erlang:error({nolocal, Bad}). %% Override all knowledge of the remote node with information it sends %% to local node. Current implementation must do the full table scan %% to remove stale pids (just as for 'nodedown'). -handle_sync(Scope, Peer, Nodes, Groups) -> +handle_sync(Scope, Peer, Remote, Groups) -> %% can't use maps:get() because it evaluates 'default' value first, %% and in this case monitor() call has side effect. {MRef, RemoteGroups} = - case maps:find(Peer, Nodes) of + case maps:find(Peer, Remote) of error -> - {monitor(process, Peer), #{}}; + {erlang:monitor(process, Peer), #{}}; {ok, MRef0} -> MRef0 end, %% sync RemoteMap and transform ETS table _ = sync_groups(Scope, RemoteGroups, Groups), - Nodes#{Peer => {MRef, maps:from_list(Groups)}}. + Remote#{Peer => {MRef, maps:from_list(Groups)}}. sync_groups(Scope, RemoteGroups, []) -> %% leave all missing groups - [leave_remote(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; + [leave_remote_update_ets(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) -> case maps:take(Group, RemoteGroups) of {Pids, NewRemoteGroups} -> @@ -391,31 +391,31 @@ sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) -> true = ets:insert(Scope, {Group, AllNewPids, LocalPids}), sync_groups(Scope, NewRemoteGroups, Tail); error -> - join_remote(Scope, Group, Pids), + join_remote_update_ets(Scope, Group, Pids), sync_groups(Scope, RemoteGroups, Tail) end. -join_monitors(Pid, Group, Monitors) when is_pid(Pid) -> - case maps:find(Pid, Monitors) of +join_local(Pid, Group, Local) when is_pid(Pid) -> + case maps:find(Pid, Local) of {ok, {MRef, Groups}} -> - maps:put(Pid, {MRef, [Group | Groups]}, Monitors); + maps:put(Pid, {MRef, [Group | Groups]}, Local); error -> MRef = erlang:monitor(process, Pid), - Monitors#{Pid => {MRef, [Group]}} + Local#{Pid => {MRef, [Group]}} end; -join_monitors([], _Group, Monitors) -> - Monitors; -join_monitors([Pid | Tail], Group, Monitors) -> - join_monitors(Tail, Group, join_monitors(Pid, Group, Monitors)). +join_local([], _Group, Local) -> + Local; +join_local([Pid | Tail], Group, Local) -> + join_local(Tail, Group, join_local(Pid, Group, Local)). -join_local_group(Scope, Group, Pid) when is_pid(Pid) -> +join_local_update_ets(Scope, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, [Pid | All], [Pid | Local]}); [] -> ets:insert(Scope, {Group, [Pid], [Pid]}) end; -join_local_group(Scope, Group, Pids) -> +join_local_update_ets(Scope, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local}); @@ -423,14 +423,14 @@ join_local_group(Scope, Group, Pids) -> ets:insert(Scope, {Group, Pids, Pids}) end. -join_remote(Scope, Group, Pid) when is_pid(Pid) -> +join_remote_update_ets(Scope, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, [Pid | All], Local}); [] -> ets:insert(Scope, {Group, [Pid], []}) end; -join_remote(Scope, Group, Pids) -> +join_remote_update_ets(Scope, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, Pids ++ All, Local}); @@ -438,32 +438,32 @@ join_remote(Scope, Group, Pids) -> ets:insert(Scope, {Group, Pids, []}) end. -join_remote_map(Group, Pid, RemoteGroups) when is_pid(Pid) -> +join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) -> maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups); -join_remote_map(Group, Pids, RemoteGroups) -> +join_remote(Group, Pids, RemoteGroups) -> maps:update_with(Group, fun (List) -> Pids ++ List end, Pids, RemoteGroups). -leave_monitors(Pid, Group, Monitors) when is_pid(Pid) -> - case maps:find(Pid, Monitors) of +leave_local(Pid, Group, Local) when is_pid(Pid) -> + case maps:find(Pid, Local) of {ok, {MRef, [Group]}} -> erlang:demonitor(MRef), - maps:remove(Pid, Monitors); + maps:remove(Pid, Local); {ok, {MRef, Groups}} -> case lists:member(Group, Groups) of true -> - maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Monitors); + maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Local); false -> - Monitors + Local end; _ -> - Monitors + Local end; -leave_monitors([], _Group, Monitors) -> - Monitors; -leave_monitors([Pid | Tail], Group, Monitors) -> - leave_monitors(Tail, Group, leave_monitors(Pid, Group, Monitors)). +leave_local([], _Group, Local) -> + Local; +leave_local([Pid | Tail], Group, Local) -> + leave_local(Tail, Group, leave_local(Pid, Group, Local)). -leave_local_group(Scope, Group, Pid) when is_pid(Pid) -> +leave_local_update_ets(Scope, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, [Pid], [Pid]}] -> ets:delete(Scope, Group); @@ -473,7 +473,7 @@ leave_local_group(Scope, Group, Pid) when is_pid(Pid) -> %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing. true end; -leave_local_group(Scope, Group, Pids) -> +leave_local_update_ets(Scope, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> case All -- Pids of @@ -486,7 +486,7 @@ leave_local_group(Scope, Group, Pids) -> true end. -leave_remote(Scope, Pid, Groups) when is_pid(Pid) -> +leave_remote_update_ets(Scope, Pid, Groups) when is_pid(Pid) -> _ = [ case ets:lookup(Scope, Group) of [{Group, [Pid], []}] -> @@ -497,7 +497,7 @@ leave_remote(Scope, Pid, Groups) when is_pid(Pid) -> true end || Group <- Groups]; -leave_remote(Scope, Pids, Groups) -> +leave_remote_update_ets(Scope, Pids, Groups) -> _ = [ case ets:lookup(Scope, Group) of [{Group, All, Local}] -> @@ -512,9 +512,9 @@ leave_remote(Scope, Pids, Groups) -> end || Group <- Groups]. -leave_update_remote_map(Pid, RemoteMap, Groups) when is_pid(Pid) -> - leave_update_remote_map([Pid], RemoteMap, Groups); -leave_update_remote_map(Pids, RemoteMap, Groups) -> +leave_remote(Pid, RemoteMap, Groups) when is_pid(Pid) -> + leave_remote([Pid], RemoteMap, Groups); +leave_remote(Pids, RemoteMap, Groups) -> lists:foldl( fun (Group, Acc) -> case maps:get(Group, Acc) -- Pids of @@ -525,20 +525,14 @@ leave_update_remote_map(Pids, RemoteMap, Groups) -> end end, RemoteMap, Groups). -all_local_pids(Monitors) -> +all_local_pids(Local) -> maps:to_list(maps:fold( fun(Pid, {_Ref, Groups}, Acc) -> lists:foldl( fun(Group, Acc1) -> Acc1#{Group => [Pid | maps:get(Group, Acc1, [])]} - end, - Acc, - Groups - ) - end, - #{}, - Monitors - )). + end, Acc, Groups) + end, #{}, Local)). %% Works as gen_server:abcast(), but accepts a list of processes %% instead of nodes list. -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor