データのバックフィリング
ClickHouseに新しく関わっている場合でも、既存のデプロイを担当している場合でも、ユーザーは必然的にテーブルに履歴データをバックフィルする必要があります。場合によっては、これは比較的簡単ですが、マテリアライズドビューをポピュレートする必要がある場合は、より複雑になることがあります。このガイドでは、このタスクをユーザーが自身のユースケースに適用できるプロセスをドキュメント化しています。
このガイドでは、ユーザーがインクリメンタルマテリアライズドビューおよびs3やgcsなどのテーブル関数を用いたデータロードの概念について既に理解していることを前提としています。また、ユーザーに対して、オブジェクトストレージからの挿入パフォーマンスの最適化に関するガイドを読むことをお勧めします。このアドバイスは、本ガイド全体の挿入に適用できます。
例データセット
このガイド全体で、PyPIデータセットを使用します。このデータセットの各行は、pip
のようなツールを使用したPythonパッケージのダウンロードを表しています。
例えば、サブセットは1日分、すなわち2024-12-17
をカバーし、https://datasets-documentation.s3.eu-west-3.amazonaws.com/pypi/2024-12-17/
で公開されています。ユーザーは次のようにクエリできます。
このバケットの完全なデータセットは、320 GBを超えるparquetファイルを含んでいます。以下の例では、意図的にグロブパターンを使用してサブセットをターゲットにしています。
ユーザーは、この日以降のデータをKafkaやオブジェクトストレージからのストリームとして消費すると仮定します。このデータのスキーマは以下に示されています。
1兆行を超える完全なPyPIデータセットは、私たちの公開デモ環境clickpy.clickhouse.comで利用可能です。このデータセットの詳細、デモがマテリアライズドビューをどのように活用してパフォーマンスを向上させているか、またデータがどのように毎日ポピュレートされるかについては、こちらを参照してください。
バックフィルのシナリオ
バックフィリングは、通常、特定の時点からデータストリームが消費されるときに必要です。このデータは、インクリメンタルマテリアライズドビューとともにClickHouseテーブルに挿入され、挿入されるブロックに対してトリガーされます。これらのビューは、挿入前にデータを変換したり、集約を計算し、下流アプリケーションで使用するためにターゲットテーブルに結果を送信したりする場合があります。
以下のシナリオをカバーすることを試みます。
- 既存のデータ取り込みによるデータのバックフィル - 新しいデータが読み込まれ、履歴データをバックフィルする必要があります。この履歴データは特定されています。
- 既存テーブルへのマテリアライズドビューの追加 - 履歴データがポピュレートされ、データが既にストリーミングされているセットアップに新しいマテリアライズドビューを追加する必要があります。
データはオブジェクトストレージからバックフィルされると仮定します。すべてのケースで、データ挿入の一時停止を避けることを目指します。
オブジェクトストレージからの履歴データのバックフィルを推奨します。データは、最適な読み取りパフォーマンスと圧縮(ネットワーク転送の減少)のために可能であればParquetにエクスポートする必要があります。ファイルサイズは約150MBが一般的に好まれますが、ClickHouseは70以上のファイル形式をサポートしており、あらゆるサイズのファイルを処理できます。
重複テーブルとビューの使用
すべてのシナリオで、「重複テーブルとビュー」の概念に依存します。これらのテーブルとビューは、ライブストリーミングデータに使用されるもののコピーを表し、バックフィルを隔離して実行することができ、障害が発生した場合に簡単に回復できる手段を提供します。たとえば、次の主なpypi
テーブルとマテリアライズドビューがあり、Pythonプロジェクトごとのダウンロード数を計算します。
主テーブルと関連ビューをデータのサブセットでポピュレートします。
別のサブセット{101..200}
を読み込みたいとします。pypi
に直接挿入することもできますが、重複テーブルを作成することでこのバックフィルを隔離して行うことができます。
バックフィルが失敗した場合、主テーブルには影響を与えず、単にトランケート重複テーブルを実行し、繰り返すことができます。
これらのビューの新しいコピーを作成するには、CREATE TABLE AS
句を使用し、サフィックス_v2
を付けます。
これを約同じサイズの2番目のサブセットでポピュレートし、成功したロードを確認します。
もしこの2回目のロード中に何らかの失敗があった場合は、単にトランケートを実行し、pypi_v2
とpypi_downloads_v2
を繰り返しデータをロードすることができます。
データのロードが完了したので、ALTER TABLE MOVE PARTITION
句を使用して、重複テーブルから主テーブルにデータを移動できます。
上記のMOVE PARTITION
呼び出しは、パーティション名()
を使用しています。これはこのテーブル(パーティション化されていないテーブル)の単一のパーティションを表しています。パーティション化されたテーブルの場合、ユーザーは複数のMOVE PARTITION
呼び出しを行う必要があります。現在のパーティション名は、system.parts
テーブルから確認できます。例:SELECT DISTINCT partition FROM system.parts WHERE (table = 'pypi_v2')
。
これで、pypi
およびpypi_downloads
に完全なデータが含まれていることを確認できます。pypi_downloads_v2
とpypi_v2
は安全に削除できます。
重要なのは、MOVE PARTITION
操作が軽量で(ハードリンクを利用)原子的であり、すなわち失敗するか成功するかのどちらかであり、間の状態が存在しないことです。
以下のバックフィリングシナリオでは、このプロセスを大いに利用します。
このプロセスがユーザーに挿入操作のサイズを選択させることに注意してください。
より大きな挿入、すなわちより多くの行は、より少ないMOVE PARTITION
操作を必要とします。しかし、これは、ネットワーク障害などによる挿入失敗のリスクに対してバランスを取る必要があります。ユーザーは、このプロセスを補完するために、ファイルをバッチ処理してリスクを減らすことができます。これは、例えば、範囲クエリWHERE timestamp BETWEEN 2024-12-17 09:00:00 AND 2024-12-17 10:00:00
やグロブパターンで実行できます。例えば、
ClickPipesはオブジェクトストレージからデータをロードする際にこのアプローチを使用し、ターゲットテーブルとそのマテリアライズドビューの複製を自動的に作成し、ユーザーが上記の手順を実行する必要を避けています。また、複数のワーカースレッドを使用して、異なるサブセットを処理し(グロブパターンを介して)、それぞれに重複テーブルを持つことで、データを迅速に、正確に1回だけのセマンティクスでロードできます。関心のある方は、このブログでさらに詳細をご覧いただけます。
シナリオ1: 既存のデータ取り込みによるデータのバックフィル
このシナリオでは、バックフィルに必要なデータが孤立したバケットにないと仮定し、フィルタリングが必要です。データはすでに挿入されており、履歴データをバックフィルするために特定できるタイムスタンプまたは単調増加カラムがあります。
このプロセスは以下の手順に従います。
- チェックポイントを特定します - 履歴データを復元する必要があるタイムスタンプまたはカラムの値。
- 主テーブルとマテリアライズドビューのターゲットテーブルの重複を作成します。
- ステップ(2)で作成したターゲットテーブルを指すマテリアライズドビューのコピーを作成します。
- ステップ(2)で作成した重複の主テーブルに挿入します。
- 重複テーブルから元のバージョンにすべてのパーティションを移動します。重複テーブルをドロップします。
例えば、PyPIデータでデータが読み込まれているとします。最小のタイムスタンプを特定でき、そのため「チェックポイント」が特定されます。
上記から、2024-12-17 09:00:00
より前のデータを読み込む必要があることがわかります。前述のプロセスを使用して、重複テーブルとビューを作成し、タイムスタンプに基づいてサブセットを読み込みます。
Parquetのタイムスタンプカラムでのフィルタリングは非常に効率的である可能性があります。ClickHouseは、読み込むべき完全なデータ範囲を特定するためにタイムスタンプカラムのみを読み込み、ネットワークトラフィックを最小限に抑えます。Parquetインデックス(最小-最大など)もClickHouseクエリエンジンによって活用されることがあります。
この挿入が完了したら、関連するパーティションを移動できます。
もし履歴データが孤立したバケットの場合、上記の時間フィルタは必要ありません。時間または単調増加カラムが利用できない場合は、履歴データを分離してください。
ClickHouse Cloudユーザーは、データが自分のバケットに隔離できる場合(フィルタリングが不要)、履歴バックアップの復元にはClickPipesを使用する必要があります。これにより、複数のワーカーでロードを並列化し、ロード時間を短縮し、ClickPipesは上記のプロセスを自動化します - 主テーブルとマテリアライズドビューのための重複テーブルを作成します。
シナリオ2: 既存テーブルへのマテリアライズドビューの追加
既に大きなデータがポピュレートされていて、データが挿入されているセットアップに新しいマテリアライズドビューを追加する必要があることは珍しくありません。ここでは、ストリーム内のポイントを特定するために使用できるタイムスタンプまたは単調増加カラムが役立ち、データ取り込みの一時停止を回避します。以下の例では、両方のケースを仮定し、取り込みを一時停止しないアプローチを優先します。
小さいデータセットで挿入が一時停止される場合を除き、マテリアライズドビューのバックフィルにPOPULATE
コマンドの使用は推奨しません。このオペレーターは、ポピュレートハッシュが完了した後にそのソーステーブルに挿入された行を見逃す可能性があります。さらに、このポピュレートはすべてのデータに対して実行され、大規模データセットでの中断やメモリ制限に脆弱です。
タイムスタンプまたは単調増加カラムが利用可能
この場合、新しいマテリアライズドビューには、今後の任意のデータより大きい行のみに制限するフィルターが含まれていることをお勧めします。マテリアライズドビューは、その後、主テーブルの履歴データを使用してこの日からバックフィルできます。バックフィリングアプローチは、データサイズおよび関連するクエリの複雑性に依存します。
最も単純なアプローチは、次の手順を含みます。
- 任意の近い将来の時間より大きい行のみを考慮するフィルターを持つマテリアライズドビューを作成します。
- マテリアライズドビューのターゲットテーブルに挿入する
INSERT INTO SELECT
クエリを実行し、ビューの集約クエリでソーステーブルから読み込みます。
これは、ステップ(2)でデータのサブセットをターゲットにしたり、マテリアライズドビューのための重複ターゲットテーブルを使用することでさらに強化できます(挿入が完了したら元のにパーティションを添付)。
たとえば、以下のマテリアライズドビューは、時間ごとの最も人気のあるプロジェクトを計算します。
ターゲットテーブルを追加できますが、マテリアライズドビューを追加する前に、そのSELECT
句を変更し、近い将来の任意の時間より大きい行のみを考慮するフィルターを含めます - この場合、2024-12-17 09:00:00
が将来の数分であると仮定します。
このビューが追加されると、このデータ以前のすべてのデータをマテリアライズドビューのためにバックフィルすることができます。
これを行う最も簡単な方法は、最近追加されたデータを無視するフィルターを用いた主テーブルからのマテリアライズドビューのクエリを実行し、結果をマテリアライズドビューのターゲットテーブルにINSERT INTO SELECT
で挿入することです。たとえば、上記のビューの場合:
上記の例では、ターゲットテーブルはSummingMergeTreeです。この場合、元の集約クエリを単純に使用できます。 AggregatingMergeTreeのようなより複雑なユースケースでは、集約のために-State
関数を使用することになります。この例はこちらで見られます。
この場合、比較的軽量な集約が3秒未満で完了し、600MiB未満のメモリを使用しました。より複雑なまたは長時間実行される集約の場合、ユーザーは、前述の重複テーブルアプローチを使用することで、このプロセスをより耐障害性のあるものにできます。例えば、シャドウターゲットテーブル、すなわちpypi_downloads_per_day_v2
を作成し、ここに挿入し、結果のパーティションをpypi_downloads_per_day
に添付することです。
しばしば、マテリアライズドビューのクエリはより複雑で(そうでなければユーザーはビューを使用しないでしょう!)、リソースを消費します。よりまれなケースでは、クエリに必要なリソースがサーバーのそれを超えることがあります。これは、ClickHouseのマテリアライズドビューの利点の1つを強調します - それらはインクリメンタルであり、全データセットを一度に処理しません!
この場合、ユーザーにはいくつかのオプションがあります。
- クエリを修正して、バックフィル範囲を指定します。例:
WHERE timestamp BETWEEN 2024-12-17 08:00:00 AND 2024-12-17 09:00:00
、WHERE timestamp BETWEEN 2024-12-17 07:00:00 AND 2024-12-17 08:00:00
など。 - Nullテーブルエンジンを使用してマテリアライズドビューを充填します。これにより、マテリアライズドビューの典型的なインクリメンタルポピュレーションを複製し、設定可能なサイズのデータブロックでクエリを実行します。
(1)は、最も単純なアプローチで、しばしば十分です。簡略化のために例を含めていません。
以下で(2)をさらに探求します。
マテリアライズドビューを充填するためのNullテーブルエンジンの使用
Nullテーブルエンジンは、データを永続化しないストレージエンジンを提供します(テーブルエンジンの世界では/dev/null
と考えてください)。これは矛盾しているように思えるかもしれませんが、マテリアライズドビューはこのテーブルエンジンに挿入されたデータで実行されます。これにより、元のデータを永続化いせずにマテリアライズドビューを構築でき、I/Oや関連するストレージを回避します。
重要な点は、このテーブルエンジンに付随するマテリアライズドビューは、挿入されるデータのブロックで実行され、結果をターゲットテーブルに送信します。これらのブロックは設定可能なサイズです。より大きなブロックは、効率的に処理できる可能性が高く(処理が速い)、より多くのリソース(主にメモリ)を消費します。このテーブルエンジンを使用することで、マテリアライズドビューをインクリメンタルに構築できるすなわち、一度に1ブロックずつ、メモリに全体の集約を保持する必要を回避できます。
次の例を考えてみましょう。
ここでは、マテリアライズドビューを構築するために使用される行を受信するためにNullテーブルpypi_v2
を作成します。必要なカラムのみをスキーマに制限していることに注意してください。私たちのマテリアライズドビューは、このテーブルに挿入された行に対して集約を行い(1ブロックずつ)、結果をターゲットテーブルpypi_downloads_per_day
に送信します。
ここでターゲットテーブルとしてpypi_downloads_per_day
を使用しています。追加の耐障害性のために、ユーザーは重複テーブルpypi_downloads_per_day_v2
を作成し、ビューのターゲットテーブルとしてこれを使用できます。挿入が完了すると、pypi_downloads_per_day_v2
のパーティションをpypi_downloads_per_day
に移動することができます。これにより、メモリの問題やサーバーの中断により挿入が失敗した場合の回復が可能になります。すなわち、pypi_downloads_per_day_v2
をトランケートし、設定を調整し、再試行するだけです。
このマテリアライズドビューをポピュレートするために、単にpypi
からpypi_v2
にバックフィルする関連データを挿入します。
ここでのメモリ使用量は639.47 MiB
です。
パフォーマンスとリソースの調整
上記のシナリオでのパフォーマンスとリソース使用量は、いくつかの要因によって決まります。調整を試みる前に、読者が最適化のためのS3挿入および読み取りパフォーマンスガイドの使用スレッドのリファレンスセクションで詳細に記載されている挿入メカニズムを理解することをお勧めします。要約すると:
- 読み取りの並列性 - 読み取りに使用されるスレッドの数。
max_threads
を通じて制御されます。ClickHouse Cloudでは、インスタンスのサイズによって決定され、デフォルトではvCPUの数になっています。この値を増やすと、メモリ使用量は多くなりますが、読み取りパフォーマンスが向上する可能性があります。 - 挿入の並列性 - 挿入に使用されるスレッドの数。
max_insert_threads
を通じて制御されます。ClickHouse Cloudでは、インスタンスのサイズ(2〜4の範囲)が決定し、OSSでは1に設定されています。この値を増やすと、メモリ使用量を増やしながらパフォーマンスが向上する可能性があります。 - 挿入ブロックサイズ - データはループで処理され、プルされ、解析され、パーティショニングキーに基づいてメモリ内の挿入ブロックに形成されます。これらのブロックは、ソート、最適化、圧縮され、新しいdata partsとしてストレージに書き込まれます。挿入ブロックのサイズは、設定
min_insert_block_size_rows
およびmin_insert_block_size_bytes
(非圧縮)を通じて制御され、メモリ使用量およびディスクI/Oに影響を与えます。より大きなブロックは、より多くのメモリを使用しますが、部品の数が減り、I/Oおよびバックグラウンドマージが減少します。これらの設定は最小しきい値を表し(最初に到達したほうがフラッシュをトリガーします)。 - マテリアライズドビューのブロックサイズ - 主な挿入に関する上記メカニズムに加え、マテリアライズドビューへの挿入前に、より効率的な処理のためにブロックが圧縮されます。これらのブロックのサイズは、設定
min_insert_block_size_bytes_for_materialized_views
およびmin_insert_block_size_rows_for_materialized_views
によって決定されます。より大きなブロックは、より効率的な処理を可能にしますが、メモリ使用量が増加します。デフォルトでは、これらの設定は、ソーステーブル設定min_insert_block_size_rows
およびmin_insert_block_size_bytes
の値に戻ります。
パフォーマンスを向上させるために、ユーザーは挿入のためのスレッドとブロックサイズの調整セクションで概説されたガイドラインに従うことができます。通常、パフォーマンス向上のためにmin_insert_block_size_bytes_for_materialized_views
およびmin_insert_block_size_rows_for_materialized_views
を変更する必要はありません。これらを変更する場合は、min_insert_block_size_rows
およびmin_insert_block_size_bytes
に関して考えたとおりのベストプラクティスを使用してください。
メモリを最小限に抑えるために、ユーザーはこれらの設定を実験することを検討する場合があります。これは、パフォーマンスを低下させることになります。前述のクエリを使用して、以下に例を示します。
max_insert_threads
を1に設定すると、メモリオーバーヘッドが減少します。
max_threads
設定を1に減少させることで、さらにメモリを減少させることができます。
最後に、min_insert_block_size_rows
を0に設定して(ブロックサイズの決定要因として無効にする)、min_insert_block_size_bytes
を10485760(10MiB)に設定することでさらにメモリを減少できます。
最後に、ブロックサイズを小さくすると部品の数が増え、マージ圧力が高まることに注意してください。これらの設定は慎重に変更する必要があります。こちらで詳しく説明しています。
タイムスタンプまたは単調増加カラムがない場合
上記のプロセスは、ユーザーがタイムスタンプまたは単調増加カラムを持っていることを前提としています。場合によってはこれは単に利用できないことがあります。この場合、以下のプロセスを推奨します。これは、以前に概説された多くの手順を活用しますが、ユーザーは取り込みを一時停止する必要があります。
- 主テーブルへの挿入を一時停止します。
CREATE AS
文を使用して主ターゲットテーブルの複製を作成します。ALTER TABLE ATTACH
を使用して、元のターゲットテーブルから重複テーブルにパーティションを添付します。 注意: このアタッチ操作は、以前に使用した移動とは異なります。ハードリンクに依存してはいますが、元のテーブル内のデータは保存されます。- 新しいマテリアライズドビューを作成します。
- 挿入を再開します。 注意: 挿入はターゲットテーブルのみを更新し、重複テーブルは元のデータのみを参照します。
- マテリアライズドビューをバックフィルし、上記のタイムスタンプでデータに対して使用したプロセスを適用します。重複テーブルをソースとして使用します。
次の例では、PyPIと以前の新しいマテリアライズドビューpypi_downloads_per_day
を使用し(タイムスタンプを使用できないと仮定)、考えてみましょう。
最後から2番目のステップでは、先に説明したシンプルなINSERT INTO SELECT
アプローチを利用してpypi_downloads_per_day
をバックフィルします。このプロセスは、上で示したNullテーブルアプローチを使用して強化することもでき、重複テーブルを使用して耐障害性を高めます。
この操作は挿入を一時停止する必要がありますが、中間操作は通常迅速に完了することができ、データの中断を最小限に抑えることができます。