`
ZacMa
  • 浏览: 38901 次
  • 来自: 深圳
社区版块
存档分类
最新评论

<7>pg2 分析

pg2 
阅读更多
网上看到erlang的pg2模块似乎没人推荐使用,但是还是有不少使用者,自己也感觉使用跨节点通信时候,使用它来管理各个近点的进程也是不错的选择, 使用过程中也有不少疑问,也感觉接口不够丰富,简单分析下:

pg2为什么允许一个进程加入两次呢?,并且也要退出两次,使用时候可以自己加些判断,
join(Pid, Group) ->         
      ok =  case lists:member(Pid, pg2:get_members(Group)) of
            true -> ok;
               _ -> pg2:join(Group, Pid)
             end;

加接口,
获取某个节点的所有进程
获取某个节点的某个进程,进程id随机
get_node_members(Node, Group) -> [pid()] | {error, {no_such_group, Group}}
get_node_pid(Node, Group) -> pid() | {error, Reason}

get_node_members(Node, Group) ->
    case pg2:get_members(Group) of
        {error, Reason} ->
            {error, Reason};
        AllMembers ->
            Fun = fun(X) ->
                node(X) =:= Node end,
            lists:filter(Fun, AllMembers)
    end.

get_node_pid(Node, Group) ->
    case get_node_members(Node, Group) of
        [Pid] ->
            Pid;
        [] ->
            [];
        Members when is_list(Members) ->
            {_,_,X} = erlang:now(),
            lists:nth((X rem length(Members))+1, Members);
        Else ->
            Else
    end.

网上还看到一种获取最优的pid
get_best_pid(Group) ->
  Members = pg2:get_members(Group),
  Members1 = lists:map(fun(Pid) ->
      [{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
                                                                 {Pid, Messages}
                       end, Members),
case lists:keysort(2, Members1) of
  [{Pid, _} | _] -> Pid;
  [] -> {error, empty_process_group}
end.
不过只能获得本地进程的信息,不能跨节点获取,
Failure: badarg if Pid is not a local process, or if Item is not a valid Item.
所以这种方法只适用于本地进程


那么pg2 是怎么实现节点信息存取的?
其实就是各个节点注册了一个相同名称的pg2, 通过{pg2, Node} ! Msg来实现
pg2.erl

start() ->
    ensure_started().
ensure_started() ->
    case whereis(?MODULE) of
        undefined ->
            C = {pg2, {?MODULE, start_link, []}, permanent,
                 1000, worker, [?MODULE]},
            supervisor:start_child(kernel_safe_sup, C);
        Pg2Pid ->
            {ok, Pg2Pid}
    end.

初始化的时候,给其他可见的node发送消息{new_pg2, node()}, 并且给自己发送{nodeup, Node},   并创建ets表
init([]) ->
    Ns = nodes(),
    net_kernel:monitor_nodes(true),
    lists:foreach(fun(N) ->
                          {?MODULE, N} ! {new_pg2, node()},
                          self() ! {nodeup, N}
                  end, Ns),
    pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
    {ok, #state{}}.

当自己收到nodeup, 向Node发送信息
当收到nodeup或者new_pg2都会返回信息{exchange, node(), all_members()}
all_members 就是所有的group和pid信息

handle_info({nodeup, Node}, S) ->
    gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
    {noreply, S};

handle_info({new_pg2, Node}, S) ->
    gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
    {noreply, S};

all_members() ->
    [[G, group_members(G)] || G <- all_groups()].

group_members(Name) ->
    [P ||
        [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
        _ <- lists:seq(1, N)].

当收到exchange信息,就会进行存储
handle_cast({exchange, _Node, List}, S) ->
    store(List),
    {noreply, S};

store(List) ->
    _ = [(assure_group(Name)
          andalso
          [join_group(Name, P) || P <- Members -- group_members(Name)]) ||
            [Name, Members] <- List],
    ok.
all_members() ->
    [[G, group_members(G)] || G <- all_groups()].

group_members(Name) ->
    [P || 
        [P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
        _ <- lists:seq(1, N)].

存储就是将其他节点的进程,全部加入本节点,如果本几点没有该进程的话


那么在join时候会发生什么呢?
join(Name, Pid) when is_pid(Pid) ->
    ensure_started(),
    case ets:member(pg2_table, {group, Name}) of
        false ->
            {error, {no_such_group, Name}};
        true ->
            global:trans({{?MODULE, Name}, self()},
                         fun() ->
                                 gen_server:multi_call(?MODULE,
                                                       {join, Name, Pid})
                         end),
            ok 
    end.
就是告诉所有的节点,有一个进程要加入,
在告诉其他节点的时候,会对这个节点加锁 ,利用函数 global:trans/2
Sets a lock on Id (using set_lock/3). If this succeeds, Fun() is evaluated and the result Res is returned. Returns aborted if the lock attempt failed. If Retries is set to infinity, the transaction will not abort.
infinity is the default setting and will be used if no value is given for Retries.
就是说默认是是无限次加锁,如果加锁失败就会重新尝试,直到成功,
再看set_lock/3,
Sets a lock on the specified nodes (or on all nodes if none are specified) on ResourceId for LockRequesterId. If a lock already exists on ResourceId for another requester than LockRequesterId, and Retries is not equal to 0, the process sleeps for a while and will try to execute the action later. When Retries attempts have been made, false is returned, otherwise true. If Retries is infinity, true is eventually returned
This function is completely synchronous.
This function does not address the problem of a deadlock. A deadlock can never occur as long as processes only lock one resource at a time. But if some processes try to lock two or more resources, a deadlock may occur. It is up to the application to detect and rectify a deadlock.
pg2/leave/2和join做了相同的操作,
加锁过程完全同步,直到加锁成功,并且有可能会造成死锁,
如果是global注册的就只有一个锁,而pg2是本地注册的,
,并且有可能造成死锁的危险,看来pg2的代价真的是不小,而如果只采用注册本地进程来发消息,就不用这么多资源的使用, 或许很多人不用pg2也是有原因的吧,但是只要不是频繁的调用函数leave, join还是可以使用, 使用时候,最好用异步来调用leave/2 和join/2函数,
至于pg2可以实现进程挂掉主动从pg2移除,也是用leave实现的
分享到:
评论

相关推荐

    ap6212a0_bb16v3_sina33验证通过BT的功能_wifi部分有问题_20170626_1148没有外层目录.7z

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    !!!!ap6212a0_a33_sc3817r_验证通过_修正wifi的配置文件为nvram_ap6212.txt

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    ap6212a0_a33_sc3817r_服务器验证通过_bt已经通了_wifi需要修改配置_需要再次验证_20170626_1549.7z

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    ap6212a0_a33_sc3817r_神舟验证版本_借用nvram_ap6210这个配置文件_20170626_1834没有外层目录.7z

    [ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...

    mybatis中操作日期实例分析

    例如,可以使用`&lt;if&gt;`、`&lt;choose&gt;`、`&lt;when&gt;`、`&lt;otherwise&gt;`等标签来判断日期是否为空,从而决定是否包含某个日期条件。 5. **结果映射与日期** 在处理查询结果时,MyBatis的`&lt;resultMap&gt;`标签可以帮助我们定义...

    PG2L100H-6IFBG484与竞品的差异化分析报告

    ### PG2L100H-6IFBG484与竞品的差异化分析报告 #### 基本概述 本报告旨在详细分析PG2L100H-6IFBG484(以下简称“PG2L100H”)与竞品XC7A100T-FGG484-2(以下简称“A7”)之间的关键差异点。通过对比两者在基本供电...

    FX2N-10PG用户手册.rar

    6. **实例分析**:通过具体的工程案例,展示FX2N-10PG在实际应用中的操作步骤和问题解决策略,帮助用户快速掌握使用技巧。 7. **故障诊断与维护**:提供常见故障的排除方法,以及日常维护和保养的建议,确保设备的...

    PG2000PG20

    2. **程序读取与解析**:PG2000应具备读取S5程序的能力,将二进制代码转换为人类可读的形式,如梯形图。 3. **编程与调试**:除了阅读,可能还允许用户编辑和调试程序,包括添加、修改和删除逻辑块。 4. **模拟与...

    pg141-dds-compiler_Xilinx_pg141-dds_pg141-dds-compiler_

    标题中的“pg141-dds-compiler_Xilinx_pg141-dds_pg141-dds-compiler_”暗示了这是一个与Xilinx公司的DDS(Direct Digital Synthesis,直接数字频率合成)编译器相关的资源,版本号可能是14.1。DDS是一种广泛应用于...

    Serial RapidIO Gen2 Endpoint v4.0-PG007阅读笔记中文版

    - **&lt;component_name&gt;_support**: 包含 &lt;component_name&gt;_block、时钟模块、重置模块以及 GTCOMMON 模块(针对 Xilinx 7 系列 FPGA)。 - **&lt;component_name&gt;**: 集成了完整的 SRIO Gen2 Endpoint 功能。 ### 逻辑...

    pg055axibridgepcie_055PG.com_055pg.com_www.055PG.COM_https//:055

    2. PCIe协议简介:包括PCIe的版本、数据速率、通道数量以及PCIe层次结构。 3. AXI到PCIe桥接原理:描述如何将AXI总线信号转换为PCIe事务,并处理PCIe的中断和配置请求。 4. 设计考虑:如何在硬件设计中集成AXI-PCIe...

    PGaE69.rar_pg&e69_simulink的PG&E69_ⅹⅹ×69美国_支路潮流_美国PG&E69系统

    7. **电力系统安全与经济性**:对PG&E69系统进行潮流计算,有助于确保电力供应的稳定性和安全性,同时通过优化发电和输电,降低运营成本,提升整体经济效益。 综上所述,PG&E69系统的研究涉及电力系统基础理论和...

    centos7下postgresql13 + postgis + timescaledb 时序库完整纯离线安装包

    sudo mv timescaledb-&lt;version&gt;-pg13.&lt;architecture&gt;.rpm /var/cache/yum/x86_64/7/epel/packages/ sudo yum localinstall /var/cache/yum/x86_64/7/epel/packages/timescaledb-&lt;version&gt;-pg13.&lt;architecture&gt;.rpm...

    pg054-7series-pcie_pciexilinx_pg054_xilinx手册_XILINXPCIE_pcie_源码.

    标题中的“pg054-7series-pcie_pciexilinx_pg054_xilinx手册_XILINXPCIE_pcie_源码”指的是Xilinx公司为7系列FPGA设计的一份关于PCI Express(PCIe)的参考指南或设计示例。这份资料可能包含了Xilinx PCIe IP核的...

    pgadmin3客户端

    2. **查询工具**:内置的SQL编辑器支持编写、执行和保存SQL语句,还提供了代码自动完成和语法高亮等功能,以提高开发效率。 3. **备份与恢复**:pgAdmin3支持数据库的备份和恢复操作,用户可以方便地创建数据库的...

    gcc中文手册.rar

    GCC的`-pg`选项可以在程序运行后生成性能数据文件,用于`gprof`工具进行性能分析。`-fprofile-arcs -ftest-coverage`则可以生成覆盖率报告,帮助评估测试的充分性。 7. **多语言支持** GCC不仅支持C和C++,还可以...

    linux中pg11的包

    - 使用`pg_dump`进行数据库备份,如`pg_dump mydatabase &gt; backup.sql`。 - 使用`pg_restore`恢复备份,如`pg_restore -d mydatabase backup.sql`。 8. **网络与连接**: - 修改`postgresql.conf`中的`listen_...

    GP pg_catalog查询.docx

    2. **pg_compression**: 此表列出可用的压缩方法。在数据库中,数据压缩可以节省存储空间,提高I/O性能。执行`SELECT * FROM pg_catalog.pg_compression;`来查看支持的压缩算法。 3. **pg_class**: 这是最重要的...

    pgAdmin II.zip

    - **视图**: 视图是基于SQL查询的结果集,pgAdmin III允许您创建、修改和删除视图,方便数据查询和分析。 - **索引**: 为提高查询性能,可以为表的列创建索引。pgAdmin III支持创建B树、GiST、SP-GiST等多种类型的...

Global site tag (gtag.js) - Google Analytics