E-MapReduce (EMR) は、スケーラブルなコンピューティングクラスター、多様なソースからの異種データ統合およびガバナンス、およびバッチ処理とストリーム処理を統一的に実行する機能を提供します。EMR は、金融リスク管理、EC 精密マーケティング、IoT 時系列データ処理など、データ集約型ワークロードに最適です。本ドキュメントでは、EMR の主な適用範囲として、データレイク、データ分析、リアルタイムデータストリーミング、およびデータ提供の 4 つのシナリオについて説明します。
データレイクのシナリオ
EMR DataLake クラスターは、OSS-HDFS 上に構築されたエンタープライズ向けデータレイクのストレージ、ガバナンス、分析を一元化するチーム向けに設計されています。3 つのコア機能が連携し、生データの取り込みから下流アプリケーションへの活用まで、フルライフサイクルをカバーします。
| コア機能 | コンポーネント | 説明 |
|---|---|---|
| 統一ストレージレイヤー | OSS-HDFS | Hadoop 分散ファイルシステム (HDFS) プロトコルと互換性のあるオブジェクトストレージレイヤーです。OSS-HDFS はオンプレミスの HDFS を置き換え、コンピュートとストレージをデカップリングし、コンピュートノードを独立してスケールできます。 |
| レイクメタデータガバナンス | Data Lake Formation (DLF) | Object Storage Service (OSS)、データベース、ファイルシステムを横断した統一メタデータカタログです。DLF は自動メタデータディスカバリー、細かい粒度での権限管理、およびデータリネージ追跡を提供します。 |
| フルスタック分析エンジン | Spark、Hive、Presto/Trino | Spark または Hive を用いたオフラインの抽出・変換・ロード (ETL)、および Presto または Trino を用いたインタラクティブクエリをサポートします。DataWorks および Quick BI との統合も可能です。 |
以下の図は、EMR DataLake シナリオにおけるエンドツーエンドのデータフローを示しています。
ステップ 1:複数のソースからのデータ取り込み
異なるソースシステムからのデータは、元のフォーマットのまま OSS-HDFS に格納されます。
-
リレーショナルデータベース (MySQL、Oracle):定期的なスケジュールで Sqoop または DataX を使用して全量または増分データを抽出し、ビジネステーブルスキーマに基づいて OSS-HDFS に書き込みます。
-
ノンリレーショナルデータベース (MongoDB、Redis):カスタムスクリプトまたは Spark コネクタを使用して JSON またはバイナリ形式のデータを OSS-HDFS にエクスポートします。
-
ログデータ:Logstash または Flume を使用して、ユーザー行動ログやシステムログなどの増分ログを収集し、分単位の遅延で OSS-HDFS に書き込みます。
-
ファイルデータ:JindoSDK と HDFS API を使用して CSV、Parquet などのファイルを一括で OSS-HDFS にアップロードするか、OSS コンソールから直接ファイルをアップロードします。
ステップ 2:データの処理および分析
OSS-HDFS 内の生データを、クエリ可能なビジネスメトリクスへと精製します。
-
バッチ処理:Spark および Hive ジョブを実行して、生ログおよびビジネスデータのクレンジング、関連付け、集計を行います。代表的な出力例として、デイリーアクティブユーザー数、30 日間ユーザー保持率、SKU (Stock Keeping Unit) 別新規注文数があります。
-
インタラクティブクエリ:標準 SQL を用いた Trino または Presto を使用して、サブ秒レベルの応答時間でビッグデータをクエリし、運用チームによるアドホック分析を支援します。
ステップ 3:下流システムへのデータ適用
処理済みデータは、さまざまなビジネアプリケーションを駆動します。
-
データサイエンス:API サービス経由で処理済みデータをリスク管理エンジンやレコメンデーションシステムなどの下流システムに公開します。
-
ビジネスインテリジェンス:Java Database Connectivity (JDBC) API を介して Quick BI などのツールに接続し、インタラクティブなレポートを作成します。
-
予測分析:処理結果および特徴量データを機械学習プラットフォームに送信してモデルをトレーニング(例:SKU 別売上予測モデル)し、結果をデータレイクに再格納します。
-
データ可視化:JDBC API を介して DataV などの可視化ツールに接続し、ダッシュボード上で複雑なデータを表示します。
データ分析のシナリオ
EMR Online Analytical Processing (OLAP) クラスターは、高性能なビッグデータ分析を実行するチーム向けに設計されています。StarRocks、Doris、ClickHouse といった OLAP エンジンと統合されており、これらはいずれも効率的なデータ圧縮、列指向ストレージ、パラレルクエリ実行という 3 つのパフォーマンス特性を備えています。これらの特性により、OLAP クラスターはユーザー行動プロファイリング、受信者選定、およびビジネスインテリジェンスなどのワークロードに最適です。
以下の図は、StarRocks 分析エンジンを用いた EMR OLAP シナリオにおけるエンドツーエンドのデータフローを示しています。
ステップ 1:データの収集
データは、リアルタイムおよびオフラインの両方のソースから StarRocks に到着します。
-
リアルタイム:Flume がログデータをキャプチャし、Message Queue for Apache Kafka が高スループットかつ低遅延でデータストリームをバッファーすることで、リアルタイム取り込みを安定化します。
-
オフライン:Sqoop または DataX を使用して、MySQL や Oracle などのリレーショナルデータベースから定期的にデータを抽出し、StarRocks にロードします。
ステップ 2:StarRocks 内でのデータレイヤリング
StarRocks は、現代のデータレイクハウスで採用されるメダリオンアーキテクチャと同様の階層構造で、データを 4 つの階層に整理します。各レイヤーは前のレイヤーを基盤とし、独立して再利用可能です。
-
DIM レイヤー(ディメンションレイヤー):ユーザー属性や商品カテゴリなどのディメンションデータを格納し、多粒度の分析をサポートします。
-
ODS レイヤー(Operational Data Store レイヤー):生データを元の状態で保持し、バックトラッキング分析を可能にします。
-
DWD レイヤー(Data Warehouse Detail レイヤー):データクレンジング、フォーマット標準化、基本的な関連付けを適用して、詳細なデータセットを生成します。
-
DWS レイヤー(Data Warehouse Summary レイヤー):ユーザー行動や注文コンバージョンなどのビジネス主題ごとにメトリクスを事前集計し、クエリ遅延を低減します。
ステップ 3:データの適用
レイヤードされたデータは、以下の 3 つの主要なビジネスアプリケーションを駆動します。
-
ユーザー行動プロファイリング:DIM レイヤーの属性タグと DWS レイヤーの行動データを組み合わせて、精密マーケティング向けのユーザー行動プロファイルを構築します。
-
受信者選定:過去 30 日間に非常にアクティブであったが購入履歴がないユーザーなど、複数の条件でユーザーをフィルタリングします。
-
ビジネスインテリジェンス:JDBC API を介して Quick BI に接続し、日次レポート、週次レポート、リアルタイムダッシュボードを生成します。
リアルタイムデータストリーミングのシナリオ
EMR Dataflow クラスターは、リアルタイムリスク管理やリアルタイムダッシュボードなど、継続的かつ低遅延のデータ取り込みおよび分析を必要とするワークロード向けに設計されています。このクラスターは、ストレージ、ストリーム処理、増分データ管理の 3 つのコアコンポーネントを統合しています。
-
OSS-HDFS:HDFS プロトコルと互換性のあるスケーラブルなストレージレイヤーです。ペタバイト規模のリアルタイムデータの永続ストレージ、ミリ秒単位の書き込み、およびコスト効率の高いコールド/ホットデータ階層化をサポートします。
-
Flink:データストリームに対する ETL(ログ解析、ディメンション関連付け)、ウィンドウ集計(分単位の GMV(総取扱高)測定)、複雑なイベント処理(リスク管理ルール評価)を実行します。
-
Paimon:ストリーミングデータレイク内でリアルタイム増分データおよび履歴スナップショットを管理します。変更データキャプチャ (CDC) 同期、ACID 特性(原子性、一貫性、独立性、永続性)を備えたトランザクション、およびタイムトラベルクエリをサポートします。
以下の図は、Flink、Paimon、OSS-HDFS が協調してリアルタイムダッシュボードを支えるストリーミングデータレイクハウスを構築する仕組みを示しています。
ステップ 1:複数のソースからのリアルタイムデータ取り込み
Flink コネクタを使用して、複数の上流システムからデータベースの変更、ログ、イベントトラッキングデータを同時に収集します。
ステップ 2:ストリーミングデータレイクハウスの構築
Flink および Paimon が、構造化されクエリ可能なレイクハウス形式でデータを処理・保存します。
-
Flink:統合型のバッチおよびストリームデータ計算エンジンとして、データストリームをリアルタイムで消費し、データクレンジング(ノイズ除去、フォーマット標準化)、変換(ログ解析、イベントトラッキングポイントの標準化)、ディメンション関連付けを実行します。
-
Paimon:処理結果をストリーミングデータレイクに格納する際に、以下の 2 つの主要な機構を活用します:
-
変更履歴: データの挿入、更新、削除を記録し、ACID トランザクションの整合性とリアルタイム増分同期を保証します。
-
階層モデリング:Paimon を ODS、DWD、DWS レイヤーと組み合わせることで、増分データの蓄積および再利用を目的とした階層化データアーキテクチャを構築します。
-
-
OSS-HDFS:生ログ、Paimon 増分スナップショット、履歴アーカイブデータを永続的に格納します。
ステップ 3:ビジネスインサイトの提供
処理済みのレイクハウスデータは、リアルタイムレポートおよび意思決定ツールに供給されます。
-
StarRocks を使用して、GMV モニタリングやユーザー保持率分析などのリアルタイムビジネスレポートを生成します。
-
Quick BI に接続して、T+0 意思決定を可能にするダッシュボードを構築します。
データ提供のシナリオ
EMR DataServing クラスターは、ミリ秒単位の低遅延で大規模データセットを格納およびクエリするワークロード向けに設計されています。このクラスターは、OSS-HDFS、HBase、Phoenix を統合しており、生データの格納から複雑なクエリ実行までの完全なパスをカバーし、ユーザー行動分析や精密マーケティングなどのユースケースをサポートします。
-
HBase:分散型・列指向データベースで、大規模データセットに対する高スループットのリアルタイム読み取り/書き込みを提供します。注文ステータスやユーザー行動記録などに対して、ミリ秒単位のポイントクエリを実現します。HBase は HFiles を OSS-HDFS に永続化することで、ストレージとコンピュートをデカップリングし、クラスターの迅速な再作成を可能にします。
-
Phoenix:HBase 向けの SQL クエリエンジンです。Phoenix は NoSQL データを標準的なリレーショナルテーブルにマッピングし、数百億件のレコード規模でもサブ秒レベルの応答時間で、マルチテーブル関連付けや集計計算などの複雑な SQL 分析をサポートします。セカンダリインデックスおよびクエリプッシュダウンにより、タグベースの選択やユーザーグループ化が高速化され、ビジネスチームの開発負荷が軽減されます。
以下の図は、EMR DataServing クラスターが HBase + OSS-HDFS ストレージアーキテクチャおよび Phoenix クエリエンジンを活用してユーザー行動分析を実現する仕組みを示しています。
ステップ 1:データの処理
ストリーム処理とバッチ処理の 2 つのパスから、データが HBase クラスターに供給されます。
-
ストリーム処理 (Flink):ログデータストリームをリアルタイムで消費し、データクレンジング(ノイズ除去、フォーマット標準化)、ウィンドウ集計(リアルタイムユニーク訪問者 (UV) 数算出)、イベントアラート(異常トラフィック検出)を実行した後、HBase API 経由で結果を HBase に書き込みます。
-
バッチ処理 (Spark):リレーショナルデータベースのデータに対して定期的なバッチジョブを実行し、ユーザー属性タグ算出やデータ重複排除などの ETL 処理を行い、出力を HBase に書き込みます。
ステップ 2:大規模データの格納
2 つのストレージレイヤーが、異なるアクセスパターンに対応します。
-
OSS-HDFS:生ログおよび HBase HFiles を永続的に格納します。JindoCache により OSS-HDFS の読み取り/書き込み遅延が低減されます。
-
HBase クラスター:リアルタイム書き込み(ユーザー行動記録)および高頻度ポイントクエリ(注文ステータス照会)を処理します。
ステップ 3:ユーザー行動のクエリ
ビジネスチームは、HBase に対して Phoenix SQL クエリを実行し、精密マーケティングに必要な行動洞察を抽出します。例えば、「過去 7 日間に特定の製品カテゴリを購入し、関連広告をクリックしたユーザー」を特定できます。