読者です 読者をやめる 読者になる 読者になる

Elasticsearchのコードを読んでみる③ Bulk編

プログラミング

ほとんど更新と一緒だけど、運用する時はBulkのがよく使うと思うので簡単に。


curl -XPOST localhost:9200/_bulk -d '
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }
'
な感じのリクエスト。

Bulk処理

RestBulkAction : bulkリクエストを処理するクラス
- client#bulk()を実行。めぐりめぐってNodeClient#execute()が実行される。

NodeClient
- TransportBulkAction#execute()を実行。

TransportBulkAction
- Bulkリクエストに含まれるインデックス名を抜き出し、インデックス生成が必要な場合はインデックスを作成。(createIndexAction#executeをインデックス分実行。この辺りは更新編を参照。)
- インデックス生成が終わったらexecuteBulk()でBulk処理を行う。
- 対象シャードごとにbulkリクエストリストを作成し、そこにbulkのリクエストを一行ずつ分つめていく。
- 対象シャードが空ならここでレスポンスを返す。
- シャードごとにTransportShardBulkAction#execute()を実行して処理していく。

TransportShardBulkAction
- TransportShardBulkAction#execute()からTransportShardReplicationOperationAction.AsyncShardOperationAction#start()が実行される。
- 対象シャードがローカルであればbulkのThreadPool上で以降の処理を行う。他ノードであれば対象ノードにリクエストを送信する。
- shardOperationOnPrimary()にてプライマリシャードを対象にBulkリクエストを一つずつ順次処理。
- IndexShard#index()やIndexShard#delete()などで更新していく。
- プライマリの後はshardOperationOnReplicaでレプリカシャードに対して処理。
- 全てのシャードで処理が終わったらレスポンスを返却する。