ワークフロー管理についてメモ

WEB+DB PRESS plusの「ビッグデータを支える技術」という本の180ページから書かれているワークフロー管理について ワークフロー周りを開発している身としては忘れないようにしたい内容だと思ったので、備忘録として雑に残します。 ワークフロー管理 ワークフロー管理ツール 主な役割としては「定期的にタスクを実行すること」と「異常を検知してその解決を手助けすること」。 例えば以下のようなものがある 名称 種類 開発元 Airflow スクリプト型 Airbnb Azkaban 宣言型 Linkedin Digdag 宣言型 TreasureData Oozie 宣言型 The Apache Software Foundation Luigi スクリプト型 Spotify Prefect スクリプト型 Prefect Technologies Metaflow スクリプト型 Netflix Dagster スクリプト型 Dagster 基本機能とビッグデータで求められる機能 タスクを定期的なスケジュールで実行し、その結果を通知する タスク間の依存関係を定めて、決められた順にもれなく実行する タスクの実行結果を保持し、エラー発生時には再実行できるようにする 宣言型とスクリプト型 宣言型のメリット 最小限の記述でタスクを定義できる スクリプト型のメリット 柔軟性 ETLのプロセスにはスクリプト型のツール、SQLの実行には宣言型のツールをなどと使い分けるのも一つの方法。...

January 8, 2024 · Me

Rayメモ

Rayとは Rayとは機械学習を使ったpythonアプリケーションをスケーリングするためOSSフレームワーク。 以下の機能を提供する データ前処理、分散トレーニング、調整、強化学習、モデル提供など一般的な機械学習タスクに必要なこと Pythonアプリケーションの分散処理 Kubernetesや各種クラウド製品を使ったスケーリング Rayのフレームワーク 以下の3つのレイヤで構成されている RayAIRuntime:機械学習アプリケーションにて使うツールキット。 次の5つの分野がサポート揃っている。 Data:データの読み込み・変換・チューニング・予測 Train:学習 Tune:チューニング Serve:モデルの提供 RLib:他のtensorflow,torchなどのRayAIRuntime RayCore:分散処理できるようにする部分 RayCluster:スケールする部分 試してみる 「RayCluster」と「RayCore」を試してみる 「RayCluster」 前述通りPythonアプリケーションをスケールする部分。 ray.initを呼び出せばRayClusterを組まなくてもRayは動作するが、スケールしたい場合はClusterを組む必要がある。 どこで組めるのか RayClusterはAWS,GCP,Azureなどクラウド製品からKubernetes上でも組めるようにサポートされている。 RayClusterの構成 RayClusterは一つのHeadnodeといくつかのWorkernodeにて構成されている。 公式画像引用:https://docs.ray.io/en/latest/cluster/key-concepts.html Headnodeは受け取ったワークロードリソース要求がクラスタの現在の容量を超えてるとWorkernodeを自動で増やし調整する。逆にWorkernodeがidle状態になるとWorkernodeを自動で削除する。 Kubernetesを利用したRayClusterの構成 Kubernetesを利用してRayClusterを構築しジョブを実行するには以下の3つの提供されているCRDを構成する必要がある。 RayCluster: Cluster本体 RayJob: Clusterジョブ RayService: Clusterサービス kind:RayJobを使わずRayServiceの指定のPortにray job submitをして、ジョブを実行することもできる。前者の場合kind:RayJobをDeployすると都度RayClusterが作られるが、後者はユーザ側でRayClusterを作り直さない限り一つのRayClusterを使い続けることになる。なので使い分けとしては常にRayJobと1:1でRayClusterをDeployしたい場合はkind:RayJobを使い、そうではない場合はRayServiceの指定のPortに都度ray job submitを使うという形で良さそう。 Kubernetesを利用してRayClusterを構築してみる 各種Operatorのインストールをする。 $ kubectl apply -k "github.com/ray-project/kuberay/ray-operator/config/default" namespaceray-system上で正常に動いていることを確認できる。 $ kubectl get all -n ray-system NAME READY STATUS RESTARTS AGE pod/kuberay-operator-79dd8d67db-skm94 1/1 Running 0 13s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kuberay-operator ClusterIP 10....

October 9, 2023 · Me

デザイン変えました

デザインというか配色を変えて最近本を読むのでブクログのリンクをヘッダーに追加したのみ。 このままのペースだと1年に一回デザインを変えることになるので他のPostも書いていきたいですね。 よく使用するカラーコード #3AA6B9 #164B60 うさぎ コスモスパイス、可愛い曲なので聴いてみてください。

August 12, 2023 · Me

GreatExpectationsのActionについて

