Actionとは

GreatExpectations(GX)で得たValidationResult(検証結果)を受け取った後に行う処理をする機能。

Actionの種類

例えば次のようなActionがある。

class名できること
SlackNotificationActionSlackへの通知
PagerdutyAlertActionPagerDutyへのEvent通知
MicrosoftTeamsNotificationActionTeamsへの通知
OpsgenieAlertActionOpsgenieへのAlert通知
EmailActionEmailの送付
StoreValidationResultActionValidationStoreへ検証結果の保存
StoreEvaluationParametersActionValidationStoreへ評価値の保存(Expectation SuiteのValidationResultsにあるEvaluationParameter)
StoreMetricsActionMetricStoreへ検証結果のMetricの保存
UpdateDataDocsActionDataDocの更新
SNSNotificationActionAmazonSNSへの通知

また、ValidationActionClassを継承し_run メソッドを上書きすればカスタマイズした独自のActionを追加することもできる。

使い方について

複数のBatchRequestを検証するCheckPointにActionは構成される。そのため、使用する際はdata_contextへadd_checkpointを用いてCheckPointを追加するとき、もしくはdata_contextへrun_checkpointを用いてCheckPointを実際に実行するときに、Actionを定義する。

例えば「data_contextへadd_checkpointを用いてCheckPointを追加するとき」のActionの定義の仕方は次のように行う。

yaml_config = """
・・・
validations:
  - expectation_suite_name: users.warning 
  - expectation_suite_name: users.error  
    action_list:
    - name: quarantine_failed_data
      action:
          class_name: CreateQuarantineData
    - name: advance_passed_data
      action:
          class_name: CreatePassedData
action_list:
    - name: store_validation_result
      action:
        class_name: StoreValidationResultAction
    - name: store_evaluation_params
      action:
        class_name: StoreEvaluationParametersAction
    - name: update_data_docs
      action:
        class_name: UpdateDataDocsAction
・・・ 
"""
context.add_checkpoint(**yaml.load(yaml_config))

「.validations.expectation_suite_name.action_list」に書かれている各Actionは、紐づかれているexpectation_suite(ここではuser.error)の検証結果に特化したActionの定義である。 「.action_list」に書かれている各Actionは、全expectation_suiteの検証結果に対するActionの定義である。

「data_contextへrun_checkpointを用いてCheckPointを実際に実行するとき」のActionの定義の仕方は次のように行う。

checkpoint_run_result: CheckpointResult = data_context.run_checkpoint(
    ・・・
    action_list=[
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"}
        },
        {
            "name": "store_evaluation_params",
            "action": {"class_name": "StoreEvaluationParametersAction"}
        }
    ]
    ・・・
)

StoreMetricsActionを実際に試してみる

Actionのうち、「StoreMetricsAction」を試し、実際にMetricStoreへ検証結果のMetricが保存されることを確認する。 公式のHow toを参考に試してみる。

1.事前準備

2.MetricStoreの追加

GreatExpectationsへMetricStoreの情報を追加する。参考と同じく、今回はMetricStoreをPostgresとする。

great_expectations/uncommitted/config_variables.ymlへPostgresの認証情報の追加

great_expectations.ymlへcredentialsを記載するための認証情報をconfig_variables.ymlへ書く。

my_postgres_db_yaml_creds:
  drivername: postgresql
  host: ${MY_DB_HOST}
  port: 5432
  username: postgres
  password: ${MY_DB_PW}
  database: metric_store_sample

MY_DB_HOSTMY_DB_PWのexport

上記config_variables.ymlで記載してstring.Templateがloadできるようにexportする。

export MY_DB_HOST=''
export MY_DB_PW=''

great_expectations/great_expectations.ymlへMetricStoreの追加

Store名「sample_metric_store」として追加する。 ちなみに.stores.metric_store.store_backendについては、現在「DatabaseStoreBackend」のsubclassのみ対応されているようだ。

stores:
・・・・
  sample_metric_store:
    class_name: MetricStore
    store_backend:
      class_name: DatabaseStoreBackend
      credentials: ${my_postgres_db_yaml_creds}

