AirflowのPluginsについて

AirflowPluginsとは Airflowをカスタマイズするために追加できるPlugin。 どんな時にAirflowPluginsを使うのか 例えば以下のようなときにAirflowPluginsを使う。 AirflowのメタデータのDBのコンテンツを表示するViewを作成したいとき。 指定された時間内において任意数のDAGが失敗時にAirflowの機能を監視しカスタムアラートを送信するアプリケーションを作成したいとき。 外部データツールのファイルやログに動的にリンクする詳細ビューをUIに表示したいとき。 新規Plugin作成方法 「plugins_folder」へPythonファイルを用意し、Pythonファイル内でairflow.plugins_manager.AirflowPluginを拡張するクラスを定義すれば新規Pluginを作成できる。 Pythonファイルの例 定義したAirflowPluginクラスには以下を実装する。 name:propertyにPlugin名 on_load:functionにPluginのロード時に実行してほしい処理 そのほかPluginに追加したいComponent(Pluginの機能)について列挙 from airflow.plugins_manager import AirflowPlugin class MyAirflowPlugin(AirflowPlugin): name = "empty" def on_load(cls, *args, **kwargs): pass # そのほかPluginに追加したいComponentについて列挙していく。 上記場合、Plugin名が「empty」であるPluginを作成している。 「plugins_folder」の確認の仕方 ちなみに「plugins_folder」はどこで確認できるのかというのだが、「コマンドを利用する」か「cfgファイルを見る」の2通りで確認できる。 基本的にデフォルトでは$AIRFLOW_HOME/pluginsになっているはず。 # コマンドを利用する $ airflow info | grep plugins_folder plugins_folder | xxx # cfgファイルを見る $ cat airflow.cfg | grep plugins_folder plugins_folder = xxxx Plugin確認方法 現在追加されているPluginは何か、Pluginに定義されてあるComponentは何かを確認する方法だが、「UIから確認する方法」と「CLIを利用して確認する方法」がある。WebServerを再起動しなくてもすぐに反映されるのは後者の方法。...

January 4, 2023 · Me

AirflowのTaskFlowAPIについて

TaskFlowAPIとは Airflow上でのWorkflowの書き方の一つ。従来Workflowの書き方は「DAGclassを定義し、Operator同士を繋げる書き方(Operatorを使用した書き方)」しかなかったが、Airflow2.0から新しい書き方が導入された。 基本的な書き方 DAGの書き方 @dagでWorkflowを定義する関数をdecorateする。グローバル変数にDAGを登録する必要がなくなった。 @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) def tutorial_taskflow_api(): """ DAG Docs Example """ tutorial_taskflow_apiがdag_id 関数配下に書いてある__doc__がDAGのDoc Taskの書き方(PythonOperatorの例) TaskFlowDecoratorでTaskの関数をdecorateする。 @task() def extract(): """ extract doc_md Example """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict extractがtask_id 関数配下に書いてある__doc__がTaskのDoc 対応しているOperator 上記PythonOperator以外にも以下のOperatorがTaskFlowDecoratorに対応している。主に独自の実行環境でPythonを実行する必要があるようなOperatorが現在は対応しているイメージ。 Operator TaskFlowDecorator PythonOperator @task(@task.python) PythonVirtualenvOperator @task.virtualenv BranchPythonOperator @task....

January 1, 2023 · Me

Great Expectationsについて調べてみた

この記事はQiita x Code Polaris共催!女性ITエンジニアが作るアドベントカレンダー Advent Calendar 2022の15日目の記事です。Qiitaのアドベントカレンダーに参加するの初めてです(wktk) 概要 Great Expectations(NOT V2)についてドキュメントを読みながらわかったこと、試したことをまとめる。 Great Expectationsとは データのバリデーション(検証)、ドキュメント化、プロファイリング(概要定義)をしてくれるPythonのライブラリ。Superconductive社が開発しているOSS。GXと略されるらしい。 できること データの期待の定義(Expectation) oO(xxxのカラムはNULLがないはずだ!) oO(yyyのカラムはAAもしくはBBしか値が入らないはずだ!) といったような期待の定義。 データの自動的なプロファイリング(Profiling) 「xxxのカラムはNULLがないです」 「yyyのカラムはAAもしくはBBしか値が入らないです」 みたいなデータの概要の自動的な定義 データのバリデーション(Validation) 定義したExpectationが期待通りかの検証。 データのドキュメント化 定義したExpectationをValidationした結果の確認。 さまざまなデータソースやデータストアからのロード PandasやSparkなどのDataframeや、SQLDatabase及びファイルからのデータロード 何が嬉しいのか データの品質管理ができる -> バグ,過去分データの修正の早期対応。 似ているもの dbtのテスト機能。 kubeflowのValidation機能。 できないこと Pipelineの実行 あくまでPythonのライブラリ。 例えばデータ検証をしたいとなった時、GXの機能を活かしてデータ検証のPipeline(流れ)を作っていくわけだが、作ったPipelineのステップ毎の実行はできるが、全体を通した実行は単体でできない。なので、Airflow,dbt,Prefect,Dagster,KedroのようなDAGを実行するツールと統合すると良い。  データのバージョン管理 データのバージョン管理するならDVCやQuikltなどを使うと良い。 Pythonの環境以外で最適な状態で動くこと Pythonでできているので 違う言語やエコシステムから呼び出すとき、CLIから呼び出しなんとかするということもできるが、言語やエコシステムに沿ったものを選択する方が良い、(R->assertR, TensorFlow->TFDV) データ検証作成の流れ では、GreatExpectationsを使用して例えばデータ検証を作成したいとなったら何をしたら良いの、となるのだが以下の画像が参考になる。 https://docs....

December 11, 2022 · Me