`
dcaoyuan
  • 浏览: 306358 次
社区版块
存档分类
最新评论

Tim Bray's Erlang Exercise on Large Dataset Processing

阅读更多

Updated Oct 10: pread_file/5 should open a new FIle process each cycle
Updated Oct 05: Wrote a new version, which is more parallelized-likeness.
Updated Sep 27: Per Gustafsson gave a better solution, it took 5.155 seconds on my computer, with all record maching tasks done. And, get all lines ready without record matching only took 1.58 seconds.
Updated Sep 27: The best result on my computer is 5.188 seconds, after added -compile([native])., with: tbray:start(20, "o1000k.ap").

Tim's Wide Finder Project tried Erlang on large log file (around 200M):

Where, the Erlang code took more than 30 seconds to fetch the matched records.

I'm with a lot of interesting on Erlang's efficiency to process large dataset, so I tried several testing, and got some conclutions.

First, the file io itself in Erlang is reasonable fast, to read a 200M file into memory, using file:read_file/1 took less than 800ms on my computer.

But, to process the dataset, you can not avoid to travel the whole dataset, to find newline marks, to match the express etc.

I wrote a piece of code to travel the whole dataset in binary form, as simple as just count it byte by byte. Well, it took about 35s for a 200M file, seems travel a binary byte by byte is the major cost.

Thomas gave some hints on how to travel a binary efficiently, here.

And Bjorn from OTP term gave some hints too.

Yes, the most efficient data struct in Erlang to travel dataset byte by byte is List.

Let's take some testing:

travel_bin(Bin) -> travel_bin(Bin, 0). 
travel_bin(<<>>, ByteCount) -> ByteCount;
travel_bin(<<$\n, Rest/binary>>, ByteCount) ->
    travel_bin(Rest, ByteCount + 1); 
travel_bin(<<_C, Rest/binary>>, ByteCount) ->
    travel_bin(Rest, ByteCount + 1). 

travel_list(List) -> travel_list(List, 0). 
travel_list([], CharCount) -> CharCount;
travel_list([$\n|Rest], CharCount) -> 
    travel_list(Rest, CharCount + 1);
travel_list([_C|Rest], CharCount) ->
    travel_list(Rest, CharCount + 1). 

When apply to a 20M file, we got:

> {ok, Bin} = file:read_file("o100k.ap").
{ok,<<"host-24-225-218-245.patmedia.net - - [01/Oct/2006:06:33:45 -0700] \"GET /ongoing/ongoing.atom HTTP/1.1\" 304 -"...>>}
> timer:tc(tbray, travel_bin, [Bin]).
{2787402,20099550}
> timer:tc(tbray, travel_list, [binary_to_list(Bin)]).
{370906,20099550}

(Updated Oct 7: The statements about travel_list below are not quite accurate, the test of travel_list actually did not include the time taken by binary_to_list. The story of "Binary vs List in Erlang" is bit complex, the results vary depending on data size a lot. I'll post another article talking about it)

Where, travel_bin took about 2787.402ms, and travel_list took about 370.906ms (including the time costed to apply binary_to_list).

Pretty good result for travel_list, which was about 13% time costed comparing to travel_bin.

But, List is memory eater than Binary. Yes, when you try to apply above code to a file with 200M size, scene changes a lot:

> f(Bin).                                             
ok
> {ok, Bin} = file:read_file("o1000k.ap").
{ok,<<"host-24-225-218-245.patmedia.net - - [01/Oct/2006:06:33:45 -0700] \"GET /ongoing/ongoing.atom HTTP/1.1\" 304 -"...>>}
> timer:tc(tbray, travel_bin, [Bin]).                 
{35414374,200995500}
> timer:tc(tbray, travel_list, [binary_to_list(Bin)]).
beam.smp(25965,0x1806400) malloc: *** vm_allocate(size=1782579200) failed (error code=3) ...

Where, size of o1000k.ap is about 200M. travel_bin took 35s, travel_list crashed.

How about split large binary to pieces, then convert them to lists, and travel them?

I tried, and, it's a bit trick. The trick is the buffer size. At first, I split binary to pieces of 1024 * 1024 size, the performance was even worse. I almost dropped. But, I tried more, when I adjusted the buffer size to 4096, this solution shines.

And finally, with a parallel file reader, I got an answer to Tim's exercise, plus a simple express matching, for an 1 million lines file (200M size), is 8.311 seconds when -smp enable, and 10.206 seconds when smp disabled.

My computer is a 2.0G 2-core MacBook, I'd like a see a result on more-core machine :-)

The code:

-module(tbray).

-compile([native]).

-export([start/2,
         collect_loop/2,
         buffered_read/3]).

-include_lib("kernel/include/file.hrl").

%% The best BUFFER_SIZE is 4096
-define(BUFFER_SIZE, 4096).

-record(context, {lineBuf = [], 
                  matched = 0,
                  total = 0,
                  lastProcessedSeq = 0,
                  dataBuf = [],
                  processNum}).

%% erl -smp enable +P 60000
%% timer:tc(wide, start, [1000, "o1000k.ap"]).   
start(ProcessNum, FileName) ->
    statistics(wall_clock),  
    {ok, FileInfo} = file:read_file_info(FileName), 
    Size = FileInfo#file_info.size,
    
    Collect = spawn(?MODULE, collect_loop, [self(), #context{processNum = ProcessNum}]),

    psplit_read_file(Collect, FileName, Size div ProcessNum, ProcessNum, 1),
    
    {Matched, Total} =
      receive
          #context{matched=MatchedX, total=TotalX} -> {MatchedX, TotalX}
      end,

    {_, Duration2} = statistics(wall_clock),
    io:format("scan lines:\t ~pms~nMatched: ~B, Total: ~B~n", [Duration2, Matched, Total]). 

psplit_read_file(_Collector, _FileName, _ChunkSize, ProcessNum, I) when I > ProcessNum -> done;
psplit_read_file(Collector, FileName, ChunkSize, ProcessNum, I) ->
    spawn(
      fun () ->
              Offset = ChunkSize * (I - 1),
              %% if it's last chuck, read all bytes left, which will not exceed ChunkSize * 2
              Length = if  I == ProcessNum -> ChunkSize * 2;
                           true -> ChunkSize
                       end,
              {ok, File} = file:open(FileName, [read, binary]),         
              {ok, Data} = file:pread(File, Offset, Length),
              Collector ! {I, Data}
      end),
    psplit_read_file(Collector, FileName, ChunkSize, ProcessNum, I + 1).

collect_loop(Pid, #context{lastProcessedSeq= ProcessNum,
                           processNum=ProcessNum}=Context) -> Pid ! Context;
collect_loop(Pid, #context{dataBuf=DataBuf}=Context) ->
    receive
        {Seq, Data} ->
            SortedDatas = lists:keysort(1, [{Seq, Data} | DataBuf]),
            Context1 = process_arrived_datas(SortedDatas, Context#context{dataBuf = []}),
            %io:format("Last processed Seq: ~B~n", [Context1#context.lastProcessedSeq]),
            collect_loop(Pid, Context1)
    end.

process_arrived_datas([], Context) -> Context;
process_arrived_datas([{Seq, Data}|T], #context{lineBuf=LineBuf,
                                                matched=Matched,
                                                total=Total,
                                                lastProcessedSeq=LastProcessedSeq, 
                                                dataBuf=DataBuf}=Context) ->
    if  Seq == LastProcessedSeq + 1 ->
            {LineBuf1, Matched1, Total1} = buffered_read(
              fun (Buffer, {LineBufX, MatchedX, TotalX}) ->
                      scan_line(binary_to_list(Buffer), LineBufX, MatchedX, TotalX)
              end, {LineBuf, Matched, Total}, Data),
            process_arrived_datas(T, Context#context{lineBuf = LineBuf1,
                                                     matched = Matched1, 
                                                     total = Total1,
                                                     lastProcessedSeq = Seq});
        true -> 
            process_arrived_datas(T, Context#context{dataBuf = [{Seq, Data} | DataBuf]})
    end.

buffered_read(Fun, Acc, Bin) ->
    case Bin of
        <<Buf:?BUFFER_SIZE/binary, Rest/binary>> ->
            Acc1 = Fun(Buf, Acc),
            buffered_read(Fun, Acc1, Rest);
        _ ->
            Fun(Bin, Acc)
    end.
    
scan_line([], LineBuf, Matched, Total) -> {LineBuf, Matched, Total};
scan_line([$\n|Rest], LineBuf, Matched, Total) -> 
    Line1 = lists:reverse(LineBuf),
    %io:format("~n~s~n", [Line1]),
    Matched1 = Matched + process_match(Line1),
    scan_line(Rest, [], Matched1, Total + 1);
scan_line([C|Rest], LineBuf, Matched, Total) ->
    scan_line(Rest, [C | LineBuf], Matched, Total).

process_match([]) -> 0;
process_match("GET /ongoing/When/"++Rest) ->
    case match_until_space(Rest, false) of
  true  -> 0;
  false -> 1
    end;
process_match([_H|Rest]) -> 
    process_match(Rest).
    
match_until_space([$\040|_Rest], Bool) -> Bool;
match_until_space([$.|_Rest], _Bool) -> true;
match_until_space([_H|Rest], Bool) -> 
    match_until_space(Rest, Bool).

Some hints:

The solution spawns a lot of processes to read the file to binary in parallel. Then send them to a collect_loop, collect_loop will buffered_read each chunk (when chunks order is correct), buffered_read then converts each binary to small (4096 bytes here) lists, the scan_line will merge them to lines, and process_match on line.

As I mentioned before, handle a short string line in Erlang is fast, so I do not fork process_match to processes.

The code can handle very large files.

The matching code may not be correct, and does not finish all tasks that Tim wants.

分享到:
评论

相关推荐

    R语言并行计算RC~bray-curtis~距离

    之前我们介绍了计算beta-NTI(beta nearest taxon index)来进行群落构建分析。|beta-NTI| &gt;2说明决定性过程主导,其中beta-NTI &gt;2说明OTU的遗传距离发散,为生物交互作用主导,beta-NTI 则说明OUT的遗传距离收敛为...

    BRAY VAL-SRS系列电动执行器说明书.pdf

    根据提供的文件信息,这份BRAY VAL-SRS系列电动执行器说明书包含了该系列产品的详细参数、特点、接线指导、认证信息、安装指南和维护指南。以下是针对这些方面的详细知识点: 1. 产品特点与优势: - 高输出力和关闭...

    pjm-v231-n1-p05

    We obtain the isoperimetric ...ing recent work of Bray and Morgan on isoperimetric comparison. We then discuss these results in the context of Bray’s isoperimetric approach to the Penrose inequality.

    node-feedparser:Node.js中强大的RSS,Atom和RDF feed解析

    它解析相对URL(例如Tim Bray的“进行中”看到的URL)。 它可以正确处理XML名称空间(包括在非常规feed中为主要feed元素定义非默认名称空间的XML名称空间)。 安装 npm install feedparser 用法 本示例只是为了...

    Bluetooth Application Developer's Guide.

    ### Bluetooth Application Developer's Guide 关键知识点解析 #### 标题:Bluetooth Application Developer's Guide - **核心主题**:本指南旨在为蓝牙应用开发者提供全面、深入的技术指导。 - **目标受众**:...

    LegalObstacleStatusCode:报告法律障碍的 HTTP 状态代码

    法律障碍状态代码 这是用于报告法律障碍的 HTTP 状态代码的拟议规范的... 原始网址: 此存储库用于对 Tim Bray 的草稿进行提议的更改。 具有拟议更改的当前规范位于: 通过标准的 fork 和 pull 方法提交任何更改。

    bertha-开源

    Bertha是用于高端存储子系统的I / O基准测试工具。 基于Tim Bray的Bonnie基准测试工具,并进行了三项增强:1)生成更多的I / O; 2)提供重播I / O事务的工具;以及3)广泛的指标报告。

    Web 2.0 Heros

    Web 2.0人物访谈录。Wiley Web 2.0 Heroes Interviews ...19 Bob Brewin & Tim Bray: Sun Microsystems . . . . . . . . . . . . . . 229 20 Michele Turner:Adobe Systems Incorporated . . . . . . . . . . . . 243

    编程狂人第十二期

    其中,Tim Bray所分享的2014年软件之路的文章,对软件开发的各个环节进行了深入的剖析。 后端架构板块集中讨论了包括MongoDB与内存的关系、Hibernate新机制TableGenerator、高效CDN架构的设计、推荐引擎的内部秘密...

    蓝牙应用开发指南 - Bluetooth Application Developer’s Guide

    《蓝牙应用开发指南》是一本关于蓝牙技术开发的专业书籍,由David Kammer、Gordon McNutt、Brian Senese和Jennifer Bray共同编写,Syngress Publishing出版。本书旨在为读者提供全面而深入的蓝牙应用开发知识,并...

    IT历史上最有影响力的150人

    Tim Bray在他的博客“Ongoing”中提及Adam Bosworth,认为他是全球顶尖20位软件人士之一。Bosworth因其在Quattro Pro、Microsoft Access和Internet Explorer 4等项目中的贡献而闻名,之后在BEA和Google担任重要职务...

    XML简易教程之二

    这里提到的Lark是一个优秀的XML分析器,由Tim Bray创建,他是XML规范的技术编辑,对XML有着深厚的理解。Lark分析器能帮助用户快速定位和修复XML文档中的错误,提高工作效率。 举个例子,假设我们有一个包含菜谱的...

    XML DTD官方英文文档

    文件中还提到XML的编辑人员,他们分别是来自Textuality和Netscape的Tim Bray,来自Microsoft的Jean Paoli,来自University of Illinois at Chicago和Text Encoding Initiative的C.M. Sperberg-McQueen,以及来自Sun ...

    nu.xom xom xom.jar -1.2.3

    它由天文学家和软件开发者Tim Bray发起,并以其强大的类型安全、API简洁和高性能而受到开发者的欢迎。XOM的核心设计理念是提供一个既简单又强大的接口来处理XML文档,使得XML解析、操作和序列化变得更加直观。 在...

    Self-modeling as a treatment for increasing on-task behavior

    Self-modeling as a treatment for increasing on-task behavior SELF-MODELING AS A TREATMENT FOR INCREASING ON-TASK BEHAVIOR susan k. clare Clovis Unified School District, Clovis, California ...

    C++实现正态随机分布的方法

    常用的成熟的生成高斯分布随机数序列的方法由Marsaglia和Bray在1964年提出,C++版本如下: 代码如下:#include &lt;stdlib&gt;#include &lt;math.h&gt; double gaussrand(){ static double V1, V2, S; static int phase =...

    电动执行器.pdf

    电动执行器pdf,电动执行器

    Algebraic Topology

    ### 代数拓扑学概览 #### 一、引言 代数拓扑是数学的一个分支,它利用代数工具研究拓扑空间的性质。本书由Allen Hatcher撰写,是一本广泛使用的教材,适用于拥有坚实数学背景的研究生以及其它科学领域的专业人士。...

Global site tag (gtag.js) - Google Analytics