AvroフォーマットデータをApache Pigで読み込み処理する – アジャイルデータサイエンス読書メモ


Hadoop周りのプロジェクトをさらっと学習するためにアジャイルデータサイエンスを読んでいます。

3章の前半、GmailからメールをIMAPで取り込みAvroフォーマットでシリアライズして、それをPigで読み込んでメールアドレスごとのカウントを取るところまでをUbuntu 14.0(Trusty Tahr)で試してみたメモになります。

この本では、半構造化データのシリアライズにApache Avroを利用しています。
データシリアライズのパッケージとしては他にGoogleのProtocol Buffersがよく使われてそうですが、Pigに標準でローダーが含まれていることもありAvroが利用されていると思われます(protobuf-java-2.4.0a.jarがlibに含まれているのでProtocol Buffersもそのまま使えるかも?)。

Pigのインストール
[bash gutter=”false”]
$ wget http://ftp.riken.jp/net/apache/pig/latest/pig-0.13.0.tar.gz
$ tar xvfz pig-0.13.0.tar.gz
[/bash]
アジャイルデータサイエンスではantでビルドしていますが、libに必要なjarがあるのでそれを利用します。
コマンドの実行は、パスは通さずに bin/pig を直接実行します。

Python Avroのインストール
easy_installでAvroをインストールしました。
[bash]
$ sudo apt-get install python-setuptools
$ sudo easy_install avro
[/bash]

Avroファイルの読み書きテスト

Agile Data Scienceのテスト用コードを実行してみます。
[bash gutter=”false”]
$ cd Agile_Data_Code/ch03/python
$ python test_avro.py
{u’message_id’: 11, u’topic’: u’Hello galaxy’, u’user_id’: 1}
{u’message_id’: 12, u’topic’: u’Jim is silly!’, u’user_id’: 1}
{u’message_id’: 23, u’topic’: u’I like apples.’, u’user_id’: 2}
$ cat /tmp/messages.avro
Objavro.schema?{"fields": [{"type": "int", "name": "message_id"}, {"type": "string", "name": "topic"}, {"type": "int", "name": "user_id"}], "type": "record", "name": "Message"}avro.codenullVUж?X5k??
??@?`Hello galaxyJim is silly!.I like apples.VUж?X5k??
??@?vagrant@precise64:~/Agile_Data_C
[/bash]

次に、書き出したAvroファイルをPigで読んでみます。
[text]
/* Set Home Directory – where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`

REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar

DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

rmf /tmp/sent_counts.txt

/* Load the emails in avro format (edit the path to match where you saved them) using the AvroStorage UDF from Piggybank */
messages = LOAD ‘/tmp/messages.avro’ USING AvroStorage();

STORE messages INTO ‘/tmp/messages_out’;
[/text]

[bash gutter=”false”]
$ JAVA_HOME=/usr pig-0.13.0/bin/pig -x local test_avro.pig

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.4 0.13.0 vagrant 2014-08-17 22:38:42 2014-08-17 22:38:46 UNKNOWN

Success!

Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local_0001 1 0 n/a n/a n/a n/a 0 0 0 0 messages MAP_ONLY /tmp/messages_out,

Input(s):
Successfully read 3 records from: "/tmp/messages.avro"

Output(s):
Successfully stored 3 records in: "/tmp/messages_out"

Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local_0001

2014-08-17 22:38:46,222 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
$ cat /tmp/messages_out/part-m-00000
11 Hello galaxy 1
12 Jim is silly! 1
23 I like apples. 2
[/bash]

Gmailからメールを取得して、アドレス毎のメール数をカウントする

PythonでEmailを処理するために以下のライブラリを追加しておきます。
[bash gutter=”false”]
$ sudo easy_install lepl
[/bash]

Gmailを取得するサンプルを実行してみます。
“[Gmail]/All Mail”は時間がかかるのでここではゴミ箱/Trashを指定してます。
[bash]
$ cd Agile_Data_Code/ch03/gmail
$ ./gmail.py -m automatic -u ‘hrendoh@gmail.com’ -p xxxxxx -s ./email.avro.schema -f ‘[Gmail]/Trash’ -o /tmp/gmail
Folder ‘[Gmail]/Trash’ SELECT status: OK
Folder ‘[Gmail]/Trash has 9’ emails…

Connected to folder [Gmail]/Trash and downloading 9 emails…

