mnesia作为一个天生分布式的数据库,为保证各个节点数据的一致性,会对于每个非读操作进行节点间的数据同步。
dirty_write也是要同步数据的,他和普通写操作的区别在于:
1.对操作不加锁。
2.同步数据请求不要求回执。
这样就会大大的增强操作的执行效率。在调用dirty_write后,mnesia会给除自己以外的所有联通节点发送消息,告诉他们应该执行的操作,然后自己再执行mnesia_tm:do_dirty。其他节点收到同步数据的请求后,也会调用mnesia_tm:do_dirty。
然后我做了一下测试分别是在不同节点数的情况下,执行100000个并发进程去dirty_write同一个mnesia表,结果如下:
1.当单独一个节点时,并发100000个进程去脏写mnesia,平均每次的耗时是18微秒左右。
2.但是2个节点的话,并发100000个进程去脏写mnesia,平均每次的耗时是54微秒左右。
3.而3个节点的话,并发100000个进程去脏写mnesia,平均每次的耗时是82微秒左右。
也就是说,发送同步消息的消耗会随着节点数的上升而提高。如果我们的项目节点书在10以上的时候,整个一个do_dirty操作会高到不能忍受的地步,这该怎么办呢?
要想知道解决办法我们还要在分析一下dirty_write的流程来找到哪里是整个操作的瓶颈。
1.最开始猜测是发送消息多个节点多了20-30微秒,可是实际经过我的测试,一个节点向另一个节点发消息,10万并发,平均2微秒左右。那就不对了,多余的10多微秒都干嘛去了呢?为了确认是否是发送消息影响了整个流程,我改了mnesia_tm这个module的代码,把发送消息的代码注释掉了,然后重复上面的测试,3个节点的情况下,平均耗时缩小到了25微秒左右。也就是说确实是发送消息占用了多余的时间。那为什么单独测试消耗的时间很小呢,我猜测应该是单独测并发多少万,如果操作简单的话,实际上同时运行的进程数很少,因为有可能第1000个进程还没启动第一个进程就运行完毕了呢,这样的话同时并发根本到不了1000以上,所以测试并发消息发送的那个测试并不准确。而mnesia的测试,由于整个流程的占用时间很长,所以同时并发的进程数可能达到了几万,那么同时消息发送可能达到几万,那么占用时间就相应的增加了。结论就是erlang的send操作,如果并发量越大,那么每个send操作占用的时间越长。所以发送消息这边没法提高速度,现有情况已经是最优的了。
2.那么剩下来能优化的地方就是mnesia_tm:do_dirty这方面了,因为所有的节点包括本地节点都要用do_dirty来操作ets表更新数据,这方面可以通过同时操作多个ets表来避免阻塞。也就是分表。原来读写一张表变成读写多个同样表结构的表。通过实验测试:
1.还是3个节点,并且分3张表,并发100000个进程去脏写mnesia,平均每次的耗时是54微秒左右。
2.还是3个节点,并且分4张表,并发100000个进程去脏写mnesia,平均每次的耗时是28微秒左右。
效率有明显的提升。说明这个方法有效。
下面是测试的代码:
测试分表性能和多节点性能:
-module (test_mnesia_split). -export ([start/0, dw/0, tdw/0, stop/0, dw1/0, tdw1/0]). -record (test_mnesia, {userid, pid}). -record (test_mnesia1, {userid, pid}). -record (test_mnesia2, {userid, pid}). -record (test_mnesia3, {userid, pid}). start() -> NodeList = node_list(), rpc_call(mnesia, stop, [], stopped), mnesia:delete_schema(NodeList), mnesia:create_schema(NodeList), rpc_call(mnesia, start, [], ok), {atomic,ok} = mnesia:create_table(test_mnesia, [{ram_copies, NodeList}, {attributes, record_info(fields, test_mnesia)}]), {atomic,ok} = mnesia:create_table(test_mnesia1, [{ram_copies, NodeList}, {attributes, record_info(fields, test_mnesia1)}]), {atomic,ok} = mnesia:create_table(test_mnesia2, [{ram_copies, NodeList}, {attributes, record_info(fields, test_mnesia2)}]), {atomic,ok} = mnesia:create_table(test_mnesia3, [{ram_copies, NodeList}, {attributes, record_info(fields, test_mnesia3)}]), {atomic,ok} = mnesia:add_table_index(test_mnesia, pid). % rpc_call(mnesia, stop, [], stopped). stop() -> NodeList = node_list(), rpc_call(mnesia, stop, [], stopped), mnesia:delete_schema(NodeList). dw() -> <<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12), random:seed (A, B, C), Random = random:uniform(3), UserId = <<"user_1@android">>, mnesia:dirty_write(#test_mnesia{userid = UserId, pid = UserId}). tdw() -> tc:ct(?MODULE, dw, [], 100000). dw1() -> <<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12), random:seed (A, B, C), Random = random:uniform(3), UserId = <<"user_1@android">>, case Random of 1 -> mnesia:dirty_write(#test_mnesia1{userid = UserId, pid = UserId}); 2 -> mnesia:dirty_write(#test_mnesia2{userid = UserId, pid = UserId}); 3 -> mnesia:dirty_write(#test_mnesia3{userid = UserId, pid = UserId}); _ -> mnesia:dirty_write(#test_mnesia{userid = UserId, pid = UserId}) end. tdw1() -> tc:ct(?MODULE, dw1, [], 100000). %% =================================================================== %% Internal functions %% =================================================================== node_list() -> [node() | nodes()]. rpc_call(Module, Function, Args, Result) -> rpc_call(node_list(), Module, Function, Args, Result). rpc_call([H|T], Module, Function, Args, Result) -> Result = rpc:call(H, Module, Function, Args), rpc_call(T, Module, Function, Args, Result); rpc_call([], _, _, _, _) -> ok.
测试消息发送的性能:
-module (ts). -export ([start_rec/0, send/0, start/1]). start_rec() -> Pid = spawn(fun() -> rec() end), case whereis(rec) of undefined -> register(rec, Pid); _ -> ok end. rec() -> receive stop -> stop; _ -> rec() end. send() -> send(1000). send(N) when N > 1 -> {rec, 's2@192.168.1.137'} ! a, send(N - 1); send(1) -> {Microsecond, _} = timer:tc(erlang, send, [{rec, 's2@192.168.1.137'},a]), collector ! {result, Microsecond}, ok. collector(Max, Min, Sum, N, I, List) when N >= I -> receive {result, Microsecond} -> NewSum = Sum + Microsecond, if Max == 0 -> NewMax = NewMin = Microsecond; Max < Microsecond -> NewMax = Microsecond, NewMin = Min; Min > Microsecond -> NewMax = Max, NewMin = Microsecond; true -> NewMax = Max, NewMin = Min end, collector(NewMax, NewMin, NewSum, N, I + 1, [Microsecond|List]) end; collector(Max, Min, Sum, N, _, List) -> Aver = Sum / N, {Less5, Less10, Less20, Less50, Less100, Less500, Other} = distribution(List, Aver), {Max, Min, Sum, Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other}. distribution(List, Aver) -> distribution(List, Aver, 0, 0, 0, 0, 0, 0, 0). distribution([H|T], Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other) -> if H < 5 -> distribution(T, Aver, Less5 + 1, Less10, Less20, Less50, Less100, Less500, Other); H < 10 -> distribution(T, Aver, Less5, Less10 + 1, Less20, Less50, Less100, Less500, Other); H < 20 -> distribution(T, Aver, Less5, Less10, Less20 + 1, Less50, Less100, Less500, Other); H < 50 -> distribution(T, Aver, Less5, Less10, Less20, Less50 + 1, Less100, Less500, Other); H < 100 -> distribution(T, Aver, Less5, Less10, Less20, Less50, Less100 + 1, Less500, Other); H < 500 -> distribution(T, Aver, Less5, Less10, Less20, Less50, Less100, Less500 + 1, Other); true -> distribution(T, Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other + 1) end; distribution([], _Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other) -> {Less5, Less10, Less20, Less50, Less100, Less500, Other}. start(N) -> case whereis(collector) of undefined -> register(collector, self()); _ -> ok end, loop(N), {Max, Min, Sum, Aver, Less5, Less10, Less20, Less50, Less100, Less500, Other} = collector(0, 0, 0, N, 1, []), io:format ("=====================~n"), io:format ("spawn [~p] processes of erlang:send/3:~n", [N]), io:format ("Maximum: ~p(μs)\t~p(s)~n", [Max, Max / 1000000]), io:format ("Minimum: ~p(μs)\t~p(s)~n", [Min, Min / 1000000]), io:format ("Sum: ~p(μs)\t~p(s)~n", [Sum, Sum / 1000000]), io:format ("Average: ~p(μs)\t~p(s)~n", [Aver, Aver / 1000000]), io:format ("0 <= x < 5: ~p~n", [Less5]), io:format ("5 <= x < 10: ~p~n", [Less10]), io:format ("10 <= x < 20: ~p~n", [Less20]), io:format ("20 <= x < 50: ~p~n", [Less50]), io:format ("50 <= x < 100: ~p~n", [Less100]), io:format ("100 <= x < 500: ~p~n", [Less500]), io:format ("x => 500: ~p~n", [Other]), io:format ("=====================~n"). loop(N) when N > 0 -> spawn(fun ts:send/0), loop(N - 1); loop(0) -> ok.
相关推荐
erlang提供了binary_to_term 函数,用于把二进制数据转为原始的erlang数据。这个函数都是c实现的,这里用erlang语言实现了,很有参考价值,其他语言可以参考这个解析erlang二进制协议数据。配套文章地址...
esl-erlang_23.0和rabbitmq-3.8.4windows版本 直接下载安装就行,可以直接下载就可安装,非常的方便 ,欢迎大家下载 注意事项: 1. Erlang版本和RabbitMQ版本要配套 (Erlang23.0, RabbitMQ3.8.4) 2. amd芯片请乖乖...
8.附录.A:Mnesia.错误信息 8.1.Mnesia.中的错误 9.附录.B:备份回调函数接口 9.1.Mnesia.备份回调行为 10.附录.C:作业存取回调接口 10.1.Mnnesia.存取回调行为 11.附录.D:分片表哈希回调接口 11.1....
Erlang OTP 20.3 是一个针对Linux CentOS系统的软件包,主要为开发者提供了一个强大的并发和分布式计算环境。Erlang是一种静态类型的、函数式的编程语言,它以其在处理高并发、容错和实时系统方面的优秀表现而闻名。...
Erlang_x64_20.1是Erlang OTP (Open Telephony Platform)的一个特定版本,专为64位Windows操作系统设计。 Erlang OTP是一个包含Erlang编译器、运行时系统、库和工具的完整平台。OTP提供了许多关键特性,如进程间...
这个erlang23.0版本,根据rabbitMQ官网的介绍,可以和下面这几个版本的rabbitMQ配合使用: 3.8.9 3.8.8 3.8.7 3.8.6 3.8.5 3.8.4 其他版本的rabbit,请移步其他资源下载
1. **Mnesia**:Mnesia 是 OTP 提供的一个分布式数据库管理系统,支持事务和实时查询,适合在分布式环境中存储和管理数据。 2. **GenServer**:GenServer 是一个行为模块,提供了一种模式来实现服务器进程,它能够...
Erlang OTP (Open Telephony Platform) 是一个用于构建高度并发、分布式和容错系统的软件框架,由瑞典的Ericsson公司开发。OTP提供了一个强大的编程环境,特别适合于实时通信系统和大规模网络应用。在本场景中,"otp...
dirty_scheduler 不要忘记使用“ --enable-dirty-schedulers”... 从Erlang 17.03开始,enif_schedule_dirty_nif,enif_schedule_dirty_nif_finalizer和enif_dirty_nif_finalizer被删除(erl_nif.h)。 更多信息: :
Erlang是一种强大的编程语言,尤其在分布式计算、并发处理和实时系统中有着广泛的应用。OTP(Open Telecom Platform)是Erlang的核心组件,提供了一系列的库和设计原则,用于构建可靠、可扩展的系统。在Linux CentOS...
标题中的"Erlang_win64_24.1.rar"是指Erlang的64位Windows版,版本号为24.1.7。这个压缩包文件是专为在Windows操作系统上安装和运行Erlang而准备的,包含了所有必要的二进制文件和库,使得开发者可以在Windows平台上...
在"erlang_otp_win64_25.0"这个标题中,我们可以提取出几个关键点: 1. **Erlang**:这是一种函数式编程语言,以其在处理并发性和容错性方面的强大能力而闻名。Erlang的设计理念是让程序员能够轻松地构建能够并行...
Erlang 安装包(otp_win32_R16B03-1.part2)
Erlang是一种高级编程语言,特别为并发、分布式和实时计算系统设计,广泛应用于电信、银行、互联网服务和软件开发。Erlang以其强大的错误恢复能力和容错性著称,其虚拟机(BEAM)允许程序在不停止服务的情况下进行热...
在这个压缩包中,我们有一个名为"Erlang_B_model.pdf"的文件,它很可能是这样的手册的一部分,提供了关于Erlang B模型的理论、公式以及查表方法。 **Erlang B公式**表达了一个服务系统(如电话交换机)在固定数量的...
Erlang B 和 Erlang C 是在电信领域中广泛使用的两个数学公式,用于预测和管理电话交换系统的呼叫处理能力。这两个公式由丹麦工程师 Agner Krarup Erlang 在20世纪初开发,对于理解通信系统中的呼叫占用率、阻塞率和...
标题中的"erlang_win64_22.0"指的是Erlang OTP(Open Telecom Platform)的第22.0版本,这个版本是为64位Windows操作系统设计的。OTP是Erlang的核心库,包含了大量用于构建可靠系统和网络服务的模块。 描述中提到,...
RabbitMQ为了搭建运行环境,erlang的官网下载实在太慢,当前最新版本提交到这里提供给大家和自己下载,Erlang_win64_22.2
是erlang_otp_20.3 的win64安装包,内为exe文件,一路next即可安装完成 是适用于多线程、分布式开发的语言,也是如rabbitmq等重要工具的必须品 使用前需要配置环境变量:1.变量名为ERLANG_HOME,变量值为安装Erlang...
标题“Erlang_win64_24.2 64位 rabbitmq 必要环境”暗示了我们讨论的是Erlang的特定版本——24.2,用于64位Windows操作系统,并且是运行RabbitMQ所必需的环境。RabbitMQ是一款开源的消息代理和队列服务器,它使用...