Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:23
erlang
4251-mnesia-Improve-dirty-writes-consistency.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 4251-mnesia-Improve-dirty-writes-consistency.patch of Package erlang
From 1731534c5e97731af691eb1228081b0ca3af3c28 Mon Sep 17 00:00:00 2001 From: Dan Gudmundsson <dgud@erlang.org> Date: Tue, 24 Jan 2023 15:08:22 +0100 Subject: [PATCH 1/4] mnesia: Improve dirty writes consistency Improve table copying timing, where dirty writes can cause consistency issues during mnesia:add_table_copy/3 call. I.e. don't set where_to_read to early. Keep subscribers longer (sync with mnesia_tm for writes in the msg queue), and let reciever wait until copier is done. --- lib/mnesia/src/mnesia_loader.erl | 36 +++++---- lib/mnesia/src/mnesia_schema.erl | 1 + lib/mnesia/src/mnesia_tm.erl | 88 ++++++++++++++------- lib/mnesia/test/mnesia_consistency_test.erl | 16 ++-- lib/mnesia/test/mt.erl | 22 +++--- 5 files changed, 104 insertions(+), 59 deletions(-) diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl index 4dea366964..ed5dba3a16 100644 --- a/lib/mnesia/src/mnesia_loader.erl +++ b/lib/mnesia/src/mnesia_loader.erl @@ -222,7 +222,6 @@ do_get_network_copy(Tab, Reason, Ns, Storage, Cs) -> ok -> set({Tab, load_node}, Node), set({Tab, load_reason}, Reason), - mnesia_controller:i_have_tab(Tab, Cs), dbg_out("Table ~tp copied from ~p to ~p~n", [Tab, Node, node()]), {loaded, ok}; Err = {error, _} when element(1, Reason) == dumper -> @@ -291,7 +290,11 @@ init_receiver(Node, Tab,Storage,Cs,Reason) -> {atomic, {error,Result}} -> fatal("Cannot create table ~tp: ~tp~n", [[Tab, Storage], Result]); - {atomic, Result} -> Result; + {atomic, ok} -> + mnesia_controller:i_have_tab(Tab, Cs), + ok; + {atomic, Result} -> + Result; {aborted, nomore} -> restart; {aborted, _Reas} -> verbose("Receiver failed on ~tp from ~p:~nReason: ~tp~n", @@ -555,14 +558,17 @@ init_table(Tab, _, Fun, _DetsInfo,_) -> finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) -> TabRef = {Storage, Tab}, - subscr_postprocess(TabRef, Cs#cstruct.record_name), case handle_last(TabRef, Cs#cstruct.type, DatBin) of ok -> - mnesia_index:init_index(Tab, Storage), - snmpify(Tab, Storage), + subscr_postprocess(TabRef, Cs#cstruct.record_name), %% OrigTabRec must not be the spawned tab-receiver %% due to old protocol. SenderPid ! {OrigTabRec, no_more}, + Ref = monitor(process, SenderPid), + %% and all remaining events + subscr_receiver(TabRef, Cs#cstruct.record_name, Ref), + mnesia_index:init_index(Tab, Storage), + snmpify(Tab, Storage), mnesia_tm:unblock_tab(Tab), ok; {error, Reason} -> @@ -582,22 +588,21 @@ subscr_postprocess(TabRef, RecName) -> handle_subscr_event(Event, TabRef, RecName) end, ok, SubscrCache), ets:delete(SubscrCache) - end, - % and all remaining events - subscr_receiver(TabRef, RecName). + end. -subscr_receiver(TabRef = {_, Tab}, RecName) -> +subscr_receiver(TabRef = {_, Tab}, RecName, Ref) -> receive {mnesia_table_event, {_Op, Val, _Tid}} = Event when element(1, Val) =:= Tab; element(1, Val) =:= schema -> handle_subscr_event(Event, TabRef, RecName), - subscr_receiver(TabRef, RecName); + subscr_receiver(TabRef, RecName, Ref); {'EXIT', Pid, Reason} -> handle_exit(Pid, Reason), - subscr_receiver(TabRef, RecName) - after 0 -> - ok + subscr_receiver(TabRef, RecName, Ref); + + {'DOWN', Ref, process, _, _} -> + ok end. handle_subscr_event(Event, TabRef = {_, Tab}, RecName) -> @@ -1014,12 +1019,15 @@ finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) -> mnesia_checkpoint:tm_add_copy(Tab, RecNode), DatBin = dat2bin(Tab, ?catch_val({Tab, storage_type}), RemoteS), Pid ! {self(), {no_more, DatBin}}, - cleanup_tab_copier(Pid, Storage, Tab), receive {Pid, no_more} -> % Dont bother about the spurious 'more' message + %% Sync mnesia_tm (before unsubscribing) + mnesia_tm:sync(), + cleanup_tab_copier(Pid, Storage, Tab), no_more; {copier_done, Node} -> verbose("Tab receiver ~tp crashed (more): ~p~n", [Tab, Node]), + cleanup_tab_copier(Pid, Storage, Tab), receiver_died end end diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl index 42c717425e..ea77e5ec6a 100644 --- a/lib/mnesia/src/mnesia_schema.erl +++ b/lib/mnesia/src/mnesia_schema.erl @@ -2472,6 +2472,7 @@ prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}, _WaitFor) -> {loaded, ok} -> %% Tables are created by mnesia_loader get_network code insert_cstruct(Tid, Cs, true), + mnesia_controller:i_have_tab(Tab, Cs), {true, optional}; {not_loaded, {not_active, schema, Node}} -> mnesia:abort({node_not_running, Node}); diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl index 847cd0074b..3a563db0be 100644 --- a/lib/mnesia/src/mnesia_tm.erl +++ b/lib/mnesia/src/mnesia_tm.erl @@ -42,6 +42,7 @@ put_activity_id/2, block_tab/1, unblock_tab/1, + sync/0, fixtable/3, new_cr_format/1 ]). @@ -205,6 +206,17 @@ block_tab(Tab) -> unblock_tab(Tab) -> req({unblock_tab, Tab}). +fixtable(Tab, Lock, Me) -> + case req({fixtable, [Tab,Lock,Me]}) of + error -> + exit({no_exists, Tab}); + Else -> + Else + end. + +sync() -> + req(sync). + doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) -> receive {_From, {async_dirty, Tid, Commit, Tab}} -> @@ -250,25 +262,30 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= [{tid, Tid}, {prot, Protocol}]), mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), Commit = new_cr_format(Commit0), - Pid = - if - node(Tid#tid.pid) =:= node() -> - error({internal_error, local_node}); - Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans -> - Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs], - spawn_link(?MODULE, commit_participant, Args); - true -> %% *_sym_trans - reply(From, {vote_yes, Tid}), - nopid - end, - P = #participant{tid = Tid, - pid = Pid, - commit = Commit, - disc_nodes = DiscNs, - ram_nodes = RamNs, - protocol = Protocol}, - State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)}, - doit_loop(State2); + case is_blocked(State#state.blocked_tabs, Commit) of + false -> + Pid = + if + node(Tid#tid.pid) =:= node() -> + error({internal_error, local_node}); + Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans -> + Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs], + spawn_link(?MODULE, commit_participant, Args); + true -> %% *_sym_trans + reply(From, {vote_yes, Tid}), + nopid + end, + P = #participant{tid = Tid, + pid = Pid, + commit = Commit, + disc_nodes = DiscNs, + ram_nodes = RamNs, + protocol = Protocol}, + State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)}, + doit_loop(State2); + true -> + reply(From, {vote_no, Tid, {bad_commit, node()}}, State) + end; {Tid, do_commit} -> case gb_trees:lookup(Tid, Participants) of @@ -449,6 +466,9 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= reply(From, ok, State2) end; + {From, sync} -> + reply(From, ok, State); + {From, {prepare_checkpoint, Cp}} -> Res = mnesia_checkpoint:tm_prepare(Cp), case Res of @@ -478,6 +498,28 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= doit_loop(State) end. +is_blocked([], _Commit) -> + false; +is_blocked([Tab|Tabs], #commit{ram_copies=RCs, disc_copies=DCs, + disc_only_copies=DOs, ext=Exts} = Commit) -> + is_blocked_tab(RCs, Tab) orelse + is_blocked_tab(DCs, Tab) orelse + is_blocked_tab(DOs, Tab) orelse + is_blocked_ext_tab(Exts, Tab) orelse + is_blocked(Tabs, Commit). + +is_blocked_tab([{{Tab,_},_,_}|_Ops], Tab) -> true; +is_blocked_tab([_|Ops], Tab) -> is_blocked_tab(Ops, Tab); +is_blocked_tab([],_) -> false. + +is_blocked_ext_tab([], _Tab) -> + false; +is_blocked_ext_tab(Exts, Tab) -> + case lists:keyfind(ext_copies, 1, Exts) of + false -> false; + {_, ExtOps} -> is_blocked_tab([Op || {_, Op} <- ExtOps], Tab) + end. + do_sync_dirty(From, Tid, Commit, _Tab) -> ?eval_debug_fun({?MODULE, sync_dirty, pre}, [{tid, Tid}]), Res = do_dirty(Tid, Commit), @@ -2326,14 +2368,6 @@ do_stop(#state{coordinators = Coordinators}) -> mnesia_log:stop(), exit(shutdown). -fixtable(Tab, Lock, Me) -> - case req({fixtable, [Tab,Lock,Me]}) of - error -> - exit({no_exists, Tab}); - Else -> - Else - end. - %%%%%%%%%%%%%%%%%%%%%%%%%%% %% System upgrade diff --git a/lib/mnesia/test/mnesia_consistency_test.erl b/lib/mnesia/test/mnesia_consistency_test.erl index 2be17e69c2..fa729aa714 100644 --- a/lib/mnesia/test/mnesia_consistency_test.erl +++ b/lib/mnesia/test/mnesia_consistency_test.erl @@ -381,7 +381,7 @@ consistency_after_restart(ReplicaType, NodeConfig, Config) -> TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, [Node1]), mnesia_tpcb:init(TpcbConfig), A ! fun () -> mnesia_tpcb:run(TpcbConfig) end, - timer:sleep(timer:seconds(10)), + timer:sleep(timer:seconds(3)), mnesia_test_lib:kill_mnesia([Node1]), %% Start and wait for tables to be loaded on all nodes timer:sleep(timer:seconds(3)), @@ -408,7 +408,7 @@ consistency_after_dump_tables(ReplicaType, NodeConfig, Config) -> TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []), mnesia_tpcb:init(TpcbConfig), A ! fun() -> mnesia_tpcb:run(TpcbConfig) end, - timer:sleep(timer:seconds(10)), + timer:sleep(timer:seconds(3)), ?match({atomic, ok}, rpc:call(Node1, mnesia, dump_tables, [[branch, teller, account, history]])), mnesia_tpcb:stop(), @@ -459,7 +459,7 @@ consistency_after_add_replica(ReplicaType, NodeConfig, Config) -> TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []), mnesia_tpcb:init(TpcbConfig), A ! fun () -> mnesia_tpcb:run(TpcbConfig) end, - timer:sleep(timer:seconds(10)), + timer:sleep(timer:seconds(2)), ?match({atomic, ok}, mnesia:add_table_copy(account, AddNode, ReplicaType)), mnesia_tpcb:stop(), ?match(ok, mnesia_tpcb:verify_tabs()), @@ -501,7 +501,7 @@ consistency_after_del_replica(ReplicaType, NodeConfig, Config) -> TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []), mnesia_tpcb:init(TpcbConfig), A ! fun () -> mnesia_tpcb:run(TpcbConfig) end, - timer:sleep(timer:seconds(10)), + timer:sleep(timer:seconds(3)), ?match({atomic, ok}, mnesia:del_table_copy(account, Node2)), mnesia_tpcb:stop(), ?match(ok, mnesia_tpcb:verify_tabs()), @@ -543,7 +543,7 @@ consistency_after_move_replica(ReplicaType, NodeConfig, Config) -> TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes -- [Node2], []), mnesia_tpcb:init(TpcbConfig), A ! fun () -> mnesia_tpcb:run(TpcbConfig) end, - timer:sleep(timer:seconds(10)), + timer:sleep(timer:seconds(3)), ?match({atomic, ok}, mnesia:move_table_copy(account, Node1, Node2)), ?log("First move completed from node ~p to ~p ~n", [Node1, Node2]), ?match({atomic, ok}, mnesia:move_table_copy(account, Node2, Node1)), @@ -638,7 +638,7 @@ consistency_after_fallback_3_disc_only(Config) when is_list(Config) -> consistency_after_fallback(ReplicaType, NodeConfig, Config) -> put(mnesia_test_verbose, true), %%?verbose("Starting consistency_after_fallback2 at ~p~n", [self()]), - Delay = 5, + Delay = 3, Nodes = ?acquire_nodes(NodeConfig, [{tc_timeout, timer:minutes(10)} | Config]), Node1 = hd(Nodes), %%?verbose("Mnesia info: ~p~n", [mnesia:info()]), @@ -867,7 +867,7 @@ updates_during_checkpoint_activation_3_disc_only(Config) when is_list(Config) -> updates_during_checkpoint_activation(ReplicaType,NodeConfig,Config) -> %%?verbose("updates_during_checkpoint_activation2 at ~p~n", [self()]), - Delay = 5, + Delay = 2, Nodes = ?acquire_nodes(NodeConfig, Config), Node1 = hd(Nodes), %%?verbose("Mnesia info: ~p~n", [mnesia:info()]), @@ -922,7 +922,7 @@ updates_during_checkpoint_iteration_2_disc_only(Config) when is_list(Config) -> updates_during_checkpoint_iteration(ReplicaType,NodeConfig,Config) -> %?verbose("updates_during_checkpoint_iteration2 at ~p~n", [self()]), - Delay = 5, + Delay = 2, Nodes = ?acquire_nodes(NodeConfig, Config), Node1 = hd(Nodes), %?verbose("Mnesia info: ~p~n", [mnesia:info()]), diff --git a/lib/mnesia/test/mt.erl b/lib/mnesia/test/mt.erl index c1859bef3f..0bb6aadc4e 100644 --- a/lib/mnesia/test/mt.erl +++ b/lib/mnesia/test/mt.erl @@ -245,30 +245,32 @@ start_nodes() -> %% loop one testcase /suite until it fails loop(Case) -> - loop_1(Case,-1,read_config()). + loop_1(Case,1,infinity,read_config()). loop(M,F) when is_atom(F) -> - loop_1({M,F},-1,read_config()); + loop_1({M,F},1, infinity, read_config()); loop(Case,N) when is_integer(N) -> - loop_1(Case, N,read_config()). + loop_1(Case, 1, N,read_config()). loop(M,F,N) when is_integer(N) -> - loop_1({M,F},N,read_config()). + loop_1({M,F},1, N,read_config()). + +loop_1(Case,N,Max,Config) when N < Max -> + io:format("~nLoop test ~p ~n", [abs(N)]), -loop_1(Case,N,Config) when N /= 0 -> - io:format("Loop test ~p ~n", [abs(N)]), case ok_result(Res = t(Case,Config)) of true -> - loop_1(Case,N-1,Config); + loop_1(Case,N+1,Max,Config); error -> + io:format("Failed after ~p~n", [N]), Res end; -loop_1(_,_,_) -> +loop_1(_,_,_,_) -> ok. - + ok_result([{_T,{ok,_,_}}|R]) -> ok_result(R); -ok_result([{_T,{TC,List}}|R]) when is_tuple(TC), is_list(List) -> +ok_result([{_T,{TC,List}}|R]) when is_tuple(TC), is_list(List) -> ok_result(List) andalso ok_result(R); ok_result([]) -> true; ok_result(_) -> error. -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor