Apache Pigで処理したデータをElasticsearchにロードする – アジャイルデータサイエンス読書メモ その3


アジャイルデータサイエンス読書メモその3です。
3章の後半、Pigで集計した結果をElasticsearchにロードする箇所をUbuntu 14.0(Trusty Tahr)で試した際のメモになります。

アジャイルデータサイエンスでは、ElasticsearchへのデータのロードはWonderdog(https://github.com/infochimps-labs/wonderdog)を利用していますが、2014年8月時点ではElasticsearchプロジェクト本体でPigをサポートしています。
Apache Pig support
Elasticsearch.org Hadoop | Elasticsearch

そこで、WonderdogではなくElasticsearch for Apache Hadoopを利用してApache PigからElasticsearchへデータをロードしてみます。


Elasticsearchのインストール

Elasticsearchはダウンロードして解凍するだけですぐに利用できます。
ダウンロードサイト: Elasticsearch.org Download ELK | Elasticsearch
[bash gutter=”false”]
$ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.2.tar.gz
$ tar xvfz elasticsearch-1.3.2.tar.gz
[/bash]
起動コマンド
[bash]
$ bin/elasticsearch
[/bash]

Elasticsearchをパッケージインストールする手順は、以前に記事を書いていますのでこちらを参照ください。
elasticsearch 1.1をubuntu12.04にセットアップ

Elasticsearch for Apache Hadoopのセットアップ
こちらもダウンロードして解凍するだけです。
ダウンロードサイト: Elasticsearch.org Download | Elasticsearch
[bash gutter=”false”]
$ wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.0.1.zip
$ unzip elasticsearch-hadoop-2.0.1.zip
[/bash]
Pigから利用するelasticsearch-hadoop-2.0.1.jarはelasticsearch-hadoop-2.0.1/distに含まれています。

PigからElasticsearchにデータをロードする
PigでAvroフォーマットデータを読み出し、Elasticsearchにロード可能なJSONフォーマットに変換し、Elasticsearchにロードする処理は以下のようになります。
こちらは、元のファイル elasticsearch.pig を修正したものになります。
[text language=”title="elasticsearch.pig”]
/* Set Home Directory – where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`

REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar

DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

%default HOME `echo \$HOME`

/* Register wonderdog – elasticsearch integration */
REGISTER $HOME/elasticsearch-hadoop-2.0.1/dist/elasticsearch-hadoop-2.0.1.jar

/* Remove the old json */
rmf /tmp/sent_count_json

/* Nuke the elasticsearch sent_counts index, as we are about to replace it. */
sh curl -XDELETE ‘http://localhost:9200/inbox/sentcounts’

/* Load Avros, and store as JSON */
sent_counts = LOAD ‘/tmp/sent_counts.txt’ AS (from:chararray, to:chararray, total:long);
STORE sent_counts INTO ‘/tmp/sent_count_json’ USING JsonStorage();

/* Now load the JSON as a single chararray field, and index it into ElasticSearch with Wonderdog from InfoChimps */
sent_count_json = LOAD ‘/tmp/sent_count_json’ AS (sent_counts:chararray);
STORE sent_count_json INTO ‘inbox/sentcounts’ USING org.elasticsearch.hadoop.pig.EsStorage();

/* Search for Hadoop to make sure we get a hit in our sent_count index */
sh curl -XGET ‘http://localhost:9200/inbox/sentcounts/_search’
[/text]

Pigを実行します。
[bash gutter=”false”]
$ JAVA_HOME=/usr ~/pig-0.12.0/bin/pig -x local -v -w elasticsearch.pig

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
1.0.0 0.12.0 vagrant 2014-08-23 02:09:58 2014-08-23 02:10:06 UNKNOWN

Success!

Job Stats (time in seconds):
JobId Alias Feature Outputs
job_local_0001 sent_counts MAP_ONLY /tmp/sent_count_json,
job_local_0002 sent_count_json MAP_ONLY inbox/sentcounts,

Input(s):
Successfully read records from: "/tmp/sent_counts.txt"
Successfully read records from: "/tmp/sent_count_json"

Output(s):
Successfully stored records in: "/tmp/sent_count_json"
Successfully stored records in: "inbox/sentcounts"

Job DAG:
job_local_0001 -> job_local_0002,
job_local_0002

2014-08-23 02:10:06,658 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed

0 0 0 0 0 0 0 0 –:–:– –:–:– –:–:– 0
100 712 100 712 0 0 54000 0 –:–:– –:–:– –:–:– 54769
{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":3,"max_score":1.0,"hits":[{"_index":"inbox","_type":"sentcounts","_id":"IZUpITzeR_SiUmY3x3U-YA","_score":1.0,"_source":{"sent_counts":"{\"from\":\"special@foo.jp\",\"to\":\"hrendoh@gmail.com\",\"total\":2}"}},{"_index":"inbox","_type":"sentcounts","_id":"riphFl8YSBSaEMFMID1Rxw","_score":1.0,"_source":{"sent_counts":"{\"from\":\"notification@hoge.com\",\"to\":\"hrendoh@gmail.com\",\"total\":5}"}},{"_index":"inbox","_type":"sentcounts","_id":"ISOpG4jQQimhH8DSLAK66Q","_score":1.0,"_source":{"sent_counts":"{\"from\":\"news@example.jp\",\"to\":\"hrendoh@gmail.com\",\"total\":1}"}}]}}
[/bash]
PigでElasticSearchにロードされたデータを出力されるところまで成功しました。

,