[go: up one dir, main page]

メインコンテンツまでスキップ

Lakeflow 宣言型パイプラインを使用したデータの読み込み

DatabricksのApache Sparkでサポートされている任意のデータソースから、Lakeflow宣言型パイプラインを使用してデータをロードできます。Lakeflow宣言型パイプラインでは、 Sparkデータフレームを返す任意のクエリに対してデータセット (テーブルとビュー) を定義できます。これには、ストリーミングデータフレーム やPandas for Sparkデータフレームが含まれます。データ取り込み タスクの場合、 Databricks はほとんどのユースケースでストリーミングテーブルを使用することをお勧めします。 ストリーミングテーブルは、 Auto Loader を使用してクラウドオブジェクトストレージからデータを取り込む場合や、 Kafkaなどのメッセージバスからデータを取り込むのに適しています。

注記
  • すべてのデータソースが取り込み用のSQLをサポートしているわけではありません。 LakeFlow宣言型パイプラインでSQLとPythonソースを混合して、必要な場合にはPythonを使用し、同じパイプライン内の他の操作にはSQL使用できます。
  • デフォルトによって 宣言型パイプライン にパッケージ化されていないライブラリの操作の詳細についてはLakeflowLakeflow宣言型パイプラインの Python依存関係を管理する を参照してください。
  • Databricksでのインジェストに関する一般的な情報については、「Lakeflowコネクトの標準コネクタ」を参照してください。

次の例は、いくつかの一般的なパターンを示しています。

既存のテーブルから読み込む

Databricks 内の既存のテーブルからデータを読み込みます。クエリを使用してデータを変換したり、パイプラインでさらに処理するためにテーブルをロードしたりできます。

次の例では、既存のテーブルからデータを読み取ります。

Python
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)

クラウドオブジェクトストレージからファイルを読み込む

Databricksでは クラウド オブジェクト ストレージまたはUnity Catalog ボリューム内のファイルからのデータ取り込みタスクのほとんどにおいて、Auto LoaderをLakeflow宣言型パイプラインと共に使用することをお勧めします。Auto Loader と Lakeflow 宣言型パイプラインは、増え続けるデータがクラウド ストレージに到着するときに、増分的かつべき等に読み込むように設計されています。

Auto Loaderとはおよびオブジェクトストレージからのデータのロードを参照してください。

次の例では、Auto Loader を使用してクラウド ストレージからデータを読み取ります。

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)

次の例では、Auto Loader を使用して、Unity Catalog ボリューム内の CSVファイルからデータセットを作成します。

Python
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
注記
  • ファイル通知でAuto Loaderを使用し、パイプラインまたはストリーミング テーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブック内のCloudFilesResourceManagerを使用してクリーンアップを実行できます。
  • Auto LoaderUnity Catalog対応パイプラインで を含むファイルをロードするには、外部ロケーション を使用する必要があります。宣言型パイプラインで を使用する方法の詳細については、「Unity CatalogLakeflow宣言型パイプラインで Unity Catalogを使用するLakeflow 」を参照してください。

メッセージバスからデータを読み込む

Lakeflow宣言型パイプラインをメッセージバスからデータを取り込むように構成できます。 Databricks では、連続実行と拡張オートスケールを備えたストリーミングテーブルを使用して、メッセージバスからの低レイテンシロードに最も効率的なインジェストを提供することをお勧めします。 「オートスケールを使用した Lakeflow 宣言型パイプラインのクラスタリング使用率の最適化」を参照してください。

たとえば、次のコードは、 read_kafka関数を使用してKafkaからデータを取り込むようにストリーミング テーブルを構成します。

Python
from pyspark import pipelines as dp

@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)

他のメッセージ バス ソースから取り込むには、以下を参照してください。

Azure Event Hubs からデータを読み込む

Azure Event Hubs は、Apache Kafka と互換性のあるインターフェイスを提供するデータ ストリーミング サービスです。Lakeflow宣言型パイプライン ランタイムに含まれる構造化ストリーミングKafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。Azure Event Hubs からのメッセージの読み込みと処理の詳細については、「Azure Event Hubs を Lakeflow宣言型パイプライン データソースとして使用する」を参照してください。

外部システムからデータを読み込む

Lakeflow 宣言型パイプラインは、 Databricksでサポートされている任意のデータソースからのデータの読み込みをサポートしています。 データソースと外部サービスへの接続を参照してください。サポートされているデータソースのレイクハウスフェデレーションを使用して外部データをロードすることもできます。レイクハウスフェデレーションには Databricks Runtime 13.3 LTS 以上が必要なため、レイクハウスフェデレーションを使用するには、 プレビューチャンネルを使用するようにパイプラインを構成する必要があります。

