このトピックでは、コールドデータの特性と使用シナリオについて説明します。また、TablestoreとDelta Lakeを使用してコールドデータとホットデータを分離する方法の例を示します。コールドデータとホットデータを分離することで、コンピューティングリソースとストレージリソースの使用率を最大化し、低コストで高パフォーマンスを実現できます。
背景情報
ビジネスとデータが継続的に成長するにつれて、パフォーマンスとコストのトレードオフは、ビッグデータシステムの設計にとって深刻な課題となっています。
Delta Lakeは新しいデータレイクソリューションです。データの取り込み、データ構造の管理、データのクエリ、データの流出など、一連の機能を提供します。また、データに対するACIDおよびCRUD操作もサポートしています。ACIDは、原子性、一貫性、独立性、耐久性の略です。CRUDは、作成、読み取り、更新、削除の略です。Delta Lakeとその上流および下流のコンポーネントを使用して、使いやすく安全なデータレイクアーキテクチャを構築できます。ハイブリッドトランザクション/分析処理(HTAP)テクノロジーを使用して、階層型ストレージコンポーネントとコンピューティングエンジンを選択できます。このテクノロジーは、大量のデータを分析し、トランザクションを高速に更新し、ホットデータとコールドデータの分離のコストを削減できます。
アクセス頻度に基づくデータ分類
データは、アクセス頻度に基づいて、ホットデータ、ウォームデータ、コールドデータに分類できます。コールドデータとは、データライフサイクル全体でアクセス頻度が低い、またはアクセスされないデータのことです。ほとんどの場合、コールドデータの量は膨大です。
- データの作成時刻:新しく書き込まれたデータは頻繁にアクセスされるため、ホットデータです。時間の経過とともにアクセス頻度は低下します。データにほとんどアクセスされない場合、またはまったくクエリされない場合は、コールドデータになります。
この方法は、トランザクションデータ、時系列メトリックの監視データ、インスタントメッセージング(IM)データなど、ほとんどのデータに適用されます。
- データアクセス頻度:アクセス頻度に基づいて、ビジネスデータに関連するタグを追加できます。システムは、アクセス頻度に基づいてホットデータとコールドデータを自動的に区別することもできます。たとえば、古いブログが突然頻繁にアクセスされるようになったとします。ブログはだいぶ前に作成されたものですが、ビジネスとデータの分散状況に基づいてホットデータとして分類されます。説明 このトピックでは、データの作成時刻に基づくホットデータとコールドデータの分離についてのみ説明します。
コールドデータの特徴
- 大量:ホットデータと比較して、コールドデータは長期間、または永続的に保存する必要があります。
- 低い管理コスト:コールドデータはアクセス頻度が低いです。したがって、ユーザーはコールドデータを低コストで管理することを期待しています。
- 低いパフォーマンス要件:テラバイト単位のデータに対する一般的なクエリとは異なり、コールドデータに対するクエリはミリ秒単位の応答を必要としません。コールドデータに対するクエリは、結果が返されるまでに数十秒以上かかる場合があります。非同期処理もサポートされています。
- 簡単な操作:ほとんどのシナリオでは、コールドデータはバッチ書き込みまたはバッチ削除され、更新されません。
コールドデータをクエリすると、システムはクエリ条件を満たすデータのみを読み取ります。クエリ条件は複雑ではありません。
シナリオ
- 時系列データ:このタイプのデータは、当然ながら時間属性を持っています。データ量は大きく、データに対して追加操作のみが実行されます。時系列データは、次のシナリオで使用されます。
- IM:ほとんどの場合、ユーザーは最近のメッセージのみをクエリします。履歴データは、特別な要件を満たすために、たまにクエリされます。例:DingTalk。
- 監視:ほとんどの場合、ユーザーは最近の監視データのみを表示します。履歴データは、ユーザーが問題を調査したり、レポートを作成したりする必要がある場合にのみクエリされます。例: CloudMonitor。
- 課金:ほとんどの場合、ユーザーは最近数日または最新の月に生成された請求書のみを表示します。 1 年前に生成された請求書はめったにクエリされません。例:Alipay。
- モノのインターネット(IoT):デバイスから最近報告されたデータは頻繁に分析されます。履歴データはあまり分析されません。
- アーカイブデータ:読み書きは簡単だがクエリが複雑なデータの場合、データを定期的にストレージコストが低いストレージコンポーネントまたは圧縮率の高いストレージメディアにアーカイブできます。これは、ストレージコストの削減に役立ちます。
例
この例では、Tablestore と Delta Lake を使用してコールドデータとホットデータを分離する方法を示します。
- ストリーミングデータをリアルタイムで同期します。
- データソーステーブルを作成します。この例では、データソーステーブルは OrderSource という名前の注文テーブルです。テーブルには 2 つのプライマリキー列(UserId と OrderId)と 2 つの属性列(price と timestamp)があります。Tablestore SDK for Java によって提供される BatchWriteRow 操作を呼び出して、注文データをテーブルに書き込みます。timestamp 列の時間範囲は過去 90 日間(2020-02-26 ~ 2020-05-26)です。合計 3,112,400 件のレコードがテーブルに書き込まれます。
この注文書き込み操作をシミュレートする場合は、タイムスタンプを Tablestore テーブルのバージョン番号属性列に書き込みます。テーブルの有効期限(TTL)属性を設定します。テーブル内のデータの保存期間が TTL を超えると、システムはバージョン番号に基づいてデータを自動的に削除します。
- Tablestore コンソールで増分同期用のトンネルを作成します。作成されたトンネルでサポートされている CDC テクノロジーを使用して、ソーステーブルの増分データを Delta Lake に同期します。トンネル ID は、後続の SQL ステートメント設定で使用されます。
- EMR クラスタのヘッダーノードで、
Streaming SQL
対話型コマンドラインを有効にします。streaming-sql --master yarn --use-emr-datasource --num-executors 16 --executor-memory 4g --executor-cores 4
次のコマンドを実行して、ソーステーブルとデスティネーションテーブルを作成します。// 1. Tablestore ソーステーブルを作成します。 DROP TABLE IF EXISTS order_source; CREATE TABLE order_source USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderSource", catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"},"price": {"col": "price", "type": "double"}, "timestamp": {"col": "timestamp", "type": "long"}}}', ); // 2. デスティネーションテーブルとして delta_orders という名前の Delta Lake シンクを作成します。 DROP TABLE IF EXISTS delta_orders; CREATE TABLE delta_orders( UserId string, OrderId string, price double, timestamp long ) USING delta LOCATION '/delta/orders'; // 3. ソーステーブルの増分スキャンビューを作成します。 CREATE SCAN incremental_orders ON order_source USING STREAM OPTIONS( tunnel.id="324c6bee-b10d-4265-9858-b829a1b71b4b", maxoffsetsperchannel="10000"); // 4. CDC テクノロジーに基づいて、ソーステーブルからデスティネーションテーブルにデータを同期するストリーミングジョブを実行します。 CREATE STREAM orders_job OPTIONS ( checkpointLocation='/delta/orders_checkpoint', triggerIntervalMs='3000' ) MERGE INTO delta_orders USING incremental_orders AS delta_source ON delta_orders.UserId=delta_source.UserId AND delta_orders.OrderId=delta_source.OrderId WHEN MATCHED AND delta_source.__ots_record_type__='DELETE' THEN DELETE WHEN MATCHED AND delta_source.__ots_record_type__='UPDATE' THEN UPDATE SET UserId=delta_source.UserId, OrderId=delta_source.OrderId, price=delta_source.price, timestamp=delta_source.timestamp WHEN NOT MATCHED AND delta_source.__ots_record_type__='PUT' THEN INSERT (UserId, OrderId, price, timestamp) values (delta_source.UserId, delta_source.OrderId, delta_source.price, delta_source.timestamp);
次の表に、これらの操作の詳細を示します。操作 説明 Tablestore ソーステーブルを作成します。 order_source という名前のソーステーブルを作成します。 OPTIONS の
catalog
パラメーターは、テーブルスキーマを定義します。この例では、テーブルには UserId、OrderId、price、timestamp の各列が含まれています。Delta Lake シンクを作成します。 delta_orders という名前のデスティネーションテーブルを作成します。 LOCATION パラメーターは、Delta ファイルが格納される場所を指定します。
ソーステーブルの増分データスキャンビューを作成します。 incremental_orders という名前のストリーミングビューを作成します。 tunnel.id
:手順 1.b で作成したトンネルの ID。maxoffsetsperchannel
:トンネルを介して各パーティションに書き込むことができる最大データ量。
ストリーミングジョブを実行してデータをリアルタイムで同期します。 プライマリキー列(UserId と OrderId)に基づいてソーステーブルのデータを集計します。CDC ログの操作タイプ(PUT、UPDATE、DELETE)に基づいて集計データを変換し、変換されたデータを Delta Lake に同期します。 __ots_record_type__
:Tablestore のストリーミングソースによって提供される定義済みの列。行ベースの操作タイプです。
- データソーステーブルを作成します。
- ホットデータとコールドデータをクエリします。テラバイト単位のデータに対する効率的なクエリのためにホットデータを Tablestore テーブルに保存し、コールドデータまたはすべてのデータを Delta Lake シンクに保存することをお勧めします。この例では、Tablestore テーブルはソーステーブル order_source で、Delta Lake シンクはデスティネーションテーブル delta_orders です。ソーステーブルの TTL を設定します。このようにして、ホットデータの量を柔軟に制御できます。
- TTL を設定する前に、ソーステーブルとデスティネーションテーブルのデータエントリ数をクエリします。クエリ結果は一致します。
- Tablestore テーブルの TTL を過去 30 日間に設定します。ソーステーブルには、過去 30 日間に生成されたデータのみが含まれています。デスティネーションテーブルにはすべてのデータが含まれています。ホットデータとコールドデータが分離されています。
- ソーステーブルとデスティネーションテーブルのデータエントリ数を再度クエリします。ソーステーブルのデータエントリ数は 1,017,004 で、デスティネーションテーブルのデータエントリ数は依然として 3,112,400 です。
- TTL を設定する前に、ソーステーブルとデスティネーションテーブルのデータエントリ数をクエリします。