Sunday Feb 24, 2008

Erlang for NetBeans (ErlyBird) Recent Updates

Erlang for NetBeans (ErlyBird) has been put on the NetBeans' mercurial trunk. I added a changelog page on http://wiki.netbeans.org/ErlangChangelog, where you can learn the latest progressing.

Here is a summary of recent updates:

20080223

  • Instant rename, or refactoring for local vars and functions (put caret on var or function name, press CTRL+R)
  • Fixed: syntax broken for packaged import attribute
  • Fixed: syntax broken for wild attribute
  • Completion suggestions will search on project's own paths only
  • Track GSF changes: reindex performance was improved a lot; Can live with other GSF based language support now (Ruby, Groovy etc)

ErlyBird 0.16.0 will be available soon.

Saturday Jan 19, 2008

On Travel

I'm in Vancouver Airport right now, using the new free Wi-Fi service here. After four-day trip to San Francisco, I'll fly to China for the traditional Spring Festival.

I met friends in San Francisco, we are developing something using Erlang as I mentioned before. What we' are building is somehow a "switch" for content, we've successfully got a lot of different content sources to be "switched" to standard Atom/Rss etc, with a new designed template language (based in Erlang). Actually It's a pleasure to write these switching code in Erlang (Parse, Map, Mashup) with the pattern match syntax and xmerl lib.

Tuesday Nov 13, 2007

Regexp Syntax in Pattern Match?

Pattern match in Erlang is very useful, but it has some limits, for example, to match something like:

\d\d\dx/\d\d\d\d/\d\d/\d\d/

I have to write the code as:

<<_:S/binary, C1,C2,C3,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,_/binary>> when
     C1 > 47, C1 < 58, C2 > 47, C2 < 58, C3 > 47, C3 < 58,
     Y1 > 47, Y1 < 58, Y2 > 47, Y2 < 58, Y3 > 47, Y3 < 58, Y4 > 47, Y4 < 58,
     M1 > 47, M1 < 58, M2 > 47, M2 < 58, D1 > 47, D2 < 58, D2 > 47, D2 < 58

But life can be simple, by using parse_transform, we can write above code to:

<<_:S/binary,d,d,d,$x,$/,d,d,d,d,$/,d,d,$/,d,d,$/,_/binary>>

Where d is digital. The parse_trasnform can process the AST tree to make the true code.

And more, since current erl_parse supports any atom in binary skeleton, we can write pattern match as:

<<_:S/binary,'[a-z]','[^abc]','var[0-9]',$x,$/,d,d,d,d,$/,d,d,$/,d,d,$/,_/binary>>

Will somebody write such a parse_transform?

Monday Nov 05, 2007

Tim Bray's Erlang Exercise "WideFinder" - After Find Wide

===>>> Updated Nov 7:
A new version tbray9a.erl which uses ets instead of dict, with 116 LoC, took about 3.8 sec on 5 million lines log file, 0.93 sec on 1 million lines file on my 4-core linux box.

* Results on T5120, an 8-core 1.4 GHz machine with 2 integer instruction threads per core and support for 8 thread contexts per core. Solaris thinks it sees 64 CPUs:

Schedulers#Elapsed(s)User(s)System(s)(User+System)/Elapsed
137.5835.517.821.15
220.1435.318.282.16
411.8135.378.253.69
87.6335.288.335.72
165.6036.088.277.92
325.2936.648.118.46
645.4536.798.238.26
1285.2636.758.398.58

When schedulers was 16, (User+System)/Elapsed was 7.92, and the elapsed time reached 5.60 sec (near the best), so, the 8-core computing ability and 2 x 8 =16 integer instruction threads ability almost reached the maxima. The 8 x 8 thread contexts seemed to do less help on gaining more performance improvement.

It seems that T5120 is a box with 8-core parallel computing ability and 16-thread for scheduler?

On my 4-core linux box, the slowest time (1 scheduler) vs the fastest time (128 schedulers) was 6.627/3.763 = 1.76. On this T5120, was 37.58/5.26 = 7.14. So, with the 8-core and 16-integer-instruction-thread combination, the T5120 is pretty good on parallel computing.

An interesting point is, when schedulers increased, Elpased time dropped along with User time and System time keeping almost constancy. This may because I separated the parallelized / sequential part completely in my code.

* Results on 2.80Ghz 4-core Intel xeon linux box (5 million lines log file):

Schedulers#Elapsed(s)User(s)System(s)(User+System)/Elapsed
16.6275.3564.2481.45
24.4866.1763.9362.25
44.2998.9894.1563.06
83.9609.6293.6443.35
163.8269.1013.6963.34
323.8589.0293.8403.34
643.7638.8013.8203.35
1283.9209.1373.9803.35


========

I'm a widefinder these days, and after weeks found wide, I worte another concise and fast widefinder tbray9.erl, which is based on Steve, Anders and my previous code, with Boyer-Moore searching (It seems Python's findall uses this algorithm) and parallelized file reading. It's in 120 LoC (a bit shorter than Fredrik Lundh's wf-6.py), took about 1 sec for 1 million lines log file, and 5.2 sec for 5 million lines on my 4-core linux box. Got 5.29 sec on T5120 per Tim's testing.

To evaluate:

erlc -smp tbray9.erl
erl -smp +A 1024 +h 10240 -noshell -run tbray9 start o1000k.ap

BTW since I use parallelized io heavily, by adding flag +A 1024, the code can get 4.3 sec for 5 million lines log file on my 4-core linux box.

Binary efficiency in Erlang is an interesting topic, except some tips I talked about in previouse blog, it seems also depending on binary size, memory size etc., The best buffer size for my code seems to be around 20000k to 80000k, which is the best range on my 4-core linux box and T5120, but it may vary for different code.

