Hadoop周りのプロジェクトをさらっと学習するためにアジャイルデータサイエンスを読んでいます。
3章の前半、GmailからメールをIMAPで取り込みAvroフォーマットでシリアライズして、それをPigで読み込んでメールアドレスごとのカウントを取るところまでをUbuntu 14.0(Trusty Tahr)で試してみたメモになります。
この本では、半構造化データのシリアライズにApache Avroを利用しています。
データシリアライズのパッケージとしては他にGoogleのProtocol Buffersがよく使われてそうですが、Pigに標準でローダーが含まれていることもありAvroが利用されていると思われます(protobuf-java-2.4.0a.jarがlibに含まれているのでProtocol Buffersもそのまま使えるかも?)。
Pigのインストール
[bash gutter=”false”]
$ wget http://ftp.riken.jp/net/apache/pig/latest/pig-0.13.0.tar.gz
$ tar xvfz pig-0.13.0.tar.gz
[/bash]
アジャイルデータサイエンスではantでビルドしていますが、libに必要なjarがあるのでそれを利用します。
コマンドの実行は、パスは通さずに bin/pig を直接実行します。
Python Avroのインストール
easy_installでAvroをインストールしました。
[bash]
$ sudo apt-get install python-setuptools
$ sudo easy_install avro
[/bash]
Avroファイルの読み書きテスト
Agile Data Scienceのテスト用コードを実行してみます。
[bash gutter=”false”]
$ cd Agile_Data_Code/ch03/python
$ python test_avro.py
{u’message_id’: 11, u’topic’: u’Hello galaxy’, u’user_id’: 1}
{u’message_id’: 12, u’topic’: u’Jim is silly!’, u’user_id’: 1}
{u’message_id’: 23, u’topic’: u’I like apples.’, u’user_id’: 2}
$ cat /tmp/messages.avro
Objavro.schema?{"fields": [{"type": "int", "name": "message_id"}, {"type": "string", "name": "topic"}, {"type": "int", "name": "user_id"}], "type": "record", "name": "Message"}avro.codenullVUж?X5k??
??@?`Hello galaxyJim is silly!.I like apples.VUж?X5k??
??@?vagrant@precise64:~/Agile_Data_C
[/bash]
次に、書き出したAvroファイルをPigで読んでみます。
[text]
/* Set Home Directory – where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`
REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar
DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
rmf /tmp/sent_counts.txt
/* Load the emails in avro format (edit the path to match where you saved them) using the AvroStorage UDF from Piggybank */
messages = LOAD ‘/tmp/messages.avro’ USING AvroStorage();
STORE messages INTO ‘/tmp/messages_out’;
[/text]
[bash gutter=”false”]
$ JAVA_HOME=/usr pig-0.13.0/bin/pig -x local test_avro.pig
…
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.4 0.13.0 vagrant 2014-08-17 22:38:42 2014-08-17 22:38:46 UNKNOWN
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local_0001 1 0 n/a n/a n/a n/a 0 0 0 0 messages MAP_ONLY /tmp/messages_out,
Input(s):
Successfully read 3 records from: "/tmp/messages.avro"
Output(s):
Successfully stored 3 records in: "/tmp/messages_out"
Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local_0001
2014-08-17 22:38:46,222 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
$ cat /tmp/messages_out/part-m-00000
11 Hello galaxy 1
12 Jim is silly! 1
23 I like apples. 2
[/bash]
Gmailからメールを取得して、アドレス毎のメール数をカウントする
PythonでEmailを処理するために以下のライブラリを追加しておきます。
[bash gutter=”false”]
$ sudo easy_install lepl
[/bash]
Gmailを取得するサンプルを実行してみます。
“[Gmail]/All Mail”は時間がかかるのでここではゴミ箱/Trashを指定してます。
[bash]
$ cd Agile_Data_Code/ch03/gmail
$ ./gmail.py -m automatic -u ‘hrendoh@gmail.com’ -p xxxxxx -s ./email.avro.schema -f ‘[Gmail]/Trash’ -o /tmp/gmail
Folder ‘[Gmail]/Trash’ SELECT status: OK
Folder ‘[Gmail]/Trash has 9’ emails…
Connected to folder [Gmail]/Trash and downloading 9 emails…
CHARSET: utf-8
utf-8
8 utf-8 1111112222233333
…
CHARSET: iso-2022-jp
iso-2022-jp
1 iso-2022-jp 1111112222233333
[/bash]
Avroフォーマットで書き出したメッセージをPigで読み込んでメールアドレス毎にカウントを実行します。
実行しやすいようにサンプルを書き換えました。
[text title=”sent_counts.pig”]
/* Set Home Directory – where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`
REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar
DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();
rmf /tmp/sent_counts.txt
/* Load the emails in avro format (edit the path to match where you saved them) using the AvroStorage UDF from Piggybank */
messages = LOAD ‘/tmp/gmail/part-1.avro’ USING AvroStorage();
/* Filter nulls, they won’t help */
messages = FILTER messages BY (from IS NOT NULL) AND (tos IS NOT NULL);
/* Emails can be ‘to’ more than one person. FLATTEN() will project our from with each ‘to’ that exists. */
addresses = FOREACH messages GENERATE from.address AS from, FLATTEN(tos.(address)) AS to;
/* Lowercase the email addresses, so we don’t count MiXed case of the same address as multiple addresses */
lowers = FOREACH addresses GENERATE LOWER(from) AS from, LOWER(to) AS to;
/* GROUP BY each from/to pair into a bag (array), then count the bag’s contents ($1 means the 2nd field) to get a total.
Same as SQL: SELECT from, to, COUNT(*) FROM lowers GROUP BY (from, to);
Note: COUNT_STAR differs from COUNT in that it counts nulls. */
by_from_to = GROUP lowers BY (from, to);
sent_counts = FOREACH by_from_to GENERATE FLATTEN(group) AS (from, to), COUNT_STAR(lowers) AS total;
/* Sort the data, highest sent count first */
sent_counts = ORDER sent_counts BY total DESC;
STORE sent_counts INTO ‘/tmp/sent_counts.txt’;
[/text]
Pigを実行します。
[bash gutter=”false”]
$ JAVA_HOME=/usr pig-0.13.0/bin/pig -x local sent_counts.pig
…
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.4 0.13.0 vagrant 2014-08-17 23:11:35 2014-08-17 23:11:56 GROUP_BY,ORDER_BY,FILTER
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local_0001 1 1 n/a n/a n/a n/a n/a n/a n/a n/a addresses,by_from_to,lowers,messages,sent_counts GROUP_BY,COMBINER
job_local_0002 1 1 n/a n/a n/a n/a n/a n/a n/a n/a sent_counts SAMPLER
job_local_0003 1 1 n/a n/a n/a n/a n/a n/a n/a n/a sent_counts ORDER_BY /tmp/sent_counts.txt,
Input(s):
Successfully read 8 records from: "/tmp/gmail/part-1.avro"
Output(s):
Successfully stored 3 records in: "/tmp/sent_counts.txt"
Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002 -> job_local_0003,
job_local_0003
2014-08-17 23:11:56,141 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
[/bash]
出力を確認してみます。
[bash gutter=”false”]
$ cat /tmp/sent_counts.txt/part-*
notification@hoge.com hrendoh@gmail.com 5
special@foo.jp hrendoh@gmail.com 2
news@example.jp hrendoh@gmail.com 1
[/bash]
参考
Protocol Buffers VS Thrift VS Avro: http://www.slideshare.net/IgorAnishchenko/pb-vs-thrift-vs-avro
PigでのJSONの扱い: How to Read and Write JSON-formatted Data With Apache Pig
補足
Ubuntu 14.0でのPigビルド手順
[bash gutter=”false”]
$ sudo apt-get install ant
$ sudo apt-get install default-jdk
$ export JAVA_HOME=/usr
$ wget http://ftp.riken.jp/net/apache/pig/latest/pig-0.13.0.tar.gz
$ tar xvfz pig-0.13.0.tar.gz
$ cd pig-0.13.0
$ ant
[/bash]
ビルドしたjarファイルを指定したsent_counts.pig
[text title=”sent_counts.pig”]
/* Set Home Directory – where we install software */
%default HOME `echo \$HOME/pig-0.13.0`
REGISTER $HOME/build/ivy/lib/Pig/avro-1.7.5.jar
REGISTER $HOME/build/ivy/lib/Pig/json-simple-1.1.jar
REGISTER $HOME/contrib/piggybank/java/piggybank.jar
…
[/text]