`
dcaoyuan
  • 浏览: 307327 次
社区版块
存档分类
最新评论

The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)

阅读更多

Playing with Tim's Erlang Exercise is so much fun.

I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.

Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.

This come out my third solution, which matchs whole

{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } 

likeness using the pattern:

"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]

and then fetchs

[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])

as the matched key, with no need to split the chunk to lines.

But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.

On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec

# smp enabled:
$ erlc -smp tbray3.erl
$ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt
8900    : <<"2006/09/29/Dynamic-IDE">>
2000    : <<"2006/07/28/Open-Data">>
1300    : <<"2003/07/25/NotGaming">>
800     : <<"2003/10/16/Debbie">>
800     : <<"2003/09/18/NXML">>
800     : <<"2006/01/31/Data-Protection">>
700     : <<"2003/06/23/SamsPie">>
600     : <<"2006/09/11/Making-Markup">>
600     : <<"2003/02/04/Construction">>
600     : <<"2005/11/03/Cars-and-Office-Suites">>
Time:    4142.83 ms

real    0m4.483s
user    0m5.804s
sys     0m0.615s

# no-smp:
$ erlc tbray3.erl
$ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt

real    0m7.050s
user    0m6.183s
sys     0m0.644s

The smp enable result speedup about 57%

On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:

real    0m8.420s
user    0m11.637s
sys     0m0.452s

And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.

The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.

The code:

-module(tbray3).

-compile([native]).

-export([start/1]).
  
%% The best Bin Buffer Size is 4096 * 1 - 4096 * 5
-define(BUFFER_SIZE, (4096 * 5)). 

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),
 
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file(File, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
    end.

read_file(File, Collector) -> read_file_1(File, [], 0, Collector).
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof ->
            Collector ! {chunk_num, I},
            file:close(File);
        {ok, Bin} -> 
            {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)),
            spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []).
split_on_last_newline_1(List, Tail) ->
    case List of
        []         -> {lists:reverse(List), []};
        [$\n|Rest] -> {lists:reverse(Rest), Tail};
        [C|Rest]   -> split_on_last_newline_1(Rest, [C | Tail])
    end.

collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} -> 
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.
    
print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

scan_chunk(List) -> scan_chunk_1(List, dict:new()).
scan_chunk_1(List, Dict) ->
    case List of
        [] -> Dict;
        "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] ->
            case match_until_space_newline(Rest, []) of
                {Rest1, []} -> 
                    scan_chunk_1(Rest1, Dict);
                {Rest1, Word} -> 
                    Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]),
                    scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict))
            end;
        [_|Rest] -> scan_chunk_1(Rest, Dict)
    end.
    
match_until_space_newline(List, Word) ->
    case List of
        []     -> {[],   []};
        [10|_] -> {List, []};
        [$.|_] -> {List, []};
        [$ |_] -> {List, lists:reverse(Word)};
        [C|Rest] -> match_until_space_newline(Rest, [C | Word])
    end.

I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.

The code:

-module(tbray3_bin).

-compile([native]).

-export([start/1]).

-define(BUFFER_SIZE, (4096 * 10000)).

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    {ok, File} = file:open(FileName, [raw, binary]),    
    read_file(File, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.
    
collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
          
read_file(File, Collector) -> read_file(File, <<>>, 0, Collector).            
read_file(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof -> 
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            {Data, NextTail} = split_on_last_newline(Bin),
            spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            read_file(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(Bin) -> split_on_last_newline(Bin, size(Bin)).   
split_on_last_newline(Bin, Offset) ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ when Offset =< 0 -> 
            {Bin, <<>>};
        _ -> 
            split_on_last_newline(Bin, Offset - 1)
    end.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,_,_/binary>> ->
            match_until_space_newline(Bin, Offset + 1);
        _ -> 
            {<<>>, <<>>}
    end. 

分享到:
评论

相关推荐

    erlang-sd_notify-1.0-2.el7.x86_64.rpm

    erlang-sd_notify-1.0-2.el7.x86_64.rpm

    esl-erlang-26.0.2x86-64.zip

    error: /tmp/esl-erlang_26.0.2-1~alinux~3_x86_64.rpm : not an rpm package (or packape manifest): arning: /tmp/rabbitmg-erver 3.12.4-1 all.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY ...

    erlang-23.2.3-1.el7.x86_64.rpm和erlang-23.2.1-1.el7.x86_64.rpm.rar

    标题中的"erlang-23.2.3-1.el7.x86_64.rpm"和"erlang-23.2.1-1.el7.x86_64.rpm.rar"代表了两个不同的Erlang版本。`.rpm`文件是用于Red Hat Enterprise Linux (RHEL) 和其衍生发行版如CentOS的软件包管理格式,而`....

    erlang-22.3-1.el7.x86_64.rpm

    在标题"erlang-22.3-1.el7.x86_64.rpm"中,我们可以解读出几个关键信息: 1. **版本号**:22.3-1表示这是Erlang的一个特定版本,22.3是主版本号和次版本号,而-1可能代表修订或更新次数。 2. **操作系统兼容性**:...

    最新版erlang-23.3.4.3-1.el7.x86_64.rpm(CentOS7)

    - 使用yum安装Erlang:`sudo yum install erlang-23.3.4.3-1.el7.x86_64.rpm` 2. **安装RabbitMQ**: - 下载RabbitMQ RPM包:`wget https://your_download_link/rabbitmq-server-3.8.17-1.el7.noarch.rpm` - ...

    erlang-erts-19.3.6.4-1.el7.x86_64.rpm

    erlang-erts-19.3.6.4-1.el7.x86_64.rpm

    esl-erlang-24.2.1-1-centos-7-amd64.rpm

    esl-erlang_24.2.1-1_centos_7_amd64.rpm

    erlang-xmerl-22.3-1.el7.x86_64.rpm 镜像下载

    erlang-xmerl-22.3-1.el7.x86_64.rpm,rabbitMQ安装需要依赖此环境。Erlang 是一种多用途编程语言,主要用于开发并发和分布式系统。它最初是一种专有的编程语言,Ericsson 使用它来开发电话和通信应用程序。

    esl-erlang_16.b.3-2~centos~6_amd64.rpm

    esl-erlang_16.b.3-2~centos~6_amd64.rpm

    erlang-19.3.6.4-1.el6.x86_64.rpm

    erlang-19.3.6.4-1.el

    esl-erlang-25.0.3-1-centos-7-amd64.rpm erlang-25.0.3-1 rpm

    esl-erlang_25.0.3-1_centos_7_amd64.rpm erlang_25.0.3-1 rpm用于rabitmq基础环境安装

    最新最全rabbitmq与erlang版本匹配-2020-04-23.docx

    **RabbitMQ与Erlang版本匹配指南** RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于Erlang编程语言构建。Erlang以其并发能力、容错性和分布式特性而闻名,是实现RabbitMQ的理想选择。正确地匹配RabbitMQ...

    erlang-rpm-21.3.4.zip

    "erlang-rpm-21.3.4.zip"是一个包含Erlang版本21.3.4的RPM(Red Hat Package Manager)包的压缩文件,适用于Linux操作系统,尤其是基于RPM包管理系统的发行版,如CentOS、Fedora和RHEL等。 Erlang的主要特点包括: ...

    erlang-18.3-1.el7.centos.x86_64.zip

    在标题中的"erlang-18.3-1.el7.centos.x86_64.zip",我们看到的是Erlang的一个特定版本,18.3,针对64位的CentOS 7操作系统(el7)的安装包。这个版本的Erlang包含了必要的组件和服务,使得它能够在CentOS 7上稳定...

    erlang-xmerl-22.2-1.el7.x86_64.rpm

    erlang-xmerl-22.2-1.el7.x86_64.rpm,rabbitMQ安装需要依赖此环境。Erlang 是一种多用途编程语言,主要用于开发并发和分布式系统。它最初是一种专有的编程语言,Ericsson 使用它来开发电话和通信应用程序。

    erlang-23.2.1-1.el7.x86-64.rpm

    Erlang:RabbitMQ 是用 Erlang 编写的,因此需要 Erlang 运行时。确保安装了兼容的 Erlang 版本;Erlang:RabbitMQ 是用 Erlang 编写的,因此需要 Erlang 运行时。确保安装了兼容的 Erlang 版本;Erlang:RabbitMQ ...

    erlang-xmerl-23.0.2-2.el7.x86_64.rpm

    erlang-xmerl-23.0.2-2.el7.x86_64.rpm,rabbitMQ安装需要依赖此环境。Erlang 是一种多用途编程语言,主要用于开发并发和分布式系统。它最初是一种专有的编程语言,Ericsson 使用它来开发电话和通信应用程序。

    ErlangB和ErlangC计算工具(exe可执行文件+excel两个)

    Erlang B和Erlang C是电信领域中两种重要的流量模型,用于预测和分析通信系统中的呼叫处理能力和拥塞情况。这两个模型由丹麦工程师Agner Krarup Erlang在20世纪初提出,至今仍广泛应用于现代通信网络的设计与优化。 ...

    最新最全rabbitmq与erlang版本匹配-2020-04-23.ods

    最新最全rabbitmq与erlang版本匹配-2020-04-23

    erlang_版本24.3.4.4

    Erlang是一种面向并发的、函数式编程语言,由瑞典电信设备制造商Ericsson开发,主要用于构建高可用性、分布式和实时系统。版本24.3.4.4是Erlang的一个更新版本,包含了对先前版本的改进和修复。Erlang以其强大的错误...

Global site tag (gtag.js) - Google Analytics