Playing with Tim's Erlang Exercise is so much fun.
I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.
Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.
This come out my third solution, which matchs whole
{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) }
likeness using the pattern:
"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]
and then fetchs
[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])
as the matched key, with no need to split the chunk to lines.
But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.
On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec
# smp enabled: $ erlc -smp tbray3.erl $ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt 8900 : <<"2006/09/29/Dynamic-IDE">> 2000 : <<"2006/07/28/Open-Data">> 1300 : <<"2003/07/25/NotGaming">> 800 : <<"2003/10/16/Debbie">> 800 : <<"2003/09/18/NXML">> 800 : <<"2006/01/31/Data-Protection">> 700 : <<"2003/06/23/SamsPie">> 600 : <<"2006/09/11/Making-Markup">> 600 : <<"2003/02/04/Construction">> 600 : <<"2005/11/03/Cars-and-Office-Suites">> Time: 4142.83 ms real 0m4.483s user 0m5.804s sys 0m0.615s # no-smp: $ erlc tbray3.erl $ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt real 0m7.050s user 0m6.183s sys 0m0.644s
The smp enable result speedup about 57%
On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:
real 0m8.420s user 0m11.637s sys 0m0.452s
And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.
The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.
The code:
-module(tbray3). -compile([native]). -export([start/1]). %% The best Bin Buffer Size is 4096 * 1 - 4096 * 5 -define(BUFFER_SIZE, (4096 * 5)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(File, Collector) -> read_file_1(File, [], 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> Collector ! {chunk_num, I}, file:close(File); {ok, Bin} -> {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)), spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []). split_on_last_newline_1(List, Tail) -> case List of [] -> {lists:reverse(List), []}; [$\n|Rest] -> {lists:reverse(Rest), Tail}; [C|Rest] -> split_on_last_newline_1(Rest, [C | Tail]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. scan_chunk(List) -> scan_chunk_1(List, dict:new()). scan_chunk_1(List, Dict) -> case List of [] -> Dict; "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> case match_until_space_newline(Rest, []) of {Rest1, []} -> scan_chunk_1(Rest1, Dict); {Rest1, Word} -> Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]), scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict)) end; [_|Rest] -> scan_chunk_1(Rest, Dict) end. match_until_space_newline(List, Word) -> case List of [] -> {[], []}; [10|_] -> {List, []}; [$.|_] -> {List, []}; [$ |_] -> {List, lists:reverse(Word)}; [C|Rest] -> match_until_space_newline(Rest, [C | Word]) end.
I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.
The code:
-module(tbray3_bin). -compile([native]). -export([start/1]). -define(BUFFER_SIZE, (4096 * 10000)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. read_file(File, Collector) -> read_file(File, <<>>, 0, Collector). read_file(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, Bin} -> {Data, NextTail} = split_on_last_newline(Bin), spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), read_file(File, NextTail, I + 1, Collector) end. split_on_last_newline(Bin) -> split_on_last_newline(Bin, size(Bin)). split_on_last_newline(Bin, Offset) -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ when Offset =< 0 -> {Bin, <<>>}; _ -> split_on_last_newline(Bin, Offset - 1) end. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,_,_/binary>> -> match_until_space_newline(Bin, Offset + 1); _ -> {<<>>, <<>>} end.
