Apache PigはHadoop関連プロジェクトの一つでMapReduceをスクリプト(DSL)で手軽に記述できるようにするフレームワークです。

Pig Latinという独自のスクリプト言語でパイプラインを1ステップ毎記述していきます。

ここではApache Pigをセットアップし、公式チュートリアルのサンプルをローカル実行して基本的な動作を確認、Hadoopクラスタ上で実行させるまでを体験してみます。

Hadoopクラスタは、手軽にためすならAWS Elastic MapReduceを利用するのが簡単です。
ここでは、Apache Hadoop 2.5.0で構築したクラスターを利用します。

OSはUbuntu 14.04、各パッケージのバージョンは、このブログ記事を書いている時点のものを記載しています。

Apache Pigのセットアップ

Java 1.6以上が実行環境として必要です。
Ubuntu 14.04 標準の OpenJDK 1.7.x をインストールします。

$ sudo apt-get update
$ sudo apt-get install -y default-jdk
$ java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.1) (7u65-2.5.1-4ubuntu1~0.14.04.2)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)

Pigは実行時JAVA_HOMEを参照するので、JAVA_HOMEを/etc/profileに追加しておきます。

$ sudo vi /etc/profile
...
JAVA_HOME=/usr/lib/jvm/default-java/
export JAVA_HOME
$ source /etc/profile

Apache Pigをダウンロード
適当なミラーサイトからpig-0.13.9.tar.gzを落とします。

$ wget http://apache.petsads.us/pig/pig-0.13.0/pig-0.13.0.tar.gz

展開して、pigコマンドをパスに通します。

$ tar xvfz pig-0.13.0.tar.gz
$ sudo vi /etc/profile
...
export PATH=/<展開したディレクトリ>/pig-0.13.0/bin:$PATH
$ source /etc/profile

確認

$ pig -h

Apache Pig version 0.13.0 (r1606446) 
compiled Jun 29 2014, 02:29:34
...

セットアップは以上で完了です。

チュートリアルのビルド

チュートリアルは実行するにはビルドが必要です。
ビルドはAntを使うのでAntをインストール後、チュートリアルディレクトリに移動してantコマンドを実行します。

$ sudo apt-get install -y ant
$ cd pig-0.13.0/tutorial/
$ ant

pigtutorial.tar.gzが作成されるので、作業ディレクトリにコピーして展開します。

$ cd
$ cp pig-0.13.0/tutorial/pigtutorial.tar.gz .
$ tar xvfz pigtutorial.tar.gz

チュートリアルはpigtmpという名前のディレクトリで展開されます。
チュートリアルのディレクトリ移動して、ローカル実行で確認してみます。

$ cd pigtmp
$ pig -x local script1-local.pig
...

HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
1.0.4	0.13.0	vagrant	2014-09-06 04:53:30	2014-09-06 04:54:07	GROUP_BY,ORDER_BY,DISTINCT,FILTER

Success!

Job Stats (time in seconds):
JobId	Maps	Reduces	MaxMapTime	MinMapTIme	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	FeatureOutputs
job_local_0001	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	clean1,clean2,houred,ngramed1,raw	DISTINCT	
job_local_0002	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	hour_frequency1,hour_frequency2	GROUP_BY,COMBINER	
job_local_0003	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	filtered_uniq_frequency,uniq_frequency1,uniq_frequency2,uniq_frequency3	GROUP_BY
job_local_0004	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	ordered_uniq_frequency	SAMPLER	
job_local_0005	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	ordered_uniq_frequency	ORDER_BY	file:///home/vagrant/pigtmp/script1-local-results.txt,

Input(s):
Successfully read 4501 records from: "file:///home/vagrant/pigtmp/excite-small.log"

Output(s):
Successfully stored 18 records in: "file:///home/vagrant/pigtmp/script1-local-results.txt"

Counters:
Total records written : 18
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local_0001	->	job_local_0002,
job_local_0002	->	job_local_0003,
job_local_0003	->	job_local_0004,
job_local_0004	->	job_local_0005,
job_local_0005


