- 浏览: 38901 次
- 来自: 深圳
文章分类
最新评论
-
ZacMa:
哈哈,突然感觉里面没怎么介绍,全是贴代码了
<8>redis及erl-redis阅读 -
惊涛翻案:
马博士,给我开课吧
<8>redis及erl-redis阅读
网上看到erlang的pg2模块似乎没人推荐使用,但是还是有不少使用者,自己也感觉使用跨节点通信时候,使用它来管理各个近点的进程也是不错的选择, 使用过程中也有不少疑问,也感觉接口不够丰富,简单分析下:
pg2为什么允许一个进程加入两次呢?,并且也要退出两次,使用时候可以自己加些判断,
join(Pid, Group) ->
ok = case lists:member(Pid, pg2:get_members(Group)) of
true -> ok;
_ -> pg2:join(Group, Pid)
end;
加接口,
获取某个节点的所有进程
获取某个节点的某个进程,进程id随机
get_node_members(Node, Group) -> [pid()] | {error, {no_such_group, Group}}
get_node_pid(Node, Group) -> pid() | {error, Reason}
get_node_members(Node, Group) ->
case pg2:get_members(Group) of
{error, Reason} ->
{error, Reason};
AllMembers ->
Fun = fun(X) ->
node(X) =:= Node end,
lists:filter(Fun, AllMembers)
end.
get_node_pid(Node, Group) ->
case get_node_members(Node, Group) of
[Pid] ->
Pid;
[] ->
[];
Members when is_list(Members) ->
{_,_,X} = erlang:now(),
lists:nth((X rem length(Members))+1, Members);
Else ->
Else
end.
网上还看到一种获取最优的pid
get_best_pid(Group) ->
Members = pg2:get_members(Group),
Members1 = lists:map(fun(Pid) ->
[{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
{Pid, Messages}
end, Members),
case lists:keysort(2, Members1) of
[{Pid, _} | _] -> Pid;
[] -> {error, empty_process_group}
end.
不过只能获得本地进程的信息,不能跨节点获取,
Failure: badarg if Pid is not a local process, or if Item is not a valid Item.
所以这种方法只适用于本地进程
那么pg2 是怎么实现节点信息存取的?
其实就是各个节点注册了一个相同名称的pg2, 通过{pg2, Node} ! Msg来实现
pg2.erl
start() ->
ensure_started().
ensure_started() ->
case whereis(?MODULE) of
undefined ->
C = {pg2, {?MODULE, start_link, []}, permanent,
1000, worker, [?MODULE]},
supervisor:start_child(kernel_safe_sup, C);
Pg2Pid ->
{ok, Pg2Pid}
end.
初始化的时候,给其他可见的node发送消息{new_pg2, node()}, 并且给自己发送{nodeup, Node}, 并创建ets表
init([]) ->
Ns = nodes(),
net_kernel:monitor_nodes(true),
lists:foreach(fun(N) ->
{?MODULE, N} ! {new_pg2, node()},
self() ! {nodeup, N}
end, Ns),
pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
{ok, #state{}}.
当自己收到nodeup, 向Node发送信息
当收到nodeup或者new_pg2都会返回信息{exchange, node(), all_members()}
all_members 就是所有的group和pid信息
handle_info({nodeup, Node}, S) ->
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
{noreply, S};
handle_info({new_pg2, Node}, S) ->
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
{noreply, S};
all_members() ->
[[G, group_members(G)] || G <- all_groups()].
group_members(Name) ->
[P ||
[P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
当收到exchange信息,就会进行存储
handle_cast({exchange, _Node, List}, S) ->
store(List),
{noreply, S};
store(List) ->
_ = [(assure_group(Name)
andalso
[join_group(Name, P) || P <- Members -- group_members(Name)]) ||
[Name, Members] <- List],
ok.
all_members() ->
[[G, group_members(G)] || G <- all_groups()].
group_members(Name) ->
[P ||
[P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
存储就是将其他节点的进程,全部加入本节点,如果本几点没有该进程的话
那么在join时候会发生什么呢?
join(Name, Pid) when is_pid(Pid) ->
ensure_started(),
case ets:member(pg2_table, {group, Name}) of
false ->
{error, {no_such_group, Name}};
true ->
global:trans({{?MODULE, Name}, self()},
fun() ->
gen_server:multi_call(?MODULE,
{join, Name, Pid})
end),
ok
end.
就是告诉所有的节点,有一个进程要加入,
在告诉其他节点的时候,会对这个节点加锁 ,利用函数 global:trans/2
Sets a lock on Id (using set_lock/3). If this succeeds, Fun() is evaluated and the result Res is returned. Returns aborted if the lock attempt failed. If Retries is set to infinity, the transaction will not abort.
infinity is the default setting and will be used if no value is given for Retries.
就是说默认是是无限次加锁,如果加锁失败就会重新尝试,直到成功,
再看set_lock/3,
Sets a lock on the specified nodes (or on all nodes if none are specified) on ResourceId for LockRequesterId. If a lock already exists on ResourceId for another requester than LockRequesterId, and Retries is not equal to 0, the process sleeps for a while and will try to execute the action later. When Retries attempts have been made, false is returned, otherwise true. If Retries is infinity, true is eventually returned
This function is completely synchronous.
This function does not address the problem of a deadlock. A deadlock can never occur as long as processes only lock one resource at a time. But if some processes try to lock two or more resources, a deadlock may occur. It is up to the application to detect and rectify a deadlock.
pg2/leave/2和join做了相同的操作,
加锁过程完全同步,直到加锁成功,并且有可能会造成死锁,
如果是global注册的就只有一个锁,而pg2是本地注册的,
,并且有可能造成死锁的危险,看来pg2的代价真的是不小,而如果只采用注册本地进程来发消息,就不用这么多资源的使用, 或许很多人不用pg2也是有原因的吧,但是只要不是频繁的调用函数leave, join还是可以使用, 使用时候,最好用异步来调用leave/2 和join/2函数,
至于pg2可以实现进程挂掉主动从pg2移除,也是用leave实现的
pg2为什么允许一个进程加入两次呢?,并且也要退出两次,使用时候可以自己加些判断,
join(Pid, Group) ->
ok = case lists:member(Pid, pg2:get_members(Group)) of
true -> ok;
_ -> pg2:join(Group, Pid)
end;
加接口,
获取某个节点的所有进程
获取某个节点的某个进程,进程id随机
get_node_members(Node, Group) -> [pid()] | {error, {no_such_group, Group}}
get_node_pid(Node, Group) -> pid() | {error, Reason}
get_node_members(Node, Group) ->
case pg2:get_members(Group) of
{error, Reason} ->
{error, Reason};
AllMembers ->
Fun = fun(X) ->
node(X) =:= Node end,
lists:filter(Fun, AllMembers)
end.
get_node_pid(Node, Group) ->
case get_node_members(Node, Group) of
[Pid] ->
Pid;
[] ->
[];
Members when is_list(Members) ->
{_,_,X} = erlang:now(),
lists:nth((X rem length(Members))+1, Members);
Else ->
Else
end.
网上还看到一种获取最优的pid
get_best_pid(Group) ->
Members = pg2:get_members(Group),
Members1 = lists:map(fun(Pid) ->
[{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
{Pid, Messages}
end, Members),
case lists:keysort(2, Members1) of
[{Pid, _} | _] -> Pid;
[] -> {error, empty_process_group}
end.
不过只能获得本地进程的信息,不能跨节点获取,
Failure: badarg if Pid is not a local process, or if Item is not a valid Item.
所以这种方法只适用于本地进程
那么pg2 是怎么实现节点信息存取的?
其实就是各个节点注册了一个相同名称的pg2, 通过{pg2, Node} ! Msg来实现
pg2.erl
start() ->
ensure_started().
ensure_started() ->
case whereis(?MODULE) of
undefined ->
C = {pg2, {?MODULE, start_link, []}, permanent,
1000, worker, [?MODULE]},
supervisor:start_child(kernel_safe_sup, C);
Pg2Pid ->
{ok, Pg2Pid}
end.
初始化的时候,给其他可见的node发送消息{new_pg2, node()}, 并且给自己发送{nodeup, Node}, 并创建ets表
init([]) ->
Ns = nodes(),
net_kernel:monitor_nodes(true),
lists:foreach(fun(N) ->
{?MODULE, N} ! {new_pg2, node()},
self() ! {nodeup, N}
end, Ns),
pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
{ok, #state{}}.
当自己收到nodeup, 向Node发送信息
当收到nodeup或者new_pg2都会返回信息{exchange, node(), all_members()}
all_members 就是所有的group和pid信息
handle_info({nodeup, Node}, S) ->
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
{noreply, S};
handle_info({new_pg2, Node}, S) ->
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
{noreply, S};
all_members() ->
[[G, group_members(G)] || G <- all_groups()].
group_members(Name) ->
[P ||
[P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
当收到exchange信息,就会进行存储
handle_cast({exchange, _Node, List}, S) ->
store(List),
{noreply, S};
store(List) ->
_ = [(assure_group(Name)
andalso
[join_group(Name, P) || P <- Members -- group_members(Name)]) ||
[Name, Members] <- List],
ok.
all_members() ->
[[G, group_members(G)] || G <- all_groups()].
group_members(Name) ->
[P ||
[P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
存储就是将其他节点的进程,全部加入本节点,如果本几点没有该进程的话
那么在join时候会发生什么呢?
join(Name, Pid) when is_pid(Pid) ->
ensure_started(),
case ets:member(pg2_table, {group, Name}) of
false ->
{error, {no_such_group, Name}};
true ->
global:trans({{?MODULE, Name}, self()},
fun() ->
gen_server:multi_call(?MODULE,
{join, Name, Pid})
end),
ok
end.
就是告诉所有的节点,有一个进程要加入,
在告诉其他节点的时候,会对这个节点加锁 ,利用函数 global:trans/2
Sets a lock on Id (using set_lock/3). If this succeeds, Fun() is evaluated and the result Res is returned. Returns aborted if the lock attempt failed. If Retries is set to infinity, the transaction will not abort.
infinity is the default setting and will be used if no value is given for Retries.
就是说默认是是无限次加锁,如果加锁失败就会重新尝试,直到成功,
再看set_lock/3,
Sets a lock on the specified nodes (or on all nodes if none are specified) on ResourceId for LockRequesterId. If a lock already exists on ResourceId for another requester than LockRequesterId, and Retries is not equal to 0, the process sleeps for a while and will try to execute the action later. When Retries attempts have been made, false is returned, otherwise true. If Retries is infinity, true is eventually returned
This function is completely synchronous.
This function does not address the problem of a deadlock. A deadlock can never occur as long as processes only lock one resource at a time. But if some processes try to lock two or more resources, a deadlock may occur. It is up to the application to detect and rectify a deadlock.
pg2/leave/2和join做了相同的操作,
加锁过程完全同步,直到加锁成功,并且有可能会造成死锁,
如果是global注册的就只有一个锁,而pg2是本地注册的,
,并且有可能造成死锁的危险,看来pg2的代价真的是不小,而如果只采用注册本地进程来发消息,就不用这么多资源的使用, 或许很多人不用pg2也是有原因的吧,但是只要不是频繁的调用函数leave, join还是可以使用, 使用时候,最好用异步来调用leave/2 和join/2函数,
至于pg2可以实现进程挂掉主动从pg2移除,也是用leave实现的
发表评论
-
erlang版本安装相关问题 <32>
2014-05-10 15:54 623<1> erlang R1603安装后,crytp ... -
关于iolist<30>
2014-01-15 10:42 624iolist是比较常用的数据结构. iolist的 ... -
erlang 字符编码 <29>
2014-01-14 16:31 1260用mochiweb通过网页发送中文到服务器,结果服务器显示乱码 ... -
<27>erlang record
2013-11-19 11:19 773平时总是忘记record的某些使用方法,每次使用都要翻文档, ... -
<26>io:format io_lib:format
2013-11-14 11:07 1313使用io_lib时候要注意参数,尤其是封装json串的时候,否 ... -
<24>用error_logger间隔记录日志
2013-10-22 16:09 650执行下面的代码 test:start(). test.erl ... -
<23>erlang 数据存储
2013-10-15 22:15 1660做为后端开发者,经常 ... -
<22> erlang中的数学计算函数相关
2013-10-10 10:34 16321. 幂函数 match:pow(m,n) 表示m的n次幂 ... -
<20>erlang中的类型和函数说明
2013-09-15 11:25 976erlang是一种动态类型的语言(运行时才决定数据类型),可以 ... -
<19>erlang中的时间,日期
2013-09-06 11:21 1194时间函数涉及的数据类型: DATA TYPES datetim ... -
<18>Efficient guide 之List handling
2013-08-31 18:45 6771 Deep and flat lists lists:fl ... -
<17>Efficiency Guide之Function
2013-08-27 22:30 5801. 函数模式匹配 模式匹配,在函数头,case和receiv ... -
<16>Efficiency Guide之Common Caveats
2013-08-11 11:07 809(1) ++ 如果做一个list的反转,不要这样, naiv ... -
<15> lists模块补充
2013-08-05 20:12 828%% 对list模块经常用到的进行补充 %% 1 对所有元素进 ... -
<15> lists模块解析和补充
2013-07-24 17:57 12%% 对list模块经常用到的进行补充 %% 1 对所有元素 ... -
<12>简述erlang的几种错误
2013-04-14 23:31 11821) badarg Bad argument. The ar ... -
<11>erlang中方便使用的模块和命令(2)
2013-04-06 22:33 797(1) 进程字典到底用不用,很多人推荐使用 http:// ... -
<9>rabbitmq网络层
2013-01-31 00:20 791抽离出了网络层, 逻辑层待以后研究 https://gith ... -
<8>redis及erl-redis阅读
2013-01-16 10:14 8491 redis的功能相当的强大,里面的发布订阅pub/su ... -
<6>error_logger 使用
2012-12-02 16:24 1451erlang中日志管理主要有error_loggger 模块, ...
相关推荐
[ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...
[ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...
[ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...
[ 4.040272] VFP support v0.3: implementor 41 architecture 2 part 30 variant 7 rev 5 [ 4.048780] ThumbEE CPU extension supported. [ 4.053550] Registering SWP/SWPB emulation handler [ 4.059269] [rfkill]...
例如,可以使用`<if>`、`<choose>`、`<when>`、`<otherwise>`等标签来判断日期是否为空,从而决定是否包含某个日期条件。 5. **结果映射与日期** 在处理查询结果时,MyBatis的`<resultMap>`标签可以帮助我们定义...
### PG2L100H-6IFBG484与竞品的差异化分析报告 #### 基本概述 本报告旨在详细分析PG2L100H-6IFBG484(以下简称“PG2L100H”)与竞品XC7A100T-FGG484-2(以下简称“A7”)之间的关键差异点。通过对比两者在基本供电...
6. **实例分析**:通过具体的工程案例,展示FX2N-10PG在实际应用中的操作步骤和问题解决策略,帮助用户快速掌握使用技巧。 7. **故障诊断与维护**:提供常见故障的排除方法,以及日常维护和保养的建议,确保设备的...
2. **程序读取与解析**:PG2000应具备读取S5程序的能力,将二进制代码转换为人类可读的形式,如梯形图。 3. **编程与调试**:除了阅读,可能还允许用户编辑和调试程序,包括添加、修改和删除逻辑块。 4. **模拟与...
标题中的“pg141-dds-compiler_Xilinx_pg141-dds_pg141-dds-compiler_”暗示了这是一个与Xilinx公司的DDS(Direct Digital Synthesis,直接数字频率合成)编译器相关的资源,版本号可能是14.1。DDS是一种广泛应用于...
- **<component_name>_support**: 包含 <component_name>_block、时钟模块、重置模块以及 GTCOMMON 模块(针对 Xilinx 7 系列 FPGA)。 - **<component_name>**: 集成了完整的 SRIO Gen2 Endpoint 功能。 ### 逻辑...
2. PCIe协议简介:包括PCIe的版本、数据速率、通道数量以及PCIe层次结构。 3. AXI到PCIe桥接原理:描述如何将AXI总线信号转换为PCIe事务,并处理PCIe的中断和配置请求。 4. 设计考虑:如何在硬件设计中集成AXI-PCIe...
7. **电力系统安全与经济性**:对PG&E69系统进行潮流计算,有助于确保电力供应的稳定性和安全性,同时通过优化发电和输电,降低运营成本,提升整体经济效益。 综上所述,PG&E69系统的研究涉及电力系统基础理论和...
sudo mv timescaledb-<version>-pg13.<architecture>.rpm /var/cache/yum/x86_64/7/epel/packages/ sudo yum localinstall /var/cache/yum/x86_64/7/epel/packages/timescaledb-<version>-pg13.<architecture>.rpm...
标题中的“pg054-7series-pcie_pciexilinx_pg054_xilinx手册_XILINXPCIE_pcie_源码”指的是Xilinx公司为7系列FPGA设计的一份关于PCI Express(PCIe)的参考指南或设计示例。这份资料可能包含了Xilinx PCIe IP核的...
2. **查询工具**:内置的SQL编辑器支持编写、执行和保存SQL语句,还提供了代码自动完成和语法高亮等功能,以提高开发效率。 3. **备份与恢复**:pgAdmin3支持数据库的备份和恢复操作,用户可以方便地创建数据库的...
GCC的`-pg`选项可以在程序运行后生成性能数据文件,用于`gprof`工具进行性能分析。`-fprofile-arcs -ftest-coverage`则可以生成覆盖率报告,帮助评估测试的充分性。 7. **多语言支持** GCC不仅支持C和C++,还可以...
- 使用`pg_dump`进行数据库备份,如`pg_dump mydatabase > backup.sql`。 - 使用`pg_restore`恢复备份,如`pg_restore -d mydatabase backup.sql`。 8. **网络与连接**: - 修改`postgresql.conf`中的`listen_...
2. **pg_compression**: 此表列出可用的压缩方法。在数据库中,数据压缩可以节省存储空间,提高I/O性能。执行`SELECT * FROM pg_catalog.pg_compression;`来查看支持的压缩算法。 3. **pg_class**: 这是最重要的...
- **视图**: 视图是基于SQL查询的结果集,pgAdmin III允许您创建、修改和删除视图,方便数据查询和分析。 - **索引**: 为提高查询性能,可以为表的列创建索引。pgAdmin III支持创建B树、GiST、SP-GiST等多种类型的...