2012年8月31日金曜日

Processing small data by Hadoop

ビッグデータ()が流行ってる昨今ですが、あえてスモールデータを処理する話題でも。

ビッグデータってさぞかしデカいデータでしょうって話がよく聞かれるわけですよ。
(最近はあんまり聞かないのでもうみなさんあきてきましたか?)
◯TB、◯PB、えぇ、確かにデカいです。こんなん処理するのにどのぐらい掛かるんですか?って話もあります。

んじゃ、逆を言えばこれらからみると数百MBぐらいのデータって小さいですね。
中身も100万件ほどしかなかったり。
単純に集計やる程度だったらHadoopとか必要ないですね。ってか、誰がそんなめんどかいことやるのかと。

じゃあ、ちょっと考えてみる。
100万件のデータがあります。全部1件ずつそれぞれ処理をしなければいけません。1件処理するのに大体0.1秒掛かったとします。

1,000,000(件) * 0.1(秒) / 60 / 60 = 27.77...時間

これ、スモールデータですか?1日じゃ処理しきれませんよ...
これをマルチスレッドで並行してやったとしても

約28時間 / 8(4コア X 2の想定) = 3.5時間

それでも3.5時間掛かるわけです。

まぁ、こんな話をしようと思ってたわけじゃないのでこの辺りでやめておいて本題。


上記のような処理をHadoopでやらせようと思った場合、単純にMapがネックになる。例えば、上記のものが300MBだったとする。HDFSのデフォルトのブロックサイズは64MB。Mapはブロック単位で処理されるので

300 / 64 = 5(約4.6)

ってことは、Mapは5個しか起動されないわけです。20ノードとか並べてもまったく意味がないわけですね。。。これを回避する方法としてInput部分を自分でアレするってのがありますが、正直そんなのめんどくせぇよってなるわけです。ってことでお手軽な方法として。

FileInputFormat.setMinInputSplitSize(job, 1048576);
FileInputFormat.setMaxInputSplitSize(job, 1048576);

とかやって処理するサイズをちっちゃくしてあげましょう。


次に数GB程度のファイルを成形する処理を考えてみましょう。
先に書いちゃうとローカルモードのHadoop速いです。速いです...
Hadoopのイニシャルコストって結構掛かるのですぐ起動するし速いです。

んでは、やり方。

普通(分散)のHadoop同様Mapがブロック単位で処理されます。ここでネックになってくるのがMapが1つずつ起動するってことです。ようするに一つ目のMapが終了すると次のMapが起動する。タスクの起動コストが高くとても効率が悪いです。。。

上記とは逆でsplitサイズをデカい値にしちゃってMapを一つしか起動しないようにしちゃいます。

FileInputFormat.setMinInputSplitSize(job, 2147483648);
FileInputFormat.setMaxInputSplitSize(job, 2147483648);

次に、これだとシングルスレッドになってしまうのでマルチスレッド化しちゃいましょう。

job.setMapperClass(MultithreadedMapper.class);
MultithreadedMapper.setMapperClass(job, Mapper.class);
MultithreadedMapper.setNumberOfThreads(job, 4);

MultithreadedMapperはMapperをマルチスレッド化してくれるので自分でマルチスレッドの処理を書かなくてすむのでとても楽ですね。
ただ、障害が起きると即死亡なのでそこは気をつけましょう。

ローカルで処理しきれないぐらいでかくなってきたらクラスタにしちゃえばいいので一粒で二度美味しい的な感じがあります(まさにビッグデータ、スモールスタート!)。

本当はここだけをメモ代わりに書いとこうと思ったんだけど、余計なことまで書いちゃったのはもういいや。

ということで、上記のファイルの処理だけやるヤツを公開しました。

Huahin Tools

まだ、ファイルを成形(デフォルトではデフォルトのApacheログを成形)する機能だけしかないですが、成形してからDBなんかにぶち込む感じで。
現在はローカルモードとAmazon Elastic MapReduceで動くようになってます。

注意としては、他のものとリリースをあわせようと思ってるので、まだ正式版ではないってこと。