GCPのCloud Composerを運用して感じたこと

こんにちは! エス・エム・エスキャリア開発者の石塚です。
前回に引き続き、GCPのCloud Composerを導入した事例について、
今回は、数か月運用して感じたことについて書いていこうと思います!

感じたこと

Airflow CLI使わないと開発の効率が悪かった

Cloud ComposerはソースをGCSの指定フォルダ内にアップロードすることで、
Airflow WebUIからDAGを見ることができるのですが、この手順が煩雑でした。
また、WebUIからDAGの手動実行はできるけど、タスク単体の実行ができなかったので、
途中のタスクの変更の動作確認のみをしたい場合も、全て実行しなければならないのが手間でした。

Airflow CLIを使うことで、コマンドでデプロイやテストができます。
ローカル環境(VSCodeでPythonのコードを書いています)からCLIを使って開発することで開発効率を上げました。
Cloud ComposerではAirflow CLIをgcloudコマンドで実行できます。
(長くて覚えられないので、エイリアスを設定してコマンドを簡略化して利用しています)

  • GCSのdagsフォルダにファイルをアップロードする
gcloud composer environments storage dags import \
--environment <Composer環境名> \
--location <Composerロケーション> \
--source <ローカルのファイルパス>
  • DAGの中のタスクを単体実行する
gcloud composer environments run \
<Composer環境名> --location <Composerロケーション> \
test -- $(date --date "9 hours ago" +"%Y-%m-%dT%H:%M:%S+00:00")

 

似たようなタスクの追加がしやすい

同じようなタスクでもPythonのリストで定義して、ループでタスクを作成すれば、
後からの追加が1行で済み、差分のチェックやレビューが楽なのがメリットです。
タスクの依存関係もPythonのコードで定義できるので、
6つのタスクのグループを、3並列で直列に動かす、等の処理も定義がしやすいです。

サンプルソース

"""
DAGのサンプルソース
"""

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

default_dag_args = {
    'start_date': datetime(2020, 1, 24, 0, 0, 0),
    'owner': 'Sample',
}

# 並列数
concurrency = 3

# タスクの一覧
object_list = [
    {'id': 'a001', 'name': 'Anna'},
    {'id': 'a002', 'name': 'Bella'},
    {'id': 'a003', 'name': 'Caroline'},
    {'id': 'a004', 'name': 'Dorothy'},
    {'id': 'a005', 'name': 'Emily'},
    {'id': 'a006', 'name': 'Fiona'},
]

# DAGの定義
with DAG(
    dag_id='sample_task_list',
    schedule_interval=None,
    default_args=default_dag_args) as dag:

    # タスク内で実行する関数
    def hello(name):
        print('hello {}'.format(name))

    def bye(name):
        print('bye {}'.format(name))

    # タスクの一覧
    hello_list = []
    bye_list = []

    # object_listに従い、タスクを作成
    for i, v in enumerate(object_list):
        hello_list.append(PythonOperator(
            task_id='hello_{}'.format(v['id']),
            python_callable=hello,
            op_kwargs={'name', v['name']}
        ))

        bye_list.append(PythonOperator(
            task_id='bye_{}'.format(v['id']),
            python_callable=hello,
            op_kwargs={'name', v['name']}
        ))

        # タスク依存関係
    hello_list[i] >> bye_list[i]

    if i >= concurrency:
        bye_list[i - concurrency] >> hello_list[i]

 

GUIでみた結果

KubernetesPodOperatorが便利だった

SalesforceのデータをDataloaderというツールを使ってGCPにデータを連携してみたのですが、
DataloaderはJavaで動くため、Pythonやbashくらいしか使えないAirflowの環境ではそのまま使えません。
どうしてもJavaが動作する環境で実行する必要があったのですが、KubernetesPodOperatorというOperatorクラスを使えば解決しました。

Cloud ComposerはKubernetes Engine上でAirflowを動かしているため、
クラスタを新規で作成することなく、そのまま利用できます。
Java + Dataloaderが動作するDockerイメージを作成し、
GCPのContainer Registryという、非公開のコンテナ イメージ レジストリにpushしておけば、
KubernetesPodOperatorのパラメータでイメージを指定するだけで、
Podの実行、削除、Airflow側へのログ出力、エラー通知までをしてくれます。便利!

まとめ

以上、2回にわたって、GCPのCloud Composerを利用してみた話を、
サンプルソースと共に書きました。
まだ導入して間もないですが、Cloud Composer、Airflowの今後の発展を期待し、引き続き運用を進めていきます。

参考

Airflow コマンドライン インターフェース  |  Cloud Composer  |  Google Cloud https://cloud.google.com/composer/docs/how-to/accessing/airflow-cli?hl=ja

Container Registry の概要  |  Container Registry のドキュメント  |  Google Cloud
https://cloud.google.com/container-registry/docs/overview?hl=ja

Pocket
LINEで送る