3.CheckPointの作成

今回はこちらの検証結果をもとに以下のようなCheckPointを作成してみる。

## 事前準備
batch_request:BatchRequest
expectation_suite_name:str

## DataContextは初期化
import great_expectations as gx
context: AbstractDataContext = gx.get_context()

# checkpointをdata_contexthe追加
checkpoint_name = "my_checkpoint"
context.add_checkpoint(
    name=checkpoint_name,
    config_version=1,
    class_name="Checkpoint",
    validations=[
        {
            "expectation_suite_name": suite_name,
        }
    ],
    batch_request=batch_request,
    action_list=[
        {
            "name": "store_metric", # 任意のAction名
            "action": {
                "class_name": "StoreMetricsAction",
                "target_store_name": "sample_metric_store", # great_expectations.ymlへ追加したStore名
                "requested_metrics":{ # 保存するMetricsについて列挙
                    "*": [ # 全ExpectationSuite対象
                        "statistics.successful_expectations", # A
                        "statistics.unsuccessful_expectations" # B
                    ],
                    suite_name: [ # 任意のExpectationSuite対象
                        {"column": {
                            "int_column": ["expect_column_values_to_be_between.result.unexpected_count"] # C
                        }}
                    ]
                }
            }
        }
    ]
)

こちらのCheckPointを実行すれば次の3つのMetricがMetricStoreへ保存されるはずだ。

  • A・・・全ExpectationSuiteの成功したExpectation数
  • B・・・全ExpectationSuiteの成功しなかったExpectation数
  • C・・・任意のExpectationSuiteのExpectation:expect_column_values_to_be_betweenのint_columnが期待通りではなかった数

そのほか「statistics」や各Expectationの「result」から以下のようなMetircの取得を指定できる。(ExpectationのTypeによっては取得できないものもあるかもしれません。)

Metric内容
statistic.evaluated_expectations評価したExpectationの数
statistic.successful_expectations成功したExpectationの数
statistic.unsuccessful_expectations成功しなかったExpectationの数
statistic.success_percent成功のパーセント表記
*.result.element_count期待通りだったカラムの数
*.result.unexpected_count期待通りではなかったカラムの数
*.result.unexpected_percent期待通りではなかったパーセント表記
*.result.partial_unexpected_list期待通りではなかったカラムの値のリスト

4.CheckPointを実行してみる

context.run_checkpoint(checkpoint_name=checkpoint_name)

をしてCheckPointを実行する。

実行すると「Calculating Metrics: 100%」というログとともに、MetricStore上にge_metricsというTBLが自動的に作られ、取得を指定したA〜CまでのMetricが挿入されていることを確認できる。

# select * from ge_metrics;
 run_name |        run_time         |  data_asset_name   | expectation_suite_identifier |                        metric_name                         | metric_kwargs_id  |      value       
----------+-------------------------+--------------------+------------------------------+------------------------------------------------------------+-------------------+------------------
 __none__ | 20230507T162308.788052Z | public.sample_poyo | ex_suite                     | statistics.successful_expectations                         | __                | {"value": 3}
 __none__ | 20230507T162308.788052Z | public.sample_poyo | ex_suite                     | statistics.unsuccessful_expectations                       | __                | {"value": 1}
 __none__ | 20230507T162308.788052Z | public.sample_poyo | ex_suite                     | expect_column_values_to_be_between.result.unexpected_count | column=int_column | {"value": 2}
  • 全ExpectationSuiteのうち、成功したExpectation数は3つ
  • 全ExpectationSuiteのうち、成功しなかったExpectation数は1つ
  • Expectation:expect_column_values_to_be_betweenのint_columnが期待通りではなかった数は2つ

という意味だ。

終わり

GreatExpectationsのActionの概要、およびActionのうちStoreMetricsActionについて簡単にですが紹介しました。Actionを自作できることや、StoreMetricsActionの場合保存したいMetricを柔軟に指定できる部分が良いなと思いました。