[go: up one dir, main page]

メインコンテンツまでスキップ
メインコンテンツまでスキップ

Amazon GlueとClickHouseおよびSparkの統合

Amazon Glue は、Amazon Web Services (AWS) が提供する完全管理型のサーバーレスデータ統合サービスです。分析、機械学習、アプリケーション開発のためのデータを発見、準備、変換するプロセスを簡素化します。

インストール

GlueコードをClickHouseと統合するには、次のいずれかを介して公式のSparkコネクタをGlueに使用できます。

  • AWS MarketplaceからClickHouse Glueコネクタをインストールする(推奨)。
  • SparkコネクタのJARファイルをGlueジョブに手動で追加する。
  1. コネクタにサブスクライブする

    アカウント内でコネクタにアクセスするには、AWS MarketplaceからClickHouse AWS Glue Connectorにサブスクライブしてください。

  2. 必要な権限を付与する

    GlueジョブのIAMロールに必要な権限があることを確認してください。これは、最小権限のガイドに記載されています。

  3. コネクタをアクティブ化し、接続を作成する

    コネクタをアクティブ化し、接続を作成するには、こちらのリンクをクリックすると、主要項目が事前に入力されたGlue接続作成ページが開きます。接続に名前を付け、作成を押します(この段階でClickHouse接続の詳細を提供する必要はありません)。

  4. Glueジョブで使用する

    Glueジョブ内で、Job details タブを選択し、Advanced properties ウィンドウを展開します。Connections セクションで、先ほど作成した接続を選択します。コネクタは必要なJARをジョブの実行時に自動的に注入します。

Glue Notebook connections config
注記

Glueコネクタで使用されるJARは、Spark 3.3Scala 2、および Python 3 に対してビルドされています。Glueジョブの設定時にこれらのバージョンを選択してください。

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.clickhouseScala.Native.NativeSparkRead.spark
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object ClickHouseGlueExample {
  def main(sysArgs: Array[String]) {
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)

    val sparkSession: SparkSession = SparkSession.builder
      .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog")
      .config("spark.sql.catalog.clickhouse.host", "<your-clickhouse-host>")
      .config("spark.sql.catalog.clickhouse.protocol", "https")
      .config("spark.sql.catalog.clickhouse.http_port", "<your-clickhouse-port>")
      .config("spark.sql.catalog.clickhouse.user", "default")
      .config("spark.sql.catalog.clickhouse.password", "<your-password>")
      .config("spark.sql.catalog.clickhouse.database", "default")
      // for ClickHouse cloud
      .config("spark.sql.catalog.clickhouse.option.ssl", "true")
      .config("spark.sql.catalog.clickhouse.option.ssl_mode", "NONE")
      .getOrCreate

    val glueContext = new GlueContext(sparkSession.sparkContext)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    import sparkSession.implicits._

    val url = "s3://{path_to_cell_tower_data}/cell_towers.csv.gz"

    val schema = StructType(Seq(
      StructField("radio", StringType, nullable = false),
      StructField("mcc", IntegerType, nullable = false),
      StructField("net", IntegerType, nullable = false),
      StructField("area", IntegerType, nullable = false),
      StructField("cell", LongType, nullable = false),
      StructField("unit", IntegerType, nullable = false),
      StructField("lon", DoubleType, nullable = false),
      StructField("lat", DoubleType, nullable = false),
      StructField("range", IntegerType, nullable = false),
      StructField("samples", IntegerType, nullable = false),
      StructField("changeable", IntegerType, nullable = false),
      StructField("created", TimestampType, nullable = false),
      StructField("updated", TimestampType, nullable = false),
      StructField("averageSignal", IntegerType, nullable = false)
    ))

    val df = sparkSession.read
      .option("header", "true")
      .schema(schema)
      .csv(url)

    // Write to ClickHouse
    df.writeTo("clickhouse.default.cell_towers").append()


    // Read from ClickHouse
    val dfRead = spark.sql("select * from clickhouse.default.cell_towers")
    Job.commit()
  }
}

詳細については、Sparkドキュメントをご覧ください。