2020.01.24
GCPのCloud Composerを運用して感じたこと

こんにちは! エス・エム・エスキャリア開発者の石塚です。
前回に引き続き、GCPのCloud Composerを導入した事例について、
今回は、数か月運用して感じたことについて書いていこうと思います!
感じたこと
Airflow CLI使わないと開発の効率が悪かった
Cloud ComposerはソースをGCSの指定フォルダ内にアップロードすることで、
Airflow WebUIからDAGを見ることができるのですが、この手順が煩雑でした。
また、WebUIからDAGの手動実行はできるけど、タスク単体の実行ができなかったので、
途中のタスクの変更の動作確認のみをしたい場合も、全て実行しなければならないのが手間でした。
Airflow CLIを使うことで、コマンドでデプロイやテストができます。
ローカル環境(VSCodeでPythonのコードを書いています)からCLIを使って開発することで開発効率を上げました。
Cloud ComposerではAirflow CLIをgcloudコマンドで実行できます。
(長くて覚えられないので、エイリアスを設定してコマンドを簡略化して利用しています)
- GCSのdagsフォルダにファイルをアップロードする
gcloud composer environments storage dags import \ --environment <Composer環境名> \ --location <Composerロケーション> \ --source <ローカルのファイルパス>
- DAGの中のタスクを単体実行する
gcloud composer environments run \ <Composer環境名> --location <Composerロケーション> \ test -- $(date --date "9 hours ago" +"%Y-%m-%dT%H:%M:%S+00:00")
似たようなタスクの追加がしやすい
同じようなタスクでもPythonのリストで定義して、ループでタスクを作成すれば、
後からの追加が1行で済み、差分のチェックやレビューが楽なのがメリットです。
タスクの依存関係もPythonのコードで定義できるので、
6つのタスクのグループを、3並列で直列に動かす、等の処理も定義がしやすいです。
サンプルソース
""" DAGのサンプルソース """ from datetime import datetime from airflow.models import DAG from airflow.operators.python_operator import PythonOperator default_dag_args = { 'start_date': datetime(2020, 1, 24, 0, 0, 0), 'owner': 'Sample', } # 並列数 concurrency = 3 # タスクの一覧 object_list = [ {'id': 'a001', 'name': 'Anna'}, {'id': 'a002', 'name': 'Bella'}, {'id': 'a003', 'name': 'Caroline'}, {'id': 'a004', 'name': 'Dorothy'}, {'id': 'a005', 'name': 'Emily'}, {'id': 'a006', 'name': 'Fiona'}, ] # DAGの定義 with DAG( dag_id='sample_task_list', schedule_interval=None, default_args=default_dag_args) as dag: # タスク内で実行する関数 def hello(name): print('hello {}'.format(name)) def bye(name): print('bye {}'.format(name)) # タスクの一覧 hello_list = [] bye_list = [] # object_listに従い、タスクを作成 for i, v in enumerate(object_list): hello_list.append(PythonOperator( task_id='hello_{}'.format(v['id']), python_callable=hello, op_kwargs={'name', v['name']} )) bye_list.append(PythonOperator( task_id='bye_{}'.format(v['id']), python_callable=hello, op_kwargs={'name', v['name']} )) # タスク依存関係 hello_list[i] >> bye_list[i] if i >= concurrency: bye_list[i - concurrency] >> hello_list[i]
GUIでみた結果
KubernetesPodOperatorが便利だった
SalesforceのデータをDataloaderというツールを使ってGCPにデータを連携してみたのですが、
DataloaderはJavaで動くため、Pythonやbashくらいしか使えないAirflowの環境ではそのまま使えません。
どうしてもJavaが動作する環境で実行する必要があったのですが、KubernetesPodOperatorというOperatorクラスを使えば解決しました。
Cloud ComposerはKubernetes Engine上でAirflowを動かしているため、
クラスタを新規で作成することなく、そのまま利用できます。
Java + Dataloaderが動作するDockerイメージを作成し、
GCPのContainer Registryという、非公開のコンテナ イメージ レジストリにpushしておけば、
KubernetesPodOperatorのパラメータでイメージを指定するだけで、
Podの実行、削除、Airflow側へのログ出力、エラー通知までをしてくれます。便利!
まとめ
以上、2回にわたって、GCPのCloud Composerを利用してみた話を、
サンプルソースと共に書きました。
まだ導入して間もないですが、Cloud Composer、Airflowの今後の発展を期待し、引き続き運用を進めていきます。
参考
Airflow コマンドライン インターフェース | Cloud Composer | Google Cloud https://cloud.google.com/composer/docs/how-to/accessing/airflow-cli?hl=ja
Container Registry の概要 | Container Registry のドキュメント | Google Cloud
https://cloud.google.com/container-registry/docs/overview?hl=ja