Note: There is a maximum element size limit of 2^27 - 1 (about 131072k) for binary pattern matching in current Erlang, this would be consistent with using a 32-bit word to store the size value (with 4 of those bits used for a type identifier and 1 bit for a sign indicator) (For this topic, please see Philip Robinson's blog). So, the buffer size can not be great than 131072k.

I. Boyer-Moore searching

Thanks to Steve and Anders, they've given out a concise Boyer-Moore searching algorithm in Erlang, I can modify it a bit to get a general BM searching module for ASCII encoded binary:

%% Boyer-Moore searching on ASCII encoded binary
-module(bmsearch).
-export([compile/1, match/3]).

-record(bmCtx, {pat, len, tab}).

compile(Str) ->
    Len = length(Str),
    Default = dict:from_list([{C, Len} || C <- lists:seq(1, 255)]),
    Dict = set_shifts(Str, Len, 1, Default),
    Tab = list_to_tuple([Pos || {_, Pos} <- lists:sort(dict:to_list(Dict))]),
    #bmCtx{pat = lists:reverse(Str), len = Len, tab = Tab}.

set_shifts([], _, _, Dict) -> Dict;
set_shifts([C|T], StrLen, Pos, Dict) ->
    set_shifts(T, StrLen, Pos + 1, dict:store(C, StrLen - Pos, Dict)).

%% @spec match(Bin, Start, #bmCtx) -> {true, Len} | {false, SkipLen}
match(Bin, S, #bmCtx{pat=Pat, len=Len, tab=Tab}) -> 
    match_1(Bin, S + Len - 1, Pat, Len, Tab, 0).
match_1(Bin, S, [C|T], Len, Tab, Count) ->
    <<_:S/binary, C1, _/binary>> = Bin,
    case C1 of
        C -> 
            match_1(Bin, S - 1, T, Len, Tab, Count + 1);
        _ ->    
            case element(C1, Tab) of
                Len -> {false, Len};
                Shift when Shift =< Count -> {false, 1};
                Shift -> {false, Shift - Count}
            end
    end;
match_1(_, _, [], Len, _, _) -> {true, Len}.

Usage:

> Pattern = bmsearch:compile("is a").
{bmCtx,"a si",4,{4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,4,...}}
> Bin = <<"this is a test">>.
<<"this is a test">>
> bmsearch:match(Bin, 1, Pattern).
{false,1}
> bmsearch:match(Bin, 5, Pattern).
{true, 4}
> bmsearch:match(Bin, 7, Pattern).
{false, 4}

II. Reading file in parallel and scan

To read file in parallel, we should open a new file handle for each process. To resolve the line break bound, we just split each chunk to first line (Head), line-bounded Data, and last line (Tail), and mark Head, Tail with the serial number as {I * 10, Head}, {I * 10 + 1 Tail}, so we can join all pending segments (Head and Tail of each chunk) later in proper order.

scan_file({FileName, Size, I, BmCtx}) ->
    {ok, File} = file:open(FileName, [raw, binary]),
    {ok, Bin} = file:pread(File, Size * I, Size),
    file:close(File),
    HeadL = split_on_first_newline(Bin),
    TailS = split_on_last_newline(Bin),
    DataL = TailS - HeadL,
    <<Head:HeadL/binary, Data:DataL/binary, Tail/binary>> = Bin,
    {scan_chunk({Data, BmCtx}), {I * 10, Head}, {I * 10 + 1, Tail}}.

III. Spawn workers

Luke's spawn_worker are two small functions, they are very useful, stable and good abstract on processing workers:

spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.

So, I can start a series of workers simply by:

read_file(FileName, Size, ProcN, BmCtx) ->
    [spawn_worker(self(), fun scan_file/1, {FileName, Size, I, BmCtx})        
     || I <- lists:seq(0, ProcN - 1)].

And then collect the results by:

    Results = [wait_result(Worker) || Worker <- read_file(FileName, ?BUFFER_SIZE, ProcN1, BmCtx)],

In this case, returning Results is a list of {Dict, {Seq1, Head}, {Seq2, Tail}}

IV. Concat segments to a binary and scan it

Each chunk is slipt to Head (first line), line-bounded Data, and Tail (last line). The Head and Tail segments are pending for further processing. After all workers finished scanning Data (got a Dict), we can finally sort these pending segments by SeqNum, concat and scan them in main process.

Unzip the results to Dicts and Segs, sort Segs by SeqNum:

    {Dicts, Segs} = lists:foldl(fun ({Dict, Head, Tail}, {Dicts, Segs}) ->
                                        {[Dict | Dicts], [Head, Tail | Segs]}
                                end, {[], []}, Results),
    Segs1 = [Seg || {_, Seg} <- lists:keysort(1, Segs)],    

The sorted Segments is a list of binary, list_to_binary/1 can concat them to one binary efficently, you do not need to care about if it's a deep list, they will be flatten automatically:

    Dict = scan_chunk({list_to_binary(Segs1), BmCtx}),

Conclution

Thinking in parallel is fairly simple in Erlang, right? Most of the code can run in one process, and if you want to spawn them for parallel, you do not need to modify the code too much. In this case, scan_file/4 is sequential, which just return all you want, you can spawn a lot of workers which do scan_file/4 work, then collect the results later. That's all.

Wednesday Oct 24, 2007

Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)

>>> Updated Nov 1:
Tim tested tbray5.erl on T5120, for his 971,538,252 bytes of data in 4,625,236 lines log file, got:

real    0m20.74s
user    3m51.33s
sys     0m8.00s

The result was what I guessed, since the elapsed time of my code was 3 times of Anders' on my machine. I'm glad that Erlang performs linearly on different machines/os.

My code not the fastest. I did not apply Boyer-Moore searching, thus scan_chunk_1/4 has to test/skip binary 1byte by 1byte when not exactly matched. Anyway, this code shows how to code binary efficiently, and demos the performance of traversing binary byte by byte (the performance is not so bad now, right?). And also, it's what I want: a balance between simple, readable and speed.

Another approach for lazy man is something binary pattern match hacking, we can modify scan_chunk_1/4 to:

scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
    Offset = 
      case Bin of
          <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> 
              34;
          <<_:S/binary,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              35;
          <<_:S/binary,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              36;
          <<_:S/binary,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              37;
          <<_:S/binary,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              38;
          <<_:S/binary,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              39;
          <<_:S/binary,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              40;
          <<_:S/binary,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              41;
          <<_:S/binary,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              42;
          <<_:S/binary,_,_,_,_,_,_,_,_,_,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
              43;
          _ -> undefined
      end,
    case Offset of
        undefined -> scan_chunk_1(Bin, DataL, S + 10, Dict);
        _ ->
            case match_until_space_newline(Bin, S + Offset) of
                {true, E} ->
                    Skip = S + Offset - 12, L = E - Skip,
                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
                {false, E} -> 
                    scan_chunk_1(Bin, DataL, E + 1, Dict)
            end
    end;
scan_chunk_1(_, _, _, Dict) -> Dict.

The elapsed time dropped to 1.424 sec immediatley vs 2.792 sec before, speedup about 100%, on my 4-CPU linux box.

If you are patient, you can copy-paste 100... such lines :-) (in this case, I'd rather to pick Boyer-Moore), and the elapsed time will drop a little bit more, but not much after 10 lines or so.
========

>>> Updated Oct 29:
Pihis updated his WideFinder, applied guildline II, and beat Ruby on his one-core box. And he also modified Anders's code by removing all un-necessary remaining binary bindings (which cause un-necessay sub-binary splitting), then, Steve tested the refined code, and got 0.567s on his famous 8-CPU linux box. Now, we may reach the real Disk/IO bound, should we try parallelized file reading? but, I've tired of more widefinders. BTW, May we have regexped pattern match in syntax level? The End.
========

>>> Updated Oct 26:
Code cleanup
========

Binary usaully is more efficent than List in Erlang.

The memory size of Binary is 3 to 6 words plus Data itself, the Data can be allocated / deallocated in global heap, so the Data can be shared over function calls, shared over processes when do message passing (on the same node), without copying. That's why heap size affects a lot on binary.

The memory size of List is 1 word per element + the size of each element, and List is always copying between function calls, on message passing.

In my previous blogs about Tim's exercise, I suspected the performance of Binary traverse. But, with more advices, experience, it seems binary can work very efficient as an ideal Dataset processing struct.

But, there are some guidelines for efficient binary in Erlang, I'll try to give out here, which I learned from the exercise and experts.

I. Don't split a binary unless the split binaries are what you exactly want

Splitting/combining binaries is expensive, so when you want to get values from a binary at some offsets:

Do

<<_:Offset/binary, C, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).

Do Not *

<<_:Offset/binary, C/binary, _/binary>> = Bin,
io:format("Char at Offset: ~p", [C]).

* This may be good in R12B

And when you want to split a binary to get Head or Tail only:

Do

<<Head:Offset/binary,_/binary>> = Bin.

Do Not

{Head, _} = split_binary(Bin, Offset).

II. Calculate the final offsets first, then split it when you've got the exactly offsets

When you traverse binary to test the bytes/bits, calculate and collect the final offsets first, don't split binary (bind named Var to sub-binary) at that time. When you've got all the exactly offsets, split what you want finally:

Do

get_nth_word(Bin, N) ->
    Offsets = calc_word_offsets(Bin, 0, [0]),
    S = element(N, Offsets),
    E = element(N + 1, Offsets),
    L = E - S,
    <<_:S/binary,Word:L/binary,_/binary>> = Bin,
    io:format("nth Word: ~p", [Word]).

calc_word_offsets(Bin, Offset, Acc) when Offset < size(Bin) ->
    case Bin of
        <<_:Offset/binary,$ ,_/binary>> ->
            calc_word_offsets(Bin, Offset + 1, [Offset + 1 | Acc]);
        _ ->
            calc_word_offsets(Bin, Offset + 1, Acc)
    end;
calc_word_offsets(_, _, Acc) -> list_to_tuple(lists:reverse(Acc)).

Bin = <<"This is a binary test">>,
get_nth_word(Bin, 4). %  <<"binary ">>

Do Not

get_nth_word_bad(Bin, N) ->
    Words = split_words(Bin, 0, []),
    Word = element(N, Words),
    io:format("nth Word: ~p", [Word]).

split_words(Bin, Offset, Acc) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            split_words(Rest, 0, [Word | Acc]);
        <<_:Offset/binary,_,_/binary>> ->
            split_words(Bin, Offset + 1, Acc);
        _ -> list_to_tuple(lists:reverse([Bin | Acc]))
    end.

Bin = <<"This is a binary test">>,
get_nth_word_bad(Bin, 4). %  <<"binary">>

III. Use "+h Size" option or [{min_heap_size, Size}] with spawn_opt

This is very important for binary performance. It's somehow a Key Number for binary performance. With this option set properly, the binary performs very well, otherwise, worse.

IV. Others

  • Don't forget to compile to native by adding "-compile([native])." in your code.
  • Maybe "+A Size" to set the number of threads in async thread pool also helps a bit when do IO.

Practice

Steve and Anders have pushed widefinder in Erlang to 1.1 sec on 8-CPU linux box. Their code took about 1.9 sec on my 4-CPU box. Then, how about a concise version?

According to above guide, based on my previous code and Per's dict code, Luke's spawn_worker, I rewrote a concise and straightforward tbray5.erl (less than 80 LOC), without any extra c-drived modules for Tim's exercise , and got about 2.972 sec for 1 milli lines log file, and 15.695 sec for 5 milli lines, vs no-parallelized Ruby's 4.161 sec and 20.768 sec on my 2.80Ghz 4-CPU Intel Xeon linux box:

BTW, using ets instead of dict is almost the same.

$ erlc -smp tbray5.erl
$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

real    0m2.972s
user    0m9.685s
sys     0m0.748s

$ time erl +h 8192 -smp -noshell -run tbray5 start o5000k.ap -s erlang halt

real    0m15.695s
user    0m53.551s
sys     0m4.268s

On 2.0GHz 2-core MacBook (Ruby code took 2.447 sec):

$ time erl +h 8192 -smp -noshell -run tbray5 start o1000k.ap -s erlang halt

real    0m3.034s
user    0m4.853s
sys     0m0.872s

The Code: tbray5.erl

-module(tbray5).
-compile([native]).
-export([start/1]).

-define(BUFFER_SIZE, (1024 * 10000)).

start(FileName) ->
    Dicts = [wait_result(Worker) || Worker <- read_file(FileName)],
    print_result(merge_dicts(Dicts)).
              
read_file(FileName) ->
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, 0, []).            
read_file_1(File, Offset, Workers) ->
    case file:pread(File, Offset, ?BUFFER_SIZE) of
        eof ->
            file:close(File),
            Workers;
        {ok, Bin} ->
            DataL = split_on_last_newline(Bin),
            Worker = spawn_worker(self(), fun scan_chunk/1, {Bin, DataL}),
            read_file_1(File, Offset + DataL + 1, [Worker | Workers])
    end.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, S) when S > 0 ->
    case Bin of
        <<_:S/binary,$\n,_/binary>> -> S;
        _ -> split_on_last_newline_1(Bin, S - 1)
    end;
