Hadoop周辺の設定メモ

hive関連 https://cwiki.apache.org/confluence/display/hive/configuration+properties参考 hive*から始まるプロパティはHiveのシステムプロパティとみなされる。なお「hive.conf.validation」でいじれる。 key default value about hive.tez.container.size -1 tezのコンテナを使うメモリサイズ hive.tez.auto.reducer.parallelism false reducerの並列有効化 hive.exec.reducers.bytes.per.reducer 256,000,000 1つのreducer辺りの処理サイズ hive.exec.dynamic.partition.mode stric dynamic partitionを使用するときのモード。stricの場合partitionをselectで明示的に指定しなければいけない。 hive.exec.compress.output false queryの最終結果を圧縮するかどうかを決める。圧縮方式は「mapred.output.compress*」から取得 mapreduce関連 https://software.fujitsu.com/jp/manual/manualfiles/m150005/j2ul1563/04z200/j1563-03-17-05-01.html mapred*はMRv1(org.apache.hadoop.mapred) mapreduceはMRv2(org.apache.hadoop.mapreduce) key default value about mapred.reduce.tasks -1 Hadoopジョブで使用するReduceタスク数 mapred.output.compression.codec Hadoopジョブの主力するファイルを圧縮するときのCodecのClass mapreduce....

February 11, 2023 · Me

AirflowのPluginsについて

AirflowPluginsとは Airflowをカスタマイズするために追加できるPlugin。 どんな時にAirflowPluginsを使うのか 例えば以下のようなときにAirflowPluginsを使う。 AirflowのメタデータのDBのコンテンツを表示するViewを作成したいとき。 指定された時間内において任意数のDAGが失敗時にAirflowの機能を監視しカスタムアラートを送信するアプリケーションを作成したいとき。 外部データツールのファイルやログに動的にリンクする詳細ビューをUIに表示したいとき。 新規Plugin作成方法 「plugins_folder」へPythonファイルを用意し、Pythonファイル内でairflow.plugins_manager.AirflowPluginを拡張するクラスを定義すれば新規Pluginを作成できる。 Pythonファイルの例 定義したAirflowPluginクラスには以下を実装する。 name:propertyにPlugin名 on_load:functionにPluginのロード時に実行してほしい処理 そのほかPluginに追加したいComponent(Pluginの機能)について列挙 from airflow.plugins_manager import AirflowPlugin class MyAirflowPlugin(AirflowPlugin): name = "empty" def on_load(cls, *args, **kwargs): pass # そのほかPluginに追加したいComponentについて列挙していく。 上記場合、Plugin名が「empty」であるPluginを作成している。 「plugins_folder」の確認の仕方 ちなみに「plugins_folder」はどこで確認できるのかというのだが、「コマンドを利用する」か「cfgファイルを見る」の2通りで確認できる。 基本的にデフォルトでは$AIRFLOW_HOME/pluginsになっているはず。 # コマンドを利用する $ airflow info | grep plugins_folder plugins_folder | xxx # cfgファイルを見る $ cat airflow.cfg | grep plugins_folder plugins_folder = xxxx Plugin確認方法 現在追加されているPluginは何か、Pluginに定義されてあるComponentは何かを確認する方法だが、「UIから確認する方法」と「CLIを利用して確認する方法」がある。WebServerを再起動しなくてもすぐに反映されるのは後者の方法。...

January 4, 2023 · Me

AirflowのTaskFlowAPIについて

TaskFlowAPIとは Airflow上でのWorkflowの書き方の一つ。従来Workflowの書き方は「DAGclassを定義し、Operator同士を繋げる書き方(Operatorを使用した書き方)」しかなかったが、Airflow2.0から新しい書き方が導入された。 基本的な書き方 DAGの書き方 @dagでWorkflowを定義する関数をdecorateする。グローバル変数にDAGを登録する必要がなくなった。 @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) def tutorial_taskflow_api(): """ DAG Docs Example """ tutorial_taskflow_apiがdag_id 関数配下に書いてある__doc__がDAGのDoc Taskの書き方(PythonOperatorの例) TaskFlowDecoratorでTaskの関数をdecorateする。 @task() def extract(): """ extract doc_md Example """ data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' order_data_dict = json.loads(data_string) return order_data_dict extractがtask_id 関数配下に書いてある__doc__がTaskのDoc 対応しているOperator 上記PythonOperator以外にも以下のOperatorがTaskFlowDecoratorに対応している。主に独自の実行環境でPythonを実行する必要があるようなOperatorが現在は対応しているイメージ。 Operator TaskFlowDecorator PythonOperator @task(@task.python) PythonVirtualenvOperator @task.virtualenv BranchPythonOperator @task....

January 1, 2023 · Me

Great Expectationsについて調べてみた

