The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)
Playing with Tim's Erlang Exercise is so much fun.
I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.
Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.
This come out my third solution, which matchs whole
{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) }
likeness using the pattern:
"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]
and then fetchs
[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])
as the matched key, with no need to split the chunk to lines.
But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.
On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec
# smp enabled: $ erlc -smp tbray3.erl $ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt 8900 : <<"2006/09/29/Dynamic-IDE">> 2000 : <<"2006/07/28/Open-Data">> 1300 : <<"2003/07/25/NotGaming">> 800 : <<"2003/10/16/Debbie">> 800 : <<"2003/09/18/NXML">> 800 : <<"2006/01/31/Data-Protection">> 700 : <<"2003/06/23/SamsPie">> 600 : <<"2006/09/11/Making-Markup">> 600 : <<"2003/02/04/Construction">> 600 : <<"2005/11/03/Cars-and-Office-Suites">> Time: 4142.83 ms real 0m4.483s user 0m5.804s sys 0m0.615s # no-smp: $ erlc tbray3.erl $ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt real 0m7.050s user 0m6.183s sys 0m0.644s
The smp enable result speedup about 57%
On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:
real 0m8.420s user 0m11.637s sys 0m0.452s
And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.
The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.
The code:
-module(tbray3). -compile([native]). -export([start/1]). %% The best Bin Buffer Size is 4096 * 1 - 4096 * 5 -define(BUFFER_SIZE, (4096 * 5)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) end. read_file(File, Collector) -> read_file_1(File, [], 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> Collector ! {chunk_num, I}, file:close(File); {ok, Bin} -> {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)), spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []). split_on_last_newline_1(List, Tail) -> case List of [] -> {lists:reverse(List), []}; [$\n|Rest] -> {lists:reverse(Rest), Tail}; [C|Rest] -> split_on_last_newline_1(Rest, [C | Tail]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. scan_chunk(List) -> scan_chunk_1(List, dict:new()). scan_chunk_1(List, Dict) -> case List of [] -> Dict; "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] -> case match_until_space_newline(Rest, []) of {Rest1, []} -> scan_chunk_1(Rest1, Dict); {Rest1, Word} -> Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]), scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict)) end; [_|Rest] -> scan_chunk_1(Rest, Dict) end. match_until_space_newline(List, Word) -> case List of [] -> {[], []}; [10|_] -> {List, []}; [$.|_] -> {List, []}; [$ |_] -> {List, lists:reverse(Word)}; [C|Rest] -> match_until_space_newline(Rest, [C | Word]) end.
I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.
-module(tbray3_bin). -compile([native]). -export([start/1]). -define(BUFFER_SIZE, (4096 * 10000)). start(FileName) -> Start = now(), Main = self(), Collector = spawn(fun () -> collect_loop(Main) end), {ok, File} = file:open(FileName, [raw, binary]), read_file(File, Collector), %% don't terminate, wait here, until all tasks done. receive stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000]) end. collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0). collect_loop_1(Main, Dict, ChunkNum, ChunkNum) -> print_result(Dict), Main ! stop; collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> receive {chunk_num, ChunkNumX} -> collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum); {dict, DictX} -> Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) end. print_result(Dict) -> SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. read_file(File, Collector) -> read_file_1(File, <<>>, 0, Collector). read_file_1(File, PrevTail, I, Collector) -> case file:read(File, ?BUFFER_SIZE) of eof -> file:close(File), Collector ! {chunk_num, I}; {ok, Bin} -> {Data, NextTail} = split_on_last_newline(Bin), spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end), read_file_1(File, NextTail, I + 1, Collector) end. split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). split_on_last_newline_1(Bin, Offset) when Offset > 0 -> case Bin of <<Data:Offset/binary,$\n,Tail/binary>> -> {Data, Tail}; _ -> split_on_last_newline_1(Bin, Offset - 1) end; split_on_last_newline_1(Bin, _) -> {Bin, <<>>}. scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()). scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 -> case Bin of <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> -> case match_until_space_newline(Rest, 0) of {Rest1, <<>>} -> scan_chunk_1(Rest1, 0, Dict); {Rest1, Word} -> Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>, scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict)) end; _ -> scan_chunk_1(Bin, Offset + 1, Dict) end; scan_chunk_1(_, _, Dict) -> Dict. match_until_space_newline(Bin, Offset) when Offset < size(Bin) -> case Bin of <<Word:Offset/binary,$ ,Rest/binary>> -> {Rest, Word}; <<_:Offset/binary,$.,Rest/binary>> -> {Rest, <<>>}; <<_:Offset/binary,10,Rest/binary>> -> {Rest, <<>>}; _ -> match_until_space_newline(Bin, Offset + 1) end; match_until_space_newline(_, _) -> {<<>>, <<>>}.
![(please configure the [header_logo] section in trac.ini)](/chrome/site/blog_logo.png)
rss
Comments
You should be aware that its not a 158% speed increase between smp and non-smp. It's just 58% or: 0,57919930374238468233246301131419 to be more exact.
(7.258 - 4.596) / 4.596
I mean, how can you have a 158% speed increase by using a another core? That would send some signals that erlang would have some severe flaws in is VM. An increase above 100% by enabling smp is basicly not possible. You should get at most 90% on common operating systems. Ofcourse you can use cpu anfinity and such to dedicate processors, but still. Something above 100% would be weird.
Best regards
Viktor Müntzing
Hi Caoyuan, I posted this same comment over on my blog, in response to your comment there. I grabbed your code from above and indeed, it is fast on my Macbook Pro, where the best time I saw was 5.063 sec as measured by the bash time command. However, if I run your code on my 8-core Linux box, it actually gets slower. I ran it in a loop 10 times, and the best time I saw was 13.872 sec, and user/CPU time was only 16.150 sec, so it’s apparently not using the multiple cores very well. Maybe I’ll poke through it to find out why.
You are right, I've corrected it :-)
Steve,
I tried to resolve the many CPUs issue, seems concerned about file reading, so I wrote my new version:
http://blogtrader.net/page/dcaoyuan?entry=learning_coding_parallelization_was_tim