split_on_last_newline_1(_, S) -> S.

scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()).
scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
    case Bin of
        <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
            case match_until_space_newline(Bin, S + 34) of
                {true, E} ->
                    Skip = S + 23, L = E - Skip,
                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
                {false, E} ->
                    scan_chunk_1(Bin, DataL, E + 1, Dict)
            end;
        _ -> scan_chunk_1(Bin, DataL, S + 1, Dict)
    end;
scan_chunk_1(_, _, _, Dict) -> Dict.

match_until_space_newline(Bin, S) when S < size(Bin) ->
    case Bin of
        <<_:S/binary,10,_/binary>> -> {false, S};
        <<_:S/binary,$.,_/binary>> -> {false, S};
        <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1};
        _ -> match_until_space_newline(Bin, S + 1)
    end;
match_until_space_newline(_, S) -> {false, S}.
    
spawn_worker(Parent, F, A) ->
    erlang:spawn_monitor(fun() -> Parent ! {self(), F(A)} end).

wait_result({Pid, Ref}) ->
    receive
        {'DOWN', Ref, _, _, normal} -> receive {Pid, Result} -> Result end;
        {'DOWN', Ref, _, _, Reason} -> exit(Reason)
    end.
    
merge_dicts([D1,D2|Rest]) ->
    merge_dicts([dict:merge(fun(_, V1, V2) -> V1 + V2 end, D1, D2) | Rest]);
