GCPのCloud Composerでバッチ処理基盤を作る

こんにちは! エス・エム・エスキャリア開発者の石塚です。
今回社内で利用するバッチ処理基盤として、Google Cloud Platformのプロダクト、Cloud Composerを導入したので、
その事例について紹介します。

なぜGoogle Cloud Platformなのか

Google Cloud Platform(以下GCP)と他のクラウドサービスを比較して、一番の特徴は、BigQueryの存在だと考えてます。
Salesforce、Webのログ、その他様々な場所にあるデータをBigQueryに蓄積し、データ分析基盤として利用するためにGCPを使っています。
今回導入したCloud Composerも、主にGCPを中心としたデータ連携基盤として運用しています。

Cloud Composerとは

Cloud Composerとは、GCP上でApache Airflowというワークフローエンジンを動かすことができるプロダクトです。Kubernetes Engine上で動くため、オートスケールが可能、細かいインフラの設定も不要、構築もボタン一つででき、導入もしやすいプロダクトでした。
一方でGCPがAirflowをラッピングして提供しているため、一部の機能がロックされていたり、GCPの他のプロダクトと比較して、利用料金が割高なところは目をつむる必要がありました。

Airflowができること

AirflowはPythonのコードを書いて、DAG(有向非巡回グラフ)というグラフでワークフローを定義します。1個のDAGの中に複数のタスクを定義し、タスクとタスクの依存関係を定義できます。作成したDAGは、毎日〇時、平日1時間毎など、スケジュールを設定して自動実行しています。
作成したDAGはAirflowのWebUI上で確認、実行ができます。

Airflow DAGの例

上記のDAGの場合、「init_1」と「init_2」が先に実行され、矢印の方向に向かってタスクが実行されていきます。
サンプルソースはこちら。

"""
DAGのサンプルソース
"""

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator


default_dag_args = {
    'start_date': datetime(2019, 12, 19, 10, 0, 0),
    'owner': 'Sample',
}

# DAGの定義
with DAG(
        dag_id='sample_dag',
        schedule_interval=None,
        default_args=default_dag_args) as dag:

    def echo():
        print('hello')

    # タスク作成
    init_1 = PythonOperator(task_id='init_1', python_callable=echo)
    prepare_task_1 = PythonOperator(task_id='prepare_task_1', python_callable=echo)
    prepare_task_2 = PythonOperator(task_id='prepare_task_2', python_callable=echo)
    prepare_task_3 = PythonOperator(task_id='prepare_task_3', python_callable=echo)
    main1_task = PythonOperator(task_id='main1_task', python_callable=echo)
    init_2 = PythonOperator(task_id='init_2', python_callable=echo)
    main2_task = PythonOperator(task_id='main2_task', python_callable=echo)
    end_task = PythonOperator(task_id='end_task', python_callable=echo)

    # タスクの依存関係
    init_1 >> prepare_task_1 >> main1_task
    init_1 >> prepare_task_2 >> main1_task
    init_1 >> prepare_task_3 >> main1_task
    main1_task >> end_task
    init_2 >> main2_task >> end_task

今回のサンプルソースではPythonのスクリプトを実行し、”hello”と標準出力するのみです。
タスクの生成は、Airflowが用意しているOperatorクラスのインスタンスを生成することで行っています。Pythonのスクリプトを実行するPythonOperatorや、BigQueryにクエリを実行するBigQueryOperatorなどのクラスが用意されているので、用途によって使い分けたり、自作でOperatorクラスを作成したりして利用しています。
DAGの矢印となるタスクの依存関係はサンプルソースの一番下で定義しています。

Airflowようなワークフローエンジンは他にもありますし、似たようなことはKubernete EngineのCronjobでジョブを定期実行することでも実現ができますが、次に上げるメリットが大きかったため、導入に至りました。

導入してみてよかったこと

①Pythonでワークフローの定義ができる
GUIでD&Dするだけで、ワークフローを作成することができるETLツールもありますが、レビューがしづらかったり、変更箇所がわかりにくかったり、痒い所に手が届かなかったりするデメリットもあります。ワークフローの定義からビジネスロジックまでをPythonでコーディングできるため、柔軟に扱えるのがメリットです。

②Web UIがリッチ
WebUIが他のオープンソースのワークフローツールと比較して非常に多機能です。ワークフローを可視化してくれたり、ボタンでワークフローを実行で来たり、ログもWebUI上で見れたりします。圧倒的に運用が楽です。

③設定ファイル、モジュールの使い回しがしやすい
ビジネスロジックも基本的にPythonで書いてます。設定ファイルや自分で書いたPythonモジュールをCloud Storage上に置いておくことで、importやファイルの共有ができます。環境変数として扱う値も、Airflow上に設定できるので、Compute Engineのインスタンスを複数作成し、それぞれで処理を行う…といった構成よりも管理がしやすいです。

④異常終了時の処理を書きやすい
APIリクエストを実行したり、DBにアクセスしたりするとき、当然リトライ制御が必要になってきますが、AirflowはOperatorクラスのパラメータにリトライ回数、リトライ間隔(秒)を指定するだけで、異常終了時もタスク単位でリトライ実行してくれます。
また、リトライ回数実行し、タスクが異常終了したとき、エラー内容を通知する仕組みが必要なりますが、DAGを作成するときのパラメータ、on_failure_callbackに、異常終了したときに呼び出すPythonの関数を設定できます。
私たちの場合、通知先をSlackに設定したかったので、Slackに通知する共通処理をPythonで実装し、全DAGで利用できるようにしました。
どのDAGのどのタスクが失敗したのか、Owner(担当チーム)、エラーメッセージ、詳細画面へのURLを通知しています。
もちろんメールでの通知も設定可能です(Sendgrid APIならノンコーディングで設定可能)。

例)Slackへのエラー通知

まとめ

以上、GCPのCloud Composerを導入し、バッチ処理基盤を構築した話でした。
まだ導入して間もないですが、社内でのデータ連携処理で利用し安定稼働・運用しています。

次回は、数か月運用して変化したこと・感じたこと、について書こうと思います。

参考

Cloud Composer – マネージド ワークフロー オーケストレーション  |  Google Cloud Composer  |  Google Cloud
https://cloud.google.com/composer/?hl=ja

Apache Airflow
https://airflow.apache.org/

Pocket
LINEで送る