GreatExpectationsのActionについて

Actionとは GreatExpectations(GX)で得たValidationResult(検証結果)を受け取った後に行う処理をする機能。 Actionの種類 例えば次のようなActionがある。 class名 できること SlackNotificationAction Slackへの通知 PagerdutyAlertAction PagerDutyへのEvent通知 MicrosoftTeamsNotificationAction Teamsへの通知 OpsgenieAlertAction OpsgenieへのAlert通知 EmailAction Emailの送付 StoreValidationResultAction ValidationStoreへ検証結果の保存 StoreEvaluationParametersAction ValidationStoreへ評価値の保存(Expectation SuiteのValidationResultsにあるEvaluationParameter) StoreMetricsAction MetricStoreへ検証結果のMetricの保存 UpdateDataDocsAction DataDocの更新 SNSNotificationAction AmazonSNSへの通知 また、ValidationActionClassを継承し_run メソッドを上書きすればカスタマイズした独自のActionを追加することもできる。 使い方について 複数のBatchRequestを検証するCheckPointにActionは構成される。そのため、使用する際はdata_contextへadd_checkpointを用いてCheckPointを追加するとき、もしくはdata_contextへrun_checkpointを用いてCheckPointを実際に実行するときに、Actionを定義する。 例えば「data_contextへadd_checkpointを用いてCheckPointを追加するとき」のActionの定義の仕方は次のように行う。 yaml_config = """ ・・・ validations: - expectation_suite_name: users.warning - expectation_suite_name: users....

May 7, 2023 · Me

AirflowのPluginsについて

AirflowPluginsとは Airflowをカスタマイズするために追加できるPlugin。 どんな時にAirflowPluginsを使うのか 例えば以下のようなときにAirflowPluginsを使う。 AirflowのメタデータのDBのコンテンツを表示するViewを作成したいとき。 指定された時間内において任意数のDAGが失敗時にAirflowの機能を監視しカスタムアラートを送信するアプリケーションを作成したいとき。 外部データツールのファイルやログに動的にリンクする詳細ビューをUIに表示したいとき。 新規Plugin作成方法 「plugins_folder」へPythonファイルを用意し、Pythonファイル内でairflow.plugins_manager.AirflowPluginを拡張するクラスを定義すれば新規Pluginを作成できる。 Pythonファイルの例 定義したAirflowPluginクラスには以下を実装する。 name:propertyにPlugin名 on_load:functionにPluginのロード時に実行してほしい処理 そのほかPluginに追加したいComponent(Pluginの機能)について列挙 from airflow.plugins_manager import AirflowPlugin class MyAirflowPlugin(AirflowPlugin): name = "empty" def on_load(cls, *args, **kwargs): pass # そのほかPluginに追加したいComponentについて列挙していく。 上記場合、Plugin名が「empty」であるPluginを作成している。 「plugins_folder」の確認の仕方 ちなみに「plugins_folder」はどこで確認できるのかというのだが、「コマンドを利用する」か「cfgファイルを見る」の2通りで確認できる。 基本的にデフォルトでは$AIRFLOW_HOME/pluginsになっているはず。 # コマンドを利用する $ airflow info | grep plugins_folder plugins_folder | xxx # cfgファイルを見る $ cat airflow.cfg | grep plugins_folder plugins_folder = xxxx Plugin確認方法 現在追加されているPluginは何か、Pluginに定義されてあるComponentは何かを確認する方法だが、「UIから確認する方法」と「CLIを利用して確認する方法」がある。WebServerを再起動しなくてもすぐに反映されるのは後者の方法。...

January 4, 2023 · Me

AirflowのTaskFlowAPIについて

TaskFlowAPIとは Airflow上でのWorkflowの書き方の一つ。従来Workflowの書き方は「DAGclassを定義し、Operator同士を繋げる書き方(Operatorを使用した書き方)」しかなかったが、Airflow2.0から新しい書き方が導入された。 基本的な書き方 DAGの書き方 @dagでWorkflowを定義する関数をdecorateする。グローバル変数にDAGを登録する必要がなくなった。 @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) def tutorial_taskflow_api(): """ DAG Docs Example """ tutorial_taskflow_apiがdag_id 関数配下に書いてある__doc__がDAGのDoc Taskの書き方(PythonOperatorの例) TaskFlowDecoratorでTaskの関数をdecorateする。 @task() def extract(): """ extract doc_md Example """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict extractがtask_id 関数配下に書いてある__doc__がTaskのDoc 対応しているOperator 上記PythonOperator以外にも以下のOperatorがTaskFlowDecoratorに対応している。主に独自の実行環境でPythonを実行する必要があるようなOperatorが現在は対応しているイメージ。 Operator TaskFlowDecorator PythonOperator @task(@task.python) PythonVirtualenvOperator @task.virtualenv BranchPythonOperator @task....

January 1, 2023 · Me

Great Expectationsについて調べてみた

この記事はQiita x Code Polaris共催!女性ITエンジニアが作るアドベントカレンダー Advent Calendar 2022の15日目の記事です。Qiitaのアドベントカレンダーに参加するの初めてです(wktk) 概要 Great Expectations(NOT V2)についてドキュメントを読みながらわかったこと、試したことをまとめる。 Great Expectationsとは データのバリデーション(検証)、ドキュメント化、プロファイリング(概要定義)をしてくれるPythonのライブラリ。Superconductive社が開発しているOSS。GXと略されるらしい。 できること データの期待の定義(Expectation) oO(xxxのカラムはNULLがないはずだ!) oO(yyyのカラムはAAもしくはBBしか値が入らないはずだ!) といったような期待の定義。 データの自動的なプロファイリング(Profiling) 「xxxのカラムはNULLがないです」 「yyyのカラムはAAもしくはBBしか値が入らないです」 みたいなデータの概要の自動的な定義 データのバリデーション(Validation) 定義したExpectationが期待通りかの検証。 データのドキュメント化 定義したExpectationをValidationした結果の確認。 さまざまなデータソースやデータストアからのロード PandasやSparkなどのDataframeや、SQLDatabase及びファイルからのデータロード 何が嬉しいのか データの品質管理ができる -> バグ,過去分データの修正の早期対応。 似ているもの dbtのテスト機能。 kubeflowのValidation機能。 できないこと Pipelineの実行 あくまでPythonのライブラリ。 例えばデータ検証をしたいとなった時、GXの機能を活かしてデータ検証のPipeline(流れ)を作っていくわけだが、作ったPipelineのステップ毎の実行はできるが、全体を通した実行は単体でできない。なので、Airflow,dbt,Prefect,Dagster,KedroのようなDAGを実行するツールと統合すると良い。  データのバージョン管理 データのバージョン管理するならDVCやQuikltなどを使うと良い。 Pythonの環境以外で最適な状態で動くこと Pythonでできているので 違う言語やエコシステムから呼び出すとき、CLIから呼び出しなんとかするということもできるが、言語やエコシステムに沿ったものを選択する方が良い、(R->assertR, TensorFlow->TFDV) データ検証作成の流れ では、GreatExpectationsを使用して例えばデータ検証を作成したいとなったら何をしたら良いの、となるのだが以下の画像が参考になる。 https://docs....

December 11, 2022 · Me