Actionとは GreatExpectations(GX)で得たValidationResult(検証結果)を受け取った後に行う処理をする機能。 Actionの種類 例えば次のようなActionがある。 class名 できること SlackNotificationAction Slackへの通知 PagerdutyAlertAction PagerDutyへのEvent通知 MicrosoftTeamsNotificationAction Teamsへの通知 OpsgenieAlertAction OpsgenieへのAlert通知 EmailAction Emailの送付 StoreValidationResultAction ValidationStoreへ検証結果の保存 StoreEvaluationParametersAction ValidationStoreへ評価値の保存(Expectation SuiteのValidationResultsにあるEvaluationParameter) StoreMetricsAction MetricStoreへ検証結果のMetricの保存 UpdateDataDocsAction DataDocの更新 SNSNotificationAction AmazonSNSへの通知 また、ValidationActionClassを継承し_run メソッドを上書きすればカスタマイズした独自のActionを追加することもできる。 使い方について 複数のBatchRequestを検証するCheckPointにActionは構成される。そのため、使用する際はdata_contextへadd_checkpointを用いてCheckPointを追加するとき、もしくはdata_contextへrun_checkpointを用いてCheckPointを実際に実行するときに、Actionを定義する。 例えば「data_contextへadd_checkpointを用いてCheckPointを追加するとき」のActionの定義の仕方は次のように行う。 yaml_config = """ ・・・ validations: - expectation_suite_name: users.warning - expectation_suite_name: users....

May 7, 2023 · Me

キーボード自作

概要 初めて自作キーボードをしたのでその記録 なんで自作にしたの? 使っているキーボードのキーキャップを掃除していたらDeleteキーのはめ口が壊れてしまったのと、分割キーボードを使ったら辛い肩こりが少しは軽減されるのでは。。?と思ったのがきっかけです。ただネットで分割キーボードを探していたのですがいまいち自分好みのものがなく。。いっそのこと0->1で作ってしまおうと思い自作に至りました。 どうやって作ったの? 資材の準備->組み立て 資材の準備 デザイン&&配列を重視して揃えました。 デザインは、「ピンク×ゴールド×ブラック」で配色を整えました。キースイッチを黒色にする・ネジをゴールドにするなどもっとこだわってもよかったかもしれません。キー配列は、自分が慣れているのというのもありHHKBライクです。 あとは自分ははんだごて・はんだなどの組み立てる工具を持っていなかったので合わせて買いました。 以下は揃えた資材の画像です(事後画像もあり) 分類 内容 値段 基盤 Choco60 rev.2(ゴールド) 19,100 ケーブル TRRSケーブル 330 キースイッチ Gateronレッドメカニカルキースイッチ(赤軸)20ケセット 1,499×4 キーキャップ Akko Black&Pink 158-Key ASA Profile PBT 中古 4,000 はんだごて 白光(HAKKO) BLUE SET 電気器具/電気部品用はんだこてセット 815 はんだ吸取機 白光(HAKKO) はんだ吸取線 484 はんだ 白光(HAKKO) HEXSOL 巻はんだ 1,515 合計 32,240 キーキャップ、本当は同じ種類の155キーの方(B09L183QDM)をキーキャップのサイズが基盤とピッタリになりそうという理由から狙っていたのですが、調べていたら158キーの方の中古が安く売っていたのと、155キーの方は左側のキーに配色が黒と灰色に偏りそうだなあ(ピンク色が少なくなりそうだなあ)と思ったので、158キーの方にしました。...

April 9, 2023 · Me

Hadoop周辺の設定メモ

hive関連 https://cwiki.apache.org/confluence/display/hive/configuration+properties参考 hive*から始まるプロパティはHiveのシステムプロパティとみなされる。なお「hive.conf.validation」でいじれる。 key default value about hive.tez.container.size -1 tezのコンテナを使うメモリサイズ hive.tez.auto.reducer.parallelism false reducerの並列有効化 hive.exec.reducers.bytes.per.reducer 256,000,000 1つのreducer辺りの処理サイズ hive.exec.dynamic.partition.mode stric dynamic partitionを使用するときのモード。stricの場合partitionをselectで明示的に指定しなければいけない。 hive.exec.compress.output false queryの最終結果を圧縮するかどうかを決める。圧縮方式は「mapred.output.compress*」から取得 mapreduce関連 https://software.fujitsu.com/jp/manual/manualfiles/m150005/j2ul1563/04z200/j1563-03-17-05-01.html mapred*はMRv1(org.apache.hadoop.mapred) mapreduceはMRv2(org.apache.hadoop.mapreduce) key default value about mapred.reduce.tasks -1 Hadoopジョブで使用するReduceタスク数 mapred.output.compression.codec Hadoopジョブの主力するファイルを圧縮するときのCodecのClass mapreduce....

February 11, 2023 · Me

AirflowのPluginsについて

AirflowPluginsとは Airflowをカスタマイズするために追加できるPlugin。 どんな時にAirflowPluginsを使うのか 例えば以下のようなときにAirflowPluginsを使う。 AirflowのメタデータのDBのコンテンツを表示するViewを作成したいとき。 指定された時間内において任意数のDAGが失敗時にAirflowの機能を監視しカスタムアラートを送信するアプリケーションを作成したいとき。 外部データツールのファイルやログに動的にリンクする詳細ビューをUIに表示したいとき。 新規Plugin作成方法 「plugins_folder」へPythonファイルを用意し、Pythonファイル内でairflow.plugins_manager.AirflowPluginを拡張するクラスを定義すれば新規Pluginを作成できる。 Pythonファイルの例 定義したAirflowPluginクラスには以下を実装する。 name:propertyにPlugin名 on_load:functionにPluginのロード時に実行してほしい処理 そのほかPluginに追加したいComponent(Pluginの機能)について列挙 from airflow.plugins_manager import AirflowPlugin class MyAirflowPlugin(AirflowPlugin): name = "empty" def on_load(cls, *args, **kwargs): pass # そのほかPluginに追加したいComponentについて列挙していく。 上記場合、Plugin名が「empty」であるPluginを作成している。 「plugins_folder」の確認の仕方 ちなみに「plugins_folder」はどこで確認できるのかというのだが、「コマンドを利用する」か「cfgファイルを見る」の2通りで確認できる。 基本的にデフォルトでは$AIRFLOW_HOME/pluginsになっているはず。 # コマンドを利用する $ airflow info | grep plugins_folder plugins_folder | xxx # cfgファイルを見る $ cat airflow.cfg | grep plugins_folder plugins_folder = xxxx Plugin確認方法 現在追加されているPluginは何か、Pluginに定義されてあるComponentは何かを確認する方法だが、「UIから確認する方法」と「CLIを利用して確認する方法」がある。WebServerを再起動しなくてもすぐに反映されるのは後者の方法。...

January 4, 2023 · Me

AirflowのTaskFlowAPIについて

TaskFlowAPIとは Airflow上でのWorkflowの書き方の一つ。従来Workflowの書き方は「DAGclassを定義し、Operator同士を繋げる書き方(Operatorを使用した書き方)」しかなかったが、Airflow2.0から新しい書き方が導入された。 基本的な書き方 DAGの書き方 @dagでWorkflowを定義する関数をdecorateする。グローバル変数にDAGを登録する必要がなくなった。 @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) def tutorial_taskflow_api(): """ DAG Docs Example """ tutorial_taskflow_apiがdag_id 関数配下に書いてある__doc__がDAGのDoc Taskの書き方(PythonOperatorの例) TaskFlowDecoratorでTaskの関数をdecorateする。 @task() def extract(): """ extract doc_md Example """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict extractがtask_id 関数配下に書いてある__doc__がTaskのDoc 対応しているOperator 上記PythonOperator以外にも以下のOperatorがTaskFlowDecoratorに対応している。主に独自の実行環境でPythonを実行する必要があるようなOperatorが現在は対応しているイメージ。 Operator TaskFlowDecorator PythonOperator @task(@task.python) PythonVirtualenvOperator @task.virtualenv BranchPythonOperator @task....

January 1, 2023 · Me

Great Expectationsについて調べてみた

この記事はQiita x Code Polaris共催!女性ITエンジニアが作るアドベントカレンダー Advent Calendar 2022の15日目の記事です。Qiitaのアドベントカレンダーに参加するの初めてです(wktk) 概要 Great Expectations(NOT V2)についてドキュメントを読みながらわかったこと、試したことをまとめる。 Great Expectationsとは データのバリデーション(検証)、ドキュメント化、プロファイリング(概要定義)をしてくれるPythonのライブラリ。Superconductive社が開発しているOSS。GXと略されるらしい。 できること データの期待の定義(Expectation) oO(xxxのカラムはNULLがないはずだ!) oO(yyyのカラムはAAもしくはBBしか値が入らないはずだ!) といったような期待の定義。 データの自動的なプロファイリング(Profiling) 「xxxのカラムはNULLがないです」 「yyyのカラムはAAもしくはBBしか値が入らないです」 みたいなデータの概要の自動的な定義 データのバリデーション(Validation) 定義したExpectationが期待通りかの検証。 データのドキュメント化 定義したExpectationをValidationした結果の確認。 さまざまなデータソースやデータストアからのロード PandasやSparkなどのDataframeや、SQLDatabase及びファイルからのデータロード 何が嬉しいのか データの品質管理ができる -> バグ,過去分データの修正の早期対応。 似ているもの dbtのテスト機能。 kubeflowのValidation機能。 できないこと Pipelineの実行 あくまでPythonのライブラリ。 例えばデータ検証をしたいとなった時、GXの機能を活かしてデータ検証のPipeline(流れ)を作っていくわけだが、作ったPipelineのステップ毎の実行はできるが、全体を通した実行は単体でできない。なので、Airflow,dbt,Prefect,Dagster,KedroのようなDAGを実行するツールと統合すると良い。  データのバージョン管理 データのバージョン管理するならDVCやQuikltなどを使うと良い。 Pythonの環境以外で最適な状態で動くこと Pythonでできているので 違う言語やエコシステムから呼び出すとき、CLIから呼び出しなんとかするということもできるが、言語やエコシステムに沿ったものを選択する方が良い、(R->assertR, TensorFlow->TFDV) データ検証作成の流れ では、GreatExpectationsを使用して例えばデータ検証を作成したいとなったら何をしたら良いの、となるのだが以下の画像が参考になる。 https://docs....

December 11, 2022 · Me

デザイン変えました

よく使用するカラーコード #00dbde #fc00ff #FEC763 #28CF75 #00c2BA #037A90 うさぎ集

September 19, 2022 · Me