Hadoop周りのプロジェクトをさらっと学習するためにアジャイルデータサイエンスを読んでいます。

3章の前半、GmailからメールをIMAPで取り込みAvroフォーマットでシリアライズして、それをPigで読み込んでメールアドレスごとのカウントを取るところまでをUbuntu 14.0(Trusty Tahr)で試してみたメモになります。

この本では、半構造化データのシリアライズにApache Avroを利用しています。
データシリアライズのパッケージとしては他にGoogleのProtocol Buffersがよく使われてそうですが、Pigに標準でローダーが含まれていることもありAvroが利用されていると思われます(protobuf-java-2.4.0a.jarがlibに含まれているのでProtocol Buffersもそのまま使えるかも?)。

Pigのインストール

$ wget http://ftp.riken.jp/net/apache/pig/latest/pig-0.13.0.tar.gz
$ tar xvfz pig-0.13.0.tar.gz

アジャイルデータサイエンスではantでビルドしていますが、libに必要なjarがあるのでそれを利用します。
コマンドの実行は、パスは通さずに bin/pig を直接実行します。

Python Avroのインストール
easy_installでAvroをインストールしました。

$ sudo apt-get install python-setuptools
$ sudo easy_install avro

Avroファイルの読み書きテスト

Agile Data Scienceのテスト用コードを実行してみます。

$ cd Agile_Data_Code/ch03/python
$ python test_avro.py 
{u'message_id': 11, u'topic': u'Hello galaxy', u'user_id': 1}
{u'message_id': 12, u'topic': u'Jim is silly!', u'user_id': 1}
{u'message_id': 23, u'topic': u'I like apples.', u'user_id': 2}
$ cat /tmp/messages.avro 
Objavro.schema?{"fields": [{"type": "int", "name": "message_id"}, {"type": "string", "name": "topic"}, {"type": "int", "name": "user_id"}], "type": "record", "name": "Message"}avro.codenullVUж?X5k??
      ??@?`Hello galaxyJim is silly!.I like apples.VUж?X5k??
                                                            ??@?vagrant@precise64:~/Agile_Data_C

次に、書き出したAvroファイルをPigで読んでみます。

/* Set Home Directory - where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`

REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar

DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

rmf /tmp/sent_counts.txt

/* Load the emails in avro format (edit the path to match where you saved them) using the AvroStorage UDF from Piggybank */
messages = LOAD '/tmp/messages.avro' USING AvroStorage();

STORE messages INTO '/tmp/messages_out';
$ JAVA_HOME=/usr pig-0.13.0/bin/pig -x local test_avro.pig 
...
HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
1.0.4	0.13.0	vagrant	2014-08-17 22:38:42	2014-08-17 22:38:46	UNKNOWN

Success!

Job Stats (time in seconds):
JobId	Maps	Reduces	MaxMapTime	MinMapTIme	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	Feature	Outputs
job_local_0001	1	0	n/a	n/a	n/a	n/a	0	0	0	0	messages	MAP_ONLY	/tmp/messages_out,

Input(s):
Successfully read 3 records from: "/tmp/messages.avro"

Output(s):
Successfully stored 3 records in: "/tmp/messages_out"

Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local_0001


2014-08-17 22:38:46,222 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
$ cat /tmp/messages_out/part-m-00000 
11	Hello galaxy	1
12	Jim is silly!	1
23	I like apples.	2

Gmailからメールを取得して、アドレス毎のメール数をカウントする

PythonでEmailを処理するために以下のライブラリを追加しておきます。

$ sudo easy_install lepl

Gmailを取得するサンプルを実行してみます。
“[Gmail]/All Mail”は時間がかかるのでここではゴミ箱/Trashを指定してます。

$ cd Agile_Data_Code/ch03/gmail
$ ./gmail.py -m automatic -u 'hrendoh@gmail.com' -p xxxxxx -s ./email.avro.schema -f '[Gmail]/Trash' -o /tmp/gmail
Folder '[Gmail]/Trash' SELECT status: OK
Folder '[Gmail]/Trash has 9' emails...

Connected to folder [Gmail]/Trash and downloading 9 emails...

