root/widefinder/src/tbray8.erl

Revision 91:b08cb886bf4e, 4.5 KB (checked in by dcaoyuan, 8 months ago)

Imported widefinder code.

Line 
1%% Parallelized file reading, don't care about tail, with binary traversing tips
2-module(tbray8).
3
4-compile([native]).
5
6-export([start/1,
7         start/2]).
8
9%% erl -smp
10%% timer:tc(tbray, start, ["o1000k.ap", 10]).
11start([FileName, ProcNum]) when is_list(ProcNum) -> 
12    start(FileName, list_to_integer(ProcNum)).
13start(FileName, ProcNum) ->
14    Start = now(),
15
16    Main = self(),
17    Counter = spawn(fun () -> count_loop(Main) end),
18    Collector = spawn(fun () -> collect_loop(Counter) end),
19
20    pread_file(FileName, ProcNum, Collector),
21
22    %% don't terminate, wait here, until all tasks done.
23    receive
24        stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000])       
25    end.
26
27pread_file(FileName, ProcNum, Collector) ->
28    ChunkSize = filelib:file_size(FileName) div ProcNum,
29    pread_file_1(FileName, ChunkSize, ProcNum, Collector).
30pread_file_1(FileName, ChunkSize, ProcNum, Collector) ->
31    [spawn(fun () ->
32                   Length = if  I == ProcNum - 1 -> ChunkSize * 2; %% latest chuck
33                                true -> ChunkSize end,
34                   {ok, File} = file:open(FileName, [read, binary]),
35                   {ok, Bin} = file:pread(File, ChunkSize * I, Length),
36                   file:close(File),
37                   DataL = split_on_last_newline(Bin),
38                   Collector ! {seq, I, Bin, DataL}
39           end) || I <- lists:seq(0, ProcNum - 1)],
40    Collector ! {chunk_num, ProcNum}.
41
42collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter).
43collect_loop_1(Chunks, PrevTail, LastSeq, Counter) ->
44    receive
45        {chunk_num, ChunkNum} ->
46            Counter ! {chunk_num, ChunkNum},
47            collect_loop_1(Chunks, PrevTail, LastSeq, Counter);
48        {seq, I, Data, Tail} ->
49            SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]),
50            {Chunks1, PrevTail1, LastSeq1} = 
51                process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter),
52            collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter)
53    end.
54   
55count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0).
56count_loop_1(Main, Dict, ChunkNum, ChunkNum) ->
57    print_result(Dict),
58    Main ! stop;
59count_loop_1(Main, Dict, ChunkNum, ProcessedNum) ->
60    receive
61        {chunk_num, ChunkNumX} -> 
62            count_loop_1(Main, Dict, ChunkNumX, ProcessedNum);
63        {dict, DictX} ->
64            Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX),
65            count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1)
66    end.
67
68process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq};
69process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) ->
70    case LastSeq + 1 of
71        I ->
72            spawn_opt(fun () -> Counter ! {dict, scan_chunk({Data, PrevTail})} end, [{min_heap_size, 8196}]),
73            process_chunks(T, ChunkBuf, Tail, I, Counter);
74        _ ->
75            process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter)
76    end.
77
78print_result(Dict) ->
79    SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))),
80    [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)].
81
82split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)).   
83split_on_last_newline_1(Bin, S) when S > 0 ->
84    case Bin of
85        <<_:S/binary,$\n,_/binary>> -> S;
86        _ -> split_on_last_newline_1(Bin, S - 1)
87    end;
88split_on_last_newline_1(_, S) -> S.
89
90scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()).
91scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 ->
92    case Bin of
93        <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> ->
94            case match_until_space_newline(Bin, S + 34) of
95                {true, E} ->
96                    Skip = S + 23, L = E - Skip,
97                    <<_:Skip/binary,Key:L/binary,_/binary>> = Bin,
98                    scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict));
99                {false, E} -> 
100                    scan_chunk_1(Bin, DataL, E + 1, Dict)
101            end;
102        _ -> scan_chunk_1(Bin, DataL, S + 1, Dict) %% here I have to skip Bin 1 byte by 1 byte
103    end;
104scan_chunk_1(_, _, _, Dict) -> Dict.
105   
106match_until_space_newline(Bin, S) when S < size(Bin) ->
107    case Bin of
108        <<_:S/binary,10,_/binary>> -> {false, S};
109        <<_:S/binary,$.,_/binary>> -> {false, S};
110        <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1};
111        _ -> match_until_space_newline(Bin, S + 1)
112    end;
113match_until_space_newline(_, S) -> {false, S}.
114
115
116
Note: See TracBrowser for help on using the browser.