Apache PigはHadoop関連プロジェクトの一つでMapReduceをスクリプト(DSL)で手軽に記述できるようにするフレームワークです。
Pig Latinという独自のスクリプト言語でパイプラインを1ステップ毎記述していきます。
ここではApache Pigをセットアップし、公式チュートリアルのサンプルをローカル実行して基本的な動作を確認、Hadoopクラスタ上で実行させるまでを体験してみます。
Hadoopクラスタは、手軽にためすならAWS Elastic MapReduceを利用するのが簡単です。
ここでは、Apache Hadoop 2.5.0で構築したクラスターを利用します。
OSはUbuntu 14.04、各パッケージのバージョンは、このブログ記事を書いている時点のものを記載しています。
Apache Pigのセットアップ
Java 1.6以上が実行環境として必要です。
Ubuntu 14.04 標準の OpenJDK 1.7.x をインストールします。
[bash gutter=”false”]
$ sudo apt-get update
$ sudo apt-get install -y default-jdk
$ java -version
java version "1.7.0_65"
OpenJDK Runtime Environment (IcedTea 2.5.1) (7u65-2.5.1-4ubuntu1~0.14.04.2)
OpenJDK 64-Bit Server VM (build 24.65-b04, mixed mode)
[/bash]
Pigは実行時JAVA_HOMEを参照するので、JAVA_HOMEを/etc/profileに追加しておきます。
[bash gutter=”false”]
$ sudo vi /etc/profile
…
JAVA_HOME=/usr/lib/jvm/default-java/
export JAVA_HOME
$ source /etc/profile
[/bash]
Apache Pigをダウンロード
適当なミラーサイトからpig-0.13.9.tar.gzを落とします。
[bash gutter=”false”]
$ wget http://apache.petsads.us/pig/pig-0.13.0/pig-0.13.0.tar.gz
[/bash]
展開して、pigコマンドをパスに通します。
[bash gutter=”false”]
$ tar xvfz pig-0.13.0.tar.gz
$ sudo vi /etc/profile
…
export PATH=/<展開したディレクトリ>/pig-0.13.0/bin:$PATH
$ source /etc/profile
[/bash]
確認
[bash gutter=”false”]
$ pig -h
Apache Pig version 0.13.0 (r1606446)
compiled Jun 29 2014, 02:29:34
…
[/bash]
セットアップは以上で完了です。
チュートリアルのビルド
チュートリアルは実行するにはビルドが必要です。
ビルドはAntを使うのでAntをインストール後、チュートリアルディレクトリに移動してantコマンドを実行します。
[bash gutter=”false”]
$ sudo apt-get install -y ant
$ cd pig-0.13.0/tutorial/
$ ant
[/bash]
pigtutorial.tar.gzが作成されるので、作業ディレクトリにコピーして展開します。
[bash gutter=”false”]
$ cd
$ cp pig-0.13.0/tutorial/pigtutorial.tar.gz .
$ tar xvfz pigtutorial.tar.gz
[/bash]
チュートリアルはpigtmpという名前のディレクトリで展開されます。
チュートリアルのディレクトリ移動して、ローカル実行で確認してみます。
[bash]
$ cd pigtmp
$ pig -x local script1-local.pig
…
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.4 0.13.0 vagrant 2014-09-06 04:53:30 2014-09-06 04:54:07 GROUP_BY,ORDER_BY,DISTINCT,FILTER
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias FeatureOutputs
job_local_0001 1 1 n/a n/a n/a n/a n/a n/a n/a n/a clean1,clean2,houred,ngramed1,raw DISTINCT
job_local_0002 1 1 n/a n/a n/a n/a n/a n/a n/a n/a hour_frequency1,hour_frequency2 GROUP_BY,COMBINER
job_local_0003 1 1 n/a n/a n/a n/a n/a n/a n/a n/a filtered_uniq_frequency,uniq_frequency1,uniq_frequency2,uniq_frequency3 GROUP_BY
job_local_0004 1 1 n/a n/a n/a n/a n/a n/a n/a n/a ordered_uniq_frequency SAMPLER
job_local_0005 1 1 n/a n/a n/a n/a n/a n/a n/a n/a ordered_uniq_frequency ORDER_BY file:///home/vagrant/pigtmp/script1-local-results.txt,
Input(s):
Successfully read 4501 records from: "file:///home/vagrant/pigtmp/excite-small.log"
Output(s):
Successfully stored 18 records in: "file:///home/vagrant/pigtmp/script1-local-results.txt"
Counters:
Total records written : 18
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002 -> job_local_0003,
job_local_0003 -> job_local_0004,
job_local_0004 -> job_local_0005,
job_local_0005
[/bash]
実行結果は、script1-local-results.txtディレクトリに出力さます。
[bash gutter=”false”]
$ cat script1-local-results.txt/part-r-*
07 new 2.4494897427831788 2 1.1428571428571426
08 pictures 2.04939015319192 3 1.4999999999999998
08 computer 2.4494897427831788 2 1.1428571428571426
08 s 2.545584412271571 3 1.3636363636363635
10 free 2.2657896674010605 4 1.923076923076923
10 to 2.6457513110645903 2 1.125
10 pics 2.794002794004192 3 1.3076923076923075
10 school 2.828427124746189 2 1.1111111111111114
11 pictures 2.04939015319192 3 1.4999999999999998
11 in 2.1572774865200244 3 1.4285714285714284
13 the 3.1309398305840723 6 1.9375
14 music 2.1105794120443453 4 1.6666666666666667
14 city 2.2360679774997902 2 1.1666666666666665
14 university 2.412090756622109 3 1.4000000000000001
15 adult 2.8284271247461903 2 1.1111111111111112
17 chat 2.9104275004359965 3 1.2857142857142854
19 in 2.1572774865200244 3 1.4285714285714284
19 car 2.23606797749979 3 1.3333333333333333
[/bash]
Hadoopクラスタ上でチュートリアルを実行する
Mapreduce ModeモードでPigを実行し、Hadoopクラスタ上でPigスクリプトを実行します。
Hadoopのインストール
hadoopコマンドを使ってHDFSにチュートリアル用のファイルをアップするのと、PigスクリプトをHadoop 2.x系に対して実行する場合、hadoopコマンドを利用してpigを実行しないと正しく動作しなそうなので(pigのコマンドの中身を見てみたところ)、クライアント用途としてHadoopをインストールしておく必要があります。
Apache Download Mirrorsよりhadoop-2.5.0.tar.gzをダウンロードします。
[bash gutter=”false”]
$ wget http://apache.petsads.us/hadoop/common/hadoop-2.5.0/hadoop-2.5.0.tar.gz
[/bash]
ダウンロードしたアーカイブをインストール先のディレクトリに解凍します。
[bash gutter=”false”]
$ tar xvfz hadoop-2.5.0.tar.gz
[/bash]
hadoopコマンドをパスに通してます。
[bash gutter=”false”]
$ sudo vi /etc/profile
…
HADOOP_HOME=/<展開したディレクトリ>/hadoop-2.5.0
export PATH=$HADOOP_HOME/bin:$PATH
$ source /etc/profile
[/bash]
Hadoopの設定ファイルのパスは、pigコマンドの中でhadoopコマンドのパスから抽出されます。
参照: Q: How can I pass a specific hadoop configuration parameter to Pig?
Hadoopの設定
HDFSのNodeName、YARNのResourceManagerが動作しているノードのホスト名またはIPアドレスを各設定ファイルに指定して行きます。
core-size.xmlに、hadoopコマンドでアクセスするHDFS NameNodeが起動しているホストを設定します。
[xml title=”etc/hadoop/core-site.xml”]
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://hostname:9000</value>
</property>
</configuration>
[/xml]
mapred-site.xmlを作成して、MapReduceジョブ設定にyarnを指定します。
[xml title=”etc/hadoop/mapred-site.xml”]
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
[/xml]
yarn-site.xmlに、YARNのResourceManagerデーモンが起動しているホストを指定します。
[xml title=”etc/hadoop/yarn-site.xml”]
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hostname</value>
</property>
</configuration>
[/xml]
ResourceMangerのサーバーでJobHistoryServerを実行
Apache PigをYARN上で実行する場合、JobHistoryServerを起動して、mapred-site.xmlにJobHistoryServerのサーバーとポートを指定する必要があります。
ResourceManagerと同じサーバー上でJobHistoryServerを実行します。
[bash]
$ sbin/mr-jobhistory-daemon.sh start historyserver
[/bash]
起動を確認
[bash gutter=”false”]
$ jps
7113 Jps
5806 NameNode
7021 JobHistoryServer
6204 ResourceManager
6025 SecondaryNameNode
[/bash]
mapred-site.xmlのmapreduce.jobhistory.addressプロパティにJobHistoryServerの設定を追加します。
このmapred-siet.xmlはPigを実行するマシン側のHadoop設定です。
[xml title=”etc/hadoop/mapred-site.xml” highlight=”8,9,10,11″]
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>hostname:10020</value>
</property>
</configuration>
[/xml]
JobHistoryServerを実行し、設定しておかない場合、以下のメッセージが出力され続けます。
[bash gutter=”false”]
[JobControl] INFO org.apache.hadoop.ipc.Client – Retrying connect to server: 0.0.0.0/0.0.0.0:10020.
[/bash]
http://stackoverflow.com/questions/17930644/connection-error-in-apache-pig
チュートリアルの実行
入力ファイルをHDFSにアップロードします。
[bash gutter=”false”]
$ hadoop fs -mkdir /user
$ hadoop fs -mkdir /user/<username>
$ cd pigtmp
$ hadoop fs -copyFromLocal excite.log.bz2 .
$ hadoop fs -ls .
Found 1 items
-rw-r–r– 1 vagrant supergroup 10408717 2014-08-23 11:57 /user/excite.log.bz2
[/bash]
実行
[bash gutter=”false”]
$ pig script1-hadoop.pig
…
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.5.0 0.13.0 vagrant 2014-09-07 09:58:39 2014-09-07 10:00:19 GROUP_BY,ORDER_BY,DISTINCT,FILTER
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias FeaturOutputs
job_local1316645786_0003 1 1 n/a n/a n/a n/a n/a n/a n/a n/a filtered_uniq_frequency,uniq_frequency1,uniq_frequency2,uniq_frequency3 GROUP_BY
job_local1391820848_0001 1 1 n/a n/a n/a n/a n/a n/a n/a n/a clean1,clean2,houred,ngramed1,raw DISTINCT
job_local184372047_0002 1 1 n/a n/a n/a n/a n/a n/a n/a n/a hour_frequency1,hour_frequency2 GROUP_BY,COMBINER
job_local307241090_0004 1 1 n/a n/a n/a n/a n/a n/a n/a n/a ordered_uniq_frequency SAMPLER
job_local66847514_0005 1 1 n/a n/a n/a n/a n/a n/a n/a n/a ordered_uniq_frequency ORDER_BY hdfs://localhost:9000/user/vagrant/script1-hadoop-results,
Input(s):
Successfully read 944954 records (20838874 bytes) from: "hdfs://localhost:9000/user/vagrant/excite.log.bz2"
Output(s):
Successfully stored 13530 records (153971104 bytes) in: "hdfs://localhost:9000/user/vagrant/script1-hadoop-results"
Counters:
Total records written : 13530
Total bytes written : 153971104
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_local1391820848_0001 -> job_local184372047_0002,
job_local184372047_0002 -> job_local1316645786_0003,
job_local1316645786_0003 -> job_local307241090_0004,
job_local307241090_0004 -> job_local66847514_0005,
job_local66847514_0005
2014-09-07 10:00:19,511 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Encountered Warning SKIP_UDF_CALL_FOR_NULL 98895 time(s).
2014-09-07 10:00:19,511 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
[/bash]
結果を確認
[bash gutter=”false”]
$ hadoop fs -ls script1-hadoop-results
Found 2 items
-rw-r–r– 1 vagrant supergroup 0 2014-09-07 10:00 script1-hadoop-results/_SUCCESS
-rw-r–r– 1 vagrant supergroup 659954 2014-09-07 10:00 script1-hadoop-results/part-r-00000
$ hadoop fs -cat ‘script1-hadoop-results/*’ | head
00 and shareware 2.112885636821291 3 1.5294117647058825
00 vcd 2.1213203435596424 3 1.5
00 bluebird 2.1555530241167826 4 1.9285714285714282
00 cute 2.1650635094610955 4 1.8571428571428574
00 chested 2.182820625326997 4 1.75
00 diablo cheats 2.197401062294143 3 1.4705882352941178
00 psygnosis 2.23606797749979 2 1.1666666666666667
00 gif s 2.2360679774997902 2 1.1666666666666665
00 vacancy 2.2360679774997902 2 1.1666666666666665
00 free mpegs 2.2360679774997902 2 1.1666666666666665
cat: Unable to write to output stream.
[/bash]