2020.01.10
GCPのCloud Composerでバッチ処理基盤を作る

こんにちは! エス・エム・エスキャリア開発者の石塚です。
今回社内で利用するバッチ処理基盤として、Google Cloud Platformのプロダクト、Cloud Composerを導入したので、
その事例について紹介します。
なぜGoogle Cloud Platformなのか
Google Cloud Platform(以下GCP)と他のクラウドサービスを比較して、一番の特徴は、BigQueryの存在だと考えてます。
Salesforce、Webのログ、その他様々な場所にあるデータをBigQueryに蓄積し、データ分析基盤として利用するためにGCPを使っています。
今回導入したCloud Composerも、主にGCPを中心としたデータ連携基盤として運用しています。
Cloud Composerとは
Cloud Composerとは、GCP上でApache Airflowというワークフローエンジンを動かすことができるプロダクトです。Kubernetes Engine上で動くため、オートスケールが可能、細かいインフラの設定も不要、構築もボタン一つででき、導入もしやすいプロダクトでした。
一方でGCPがAirflowをラッピングして提供しているため、一部の機能がロックされていたり、GCPの他のプロダクトと比較して、利用料金が割高なところは目をつむる必要がありました。
Airflowができること
AirflowはPythonのコードを書いて、DAG(有向非巡回グラフ)というグラフでワークフローを定義します。1個のDAGの中に複数のタスクを定義し、タスクとタスクの依存関係を定義できます。作成したDAGは、毎日〇時、平日1時間毎など、スケジュールを設定して自動実行しています。
作成したDAGはAirflowのWebUI上で確認、実行ができます。
Airflow DAGの例
上記のDAGの場合、「init_1」と「init_2」が先に実行され、矢印の方向に向かってタスクが実行されていきます。
サンプルソースはこちら。
"""
DAGのサンプルソース
"""
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
default_dag_args = {
'start_date': datetime(2019, 12, 19, 10, 0, 0),
'owner': 'Sample',
}
# DAGの定義
with DAG(
dag_id='sample_dag',
schedule_interval=None,
default_args=default_dag_args) as dag:
def echo():
print('hello')
# タスク作成
init_1 = PythonOperator(task_id='init_1', python_callable=echo)
prepare_task_1 = PythonOperator(task_id='prepare_task_1', python_callable=echo)
prepare_task_2 = PythonOperator(task_id='prepare_task_2', python_callable=echo)
prepare_task_3 = PythonOperator(task_id='prepare_task_3', python_callable=echo)
main1_task = PythonOperator(task_id='main1_task', python_callable=echo)
init_2 = PythonOperator(task_id='init_2', python_callable=echo)
main2_task = PythonOperator(task_id='main2_task', python_callable=echo)
end_task = PythonOperator(task_id='end_task', python_callable=echo)
# タスクの依存関係
init_1 >> prepare_task_1 >> main1_task
init_1 >> prepare_task_2 >> main1_task
init_1 >> prepare_task_3 >> main1_task
main1_task >> end_task
init_2 >> main2_task >> end_task
今回のサンプルソースではPythonのスクリプトを実行し、”hello”と標準出力するのみです。
タスクの生成は、Airflowが用意しているOperatorクラスのインスタンスを生成することで行っています。Pythonのスクリプトを実行するPythonOperatorや、BigQueryにクエリを実行するBigQueryOperatorなどのクラスが用意されているので、用途によって使い分けたり、自作でOperatorクラスを作成したりして利用しています。
DAGの矢印となるタスクの依存関係はサンプルソースの一番下で定義しています。
Airflowようなワークフローエンジンは他にもありますし、似たようなことはKubernete EngineのCronjobでジョブを定期実行することでも実現ができますが、次に上げるメリットが大きかったため、導入に至りました。
導入してみてよかったこと
①Pythonでワークフローの定義ができる
GUIでD&Dするだけで、ワークフローを作成することができるETLツールもありますが、レビューがしづらかったり、変更箇所がわかりにくかったり、痒い所に手が届かなかったりするデメリットもあります。ワークフローの定義からビジネスロジックまでをPythonでコーディングできるため、柔軟に扱えるのがメリットです。
②Web UIがリッチ
WebUIが他のオープンソースのワークフローツールと比較して非常に多機能です。ワークフローを可視化してくれたり、ボタンでワークフローを実行で来たり、ログもWebUI上で見れたりします。圧倒的に運用が楽です。
③設定ファイル、モジュールの使い回しがしやすい
ビジネスロジックも基本的にPythonで書いてます。設定ファイルや自分で書いたPythonモジュールをCloud Storage上に置いておくことで、importやファイルの共有ができます。環境変数として扱う値も、Airflow上に設定できるので、Compute Engineのインスタンスを複数作成し、それぞれで処理を行う…といった構成よりも管理がしやすいです。
④異常終了時の処理を書きやすい
APIリクエストを実行したり、DBにアクセスしたりするとき、当然リトライ制御が必要になってきますが、AirflowはOperatorクラスのパラメータにリトライ回数、リトライ間隔(秒)を指定するだけで、異常終了時もタスク単位でリトライ実行してくれます。
また、リトライ回数実行し、タスクが異常終了したとき、エラー内容を通知する仕組みが必要なりますが、DAGを作成するときのパラメータ、on_failure_callbackに、異常終了したときに呼び出すPythonの関数を設定できます。
私たちの場合、通知先をSlackに設定したかったので、Slackに通知する共通処理をPythonで実装し、全DAGで利用できるようにしました。
どのDAGのどのタスクが失敗したのか、Owner(担当チーム)、エラーメッセージ、詳細画面へのURLを通知しています。
もちろんメールでの通知も設定可能です(Sendgrid APIならノンコーディングで設定可能)。
例)Slackへのエラー通知
まとめ
以上、GCPのCloud Composerを導入し、バッチ処理基盤を構築した話でした。
まだ導入して間もないですが、社内でのデータ連携処理で利用し安定稼働・運用しています。
次回は、数か月運用して変化したこと・感じたこと、について書こうと思います。
参考
Cloud Composer – マネージド ワークフロー オーケストレーション | Google Cloud Composer | Google Cloud
https://cloud.google.com/composer/?hl=ja
Apache Airflow
https://airflow.apache.org/