AirflowのPluginsについて

AirflowPluginsとは Airflowをカスタマイズするために追加できるPlugin。 どんな時にAirflowPluginsを使うのか 例えば以下のようなときにAirflowPluginsを使う。 AirflowのメタデータのDBのコンテンツを表示するViewを作成したいとき。 指定された時間内において任意数のDAGが失敗時にAirflowの機能を監視しカスタムアラートを送信するアプリケーションを作成したいとき。 外部データツールのファイルやログに動的にリンクする詳細ビューをUIに表示したいとき。 新規Plugin作成方法 「plugins_folder」へPythonファイルを用意し、Pythonファイル内でairflow.plugins_manager.AirflowPluginを拡張するクラスを定義すれば新規Pluginを作成できる。 Pythonファイルの例 定義したAirflowPluginクラスには以下を実装する。 name:propertyにPlugin名 on_load:functionにPluginのロード時に実行してほしい処理 そのほかPluginに追加したいComponent(Pluginの機能)について列挙 from airflow.plugins_manager import AirflowPlugin class MyAirflowPlugin(AirflowPlugin): name = "empty" def on_load(cls, *args, **kwargs): pass # そのほかPluginに追加したいComponentについて列挙していく。 上記場合、Plugin名が「empty」であるPluginを作成している。 「plugins_folder」の確認の仕方 ちなみに「plugins_folder」はどこで確認できるのかというのだが、「コマンドを利用する」か「cfgファイルを見る」の2通りで確認できる。 基本的にデフォルトでは$AIRFLOW_HOME/pluginsになっているはず。 # コマンドを利用する $ airflow info | grep plugins_folder plugins_folder | xxx # cfgファイルを見る $ cat airflow.cfg | grep plugins_folder plugins_folder = xxxx Plugin確認方法 現在追加されているPluginは何か、Pluginに定義されてあるComponentは何かを確認する方法だが、「UIから確認する方法」と「CLIを利用して確認する方法」がある。WebServerを再起動しなくてもすぐに反映されるのは後者の方法。...

January 4, 2023 · Me

AirflowのTaskFlowAPIについて

TaskFlowAPIとは Airflow上でのWorkflowの書き方の一つ。従来Workflowの書き方は「DAGclassを定義し、Operator同士を繋げる書き方(Operatorを使用した書き方)」しかなかったが、Airflow2.0から新しい書き方が導入された。 基本的な書き方 DAGの書き方 @dagでWorkflowを定義する関数をdecorateする。グローバル変数にDAGを登録する必要がなくなった。 @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) def tutorial_taskflow_api(): """ DAG Docs Example """ tutorial_taskflow_apiがdag_id 関数配下に書いてある__doc__がDAGのDoc Taskの書き方(PythonOperatorの例) TaskFlowDecoratorでTaskの関数をdecorateする。 @task() def extract(): """ extract doc_md Example """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict extractがtask_id 関数配下に書いてある__doc__がTaskのDoc 対応しているOperator 上記PythonOperator以外にも以下のOperatorがTaskFlowDecoratorに対応している。主に独自の実行環境でPythonを実行する必要があるようなOperatorが現在は対応しているイメージ。 Operator TaskFlowDecorator PythonOperator @task(@task.python) PythonVirtualenvOperator @task.virtualenv BranchPythonOperator @task....

January 1, 2023 · Me

Airflowのschedule関連について

概要 Airflowのscheule関連についてしっかりと理解できていなかったので、DataPipelines with ApacheAirflowのCAPTER3「Scheduling in Airflow」を参考にしつつ雑にまとめてみた。 TL;DR DAGは「schedule_interval」に設定した間隔で実行される。 「schedule_interval」はcronとtimedeltaインスタンスで書くことができる。 「interval is started」「the end of the interval」 「execution date」は実際の実行時間ではない。 JinjaTemplateを使用して動的に設定することによってデータを段階的に処理できる。 DAGはbackfillを使うと過去分実行ができる。 schedule_intervalについて DAGの定期的な実行間隔は airflow.dagの「scheudle_interval」引数に指定する。 「None」の場合、UIもしくはAPIからの実行のみがトリガーとなる。 dag = DAG( dag_id="unscheduled", start_date=dt.datetime(2019, 1, 1), schedule_interval=None ) 「Cron」を使用することができる。 0 * * * * 0 0 * * MON-FRI 値をrangeで渡すこともできる(例は平日の0時に実行) 0, 0,12 * * * 値をリストで渡すこともできる(例は0,12時に実行) dag = DAG( dag_id="use_cron", start_date=dt....

January 1, 2022 · Me