アジャイルデータサイエンス読書メモその3です。
3章の後半、Pigで集計した結果をElasticsearchにロードする箇所をUbuntu 14.0(Trusty Tahr)で試した際のメモになります。

アジャイルデータサイエンスでは、ElasticsearchへのデータのロードはWonderdog(https://github.com/infochimps-labs/wonderdog)を利用していますが、2014年8月時点ではElasticsearchプロジェクト本体でPigをサポートしています。
Apache Pig support
Elasticsearch.org Hadoop | Elasticsearch

そこで、WonderdogではなくElasticsearch for Apache Hadoopを利用してApache PigからElasticsearchへデータをロードしてみます。


Elasticsearchのインストール

Elasticsearchはダウンロードして解凍するだけですぐに利用できます。
ダウンロードサイト: Elasticsearch.org Download ELK | Elasticsearch

$ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.3.2.tar.gz
$ tar xvfz elasticsearch-1.3.2.tar.gz

起動コマンド

$ bin/elasticsearch

Elasticsearchをパッケージインストールする手順は、以前に記事を書いていますのでこちらを参照ください。
elasticsearch 1.1をubuntu12.04にセットアップ

Elasticsearch for Apache Hadoopのセットアップ
こちらもダウンロードして解凍するだけです。
ダウンロードサイト: Elasticsearch.org Download | Elasticsearch

$ wget http://download.elasticsearch.org/hadoop/elasticsearch-hadoop-2.0.1.zip
$ unzip elasticsearch-hadoop-2.0.1.zip

Pigから利用するelasticsearch-hadoop-2.0.1.jarはelasticsearch-hadoop-2.0.1/distに含まれています。

PigからElasticsearchにデータをロードする
PigでAvroフォーマットデータを読み出し、Elasticsearchにロード可能なJSONフォーマットに変換し、Elasticsearchにロードする処理は以下のようになります。
こちらは、元のファイル elasticsearch.pig を修正したものになります。

/* Set Home Directory - where we install software */
%default PIG_HOME `echo \$HOME/pig-0.13.0`
 
REGISTER $PIG_HOME/lib/avro-1.7.5.jar
REGISTER $PIG_HOME/lib/json-simple-1.1.jar
REGISTER $PIG_HOME/lib/piggybank.jar

DEFINE AvroStorage org.apache.pig.piggybank.storage.avro.AvroStorage();

%default HOME `echo \$HOME`

/* Register wonderdog - elasticsearch integration */
REGISTER $HOME/elasticsearch-hadoop-2.0.1/dist/elasticsearch-hadoop-2.0.1.jar

/* Remove the old json */
rmf /tmp/sent_count_json

/* Nuke the elasticsearch sent_counts index, as we are about to replace it. */
sh curl -XDELETE 'http://localhost:9200/inbox/sentcounts'

/* Load Avros, and store as JSON */
sent_counts = LOAD '/tmp/sent_counts.txt' AS (from:chararray, to:chararray, total:long);
STORE sent_counts INTO '/tmp/sent_count_json' USING JsonStorage();

/* Now load the JSON as a single chararray field, and index it into ElasticSearch with Wonderdog from InfoChimps */
sent_count_json = LOAD '/tmp/sent_count_json' AS (sent_counts:chararray);
STORE sent_count_json INTO 'inbox/sentcounts' USING org.elasticsearch.hadoop.pig.EsStorage();

/* Search for Hadoop to make sure we get a hit in our sent_count index */
sh curl -XGET 'http://localhost:9200/inbox/sentcounts/_search'

Pigを実行します。

$ JAVA_HOME=/usr ~/pig-0.12.0/bin/pig -x local -v -w elasticsearch.pig 
...

HadoopVersion	PigVersion	UserId	StartedAt	FinishedAt	Features
1.0.0	0.12.0	vagrant	2014-08-23 02:09:58	2014-08-23 02:10:06	UNKNOWN

Success!

Job Stats (time in seconds):
JobId	Alias	Feature	Outputs
job_local_0001	sent_counts	MAP_ONLY	/tmp/sent_count_json,
job_local_0002	sent_count_json	MAP_ONLY	inbox/sentcounts,

Input(s):
Successfully read records from: "/tmp/sent_counts.txt"
Successfully read records from: "/tmp/sent_count_json"

Output(s):
Successfully stored records in: "/tmp/sent_count_json"
Successfully stored records in: "inbox/sentcounts"

Job DAG:
job_local_0001	->	job_local_0002,
job_local_0002


2014-08-23 02:10:06,658 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   712  100   712    0     0  54000      0 --:--:-- --:--:-- --:--:-- 54769
{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":3,"max_score":1.0,"hits":[{"_index":"inbox","_type":"sentcounts","_id":"IZUpITzeR_SiUmY3x3U-YA","_score":1.0,"_source":{"sent_counts":"{\"from\":\"special@foo.jp\",\"to\":\"hrendoh@gmail.com\",\"total\":2}"}},{"_index":"inbox","_type":"sentcounts","_id":"riphFl8YSBSaEMFMID1Rxw","_score":1.0,"_source":{"sent_counts":"{\"from\":\"notification@hoge.com\",\"to\":\"hrendoh@gmail.com\",\"total\":5}"}},{"_index":"inbox","_type":"sentcounts","_id":"ISOpG4jQQimhH8DSLAK66Q","_score":1.0,"_source":{"sent_counts":"{\"from\":\"news@example.jp\",\"to\":\"hrendoh@gmail.com\",\"total\":1}"}}]}}

PigでElasticSearchにロードされたデータを出力されるところまで成功しました。