merge_dicts([D]) -> D.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

Sunday Oct 21, 2007

Tim's Erlang Exercise - Summary

>>> Updated Nov 1:
Tim tested my last attempt tbray5.erl, which was described on Learning Coding Binary (Was Tim's Erlang Exercise - Round VI), got for his 971,538,252 bytes of data in 4,625,236 lines log file:

real    0m20.74s
user    3m51.33s
sys     0m8.00s

It's not the fastest, since I did not apply Boyer-Moore searching. But it's what I want: a balance between simple, readable and speed.
========

>>> Updated Oct 24:
The Erlang code can be faster than un-parallelized Ruby, a new version run 2.97 sec on the 4-CPU box: Learning Coding Binary (Was Tim's Erlang Exercise - Round VI)
========

>>> Updated Oct 22:
As Bjorn's suggestion, I added "+h 4096" option for 'erl', which means "sets the default heap size of processes to the size 4096", the elapsed time dropped from 7.7s to 5.5s immediately:

time erl +h 4096 -smp -noshell -run tbray4 start o1000k.ap 10 -s erlang halt

The +h option seems to affect on binary version a lot, but few on list version. This may be caused by that list is always copied and binary may be left in process' heap and passed by point?

The default heap size for each process is set to 233 Word, this number may be suitable for a lot of concurrent processes to avoid too much memory exhaust. But for some parallelization tasks, with less processes, or with enough memory, the heap size can be adjusted to a bit large.

Anyway, I think Erlang/OTP has been very good there for Concurrency, but there may be still room to optimize for Parallelization.

BTW, with +h option, and some tips for efficient binary, the most concise binary version tbray5.erl can run into 3 sec now.
========

This is a performance summary on Tim's Erlang exercise on large dataset processing, I only compare the results on a 4-CPU Intel Xeon 2.80G linux box:

Log FileTimeErlang(1 Proc)Erlang(Many Proc)Erlang(Many Proc)
+h 4096
Ruby
1 milli linesreal22.088s7.700s5.475s4.161s
user21.161s25.750s18.785s3.592s
sys0.924s3.552s1.352s0.568s
5 milli linesreal195.570s37.669s27.911s20.768s
user192.496s126.296s98.162s19.009s
sys3.480s17.789s7.344s3.116s

Notice:

  • The Erlang code is tbray4.erl, which can be found in previous blog, the Ruby code is from Tim's blog.
  • Erlang code is parallelized, Ruby code not.
  • Erlang code is with tons of code, but, parallelization is not free lunch.
  • With an 8-CPU box, Erlang's version should exceed or near to non-parallelized Ruby version*.
  • Although we are talking about multiple-core era, but I'm not sure if disk/io is also ready.

* Per Steve's testing.

Friday Oct 19, 2007

CN Erlounge II

It was last weekend, in Zhuhai, China, a two day CN Erlounge II, discussed topics of Erlang and FP:

  • Why I choose Erlang? - by xushiwei
  • Erlang emulator implementation - by mryufeng
  • Port & driver - by codeplayer
  • Py 2 Erl  - by Zoom.Quiet
  • STM: Lock free concurrent Overview - by Albert Lee
  • mnesia - by mryufeng

And a new logo of "ERL.CHINA" was born as:


(Picture source - http://www.haokanbu.com/story/1104/)

More information about CN Erlounge II can be found here and some pictures.

I did not schedule for this meeting, maybe next time.

Monday Oct 15, 2007

Learning Coding Parallelization (Was Tim's Erlang Exercise - Round V)

>>> Updated Oct 20:
After cleaned up my 4-CPU linux box, the result for the 1M records file is about 7.7 sec, for the 5M records file is about 38 sec.
========

>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I added another version tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. If you'd like to have a test on your machine, please try both.
========

Well, I think I've learned a lot from doing Tim's exercise, not only the List vs Binary in Erlang, but also computing in parallel. Coding Concurrency is farely easy in Erlang, but coding Parallelization is not only about the Languages, it's also a real question.

I wrote tbray3.erl in The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV) and got a fairly good result by far on my 2-core MacBook. But things always are a bit complex. As Steve pointed in the comment, when he tried tbray3.erl on his 8-core linux box:

"I ran it in a loop 10 times, and the best time I saw was 13.872 sec, and user/CPU time was only 16.150 sec, so it’s apparently not using the multiple cores very well."

I also encoutered this issue on my 4-CPU Intel Xeon CPU 2.80GHz debian box, it runs even worse (8.420s) than my 2-core MacBook (4.483s).

I thought about my code a while, and found that my code seems spawning too many processes for scan_chunk, as the scan_chunk's performance has been improved a lot, each process will finish its task very quickly, too quick to the file reading, the inceasing CPUs have no much chance to play the game, the cycled 'reading'-'spawning scan process' is actually almost sequential now, there has been very few simultaneously alive scanning processes. I think I finally meet the file reading bound.

But wait, as I claimed before, that reading file to memory is very fast in Erlang, for a 200M log file, it takes less than 800ms. The time elapsed for tbray3.erl is about 4900ms, far away from 800ms, why I say the file reading is the bound now?

The problem here is: since I suspect the performance of traversing binary byte by byte, I choose to convert binary to list to scan the world. Per my testing results, list is better than binary when is not too longer, in many cases, not longer than several KBytes. And, to make the code clear and readable, I also choose splitting big binary when read file in the meanwhile, so, I have to read file in pieces of no longer than n KBytes. For a very big file, the reading procedure is broken to several ten-thousands steps, which finally cause the whole file reading time elapsed is bit long. That's bad.

So, I decide to write another version, which will read file in parallel (Round III), and split each chunk on lastest new-line (Round II), scan the words using pattern match (Round IV), and yes, I'll use binary instead of list this time, try to solve the worse performance of binary-traverse by parallel, on multiple cores.

The result is interesting, it's the first time I achieved around 10 sec in my 2-core MacBook when use binary match only, and it's also the first time, on my dummy 4-CPU Intel Xeon CPU 2.80GHz debian box, I got better result (7.700 sec) than my MacBook.

>>> Updated Oct 15:
Steve run the code on his 8-core 2.33 GHz Intel Xeon Linux box, with the best time was 4.920 sec:

"the best time I saw for your newest version was 4.920 sec on my 8-core Linux box. Fast! However, user time was only 14.751 sec, so I’m not sure it’s using all the cores that well. Perhaps you’re getting down to where I/O is becoming a more significant factor."

Please see Steve's One More Erlang Wide Finder and his widefinder attempts.
========

Result on 2.0GHz 2-core MacBook:

$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt
8900    : 2006/09/29/Dynamic-IDE
2000    : 2006/07/28/Open-Data
1300    : 2003/07/25/NotGaming
800     : 2003/10/16/Debbie
800     : 2003/09/18/NXML
800     : 2006/01/31/Data-Protection
700     : 2003/06/23/SamsPie
600     : 2006/09/11/Making-Markup
600     : 2003/02/04/Construction
600     : 2005/11/03/Cars-and-Office-Suites
Time:   10527.50 ms

real    0m10.910s
user    0m13.927s
sys     0m6.413s

Result on 4-CPU Intel Xeon CPU 2.80GHz debian box,:

# When process number is set to 20:
time erl -smp -noshell -run tbray4_bin start o1000k.ap 20 -s erlang halt 

real    0m7.700s
user    0m25.750s
sys     0m3.552s

# When process number is set to 1:
$ time erl -smp -noshell -run tbray4_bin start o1000k.ap 1 -s erlang halt

real    0m22.035s
user    0m21.525s
sys     0m0.512s

# On a 940M 5 million lines log file:
time erl -smp -noshell -run tbray4_bin start o5000k.ap 100 -s erlang halt
44500   : 2006/09/29/Dynamic-IDE
10000   : 2006/07/28/Open-Data
6500    : 2003/07/25/NotGaming
4000    : 2003/10/16/Debbie
4000    : 2003/09/18/NXML
4000    : 2006/01/31/Data-Protection
3500    : 2003/06/23/SamsPie
3000    : 2006/09/11/Making-Markup
3000    : 2003/02/04/Construction
3000    : 2005/11/03/Cars-and-Office-Suites
Time:   37512.76 ms

real    0m37.669s
user    2m6.296s
sys     0m17.789s

On the 4-CPU linux box, comparing the elapsed time between ProcNum = 20 and ProcNum = 1, the elapsed time of parallelized one was only 35% of un-parallelized one, speedup about 185%. The ratio was almost the same as my pread_file.erl testing on the same machine.

It's actually a combination of code in my four previous blogs. Although the performance is not so good as tbray3.erl on my MacBook, but I'm happy that this version is a fully parallelized one, from reading file, scanning words etc. it should scale better than all my previous versions.

The code: tbray4.erl

-module(tbray4).

-compile([native]).

-export([start/1,
         start/2]).

-include_lib("kernel/include/file.hrl").

start([FileName, ProcNum]) when is_list(ProcNum) -> 
    start(FileName, list_to_integer(ProcNum)).
start(FileName, ProcNum) ->
    Start = now(),

    Main = self(),
    Counter = spawn(fun () -> count_loop(Main) end),
    Collector = spawn(fun () -> collect_loop(Counter) end),

    pread_file(FileName, ProcNum, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.

pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2; %% lastest chuck
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [raw, binary]),
                   {ok, Bin} = file:pread(File, ChunkSize * I, Length),
                   file:close(File),
                   {Data, Tail} = split_on_last_newline(Bin),
                   Collector ! {seq, I, Data, Tail}
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
    receive
        {chunk_num, ChunkNum} ->
            Counter ! {chunk_num, ChunkNum},
            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
        {seq, I, Data, Tail} ->
            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
            {Chunks1, PrevTail1, LastSeq1} = 
                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
    end.
    
count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
    case LastSeq + 1 of
        I ->
            spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            process_chunks(T, ChunkBuf, Tail, I, Counter);
        _ ->
            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.

>>> Updated Oct 16:
After testing my code on different machines, I found that disk/io performed varyingly, for some very large files, reading file in parallel may cause longer elapsed time (typically on non-server machine, which is not equipped for fast disk/io). So, I wrote another version: tbray4b.erl, in this version, only reading file is not parallalized, all other code is the same. Here's a result for this version on a 940M file with 5 million lines, with ProcNum set to 200 and 400)

# On 2-core MacBook:
$ time erl -smp -noshell -run tbray4b start o5000k.ap 200 -s erlang halt

real    0m50.498s
user    0m49.746s
sys     0m11.979s

# On 4-cpu linux box:
$ time erl -smp -noshell -run tbray4b start o5000k.ap 400 -s erlang halt

real    1m2.136s
user    1m59.907s
sys     0m7.960s

The code: tbray4b.erl

-module(tbray4b).

-compile([native]).

-export([start/1,
         start/2]).

-include_lib("kernel/include/file.hrl").

start([FileName, ProcNum]) when is_list(ProcNum) -> 
    start(FileName, list_to_integer(ProcNum)).
start(FileName, ProcNum) ->
    Start = now(),

    Main = self(),
    Counter = spawn(fun () -> count_loop(Main) end),
    Collector = spawn(fun () -> collect_loop(Counter) end),

    read_file(FileName, ProcNum, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            spawn(fun () ->
                          {Data, Tail} = split_on_last_newline(Bin),
                          Collector ! {seq, I, Data, Tail}
                  end),
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.

collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
    receive
        {chunk_num, ChunkNum} ->
            Counter ! {chunk_num, ChunkNum},
            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
        {seq, I, Data, Tail} ->
            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
            {Chunks1, PrevTail1, LastSeq1} = 
                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
    end.
    
count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
    case LastSeq + 1 of
        I ->
            spawn(fun () -> Counter ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            process_chunks(T, ChunkBuf, Tail, I, Counter);
        _ ->
            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset =< size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.


=======

Sunday Oct 14, 2007

The Erlang Way (Was Tim Bray's Erlang Exercise - Round IV)

Playing with Tim's Erlang Exercise is so much fun.

I've been coding in Erlang about 6 months as a newbie, in most cases, I do parsing on string (or list what ever) with no need of regular expressions, since Erlang's pattern match can usaully solve most problems straightforward.

Tim's log file is also a good example for applying pattern match in Erlang way. It's a continuous stream of dataset, after splitting it to line-bounded chunks for parallellization purpose, we can truely match whole {GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } directly on chunk with no need to split to lines any more.

This come out my third solution, which matchs whole

{GET /ongoing/When/\d\d\dx/(\d\d\d\d/\d\d/\d\d/[^ .]+) } 

likeness using the pattern:

"GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest]

and then fetchs

[Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/] ++ match_until_space_newline(Rest, [])

as the matched key, with no need to split the chunk to lines.

But yes, we still need to split each chunk on the lastest newline to get parallelized result exactly accurate.

On my 2-core 2 GHz MacBook, the best time I’ve got is 4.483 sec

# smp enabled:
$ erlc -smp tbray3.erl
$ time erl -smp +P 60000 -noshell -run tbray3 start o1000k.ap -s erlang halt
8900    : <<"2006/09/29/Dynamic-IDE">>
2000    : <<"2006/07/28/Open-Data">>
1300    : <<"2003/07/25/NotGaming">>
800     : <<"2003/10/16/Debbie">>
800     : <<"2003/09/18/NXML">>
800     : <<"2006/01/31/Data-Protection">>
700     : <<"2003/06/23/SamsPie">>
600     : <<"2006/09/11/Making-Markup">>
600     : <<"2003/02/04/Construction">>
600     : <<"2005/11/03/Cars-and-Office-Suites">>
Time:    4142.83 ms

real    0m4.483s
user    0m5.804s
sys     0m0.615s

# no-smp:
$ erlc tbray3.erl
$ time erl -noshell -run tbray_list_no_line start o1000k.ap -s erlang halt

real    0m7.050s
user    0m6.183s
sys     0m0.644s

The smp enable result speedup about 57%

On the 2.80GHz 4-cpu xeon debian box that I mentioned before in previous blog, the best result is:

real    0m8.420s
user    0m11.637s
sys     0m0.452s

And I've noticed, adjusting the BUFFER_SIZE can balance the time consumered by parallelized parts and un-parallelized parts. That is, if the number of core is increased, we can also increase the BUFFER_SIZE a bit, so the number of chunks decreased (less un-parallelized split_on_last_new_line/1 and file:pread/3) but with more heavy work for parallelized binary_to_list/1 and scan_chunk/1 on longer list.

The best BUFFER_SIZE on my computer is 4096 * 5 bytes, which causes un-parallized split_on_last_newline/1 took about only 0.226s in the case.

The code:

-module(tbray3).

-compile([native]).

-export([start/1]).
  
%% The best Bin Buffer Size is 4096 * 1 - 4096 * 5
-define(BUFFER_SIZE, (4096 * 5)). 

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),
 
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file(File, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])
    end.

read_file(File, Collector) -> read_file_1(File, [], 0, Collector).
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof ->
            Collector ! {chunk_num, I},
            file:close(File);
        {ok, Bin} -> 
            {Chunk, NextTail} = split_on_last_newline(PrevTail ++ binary_to_list(Bin)),
            spawn(fun () -> Collector ! {dict, scan_chunk(Chunk)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(List) -> split_on_last_newline_1(lists:reverse(List), []).
split_on_last_newline_1(List, Tail) ->
    case List of
        []         -> {lists:reverse(List), []};
        [$\n|Rest] -> {lists:reverse(Rest), Tail};
        [C|Rest]   -> split_on_last_newline_1(Rest, [C | Tail])
    end.

collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} -> 
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.
    
print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~p~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].

scan_chunk(List) -> scan_chunk_1(List, dict:new()).
scan_chunk_1(List, Dict) ->
    case List of
        [] -> Dict;
        "GET /ongoing/When/"++[_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/|Rest] ->
            case match_until_space_newline(Rest, []) of
                {Rest1, []} -> 
                    scan_chunk_1(Rest1, Dict);
                {Rest1, Word} -> 
                    Key = list_to_binary([Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word]),
                    scan_chunk_1(Rest1, dict:update_counter(Key, 1, Dict))
            end;
        [_|Rest] -> scan_chunk_1(Rest, Dict)
    end.
    
match_until_space_newline(List, Word) ->
    case List of
        []     -> {[],   []};
        [10|_] -> {List, []};
        [$.|_] -> {List, []};
        [$ |_] -> {List, lists:reverse(Word)};
        [C|Rest] -> match_until_space_newline(Rest, [C | Word])
    end.

I also wrote another corresponding binary version, which is 2-3 times slower than above list version on my machine, but the result may vary depending on your compiled Erlang/OTP on various operation system. I will test it again when Erlang/OTP R12B is released, which is claimed to have been optimized for binary match performance.

The code:

-module(tbray3_bin).

-compile([native]).

-export([start/1]).

-define(BUFFER_SIZE, (4096 * 10000)).

start(FileName) ->
    Start = now(),

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    {ok, File} = file:open(FileName, [raw, binary]),    
    read_file(File, Collector),

    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("Time: ~p ms~n", [timer:now_diff(now(), Start) / 1000])       
    end.
    
collect_loop(Main) -> collect_loop_1(Main, dict:new(), undefined, 0).
collect_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
    print_result(Dict),
    Main ! stop;
collect_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} -> 
            collect_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
        {dict, DictX} ->
            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
            collect_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
    end.

print_result(Dict) ->
    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
          
read_file(File, Collector) -> read_file_1(File, <<>>, 0, Collector).            
read_file_1(File, PrevTail, I, Collector) ->
    case file:read(File, ?BUFFER_SIZE) of
        eof -> 
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, Bin} -> 
            {Data, NextTail} = split_on_last_newline(Bin),
            spawn(fun () -> Collector ! {dict, scan_chunk(<<PrevTail/binary, Data/binary>>)} end),
            read_file_1(File, NextTail, I + 1, Collector)
    end.

split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
split_on_last_newline_1(Bin, Offset) when Offset > 0 ->
    case Bin of
        <<Data:Offset/binary,$\n,Tail/binary>> ->
            {Data, Tail};
        _ -> 
            split_on_last_newline_1(Bin, Offset - 1)
    end;
split_on_last_newline_1(Bin, _) -> {Bin, <<>>}.
    
scan_chunk(Bin) -> scan_chunk_1(Bin, 0, dict:new()).    
scan_chunk_1(Bin, Offset, Dict) when Offset < size(Bin) - 34 ->
    case Bin of
        <<_:Offset/binary,"GET /ongoing/When/",_,_,_,$x,$/,Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/,Rest/binary>> ->            
            case match_until_space_newline(Rest, 0) of
                {Rest1, <<>>} -> 
                    scan_chunk_1(Rest1, 0, Dict);
                {Rest1, Word} -> 
                    Key = <<Y1,Y2,Y3,Y4,$/,M1,M2,$/,D1,D2,$/, Word/binary>>,
                    scan_chunk_1(Rest1, 0, dict:update_counter(Key, 1, Dict))
            end;
        _ -> scan_chunk_1(Bin, Offset + 1, Dict)
    end;
scan_chunk_1(_, _, Dict) -> Dict.

match_until_space_newline(Bin, Offset) when Offset < size(Bin) ->
    case Bin of
        <<Word:Offset/binary,$ ,Rest/binary>> ->
            {Rest, Word};
        <<_:Offset/binary,$.,Rest/binary>> ->
            {Rest, <<>>};
        <<_:Offset/binary,10,Rest/binary>> ->
            {Rest, <<>>};
        _ ->
            match_until_space_newline(Bin, Offset + 1)
    end;
match_until_space_newline(_, _) -> {<<>>, <<>>}.

Thursday Oct 11, 2007

Reading File in Parallel in Erlang (Was Tim Bray's Erlang Exercise - Round III)

My first solution for Tim's exercise tried to read file in parallel, but I just realized by reading file module's source code, that file:open(FileName, Options) will return a process instead of IO device. Well, this means a lot:

  • It's a process, so, when you request more data on it, you actually send message to it. Since you only send 2 integer: the offset and length, sending message should be very fast. But then, this process (File) will wait for receiving data from disk/io. For one process, the receiving is sequential rather than parallelized.
  • If we look the processes in Erlang as ActiveObjects, which send/receive messages/data in async, since the receiving is sequential in one process, requesting/waiting around one process(or, object) is almost safe for parallelized programming, you usaully do not need to worry about lock/unlock etc. (except the outside world).
  • We can open a lot of File processes to read data in parallel, the bound is the disk/IO and the os' resources limit.

I wrote some code to test file reading in parallel, discardng the disk cache, on my 2-core MacBook, reading file with two processes can speedup near 200% to one process.

The code:


-module(file_pread).

-compile([native]).

-export([start/2]).

-include_lib("kernel/include/file.hrl").

start(FileName, ProcNum) ->
    [start(FileName, ProcNum, Fun) || Fun <- [fun read_file/3, fun pread_file/3]].


start(FileName, ProcNum, Fun) ->
    Start = now(),  

    Main = self(),
    Collector = spawn(fun () -> collect_loop(Main) end),

    Fun(FileName, ProcNum, Collector),
    
    %% don't terminate, wait here, until all tasks done.
    receive
        stop -> io:format("time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) 
    end.

collect_loop(Main) -> collect_loop_1(Main, undefined, 0).
collect_loop_1(Main, ChunkNum, ChunkNum) -> 
    Main ! stop;
collect_loop_1(Main, ChunkNum, ProcessedNum) ->
    receive
        {chunk_num, ChunkNumX} ->
            collect_loop_1(Main, ChunkNumX, ProcessedNum);
        {seq, _Seq} ->
            collect_loop_1(Main, ChunkNum, ProcessedNum + 1)
    end.

get_chunk_size(FileName, ProcNum) ->
    {ok, #file_info{size=Size}} = file:read_file_info(FileName),
    Size div ProcNum.

read_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    {ok, File} = file:open(FileName, [raw, binary]),
    read_file_1(File, ChunkSize, 0, Collector).
    
read_file_1(File, ChunkSize, I, Collector) ->
    case file:read(File, ChunkSize) of
        eof ->
            file:close(File),
            Collector ! {chunk_num, I};
        {ok, _Bin} -> 
            Collector ! {seq, I},
            read_file_1(File, ChunkSize, I + 1, Collector)
    end.


pread_file(FileName, ProcNum, Collector) ->
    ChunkSize = get_chunk_size(FileName, ProcNum),
    pread_file_1(FileName, ChunkSize, ProcNum, Collector).
       
pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
    [spawn(fun () ->
                   %% if it's the lastest chuck, read all bytes left, 
                   %% which will not exceed ChunkSize * 2
                   Length = if  I == ProcNum - 1 -> ChunkSize * 2;
                                true -> ChunkSize end,
                   {ok, File} = file:open(FileName, [raw, binary]),
                   {ok, _Bin} = file:pread(File, ChunkSize * I, Length),
                   Collector ! {seq, I},
                   file:close(File)
           end) || I <- lists:seq(0, ProcNum - 1)],
    Collector ! {chunk_num, ProcNum}.

The pread_file/3 is parallelized, it always opens new File process for each reading process instead of sharing one opened File process during all reading processes. The read_file/3 is non-parallelized.

To evaulate: (run at least two-time for each test to average disk/IO caches.)

$ erlc -smp file_pread.erl
$ erl -smp

1> file_pread:start("o100k.ap", 2).
time:     691.72 ms
time:      44.37 ms
[ok,ok]
2> file_pread:start("o100k.ap", 2).
time:      74.50 ms
time:      43.59 ms
[ok,ok]
3> file_pread:start("o1000k.ap", 2).
time:    1717.68 ms
time:     408.48 ms
[ok,ok]
4> file_pread:start("o1000k.ap", 2).
time:     766.00 ms
time:     393.71 ms
[ok,ok]
5> 

Let's compare the results for each file (we pick the second testing result of each), the speedup:

  • o100k.ap, 20M, 74.50 / 43.59 - 1= 70%
  • o1000k.ap, 200M, 766.00 / 393.71 - 1 = 95%

On another 4-CPU debian machine, with 4 processes, the best result I got:

4> file_pread:start("o1000k.ap", 4).
time:     768.59 ms
time:     258.57 ms
[ok, ok]
5>

The parallelized reading speedup 768.59 / 258.57 -1 = 197%

I've updated my first solution according to this testing, opening new File process for each reading process instead of sharing the same File process. Of cource, there are still issues that I pointed in Tim Bray's Erlang Exercise on Large Dataset Processing - Round II

Although the above result can also be achieved in other Languages, but I find that coding parallelization in Erlang is a pleasure.

Friday Oct 05, 2007

Tim Bray's Erlang Exercise on Large Dataset Processing - Round II

Updated Oct 09: Added more benchmark results under linux on other machines.
Updated Oct 07: More concise code.
Updated Oct 06: Fixed bugs: 1. Match "GET /ongoing/When/" instead of "/ongoing/When/"; 2. split_on_last_newline should not reverse Tail.

Backed from a short vacation, and sit down in front of my computer, I'm thinking about Tim Bray's exercise again.

As I realized, the most expensive procedure is splitting dataset to lines. To get the multiple-core benefit, we should parallelize this procedure instead of reading file to binary or macthing process only.

In my previous solution, there are at least two issues:

  • Since the file reading is fast in Erlang, then, parallelizing the file reading is not much helpful.
  • The buffered_read actually can be merged with the buffered file reading.

And, Per's solution parallelizes process_match procedure only, based on a really fast divide_to_lines, but with hacked binary matching syntax.

After a couple of hours working, I finially get the second version of tbray.erl (with some code from Per's solution).

  • Read file to small pieces of binary (about 4096 bytes each chunk), then convert to list.
  • Merge the previous tail for each chunk, search this chunk from tail, find the last new line mark, split this chunk to line-bounded data part, and tail part for next chunk.
  • The above steps are difficult to parallelize. If we try, there will be about 30 more LOC, and not so readable.
  • Spawn a new process at once to split line-bounded chunk to lines, process match and update dict.
  • Thus we can go on reading file with non-stop.
  • A collect_loop will receive dicts from each process, and merge them.

What I like of this version is, it scales on mutiple-core almost linearly! On my 2.0G 2-core MacBook, it took about 13.522 seconds with non-smp, 7.624 se