| 1 | %% Parallelized file reading, don't care about tail, with binary traversing tips |
|---|
| 2 | -module(tbray8). |
|---|
| 3 | |
|---|
| 4 | -compile([native]). |
|---|
| 5 | |
|---|
| 6 | -export([start/1, |
|---|
| 7 | start/2]). |
|---|
| 8 | |
|---|
| 9 | %% erl -smp |
|---|
| 10 | %% timer:tc(tbray, start, ["o1000k.ap", 10]). |
|---|
| 11 | start([FileName, ProcNum]) when is_list(ProcNum) -> |
|---|
| 12 | start(FileName, list_to_integer(ProcNum)). |
|---|
| 13 | start(FileName, ProcNum) -> |
|---|
| 14 | Start = now(), |
|---|
| 15 | |
|---|
| 16 | Main = self(), |
|---|
| 17 | Counter = spawn(fun () -> count_loop(Main) end), |
|---|
| 18 | Collector = spawn(fun () -> collect_loop(Counter) end), |
|---|
| 19 | |
|---|
| 20 | pread_file(FileName, ProcNum, Collector), |
|---|
| 21 | |
|---|
| 22 | %% don't terminate, wait here, until all tasks done. |
|---|
| 23 | receive |
|---|
| 24 | stop -> io:format("Time: ~10.2f ms~n", [timer:now_diff(now(), Start) / 1000]) |
|---|
| 25 | end. |
|---|
| 26 | |
|---|
| 27 | pread_file(FileName, ProcNum, Collector) -> |
|---|
| 28 | ChunkSize = filelib:file_size(FileName) div ProcNum, |
|---|
| 29 | pread_file_1(FileName, ChunkSize, ProcNum, Collector). |
|---|
| 30 | pread_file_1(FileName, ChunkSize, ProcNum, Collector) -> |
|---|
| 31 | [spawn(fun () -> |
|---|
| 32 | Length = if I == ProcNum - 1 -> ChunkSize * 2; %% latest chuck |
|---|
| 33 | true -> ChunkSize end, |
|---|
| 34 | {ok, File} = file:open(FileName, [read, binary]), |
|---|
| 35 | {ok, Bin} = file:pread(File, ChunkSize * I, Length), |
|---|
| 36 | file:close(File), |
|---|
| 37 | DataL = split_on_last_newline(Bin), |
|---|
| 38 | Collector ! {seq, I, Bin, DataL} |
|---|
| 39 | end) || I <- lists:seq(0, ProcNum - 1)], |
|---|
| 40 | Collector ! {chunk_num, ProcNum}. |
|---|
| 41 | |
|---|
| 42 | collect_loop(Counter) -> collect_loop_1([], <<>>, -1, Counter). |
|---|
| 43 | collect_loop_1(Chunks, PrevTail, LastSeq, Counter) -> |
|---|
| 44 | receive |
|---|
| 45 | {chunk_num, ChunkNum} -> |
|---|
| 46 | Counter ! {chunk_num, ChunkNum}, |
|---|
| 47 | collect_loop_1(Chunks, PrevTail, LastSeq, Counter); |
|---|
| 48 | {seq, I, Data, Tail} -> |
|---|
| 49 | SortedChunks = lists:keysort(1, [{I, Data, Tail} | Chunks]), |
|---|
| 50 | {Chunks1, PrevTail1, LastSeq1} = |
|---|
| 51 | process_chunks(SortedChunks, [], PrevTail, LastSeq, Counter), |
|---|
| 52 | collect_loop_1(Chunks1, PrevTail1, LastSeq1, Counter) |
|---|
| 53 | end. |
|---|
| 54 | |
|---|
| 55 | count_loop(Main) -> count_loop_1(Main, dict:new(), undefined, 0). |
|---|
| 56 | count_loop_1(Main, Dict, ChunkNum, ChunkNum) -> |
|---|
| 57 | print_result(Dict), |
|---|
| 58 | Main ! stop; |
|---|
| 59 | count_loop_1(Main, Dict, ChunkNum, ProcessedNum) -> |
|---|
| 60 | receive |
|---|
| 61 | {chunk_num, ChunkNumX} -> |
|---|
| 62 | count_loop_1(Main, Dict, ChunkNumX, ProcessedNum); |
|---|
| 63 | {dict, DictX} -> |
|---|
| 64 | Dict1 = dict:merge(fun (_, V1, V2) -> V1 + V2 end, Dict, DictX), |
|---|
| 65 | count_loop_1(Main, Dict1, ChunkNum, ProcessedNum + 1) |
|---|
| 66 | end. |
|---|
| 67 | |
|---|
| 68 | process_chunks([], ChunkBuf, PrevTail, LastSeq, _) -> {ChunkBuf, PrevTail, LastSeq}; |
|---|
| 69 | process_chunks([{I, Data, Tail}=Chunk|T], ChunkBuf, PrevTail, LastSeq, Counter) -> |
|---|
| 70 | case LastSeq + 1 of |
|---|
| 71 | I -> |
|---|
| 72 | spawn_opt(fun () -> Counter ! {dict, scan_chunk({Data, PrevTail})} end, [{min_heap_size, 8196}]), |
|---|
| 73 | process_chunks(T, ChunkBuf, Tail, I, Counter); |
|---|
| 74 | _ -> |
|---|
| 75 | process_chunks(T, [Chunk | ChunkBuf], PrevTail, LastSeq, Counter) |
|---|
| 76 | end. |
|---|
| 77 | |
|---|
| 78 | print_result(Dict) -> |
|---|
| 79 | SortedList = lists:reverse(lists:keysort(2, dict:to_list(Dict))), |
|---|
| 80 | [io:format("~b\t: ~s~n", [V, K]) || {K, V} <- lists:sublist(SortedList, 10)]. |
|---|
| 81 | |
|---|
| 82 | split_on_last_newline(Bin) -> split_on_last_newline_1(Bin, size(Bin)). |
|---|
| 83 | split_on_last_newline_1(Bin, S) when S > 0 -> |
|---|
| 84 | case Bin of |
|---|
| 85 | <<_:S/binary,$\n,_/binary>> -> S; |
|---|
| 86 | _ -> split_on_last_newline_1(Bin, S - 1) |
|---|
| 87 | end; |
|---|
| 88 | split_on_last_newline_1(_, S) -> S. |
|---|
| 89 | |
|---|
| 90 | scan_chunk({Bin, DataL}) -> scan_chunk_1(Bin, DataL, 0, dict:new()). |
|---|
| 91 | scan_chunk_1(Bin, DataL, S, Dict) when S < DataL - 34 -> |
|---|
| 92 | case Bin of |
|---|
| 93 | <<_:S/binary,"GET /ongoing/When/",_,_,_,$x,$/,_,_,_,_,$/,_,_,$/,_,_,$/,_/binary>> -> |
|---|
| 94 | case match_until_space_newline(Bin, S + 34) of |
|---|
| 95 | {true, E} -> |
|---|
| 96 | Skip = S + 23, L = E - Skip, |
|---|
| 97 | <<_:Skip/binary,Key:L/binary,_/binary>> = Bin, |
|---|
| 98 | scan_chunk_1(Bin, DataL, E + 1, dict:update_counter(Key, 1, Dict)); |
|---|
| 99 | {false, E} -> |
|---|
| 100 | scan_chunk_1(Bin, DataL, E + 1, Dict) |
|---|
| 101 | end; |
|---|
| 102 | _ -> scan_chunk_1(Bin, DataL, S + 1, Dict) %% here I have to skip Bin 1 byte by 1 byte |
|---|
| 103 | end; |
|---|
| 104 | scan_chunk_1(_, _, _, Dict) -> Dict. |
|---|
| 105 | |
|---|
| 106 | match_until_space_newline(Bin, S) when S < size(Bin) -> |
|---|
| 107 | case Bin of |
|---|
| 108 | <<_:S/binary,10,_/binary>> -> {false, S}; |
|---|
| 109 | <<_:S/binary,$.,_/binary>> -> {false, S}; |
|---|
| 110 | <<_:S/binary,_,$ ,_/binary>> -> {true, S + 1}; |
|---|
| 111 | _ -> match_until_space_newline(Bin, S + 1) |
|---|
| 112 | end; |
|---|
| 113 | match_until_space_newline(_, S) -> {false, S}. |
|---|
| 114 | |
|---|
| 115 | |
|---|
| 116 | |
|---|