実行結果は、script1-local-results.txtディレクトリに出力さます。

$ cat script1-local-results.txt/part-r-* 
07	new	2.4494897427831788	2	1.1428571428571426
08	pictures	2.04939015319192	3	1.4999999999999998
08	computer	2.4494897427831788	2	1.1428571428571426
08	s	2.545584412271571	3	1.3636363636363635
10	free	2.2657896674010605	4	1.923076923076923
10	to	2.6457513110645903	2	1.125
10	pics	2.794002794004192	3	1.3076923076923075
10	school	2.828427124746189	2	1.1111111111111114
11	pictures	2.04939015319192	3	1.4999999999999998
11	in	2.1572774865200244	3	1.4285714285714284
13	the	3.1309398305840723	6	1.9375
14	music	2.1105794120443453	4	1.6666666666666667
14	city	2.2360679774997902	2	1.1666666666666665
14	university	2.412090756622109	3	1.4000000000000001
15	adult	2.8284271247461903	2	1.1111111111111112
17	chat	2.9104275004359965	3	1.2857142857142854
19	in	2.1572774865200244	3	1.4285714285714284
19	car	2.23606797749979	3	1.3333333333333333

Hadoopクラスタ上でチュートリアルを実行する

Mapreduce ModeモードでPigを実行し、Hadoopクラスタ上でPigスクリプトを実行します。

Hadoopのインストール

hadoopコマンドを使ってHDFSにチュートリアル用のファイルをアップするのと、PigスクリプトをHadoop 2.x系に対して実行する場合、hadoopコマンドを利用してpigを実行しないと正しく動作しなそうなので(pigのコマンドの中身を見てみたところ)、クライアント用途としてHadoopをインストールしておく必要があります。
Apache Download Mirrorsよりhadoop-2.5.0.tar.gzをダウンロードします。

$ wget http://apache.petsads.us/hadoop/common/hadoop-2.5.0/hadoop-2.5.0.tar.gz

ダウンロードしたアーカイブをインストール先のディレクトリに解凍します。

$ tar xvfz hadoop-2.5.0.tar.gz

hadoopコマンドをパスに通してます。

$ sudo vi /etc/profile
...
HADOOP_HOME=/<展開したディレクトリ>/hadoop-2.5.0
export PATH=$HADOOP_HOME/bin:$PATH
$ source /etc/profile

Hadoopの設定ファイルのパスは、pigコマンドの中でhadoopコマンドのパスから抽出されます。
参照: Q: How can I pass a specific hadoop configuration parameter to Pig?

Hadoopの設定

HDFSのNodeName、YARNのResourceManagerが動作しているノードのホスト名またはIPアドレスを各設定ファイルに指定して行きます。

core-size.xmlに、hadoopコマンドでアクセスするHDFS NameNodeが起動しているホストを設定します。

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
   <property>
        <name>fs.default.name</name>
        <value>hdfs://hostname:9000</value>
    </property>
</configuration>

mapred-site.xmlを作成して、MapReduceジョブ設定にyarnを指定します。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

yarn-site.xmlに、YARNのResourceManagerデーモンが起動しているホストを指定します。

<?xml version="1.0"?>
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hostname</value>
    </property>
</configuration>

ResourceMangerのサーバーでJobHistoryServerを実行

Apache PigをYARN上で実行する場合、JobHistoryServerを起動して、mapred-site.xmlにJobHistoryServerのサーバーとポートを指定する必要があります。

ResourceManagerと同じサーバー上でJobHistoryServerを実行します。

$ sbin/mr-jobhistory-daemon.sh start historyserver

起動を確認

$ jps
7113 Jps
5806 NameNode
7021 JobHistoryServer
6204 ResourceManager
6025 SecondaryNameNode

mapred-site.xmlのmapreduce.jobhistory.addressプロパティにJobHistoryServerの設定を追加します。
このmapred-siet.xmlはPigを実行するマシン側のHadoop設定です。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>hostname:10020</value>
    </property>