CHARSET: utf-8
utf-8
8 utf-8 1111112222233333

CHARSET: iso-2022-jp
iso-2022-jp
1 iso-2022-jp 1111112222233333
[/bash]

Avroフォーマットで書き出したメッセージをPigで読み込んでメールアドレス毎にカウントを実行します。
実行しやすいようにサンプルを書き換えました。
[text title=”sent_counts.pig”]
/* Set Home Directory – where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`

REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar

DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

rmf /tmp/sent_counts.txt

/* Load the emails in avro format (edit the path to match where you saved them) using the AvroStorage UDF from Piggybank */
messages = LOAD ‘/tmp/gmail/part-1.avro’ USING AvroStorage();

/* Filter nulls, they won’t help */
messages = FILTER messages BY (from IS NOT NULL) AND (tos IS NOT NULL);

/* Emails can be ‘to’ more than one person. FLATTEN() will project our from with each ‘to’ that exists. */
addresses = FOREACH messages GENERATE from.address AS from, FLATTEN(tos.(address)) AS to;

/* Lowercase the email addresses, so we don’t count MiXed case of the same address as multiple addresses */
lowers = FOREACH addresses GENERATE LOWER(from) AS from, LOWER(to) AS to;

/* GROUP BY each from/to pair into a bag (array), then count the bag’s contents ($1 means the 2nd field) to get a total.
Same as SQL: SELECT from, to, COUNT(*) FROM lowers GROUP BY (from, to);
Note: COUNT_STAR differs from COUNT in that it counts nulls. */
by_from_to = GROUP lowers BY (from, to);
sent_counts = FOREACH by_from_to GENERATE FLATTEN(group) AS (from, to), COUNT_STAR(lowers) AS total;

/* Sort the data, highest sent count first */
sent_counts = ORDER sent_counts BY total DESC;
STORE sent_counts INTO ‘/tmp/sent_counts.txt’;
[/text]
Pigを実行します。
[bash gutter=”false”]
$ JAVA_HOME=/usr pig-0.13.0/bin/pig -x local sent_counts.pig

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.4 0.13.0 vagrant 2014-08-17 23:11:35 2014-08-17 23:11:56 GROUP_BY,ORDER_BY,FILTER

Success!

Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local_0001 1 1 n/a n/a n/a n/a n/a n/a n/a n/a addresses,by_from_to,lowers,messages,sent_counts GROUP_BY,COMBINER
job_local_0002 1 1 n/a n/a n/a n/a n/a n/a n/a n/a sent_counts SAMPLER
job_local_0003 1 1 n/a n/a n/a n/a n/a n/a n/a n/a sent_counts ORDER_BY /tmp/sent_counts.txt,

Input(s):
Successfully read 8 records from: "/tmp/gmail/part-1.avro"

Output(s):
Successfully stored 3 records in: "/tmp/sent_counts.txt"

Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002 -> job_local_0003,
job_local_0003

2014-08-17 23:11:56,141 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
[/bash]
出力を確認してみます。
[bash gutter=”false”]
$ cat /tmp/sent_counts.txt/part-*
notification@hoge.com hrendoh@gmail.com 5
special@foo.jp hrendoh@gmail.com 2
news@example.jp hrendoh@gmail.com 1
[/bash]

参考
Protocol Buffers VS Thrift VS Avro: http://www.slideshare.net/IgorAnishchenko/pb-vs-thrift-vs-avro
PigでのJSONの扱い: How to Read and Write JSON-formatted Data With Apache Pig

補足
Ubuntu 14.0でのPigビルド手順
[bash gutter=”false”]
$ sudo apt-get install ant
$ sudo apt-get install default-jdk
$ export JAVA_HOME=/usr
$ wget http://ftp.riken.jp/net/apache/pig/latest/pig-0.13.0.tar.gz
$ tar xvfz pig-0.13.0.tar.gz
$ cd pig-0.13.0
$ ant
[/bash]
ビルドしたjarファイルを指定したsent_counts.pig
[text title=”sent_counts.pig”]
/* Set Home Directory – where we install software */
%default HOME `echo \$HOME/pig-0.13.0`

REGISTER $HOME/build/ivy/lib/Pig/avro-1.7.5.jar
REGISTER $HOME/build/ivy/lib/Pig/json-simple-1.1.jar
REGISTER $HOME/contrib/piggybank/java/piggybank.jar


[/text]