一部のデータソースには、 SQLでの同等のサポートがありません。 これらのデータ ソースのいずれかでレイクハウスフェデレーションを使用できない場合は、 Pythonを使用してソースからデータを取り込むことができます。 同じパイプラインに Python ソース ファイルと SQL ソース ファイルを追加できます。次の例では、リモートPostgreSQLテーブル内のデータの現在の状態にアクセスするためのマテリアライズドビューを宣言します。

Python
import dp

@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)

クラウド オブジェクト ストレージから小規模または静的なデータセットを読み込む

Apache Spark のロード構文を使用して、小さなデータセットまたは静的なデータセットをロードできます。Lakeflow 宣言型パイプラインは、 Apache Spark on の Databricksでサポートされているすべてのファイル形式をサポートしています。 完全なリストについては、「 データ形式のオプション」を参照してください。

次の例は、Lakeflow宣言型パイプラインテーブルを作成するためのJSONのロード を示しています。

Python
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
注記

read_files SQL 関数は、Databricks 上のすべての SQL 環境に共通です。これは、 SQL と 宣言型パイプラインを使用して直接ファイル アクセスする場合 Lakeflow 推奨されるパターンです。 詳細については、「 オプション」を参照してください。

Pythonカスタムデータソースからデータをロードする

Pythonカスタム データソースを使用すると、カスタム形式でデータをロードできます。 特定の外部データソースに対して読み取りや書き込みを行うコードを記述したり、既存のシステム内の既存のPythonコードを活用して独自の内部システムからデータを読み取ったりすることができます。 Pythonデータソースの開発の詳細については、 PySparkカスタム データソース」を参照してください。

Pythonカスタム データソースを使用してLakeFlow宣言型パイプラインにデータをロードするには、それをmy_custom_datasourceなどの形式名で登録し、そこから読み取ります。

Python
from pyspark import pipelines as dp

# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.

# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()

# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()

ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する

注記
  • skipChangeCommitsフラグは、option()関数を使用するspark.readStreamでのみ機能します。このフラグは、dp.read_stream()関数では使用できません。
  • ソース ストリーミング テーブルがcreate_auto_cdc_flow()関数のターゲットとして定義されている場合は、 skipChangeCommitsフラグを使用できません。

とりあえず、ストリーミングテーブルには追加専用のソースが必要です。 ストリーミング テーブルが別のストリーミング テーブルをソースとして使用し、そのソース ストリーミング テーブルがGDPR 「忘れられる権利」処理などの更新または削除を必要とする場合、ソース ストリーミング テーブルの読み取り時にskipChangeCommitsフラグを設定して、それらの変更を無視することができます。 このフラグの詳細については、 「更新と削除を無視する」を参照してください。

Python
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")

パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする

Databricks シークレット を使用して、アクセス キーやパスワードなどの資格情報を格納できます。パイプラインでシークレットを構成するには、パイプライン設定のクラスター構成で Spark プロパティを使用します。 「 Lakeflow宣言型パイプラインにおけるクラシックコンピュートの設定」を参照してください。

次の例では、シークレットを使用して、Azure Data Lake Storage (ADLS)のストレージアカウント から入力データを読み取るために必要なアクセス キー Auto Loaderを使用します。これと同じ方法を使用して、パイプラインで必要なシークレット (S3にアクセスするためのAWSキーやApache Hive metastoreのパスワードなど)を構成できます。

Azure Data Lake Storage の操作の詳細については、「Azure Data Lake Storage と Blob Storage に接続する」を参照してください。

注記

シークレット値を設定するspark_conf構成キーにspark.hadoop.プレフィックスを追加する必要があります。

JSON
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}

以下の通り置き換えます。

  • <storage-account-name> ADLS ストレージ アカウント名を使用します。
  • <scope-name> をDatabricksシークレットスコープ名に置き換えます。
  • <secret-name> をAzureストレージアカウントのアクセスキーを含むキーの名前に置き換えます。
Python
from pyspark import pipelines as dp

json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)

以下の通り置き換えます。

  • <container-name> 入力データを保存する Azure ストレージ アカウント コンテナーの名前。
  • <storage-account-name> ADLS ストレージ アカウント名を使用します。
  • <path-to-input-dataset> 入力データセットへのパス。