</configuration>

JobHistoryServerを実行し、設定しておかない場合、以下のメッセージが出力され続けます。

[JobControl] INFO  org.apache.hadoop.ipc.Client - Retrying connect to server: 0.0.0.0/0.0.0.0:10020.

http://stackoverflow.com/questions/17930644/connection-error-in-apache-pig

チュートリアルの実行

入力ファイルをHDFSにアップロードします。

$ hadoop fs -mkdir /user
$ hadoop fs -mkdir /user/<username>
$ cd pigtmp
$ hadoop fs -copyFromLocal excite.log.bz2 .
$ hadoop fs -ls .
Found 1 items
-rw-r--r--   1 vagrant supergroup   10408717 2014-08-23 11:57 /user/excite.log.bz2

実行

$ pig script1-hadoop.pig
...

HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
2.5.0	0.13.0	vagrant	2014-09-07 09:58:39	2014-09-07 10:00:19	GROUP_BY,ORDER_BY,DISTINCT,FILTER

Success!

Job Stats (time in seconds):
JobId	Maps	Reduces	MaxMapTime	MinMapTIme	AvgMapTime	MedianMapTime	MaxReduceTime	MinReduceTime	AvgReduceTime	MedianReducetime	Alias	FeaturOutputs
job_local1316645786_0003	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	filtered_uniq_frequency,uniq_frequency1,uniq_frequency2,uniq_frequency3	GROUP_BY	
job_local1391820848_0001	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	clean1,clean2,houred,ngramed1,raw	DISTINCT	
job_local184372047_0002	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	hour_frequency1,hour_frequency2	GROUP_BY,COMBINER	
job_local307241090_0004	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	ordered_uniq_frequency	SAMPLER	
job_local66847514_0005	1	1	n/a	n/a	n/a	n/a	n/a	n/a	n/a	n/a	ordered_uniq_frequency	ORDER_BY	hdfs://localhost:9000/user/vagrant/script1-hadoop-results,

Input(s):
Successfully read 944954 records (20838874 bytes) from: "hdfs://localhost:9000/user/vagrant/excite.log.bz2"

Output(s):
Successfully stored 13530 records (153971104 bytes) in: "hdfs://localhost:9000/user/vagrant/script1-hadoop-results"

Counters:
Total records written : 13530
Total bytes written : 153971104
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1391820848_0001	->	job_local184372047_0002,
job_local184372047_0002	->	job_local1316645786_0003,
job_local1316645786_0003	->	job_local307241090_0004,
job_local307241090_0004	->	job_local66847514_0005,
job_local66847514_0005


2014-09-07 10:00:19,511 [main] WARN  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning SKIP_UDF_CALL_FOR_NULL 98895 time(s).
2014-09-07 10:00:19,511 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

結果を確認

$ hadoop fs -ls script1-hadoop-results
Found 2 items
-rw-r--r--   1 vagrant supergroup          0 2014-09-07 10:00 script1-hadoop-results/_SUCCESS
-rw-r--r--   1 vagrant supergroup     659954 2014-09-07 10:00 script1-hadoop-results/part-r-00000
$ hadoop fs -cat 'script1-hadoop-results/*' | head
00	and shareware	2.112885636821291	3	1.5294117647058825
00	vcd	2.1213203435596424	3	1.5
00	bluebird	2.1555530241167826	4	1.9285714285714282
00	cute	2.1650635094610955	4	1.8571428571428574
00	chested	2.182820625326997	4	1.75
00	diablo cheats	2.197401062294143	3	1.4705882352941178
00	psygnosis	2.23606797749979	2	1.1666666666666667
00	gif s	2.2360679774997902	2	1.1666666666666665
00	vacancy	2.2360679774997902	2	1.1666666666666665
00	free mpegs	2.2360679774997902	2	1.1666666666666665
cat: Unable to write to output stream.