`
wqtn22
  • 浏览: 101065 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

mnesia的普通transaction写过程(五)事务提交

 
阅读更多

上一篇博文介绍了mnesia的事务提交准备过程,为每个事务参与结点构造了其提交结构commit,下面将进入到提交过程中,此后将继续分析。

 

mnesia_tm.erl

 

multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->

    {DiscNs, RamNs} = commit_nodes(CR, [], []),

    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),

    ?ets_insert(Store, Pending),

 

    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),

    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),

    ?eval_debug_fun({?MODULE, multi_commit_sym},

   [{tid, Tid}, {outcome, Outcome}]),

    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),

    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),

    case Outcome of

do_commit ->

   mnesia_recover:note_decision(Tid, committed),

   do_dirty(Tid, Local),

   mnesia_locker:release_tid(Tid),

   ?MODULE ! {delete_transaction, Tid};

{do_abort, _Reason} ->

   mnesia_recover:note_decision(Tid, aborted)

    end,

    ?eval_debug_fun({?MODULE, multi_commit_sym, post},

   [{tid, Tid}, {outcome, Outcome}]),

Outcome;

commit_nodes([C | Tail], AccD, AccR)

        when C#commit.disc_copies == [],

             C#commit.disc_only_copies  == [],

             C#commit.schema_ops == [] ->

    commit_nodes(Tail, AccD, [C#commit.node | AccR]);

commit_nodes([C | Tail], AccD, AccR) ->

    commit_nodes(Tail, [C#commit.node | AccD], AccR);

commit_nodes([], AccD, AccR) ->

    {AccD, AccR}.

取出所有参与事务的结点,这实质上等同于where_to_commit属性记录的所有结点。

ask_commit(Protocol, Tid, CR, DiscNs, RamNs) ->
    ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local).
ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
    Node = Head#commit.node,
    if
Node == node() ->
   ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
true ->
   Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs),
   Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs},
   {?MODULE, Node} ! {self(), Msg},
   ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
    end;
ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
    {WaitFor, Local}.
第一阶段提交,这次是同步过程,由ask_commit和rec_all组成,向每个结点的事务管理器通知事务属性,包括事务tid和事务协议等,及该结点的commit结构,此处使用的事务协议是sym_trans,为异步同构协议,异步表示该事务一旦在所有参与结点完成提交,则立即完成,不必等待每个结点都将事务日志落盘;同构表示所有事务参与结点的表结构都相同。
同时观察各个结点的事务管理器的执行过程:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
{From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} ->
   ?eval_debug_fun({?MODULE, doit_ask_commit},
   [{tid, Tid}, {prot, Protocol}]),
   mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
   Pid =
case Protocol of
   asym_trans when node(Tid#tid.pid) /= node() ->
Args = [tmpid(From), Tid, Commit, DiscNs, RamNs],
spawn_link(?MODULE, commit_participant, Args);
   _ when node(Tid#tid.pid) /= node() -> %% *_sym_trans
reply(From, {vote_yes, Tid}),
nopid
end,
   P = #participant{tid = Tid,
    pid = Pid,
    commit = Commit,
    disc_nodes = DiscNs,
    ram_nodes = RamNs,
    protocol = Protocol},
   State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
   doit_loop(State2);
每个结点的事务管理器接收第一阶段的提交,记录提交的内容,并表示参与此次提交。
事务发起进程等待各个结点的事务管理器的第一阶段提交结果:
rec_all([Node | Tail], Tid, Res, Pids) ->
    receive
{?MODULE, Node, {vote_yes, Tid}} ->
   rec_all(Tail, Tid, Res, Pids);
{?MODULE, Node, {vote_yes, Tid, Pid}} ->
   rec_all(Tail, Tid, Res, [Pid | Pids]);
{?MODULE, Node, {vote_no, Tid, Reason}} ->
   rec_all(Tail, Tid, {do_abort, Reason}, Pids);
{?MODULE, Node, {committed, Tid}} ->
   rec_all(Tail, Tid, Res, Pids);
{?MODULE, Node, {aborted, Tid}} ->
   rec_all(Tail, Tid, Res, Pids);
{mnesia_down, Node} ->
   Abort = {do_abort, {bad_commit, Node}},
   catch {?MODULE, Node} ! {Tid, Abort},
   rec_all(Tail, Tid, Abort, Pids)
    end;
rec_all([], _Tid, Res, Pids) ->
    {Res, Pids}.
发起事务的结点等待事务提交决议,待所有结点都返回结果后,向上层通知决议结果。
第一阶段提交完成后,multi_commit将进行第二阶段提交:
multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->
    {DiscNs, RamNs} = commit_nodes(CR, [], []),
    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
    ?ets_insert(Store, Pending),

    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),
    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
    ?eval_debug_fun({?MODULE, multi_commit_sym},
   [{tid, Tid}, {outcome, Outcome}]),
    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
    case Outcome of
do_commit ->
   mnesia_recover:note_decision(Tid, committed),
   do_dirty(Tid, Local),
   mnesia_locker:release_tid(Tid),
   ?MODULE ! {delete_transaction, Tid};
{do_abort, _Reason} ->
   mnesia_recover:note_decision(Tid, aborted)
    end,
    ?eval_debug_fun({?MODULE, multi_commit_sym, post},
   [{tid, Tid}, {outcome, Outcome}]),
Outcome;
第二阶段提交,这次是异步过程,通知每个结点的事务管理器写入日志和数据。
各个结点的事务管理器继续参与第二阶段提交:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
receive
{Tid, do_commit} ->
   case gb_trees:lookup(Tid, Participants) of
none ->
   verbose("Tried to commit a non participant transaction ~p~n",[Tid]),
   doit_loop(State);
{value, P} ->
   ?eval_debug_fun({?MODULE,do_commit,pre},[{tid,Tid},{participant,P}]),
   case P#participant.pid of
nopid ->
   Commit = P#participant.commit,
   Member = lists:member(node(), P#participant.disc_nodes),
   if Member == false ->
   ignore;
      P#participant.protocol == sym_trans ->
   mnesia_log:log(Commit);
      P#participant.protocol == sync_sym_trans ->
   mnesia_log:slog(Commit)
   end,
   mnesia_recover:note_decision(Tid, committed),
   do_commit(Tid, Commit),
   if
P#participant.protocol == sync_sym_trans ->
   Tid#tid.pid ! {?MODULE, node(), {committed, Tid}};
true ->
   ignore
   end,
   mnesia_locker:release_tid(Tid),
   transaction_terminated(Tid),
   ?eval_debug_fun({?MODULE,do_commit,post},[{tid,Tid},{pid,nopid}]),
   doit_loop(State#state{participants=gb_trees:delete(Tid,Participants)});
Pid when is_pid(Pid) ->
   Pid ! {Tid, committed},
   ?eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, Pid}]),
   doit_loop(State)
   end
   end;
参与结点的事务管理器首先取回事务第一阶段提交的commit结构,然后进行日志记录和写入数据,并结束事务,后续过程由do_commit完成。
对于事务发起结点,其过程由do_dirty完成:
do_dirty(Tid, Commit) when Commit#commit.schema_ops == [] ->
    mnesia_log:log(Commit),
    do_commit(Tid, Commit).
事务发起结点的过程也是如此,记录日志,然后由do_commit完成后续过程,mnesia的日志由disk_log实现。
do_commit(Tid, Bin) when is_binary(Bin) ->
    do_commit(Tid, binary_to_term(Bin));
do_commit(Tid, C) ->
    do_commit(Tid, C, optional).
do_commit(Tid, Bin, DumperMode) when is_binary(Bin) ->
    do_commit(Tid, binary_to_term(Bin), DumperMode);
do_commit(Tid, C, DumperMode) ->
    mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode),
    R  = do_snmp(Tid, C#commit.snmp),
    R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R),
    R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2),
    R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3),
    mnesia_subscr:report_activity(Tid),
R4.
do_commit实际完成后续的表操作,将数据真正写入表中。
do_update(Tid, Storage, [Op | Ops], OldRes) ->
    case catch do_update_op(Tid, Storage, Op) of
ok ->
   do_update(Tid, Storage, Ops, OldRes);
{'EXIT', Reason} ->
   verbose("do_update in ~w failed: ~p -> {'EXIT', ~p}~n",
   [Tid, Op, Reason]),
   do_update(Tid, Storage, Ops, OldRes);
NewRes ->
   do_update(Tid, Storage, Ops, NewRes)
    end;
do_update(_Tid, _Storage, [], Res) ->
Res.
do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) ->
    commit_write(?catch_val({Tab, commit_work}), Tid, Tab, K, Obj, undefined),
    mnesia_lib:db_put(Storage, Tab, Obj);
mnesia_lib:db_put是真正完成数据表的ets表(非临时表)操作的地方。
commit_write([], _, _, _, _, _) -> ok;
commit_write([{checkpoints, CpList}|R], Tid, Tab, K, Obj, Old) ->
    mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
    commit_write(R, Tid, Tab, K, Obj, Old);
commit_write([H|R], Tid, Tab, K, Obj, Old)
  when element(1, H) == subscribers ->
    mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
    commit_write(R, Tid, Tab, K, Obj, Old);
commit_write([H|R], Tid, Tab, K, Obj, Old)
  when element(1, H) == index ->
    mnesia_index:add_index(H, Tab, K, Obj, Old),
commit_write(R, Tid, Tab, K, Obj, Old).
commit_write的操作主要用于辅助工作,如检查点,写事件通报,索引添加等
mnesia_lib.erl
db_put(Tab, Val) ->
    db_put(val({Tab, storage_type}), Tab, Val).
db_put(ram_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;
db_put(disc_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;
db_put(disc_only_copies, Tab, Val) -> dets:insert(Tab, Val).
将数据真正写入ets表。
重新回到multi_commit中,此后将清除事务。
multi_commit(sym_trans, _Maj = [], Tid, CR, Store) ->
    {DiscNs, RamNs} = commit_nodes(CR, [], []),
    Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
    ?ets_insert(Store, Pending),

    {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),
    {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
    ?eval_debug_fun({?MODULE, multi_commit_sym},
   [{tid, Tid}, {outcome, Outcome}]),
    rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
    rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
    case Outcome of
do_commit ->
   mnesia_recover:note_decision(Tid, committed),
   do_dirty(Tid, Local),
   mnesia_locker:release_tid(Tid),
   ?MODULE ! {delete_transaction, Tid};
{do_abort, _Reason} ->
   mnesia_recover:note_decision(Tid, aborted)
    end,
    ?eval_debug_fun({?MODULE, multi_commit_sym, post},
   [{tid, Tid}, {outcome, Outcome}]),
    Outcome;
事务发起结点通知本地事务管理器清除事务:
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
receive
{delete_transaction, Tid} ->
   case gb_trees:is_defined(Tid, Participants) of
false ->
   case gb_trees:lookup(Tid, Coordinators) of
none ->
   verbose("** ERROR ** Tried to delete a non transaction ~p~n", [Tid]),
   doit_loop(State);
{value, Etabs} ->
   clear_fixtable(Etabs),
   erase_ets_tabs(Etabs),
   transaction_terminated(Tid),
   doit_loop(State#state{coordinators = gb_trees:delete(Tid,Coordinators)})
   end;
true ->
   transaction_terminated(Tid),
   State2 = State#state{participants=gb_trees:delete(Tid,Participants)},
   doit_loop(State2)
   end;
与事务参与结点不同,事务发起结点清除的是coordinator,而事务参与结点清除的是participants域。
之后的清除过程由transaction_terminated完成,该函数将做一些检查点相关的工作并递增事务序列号,以供下次事务使用。
至此,mnesia的普通事务写就完成了,总结一下,transaction上下文中,一次mnesia:write写行记录至少涉及:
一次行锁,一次同步提交,一次异步提交,一次临时ets表写,一次正式写,一次异步日志。

 

分享到:
评论

相关推荐

    Mnesia User's Guide

    session, specify a Mnesia database directory, initialize a database schema, start Mnesia, and create tables. Initial prototyping of record definitions is also discussed. • Build a Mnesia Database ...

    Mnesia table fragmentation 过程及算法分析

    在分析 Mnesia 表分片的过程和算法之前,首先要理解分片的必要性和它所解决的问题。随着业务的发展,数据量会迅速增加,读写请求的次数也会大幅增长。为了保证服务的高可用性,系统需要能够在短时间内处理大量的读写...

    amnesia-开源

    AMNESIA还提供了事务支持,确保了数据的一致性和完整性,这对于在分布式环境中运行的应用程序至关重要。 AMNESIA的另一个核心特性是其强大的查询功能。虽然它不直接使用SQL,但提供了类似查询构建器的工具,允许...

    amnesia:失忆备忘录

    B站视频地址: 做了文字校验,已经成功上线,有兴趣的小伙伴可以扫码体验:可以微信搜索:失忆备忘录一、失忆的由来之所以开发这款软件,是因为在那段时间事情很多,但是经常忘记。虽然市面上类似的功能很多,我之前...

    Api-Social-Amnesia.zip

    Api-Social-Amnesia.zip,忘记过去。社交健忘症确保你的社交媒体帐户只显示你最近的历史,而不是5年前“那个阶段”的帖子。,一个api可以被认为是多个软件设备之间通信的指导手册。例如,api可用于web应用程序之间的...

    Chrome Amnesia-crx插件

    语言:English (United States) 遗忘的延伸 Chrome失忆症是一个Chrome扩展程序,可让您有选择地不记得自己的任何浏览历史记录。...有关更多信息,请访问https://github.com/DanielBok/chrome-amnesia。

    Mnesia用户手册.zip

    《Mnesia用户手册》是专为理解和操作Erlang编程语言中的Mnesia数据库管理系统而编写的详尽指南。Mnesia是Erlang OTP (Open Telephony Platform) 库中的一个核心组件,它是一个强大的分布式数据库系统,特别适用于...

    erlang——Mnesia用户手册.pdf

    8.附录.A:Mnesia.错误信息 8.1.Mnesia.中的错误 9.附录.B:备份回调函数接口 9.1.Mnesia.备份回调行为 10.附录.C:作业存取回调接口 10.1.Mnnesia.存取回调行为 11.附录.D:分片表哈希回调接口 11.1....

    Amnesia-开源

    失忆症是一种提醒,允许您定义警报,贴纸(贴子)以提醒您一些重要的内容以及有关所需内容的注释。 可以将警报编程为在给定时间显示,可以在桌面上放置贴纸以随时查看。

    Mnesia 用户手册中文版 pdf

    在事务和上下文访问方面,Mnesia提供了事务属性定义、锁机制、脏操作、记录名与表名映射、作业概念、嵌套事务、模式匹配、迭代等高级功能,支持复杂的数据库操作。 Mnesia的系统信息部分涵盖了数据库配置数据、内核...

    mnesia数据库文档

    4. **事务处理**:Mnesia支持原子性、一致性、隔离性和持久性(ACID)事务,确保了数据操作的可靠性和一致性。 5. **模式定义**:Mnesia允许用户定义数据库模式,包括表、索引和属性等,提供了一种结构化的方式来...

    Mnesia用户手册(docx版)

    - **事务属性**:Mnesia 提供事务处理,确保数据的原子性和一致性。事务可以包含一系列操作,这些操作作为一个单元执行,要么全部成功,要么全部回滚。 - **锁**:为了管理并发访问,Mnesia 使用锁机制来控制对...

    Mnesia用户手册 4.4.10版.rar

    8 附录 A : Mnesia 错误信息 . . .. . . 75 8.1 Mnesia 中的错误 . . . . .. . 75 9 附录 B :备份回调函数接口 . . .. . .. . . .. . 76 9.1 Mnesia 备份回调行为 . . .. . . . .. . 76 10 附录 C :作业存取...

    Erlang Mnesia

    ### Erlang Mnesia:分布式数据库管理系统 #### 一、引言与概述 Mnesia是一款由爱立信(Ericsson)开发并维护的分布式数据库管理系统(DBMS),它被设计用于支持电信应用以及其他需要持续运行和具备软实时特性的...

    Amnesia

    "Amnesia"是一个可能与计算机安全或数据丢失相关的主题,暗示了系统或用户可能遭遇了某种形式的记忆丧失,即数据无法访问或丢失的情况。在IT领域,这种情况通常涉及到磁盘故障、病毒攻击、误操作或者软件错误。"Post...

Global site tag (gtag.js) - Google Analytics