この記事はQiita x Code Polaris共催!女性ITエンジニアが作るアドベントカレンダー Advent Calendar 2022の15日目の記事です。Qiitaのアドベントカレンダーに参加するの初めてです(wktk) 概要 Great Expectations(NOT V2)についてドキュメントを読みながらわかったこと、試したことをまとめる。 Great Expectationsとは データのバリデーション(検証)、ドキュメント化、プロファイリング(概要定義)をしてくれるPythonのライブラリ。Superconductive社が開発しているOSS。GXと略されるらしい。 できること データの期待の定義(Expectation) oO(xxxのカラムはNULLがないはずだ!) oO(yyyのカラムはAAもしくはBBしか値が入らないはずだ!) といったような期待の定義。 データの自動的なプロファイリング(Profiling) 「xxxのカラムはNULLがないです」 「yyyのカラムはAAもしくはBBしか値が入らないです」 みたいなデータの概要の自動的な定義 データのバリデーション(Validation) 定義したExpectationが期待通りかの検証。 データのドキュメント化 定義したExpectationをValidationした結果の確認。 さまざまなデータソースやデータストアからのロード PandasやSparkなどのDataframeや、SQLDatabase及びファイルからのデータロード 何が嬉しいのか データの品質管理ができる -> バグ,過去分データの修正の早期対応。 似ているもの dbtのテスト機能。 kubeflowのValidation機能。 できないこと Pipelineの実行 あくまでPythonのライブラリ。 例えばデータ検証をしたいとなった時、GXの機能を活かしてデータ検証のPipeline(流れ)を作っていくわけだが、作ったPipelineのステップ毎の実行はできるが、全体を通した実行は単体でできない。なので、Airflow,dbt,Prefect,Dagster,KedroのようなDAGを実行するツールと統合すると良い。  データのバージョン管理 データのバージョン管理するならDVCやQuikltなどを使うと良い。 Pythonの環境以外で最適な状態で動くこと Pythonでできているので 違う言語やエコシステムから呼び出すとき、CLIから呼び出しなんとかするということもできるが、言語やエコシステムに沿ったものを選択する方が良い、(R->assertR, TensorFlow->TFDV) データ検証作成の流れ では、GreatExpectationsを使用して例えばデータ検証を作成したいとなったら何をしたら良いの、となるのだが以下の画像が参考になる。 https://docs....

December 11, 2022 · Me

デザイン変えました

よく使用するカラーコード #00dbde #fc00ff #FEC763 #28CF75 #00c2BA #037A90 うさぎ集

September 19, 2022 · Me

Airflowのschedule関連について

概要 Airflowのscheule関連についてしっかりと理解できていなかったので、DataPipelines with ApacheAirflowのCAPTER3「Scheduling in Airflow」を参考にしつつ雑にまとめてみた。 TL;DR DAGは「schedule_interval」に設定した間隔で実行される。 「schedule_interval」はcronとtimedeltaインスタンスで書くことができる。 「interval is started」「the end of the interval」 「execution date」は実際の実行時間ではない。 JinjaTemplateを使用して動的に設定することによってデータを段階的に処理できる。 DAGはbackfillを使うと過去分実行ができる。 schedule_intervalについて DAGの定期的な実行間隔は airflow.dagの「scheudle_interval」引数に指定する。 「None」の場合、UIもしくはAPIからの実行のみがトリガーとなる。 dag = DAG( dag_id="unscheduled", start_date=dt.datetime(2019, 1, 1), schedule_interval=None ) 「Cron」を使用することができる。 0 * * * * 0 0 * * MON-FRI 値をrangeで渡すこともできる(例は平日の0時に実行) 0, 0,12 * * * 値をリストで渡すこともできる(例は0,12時に実行) dag = DAG( dag_id="use_cron", start_date=dt....

January 1, 2022 · Me

Flaskを活用して自作exporter作成。Prometheus->Grafanaで可視化

概要 Flaskを活用して自作exporterを作成し、Prometheusでmetrics取得->Grafanaでmetrics可視化を行う手順の備忘録 手順 構成 . ├── docker-compose.yml ├── flask │ ├── Dockerfile │ └── app │ └── app.py ├── grafana │ ├── Dockerfile │ └── datasource.yml └── prometheus.yml flask/Dockerfile FROMubuntu:latestRUN apt-get updateRUN apt-get install python3 python3-pip -yRUN pip3 install flask prometheus-clientRUN mkdir /appflask/app.py pythonのPrometheusClientライブラリを使ってexporter化。curl http://localhost:3000/hogeでGauge型のmetricsが増減するexporterです。 from flask import Flaskimport json import queue from werkzeug.middleware.dispatcher import DispatcherMiddleware from prometheus_client import make_wsgi_app,Gauge app = Flask(__name__) G1 = Gauge('Gauge1','Gauge test') G2 = Gauge('Gauge2','Gauge test') @app....

January 1, 2020 · Me

GrafanaでのDashboard作成を自動化する

概要 Flaskを活用して自作exporter作成。Prometheus->Grafanaで可視化の続き。現状のままだとコンテナRestart都度GrafanaへDashboardの作成をしなくてはならないので、起動時に自動でDashboardが作成できるようにする備忘録 手順 構成 ├── docker-compose.yml ├── flask │ ├── Dockerfile │ └── app │ └── app.py ├── grafana │ ├── Dockerfile # 変更 │ ├── dashboard.yml # 作成 │ ├── datasource.yml │ └── prometheus │ └── <hoge>.json # 作成 └── prometheus.yml Dashboard自動作成設定ファイル「dashboard.yml」、自動作成するDashboardのファイル「.json」を作成。 追加ファイルをコンテナへ置くためにGrafanaの「Dockerfile」を変更。 grafana/prometheus/.json http://localhost:3000でGrafanaへ接続 「Create」→「Dashboard」で自動作成するDashboardを作成する。 「Share dashboard」→「Export」→「Save to file」でJSONファイルを出力。「./grafana/prometheus」配下にファイルを置く。 grafana/dashboard.yml https://grafana.com/docs/grafana/latest/administration/provisioning/#datasources apiVersion: 1 providers: - name: 'prometheus metrics' orgId: 1 folder: '' folderUid: '' type: file disableDeletion: false updateIntervalSeconds: 10 allowUiUpdates: false options: path: /etc/grafana/provisioning/dashboards/prometheus foldersFromFileStructure: true grafana/Dockerfile FROMgrafana/grafana:masterCOPY ....

January 1, 2020 · Me