CHARSET: utf-8
utf-8
8 utf-8 1111112222233333

...

CHARSET: iso-2022-jp
iso-2022-jp
1 iso-2022-jp 1111112222233333

Avroフォーマットで書き出したメッセージをPigで読み込んでメールアドレス毎にカウントを実行します。
実行しやすいようにサンプルを書き換えました。

/* Set Home Directory - where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`

REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar

DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

rmf /tmp/sent_counts.txt

/* Load the emails in avro format (edit the path to match where you saved them) using the AvroStorage UDF from Piggybank */
messages = LOAD '/tmp/gmail/part-1.avro' USING AvroStorage();

/* Filter nulls, they won't help */
messages = FILTER messages BY (from IS NOT NULL) AND (tos IS NOT NULL);

/* Emails can be 'to' more than one person. FLATTEN() will project our from with each 'to' that exists. */
addresses = FOREACH messages GENERATE from.address AS from, FLATTEN(tos.(address)) AS to;

/* Lowercase the email addresses, so we don't count MiXed case of the same address as multiple addresses */
lowers = FOREACH addresses GENERATE LOWER(from) AS from, LOWER(to) AS to;

/* GROUP BY each from/to pair into a bag (array), then count the bag's contents ($1 means the 2nd field) to get a total.
   Same as SQL: SELECT from, to, COUNT(*) FROM lowers GROUP BY (from, to);
   Note: COUNT_STAR differs from COUNT in that it counts nulls. */
by_from_to = GROUP lowers BY (from, to);
sent_counts = FOREACH by_from_to GENERATE FLATTEN(group) AS (from, to), COUNT_STAR(lowers) AS total;

/* Sort the data, highest sent count first */
sent_counts = ORDER sent_counts BY total DESC;
STORE sent_counts INTO '/tmp/sent_counts.txt';

Pigを実行します。

$ JAVA_HOME=/usr pig-0.13.0/bin/pig -x local sent_counts.pig
...
HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
1.0.4	0.13.0	vagrant	2014-08-17 23:11:35	2014-08-17 23:11:56	GROUP_BY,ORDER_BY,FILTER

Success!

Job Stats (time in seconds):
JobId	Maps	Reduces	MaxMapTime	MinMapTIme	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	Feature	Outputs
job_local_0001	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	addresses,by_from_to,lowers,messages,sent_counts	GROUP_BY,COMBINER	
job_local_0002	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	sent_counts	SAMPLER	
job_local_0003	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	sent_counts	ORDER_BY	/tmp/sent_counts.txt,

Input(s):
Successfully read 8 records from: "/tmp/gmail/part-1.avro"

Output(s):
Successfully stored 3 records in: "/tmp/sent_counts.txt"

Counters:
Total records written : 3
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local_0001	->	job_local_0002,
job_local_0002	->	job_local_0003,
job_local_0003


2014-08-17 23:11:56,141 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

出力を確認してみます。

$ cat /tmp/sent_counts.txt/part-* 
notification@hoge.com	hrendoh@gmail.com	5
special@foo.jp	hrendoh@gmail.com	2
news@example.jp	hrendoh@gmail.com	1

参考
Protocol Buffers VS Thrift VS Avro: http://www.slideshare.net/IgorAnishchenko/pb-vs-thrift-vs-avro
PigでのJSONの扱い: How to Read and Write JSON-formatted Data With Apache Pig

補足
Ubuntu 14.0でのPigビルド手順

$ sudo apt-get install ant
$ sudo apt-get install default-jdk
$ export JAVA_HOME=/usr
$ wget http://ftp.riken.jp/net/apache/pig/latest/pig-0.13.0.tar.gz
$ tar xvfz pig-0.13.0.tar.gz
$ cd pig-0.13.0
$ ant

ビルドしたjarファイルを指定したsent_counts.pig

/* Set Home Directory - where we install software */
%default HOME `echo \$HOME/pig-0.13.0`

REGISTER $HOME/build/ivy/lib/Pig/avro-1.7.5.jar
REGISTER $HOME/build/ivy/lib/Pig/json-simple-1.1.jar
REGISTER $HOME/contrib/piggybank/java/piggybank.jar

...