Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:23
erlang
5742-pg-Implement-group-membership-monitoring-f...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 5742-pg-Implement-group-membership-monitoring-for-the-sco.patch of Package erlang
From fadfcff7ac17538db6100350585aac6407d70305 Mon Sep 17 00:00:00 2001 From: Maxim Fedorov <maximfca@gmail.com> Date: Tue, 7 Jun 2022 13:47:52 -0700 Subject: [PATCH 2/4] [pg] Implement group membership monitoring for the scope This commit adds monitor_scope/0,1 that implements group membership monitoring. Caller gets the current map of groups to processes, and atomically subscribes to all changes to this map. --- lib/kernel/doc/src/pg.xml | 29 +++++- lib/kernel/src/pg.erl | 194 ++++++++++++++++++++++++++--------- lib/kernel/test/pg_SUITE.erl | 96 +++++++++++++++-- 3 files changed, 262 insertions(+), 57 deletions(-) diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml index 9b6d20f818..85b1529f44 100644 --- a/lib/kernel/doc/src/pg.xml +++ b/lib/kernel/doc/src/pg.xml @@ -6,7 +6,7 @@ <erlref> <header> <copyright> - <year>2020</year><year>2020</year> + <year>2020</year><year>2022</year> <holder>Maxim Fedorov, WhatsApp Inc.</holder> </copyright> <legalnotice> @@ -149,6 +149,33 @@ </desc> </func> + <func> + <name name="monitor_scope" arity="0" since="OTP 25.1"/> + <name name="monitor_scope" arity="1" since="OTP 25.1"/> + <fsummary>Starts group membership monitoring for a scope.</fsummary> + <desc> + <p>Subscribes the caller to updates from the specified scope. Returns + content of the entire scope and a reference to match the upcoming + notifications.</p> + + <p>Whenever any group membership changes, an update message is sent + to the subscriber:</p> + <code type="none">{Ref, join, Group, [JoinPid1, JoinPid2]}</code> + <code type="none">{Ref, leave, Group, [LeavePid1]}</code> + </desc> + </func> + + <func> + <name name="demonitor" arity="1" since="OTP 25.1"/> + <name name="demonitor" arity="2" since="OTP 25.1"/> + <fsummary>Stops group membership monitoring.</fsummary> + <desc> + <p>Unsubscribes the caller from updates off the specified scope. + Flushes all outstanding updates that were already in the message + queue of the calling process.</p> + </desc> + </func> + <func> <name name="get_local_members" arity="1" since="OTP 23.0"/> <name name="get_local_members" arity="2" since="OTP 23.0"/> diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl index 8d0f6124f1..845021c7fa 100644 --- a/lib/kernel/src/pg.erl +++ b/lib/kernel/src/pg.erl @@ -60,6 +60,10 @@ join/2, leave/2, + monitor_scope/0, + monitor_scope/1, + demonitor/1, + demonitor/2, get_members/1, get_local_members/1, which_groups/0, @@ -142,6 +146,32 @@ leave(Scope, Group, PidOrPids) when is_pid(PidOrPids); is_list(PidOrPids) -> ok = ensure_local(PidOrPids), gen_server:call(Scope, {leave_local, Group, PidOrPids}, infinity). +%%-------------------------------------------------------------------- +%% @doc +%% Returns currently known groups, and begins monitoring +%% all group changes. Calling process will receive {Ref, join, Group, Pids} +%% message when new Pids join the Group, and {Ref, leave, Group, Pids} when +%% Pids leave the group. +-spec monitor_scope() -> {reference(), #{group() => [pid()]}}. +monitor_scope() -> + monitor_scope(?DEFAULT_SCOPE). + +-spec monitor_scope(Scope :: atom()) -> {reference(), #{group() => [pid()]}}. +monitor_scope(Scope) -> + gen_server:call(Scope, monitor, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Stops monitoring Scope for groups changes. Flushes all +%% {Ref, join|leave, Group, Pids} messages from the calling process queue. +-spec demonitor(Ref :: reference()) -> ok | false. +demonitor(Ref) -> + pg:demonitor(?DEFAULT_SCOPE, Ref). + +-spec demonitor(Scope :: atom(), Ref :: reference()) -> ok | false. +demonitor(Scope, Ref) -> + gen_server:call(Scope, {demonitor, Ref}, infinity) =:= ok andalso flush(Ref). + %%-------------------------------------------------------------------- %% @doc %% Returns all processes in a group @@ -206,7 +236,9 @@ which_local_groups(Scope) when is_atom(Scope) -> %% monitored local processes and groups they joined local = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}}, %% remote node: scope process monitor and map of groups to pids for fast sync routine - remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}} + remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}, + %% processes monitoring group membership + scope_monitors = #{} :: #{reference() => pid()} }). -type state() :: #state{}. @@ -220,26 +252,46 @@ init([Scope]) -> {ok, #state{scope = Scope}}. -spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()} - | {leave_local, Group :: group(), Pid :: pid()}, + | {leave_local, Group :: group(), Pid :: pid()} + | monitor + | {demonitor, Ref :: reference()}, From :: {pid(), Tag :: any()}, - State :: state()) -> {reply, ok | not_joined, state()}. + State :: state()) -> + {reply, ok | not_joined | {reference(), #{group() => [pid()]}} | false, state()}. -handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) -> +handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, + remote = Remote, scope_monitors = ScopeMon} = State) -> NewLocal = join_local(PidOrPids, Group, Local), - join_local_update_ets(Scope, Group, PidOrPids), + join_local_update_ets(Scope, ScopeMon, Group, PidOrPids), broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}), {reply, ok, State#state{local = NewLocal}}; -handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) -> +handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, + remote = Remote, scope_monitors = ScopeMon} = State) -> case leave_local(PidOrPids, Group, Local) of Local -> {reply, not_joined, State}; NewLocal -> - leave_local_update_ets(Scope, Group, PidOrPids), + leave_local_update_ets(Scope, ScopeMon, Group, PidOrPids), broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}), {reply, ok, State#state{local = NewLocal}} end; +handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMon} = State) -> + %% next line could also be done with iterating over process state, but it appears to be slower + Local = maps:from_list([{G,P} || [G,P] <- ets:match(Scope, {'$1', '_', '$2'})]), + MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef + {reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}}; + +handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon} = State) -> + case maps:take(Ref, ScopeMon) of + {_, NewMons} -> + erlang:demonitor(Ref), + {reply, ok, State#state{scope_monitors = NewMons}}; + error -> + {reply, false, State} + end; + handle_call(_Request, _From, _S) -> erlang:error(badarg). @@ -247,8 +299,8 @@ handle_call(_Request, _From, _S) -> {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]}, State :: state()) -> {noreply, state()}. -handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote} = State) -> - {noreply, State#state{remote = handle_sync(Scope, Peer, Remote, Groups)}}; +handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) -> + {noreply, State#state{remote = handle_sync(Scope, ScopeMon, Peer, Remote, Groups)}}; handle_cast(_, _State) -> erlang:error(badarg). @@ -261,10 +313,10 @@ handle_cast(_, _State) -> {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}. %% remote pid or several pids joining the group -handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote} = State) -> +handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) -> case maps:get(Peer, Remote, []) of {MRef, RemoteGroups} -> - join_remote_update_ets(Scope, Group, PidOrPids), + join_remote_update_ets(Scope, ScopeMon, Group, PidOrPids), %% store remote group => pids map for fast sync operation NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups), {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}}; @@ -277,10 +329,10 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remot end; %% remote pid leaving (multiple groups at once) -handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote} = State) -> +handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) -> case maps:get(Peer, Remote, []) of {MRef, RemoteMap} -> - _ = leave_remote_update_ets(Scope, PidOrPids, Groups), + _ = leave_remote_update_ets(Scope, ScopeMon, PidOrPids, Groups), NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups), {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}}; [] -> @@ -307,23 +359,28 @@ handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) -> end; %% handle local process exit -handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote} = State) when node(Pid) =:= node() -> +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote, + scope_monitors = ScopeMon} = State) when node(Pid) =:= node() -> case maps:take(Pid, Local) of error -> - %% this can only happen when leave request and 'DOWN' are in pg queue - {noreply, State}; + maybe_drop_monitor(MRef, State); {{MRef, Groups}, NewLocal} -> - [leave_local_update_ets(Scope, Group, Pid) || Group <- Groups], + [leave_local_update_ets(Scope, ScopeMon, Group, Pid) || Group <- Groups], %% send update to all scope processes on remote nodes broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}), {noreply, State#state{local = NewLocal}} end; -%% handle remote node down or leaving overlay network -handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote} = State) -> - {{MRef, RemoteMap}, NewRemote} = maps:take(Pid, Remote), - maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, Pids, [Group]) end, RemoteMap), - {noreply, State#state{remote = NewRemote}}; +%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down +handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote, + scope_monitors = ScopeMon} = State) -> + case maps:take(Pid, Remote) of + {{MRef, RemoteMap}, NewRemote} -> + maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) end, RemoteMap), + {noreply, State#state{remote = NewRemote}}; + error -> + maybe_drop_monitor(MRef, State) + end; %% nodedown: ignore, and wait for 'DOWN' signal for monitored process handle_info({nodedown, _Node}, State) -> @@ -363,7 +420,7 @@ ensure_local(Bad) -> %% Override all knowledge of the remote node with information it sends %% to local node. Current implementation must do the full table scan %% to remove stale pids (just as for 'nodedown'). -handle_sync(Scope, Peer, Remote, Groups) -> +handle_sync(Scope, ScopeMon, Peer, Remote, Groups) -> %% can't use maps:get() because it evaluates 'default' value first, %% and in this case monitor() call has side effect. {MRef, RemoteGroups} = @@ -374,25 +431,25 @@ handle_sync(Scope, Peer, Remote, Groups) -> MRef0 end, %% sync RemoteMap and transform ETS table - _ = sync_groups(Scope, RemoteGroups, Groups), + _ = sync_groups(Scope, ScopeMon, RemoteGroups, Groups), Remote#{Peer => {MRef, maps:from_list(Groups)}}. -sync_groups(Scope, RemoteGroups, []) -> +sync_groups(Scope, ScopeMon, RemoteGroups, []) -> %% leave all missing groups - [leave_remote_update_ets(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; -sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) -> + [leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)]; +sync_groups(Scope, ScopeMon, RemoteGroups, [{Group, Pids} | Tail]) -> case maps:take(Group, RemoteGroups) of {Pids, NewRemoteGroups} -> - sync_groups(Scope, NewRemoteGroups, Tail); + sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail); {OldPids, NewRemoteGroups} -> [{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group), %% should be really rare... AllNewPids = Pids ++ AllOldPids -- OldPids, true = ets:insert(Scope, {Group, AllNewPids, LocalPids}), - sync_groups(Scope, NewRemoteGroups, Tail); + sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail); error -> - join_remote_update_ets(Scope, Group, Pids), - sync_groups(Scope, RemoteGroups, Tail) + join_remote_update_ets(Scope, ScopeMon, Group, Pids), + sync_groups(Scope, ScopeMon, RemoteGroups, Tail) end. join_local(Pid, Group, Local) when is_pid(Pid) -> @@ -408,35 +465,39 @@ join_local([], _Group, Local) -> join_local([Pid | Tail], Group, Local) -> join_local(Tail, Group, join_local(Pid, Group, Local)). -join_local_update_ets(Scope, Group, Pid) when is_pid(Pid) -> +join_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, [Pid | All], [Pid | Local]}); [] -> ets:insert(Scope, {Group, [Pid], [Pid]}) - end; -join_local_update_ets(Scope, Group, Pids) -> + end, + notify_group(ScopeMon, join, Group, [Pid]); +join_local_update_ets(Scope, ScopeMon, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local}); [] -> ets:insert(Scope, {Group, Pids, Pids}) - end. + end, + notify_group(ScopeMon, join, Group, Pids). -join_remote_update_ets(Scope, Group, Pid) when is_pid(Pid) -> +join_remote_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, [Pid | All], Local}); [] -> ets:insert(Scope, {Group, [Pid], []}) - end; -join_remote_update_ets(Scope, Group, Pids) -> + end, + notify_group(ScopeMon, join, Group, [Pid]); +join_remote_update_ets(Scope, ScopeMon, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> ets:insert(Scope, {Group, Pids ++ All, Local}); [] -> ets:insert(Scope, {Group, Pids, []}) - end. + end, + notify_group(ScopeMon, join, Group, Pids). join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) -> maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups); @@ -463,17 +524,19 @@ leave_local([], _Group, Local) -> leave_local([Pid | Tail], Group, Local) -> leave_local(Tail, Group, leave_local(Pid, Group, Local)). -leave_local_update_ets(Scope, Group, Pid) when is_pid(Pid) -> +leave_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) -> case ets:lookup(Scope, Group) of [{Group, [Pid], [Pid]}] -> - ets:delete(Scope, Group); + ets:delete(Scope, Group), + notify_group(ScopeMon, leave, Group, [Pid]); [{Group, All, Local}] -> - ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}); + ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}), + notify_group(ScopeMon, leave, Group, [Pid]); [] -> %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing. true end; -leave_local_update_ets(Scope, Group, Pids) -> +leave_local_update_ets(Scope, ScopeMon, Group, Pids) -> case ets:lookup(Scope, Group) of [{Group, All, Local}] -> case All -- Pids of @@ -481,23 +544,26 @@ leave_local_update_ets(Scope, Group, Pids) -> ets:delete(Scope, Group); NewAll -> ets:insert(Scope, {Group, NewAll, Local -- Pids}) - end; + end, + notify_group(ScopeMon, leave, Group, Pids); [] -> true end. -leave_remote_update_ets(Scope, Pid, Groups) when is_pid(Pid) -> +leave_remote_update_ets(Scope, ScopeMon, Pid, Groups) when is_pid(Pid) -> _ = [ case ets:lookup(Scope, Group) of [{Group, [Pid], []}] -> - ets:delete(Scope, Group); + ets:delete(Scope, Group), + notify_group(ScopeMon, leave, Group, [Pid]); [{Group, All, Local}] -> - ets:insert(Scope, {Group, lists:delete(Pid, All), Local}); + ets:insert(Scope, {Group, lists:delete(Pid, All), Local}), + notify_group(ScopeMon, leave, Group, [Pid]); [] -> true end || Group <- Groups]; -leave_remote_update_ets(Scope, Pids, Groups) -> +leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) -> _ = [ case ets:lookup(Scope, Group) of [{Group, All, Local}] -> @@ -506,7 +572,8 @@ leave_remote_update_ets(Scope, Pids, Groups) -> ets:delete(Scope, Group); NewAll -> ets:insert(Scope, {Group, NewAll, Local}) - end; + end, + notify_group(ScopeMon, leave, Group, Pids); [] -> true end || @@ -543,3 +610,32 @@ broadcast([Dest | Tail], Msg) -> %% join/leave messages when dist buffer is full erlang:send(Dest, Msg, [noconnect]), broadcast(Tail, Msg). + + +%% drops a monitor if DOWN was received +maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon} = State) -> + %% could be a local monitor going DOWN. Since it's a rare event, check should + %% not stay in front of any other, more frequent events + case maps:take(MRef, ScopeMon) of + error -> + %% this can only happen when leave request and 'DOWN' are in pg queue + {noreply, State}; + {_Pid, NewScopeMon} -> + {noreply, State#state{scope_monitors = NewScopeMon}} + end. + +%% notify all scope monitors about an Action in Groups for Pids +notify_group(ScopeMon, Action, Group, Pids) -> + maps:foreach( + fun (Ref, Pid) -> + erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect]) + end, ScopeMon). + +%% remove all messages that were send to monitor groups +flush(Ref) -> + receive + {Ref, Verb, _Group, _Pids} when Verb =:= join; Verb =:= leave -> + flush(Ref) + after 0 -> + ok + end. diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl index a6708dc419..5d56b751e0 100644 --- a/lib/kernel/test/pg_SUITE.erl +++ b/lib/kernel/test/pg_SUITE.erl @@ -54,7 +54,8 @@ missing_scope_join/1, disconnected_start/1, forced_sync/0, forced_sync/1, - group_leave/1 + group_leave/1, + monitor_scope/0, monitor_scope/1 ]). -export([ @@ -72,15 +73,16 @@ end_per_testcase(TestCase, _Config) -> ok. all() -> - [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}]. + [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}, {group, monitor}]. groups() -> [ {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing]}, - {performance, [sequential], [thundering_herd]}, + {performance, [], [thundering_herd]}, {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit, exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave, - disconnected_start, forced_sync, group_leave]} + disconnected_start, forced_sync, group_leave]}, + {monitor, [parallel], [monitor_scope]} ]. %%-------------------------------------------------------------------- @@ -262,13 +264,13 @@ empty_group_by_remote_leave(Config) when is_list(Config) -> sync({?FUNCTION_NAME, TwoPeer}), ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), % inspecting internal state is not best practice, but there's no other way to check if the state is correct. - {state, _, _, #{RemoteNode := {_, RemoteMap}}} = sys:get_state(?FUNCTION_NAME), + {_, RemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))), ?assertEqual(#{?FUNCTION_NAME => [RemotePid]}, RemoteMap), % remote leave ?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])), sync({?FUNCTION_NAME, TwoPeer}), ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), - {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME), + {_, NewRemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))), % empty group should be deleted. ?assertEqual(#{}, NewRemoteMap), @@ -281,7 +283,7 @@ empty_group_by_remote_leave(Config) when is_list(Config) -> ?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid2, RemotePid]])), sync({?FUNCTION_NAME, TwoPeer}), ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)), - {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME), + {_, NewRemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))), stop_node(TwoPeer, Socket), ok. @@ -558,6 +560,85 @@ group_leave(Config) when is_list(Config) -> ?assertEqual([], pg:get_members(?FUNCTION_NAME, two)), ok. +monitor_scope() -> + [{doc, "Tests monitor_scope/1 and demonitor/2"}]. + +monitor_scope(Config) when is_list(Config) -> + Self = self(), + Scope = ?FUNCTION_NAME, + Group = ?FUNCTION_ARITY, + %% ensure that demonitoring returns 'false' when monitor is not installed + ?assertEqual(false, pg:demonitor(Scope, erlang:make_ref())), + %% start the actual test case + {Ref, #{}} = pg:monitor_scope(Scope), + %% local join + ?assertEqual(ok, pg:join(Scope, Group, Self)), + wait_message(Ref, join, Group, [Self], "Local"), + %% start second monitor (which has 1 local pid at the start) + SecondMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self) end), + Ref2 = receive {second, SecondRef} -> SecondRef end, + %% start a remote node, and a remote monitor + {Peer, Node} = spawn_node(Scope), + ScopePid = whereis(Scope), + %% do not care about the remote monitor, it is started only to check DOWN handling + _ThirdMonitor = spawn(Node, fun() -> second_monitor(ScopePid, Group, Self) end), + %% remote join + RemotePid = erlang:spawn(Node, forever()), + ?assertEqual(ok, rpc:call(Node, pg, join, [Scope, Group, [RemotePid, RemotePid]])), + wait_message(Ref, join, Group, [RemotePid, RemotePid], "Remote"), + %% verify leave event + ?assertEqual([Self], pg:get_local_members(Scope, Group)), + ?assertEqual(ok, pg:leave(Scope, Group, self())), + wait_message(Ref, leave, Group, [Self], "Local"), + %% remote leave + ?assertEqual(ok, rpc:call(Node, pg, leave, [Scope, Group, RemotePid])), + wait_message(Ref, leave, Group, [RemotePid], "Remote"), + %% drop the SecondMonitor - this keeps original and remote monitors + SecondMonMsgs = gen_server:call(SecondMonitor, flush), + %% inspect the queue, it should contain double remote join, then single local and single remove leave + ?assertEqual([ + {Ref2, join, Group, [RemotePid, RemotePid]}, + {Ref2, leave, Group, [Self]}, + {Ref2, leave, Group, [RemotePid]}], + SecondMonMsgs), + %% remote leave via stop (causes remote monitor to go DOWN) + wait_message(Ref, leave, Group, [RemotePid], "Remote stop"), + %% WHITE BOX: knowing pg state internals - only the original monitor should stay + {state, _, _, _, InternalMonitors} = sys:get_state(?FUNCTION_NAME), + ?assertEqual(#{Ref => Self}, InternalMonitors, "pg did not remove DOWNed monitor"), + %% demonitor + ?assertEqual(ok, pg:demonitor(Scope, Ref)), + ?assertEqual(false, pg:demonitor(Scope, Ref)), + %% ensure messages don't come + ?assertEqual(ok, pg:join(Scope, Group, Self)), + sync(Scope), + %% join should not be here + receive {Ref, Action, Group, [Self]} -> ?assert(false, lists:concat(["Unexpected ", Action, "event"])) + after 0 -> ok end. + +wait_message(Ref, Action, Group, Pids, Msg) -> + receive + {Ref, Action, Group, Pids} -> + ok + after 1000 -> + {messages, Msgs} = process_info(self(), messages), + ct:pal("Message queue: ~0p", [Msgs]), + ?assert(false, Msg ++ " " ++ atom_to_list(Action) ++ " event failed to occur") + end. + +second_monitor(Scope, Group, Control) -> + {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope), + Control ! {second, Ref}, + second_monitor([]). + +second_monitor(Msgs) -> + receive + {'$gen_call', Reply, flush} -> + gen:reply(Reply, lists:reverse(Msgs)); + Msg -> + second_monitor([Msg | Msgs]) + end. + %%-------------------------------------------------------------------- %% Test Helpers - start/stop additional Erlang nodes -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor