すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:E-MapReduce データ移行ソリューション

最終更新日:Apr 11, 2025

このページでは、自己構築クラスターから E-MapReduce (EMR) クラスターにデータを移行する方法について説明します。

適用可能な移行シナリオは以下のとおりです。
  • オフラインの Hadoop クラスターからE- MapReduce へのデータの移行
  • ECS 上の自己構築 Hadoop クラスターから E-MapReduce へのデータの移行
サポートされているデータソースは以下のとおりです。
  • RDS 増分データおよび Flume などの HDFS 増分アップストリームデータソース

新旧クラスター間のネットワーク相互接続

  • オフライン IDC 上の自己構築 Hadoop クラスター

    自己構築 Hadoop クラスターは、OSS を介して、または Alibaba Cloud Express Connect を使用して E-MapReduce に移行し、オフライン IDC と E-MapReduce クラスターが配置されている VPC との間の接続を確立できます。

  • ECS インスタンス上の自己構築 Hadoop クラスター
    VPC ネットワークは論理的に隔離されているため、相互接続を確立するには VPC-Connected E-MapReduce サービスを使用するよう推奨します。 相互接続に関連する特定のネットワークタイプに応じて、以下の操作を実行する必要があります。
    • 従来のネットワークと VPC ネットワークとの間の相互接続

      ECS インスタンスで構築された Hadoop クラスターの場合は、ECS ClassicLink の手法を使用してクラシックネットワークと VPC ネットワークを相互接続する必要があります。 詳細は、「 ClassicLink 接続の構築 (Build a ClassicLink connection)」をご参照ください。

    • VPC ネットワーク間の相互接続

      VPC ネットワーク間の最適な接続を確保するには、古いクラスターと同じリージョンおよびゾーンに新しいクラスターを作成するよう推奨します。

HDFS データ移行

  • DistCp とのデータの同期化

    詳細は、「Hadoop DistCp」をご参照ください。

    DistCp ツールを使用して、完全 HDFS データと増分 HDFS データを移行できます。 現在のクラスターリソースへの負荷を軽減するため、新旧のクラスータネットワークが相互接続された後に distcp コマンドを実行するよう推奨します。

    • 完全データ同期
      hadoop distcp -pbugpcax -m 1000 -bandwidth 30 hdfs://oldclusterip:8020/user/hive/warehouse /user/hive/warehouse
    • 増分データ同期
      hadoop distcp -pbugpcax -m 1000 -bandwidth 30  -update –delete hdfs://oldclusterip:8020/user/hive/warehouse /user/hive/warehouse
    パラメーターの説明 :
    • hdfs://oldclusterip:8020 は古いクラスターのネームノード IP を示します。 複数のネームノードがある場合は、アクティブ状態 niar にある u ネームノードを入力します。
    • デフォルトのレプリカの数は 3 です。 元のレプリカ数を保持する場合は、-prbugpcax のように -p の後に r を追加します。 権限と ACL の同期化が必要ない場合は、-p の後の p と a を削除します。
    • -m は、データ量に対応するマップの量とクラスターのサイズを示します。 たとえば、クラスターの CPU が 2000 コアの場合、2000 個のマップを指定できます。
    • -bandwidth は、単一のマップの同期速度の推定値を示しており、レプリカのコピー速度を制御することによって実装されます。
    • -update。ソースファイルとターゲットファイルのチェックサムとファイルサイズを検証します。 比較されたファイルサイズが異なる場合、ソースファイルはターゲットクラスターデータを更新します。 新旧のクラスターの同期中にデータの書き込みがある場合は、増分データ同期に -update を使用できます。
    • -delete。古いクラスターのデータがもう存在しない場合、新しいクラスターのデータは削除されます。
    説明
    • 移行の全体的な速度は、クラスターの帯域幅とサイズによって変動します。 ファイル数が多いほど、チェックサムの処理に時間がかかります。 移行するデータが大量な場合は、複数のディレクトリを同期して全体の時間を評価してみます。 指定した期間内に同期が実行される場合は、ディレクトリをいくつかの小さなディレクトリに分割して一度に 1 つずつ同期できます。
    • 二重書き込みおよび二重カウントを可能にし、サービスを新しいクラスターに直接切り替えるには、完全データ同期に短いサービス停止が必要です。
  • HDFS 権限の設定

    HDFS は権限の設定を提供しています。 HDFS データを移行する前に、古いクラスターに ACL ルールがあるかどうか確認し、ルールを同期する場合は、新旧のクラスターで同じ dfs.permissions.enabled および dfs.namenode.acls.enabled が設定されているかどうかチェックします。 設定はすぐに有効になります。

    同期する ACL ルールがある場合は、permission パラメーターを同期するため distcp パラメーターを -p に追加する必要があります。 distcp 操作でクラスターが ACL をサポートしていないことが 表示される場合、対応するクラスターに ACL ルールが設定されていないことを意味します。 新しいクラスターが ACL ルールによって設定されていない場合は、ルールを追加して NM を再起動できます。 古いクラスターが ACL ルールをサポートしていない場合、ACL ルールを設定または同期する必要はありません。

Hive メタデータ同期

  • 概要
    Hive メタデータは通常 MySQL に格納されます。 MySQL のデータ同期と比較して以下の点にご注意ください。
    • 保存位置を変更します。
    • Hive バージョンを調整する必要があります。
    E-MapReduce は以下を含む Hive メタベースをサポートします。
    • EMR が RDS を管理し、各ユーザーにスキーマがあるユニファイドメタベース
    • 自己構築 RDS
    • ECS インスタンス上の自己構築 MySQL

    古いクラスターと新しいクラスターとの間の移行後もデータの一貫性を保つため、移行中にメタストアサービスを停止し、移行後に古いクラスターでメタストアサービスをオープンし、新しいクラスターでジョブをサブミットするよう推奨します。

  • 操作手順 :
    1. 新しいクラスターのメタベースを削除し、drop database xxx と入力します。
    2. mysqldump コマンドを実行して古いクラスターのメタベースのテーブル構造とデータをエクスポートします。
    3. 保存位置を移動します。 Hive メタデータ内のすべてのテーブルとパーティションの位置情報は、hdfs://mycluster:8020/ など dfs nameservices プレフィックス内にあります。 ただし、E-MapReduce クラスターの nameservices プレフィックスは emr-cluster で、保存位置情報を置き換える必要があります。
      位置情報を置き換えるには、以下のとおりデータをエクスポートします。
      mysqldump --databases hivemeta --single-transaction -u root –p > hive_databases.sql
      sed を使用して hdfs://oldcluster:8020/hdfs://emr-cluster/ に置き変えてから新しいデータベースにデータをインポートします。
      mysql hivemeta -p < hive_databases.sql
    4. 新しいクラスターのインターフェイスで、hivemetastore サービスを停止します。
    5. 新しいメタベースにログインしてデータベースを作成します。
    6. location フィールドが置き換えられたら、古いメタベースからエクスポートされたすべてのデータを新しいメタベースにインポートします。
    7. 現在、E-MapReduce Hive バージョンは最新の安定バージョンです。 ただし、自己構築クラスターの Hive バージョンがそれより前の場合、インポートされたデータは直接使用できない場合があります。 この問題を解決するには、アップグレードした Hive スクリプトを実行する必要があります (テーブルとフィールドの問題は無視)。 詳細は、「Hive アップグレードスクリプト (Hive upgrade scripts)」をご参照ください。 たとえば Hive 1.2 を 2.3.0 にアップグレードするには、upgrade-1.2.0-to-2.0.0.mysql.sqlupgrade-2.0.0-to-2.1.0.mysql.sqlupgrade-2.1.0-to-2.2.0.mysql.sql、および upgrade-2.2.0-to-2.3.0.mysql.sql を連続して実行します。 このスクリプトは主にテーブルのビルド、フィールドの追加、コンテンツの変更に使用します。
    8. テーブルとフィールドがすでに存在するという例外は無視できます。 すべてのメタデータを改訂後、MetaServer を起動し、コマンドラインで hive コマンドを実行してデータベースとテーブルを照会し、情報が正しいことを検証します。

Flume データ移行

  • 2 クラスター設定で Flume 同時書き込み

    新しいクラスターで Flume サービスを有効にし、古いクラスターと同じルールに従って新しいクラスターにデータを書き込みます。

  • Flume パーティションテーブルの書き込み

    Flume データの二重書き込みを実行する場合は、Flume が新しい時刻パーティションを開始する時点で新しいクラスターが確実に同期化されるよう開始タイミングを制御する必要があります。 Flume が 1 時間ごとにすべてのテーブルを同期する場合は、次の同期の前に Flume 同期サービスを有効にする必要があります。 これにより、次の新しい時間に Flume が書き込むデータは確実に正しく複製されます。 DistCp との完全データ同期が実行されると、不完全な古いデータが同期化されます。 同時書き込み時間が有効になった後に生成された新しいデータは同期されません。

    説明 データを分割する場合は、新しく書き込まれたデータをデータ同期ディレクトリに入れないでください。

ジョブ同期

Hadoop、Hive、Spark、および MapReduce のバージョンアップが大規模な場合は、オンデマンドでジョブを再構築できる場合があります。

一般的な問題と対応するソリューションは以下のとおりです。
  • ゲートウェイ OOM

    /etc/ecm/hive-conf/hive-env.sh を変更します。

    export HADOOP_HEAPSIZE=512 は 1024 に変更されます。

  • ジョブ実行メモリ不足

    mapreduce.map.java.opts は、JVM 仮想マシンの起動時に仮想マシンに渡される起動パラメーターを調整します。 デフォルト値 -Xmx200m は、この Java プログラムが使用するヒープメモリの最大量を示します。 この量を超えると、JVM はメモリ不足例外を表示し、

    set mapreduce.map.java.opts=-Xmx3072m プロセスを終了します。

    mapreduce.map.memory.mb で、NodeManager によって読み取られ制御されるコンテナのメモリ制限を設定します。 コンテナのメモリサイズがこのパラメーター値を超えると、NodeManager はコンテナを強制終了します。

    set mapreduce.map.memory.mb=3840

データ検証

データはお客様の自己生成レポートを介して検証されます。

Presto クラスターの移行

Presto クラスターをデータクエリに使用する場合は、Hive 設定ファイルを変更する必要があります。 詳細は、「Presto ドキュメント (Presto documentation)」をご参照ください。

変更が必要な Hive プロパティは以下のとおりです。
  • connector.name=hive-hadoop2
  • hive.metastore.uri=thrift://emr-header-1.cluster-500148414:9083
  • hive.config.resources=/etc/ecm/hadoop-conf/core-site.xml, /etc/ecm/hadoop-conf/hdfs-site.xml
  • hive.allow-drop-table=true
  • hive.allow-rename-table=true
  • hive.recursive-directories=true
Hive バージョン 1.2 から 2.3 へのアップグレード例:
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-1.2.0-to-2.0.0.mysql.sql
CREATE TABLE COMPACTION_QUEUE (
  CQ_ID bigint PRIMARY KEY,
  CQ_DATABASE varchar(128) NOT NULL,
  CQ_TABLE varchar(128) NOT NULL,
  CQ_PARTITION varchar(767),
  CQ_STATE char(1) NOT NULL,
  CQ_TYPE char(1) NOT NULL,
  CQ_WORKER_ID varchar(128),
  Cq_start bigint,
  CQ_RUN_AS varchar(128),
  CQ_HIGHEST_TXN_ID bigint,
  CQ_META_INFO varbinary(2048),
  CQ_HADOOP_JOB_ID varchar(32)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE TXNS (
  TXN_ID bigint PRIMARY KEY,
  TXN_STATE char(1) NOT NULL,
  TXN_STARTED bigint NOT NULL,
  TXN_LAST_HEARTBEAT bigint NOT NULL,
  TXN_USER varchar(128) NOT NULL,
  TXN_HOST varchar(128) NOT NULL,
  TXN_AGENT_INFO varchar(128),
  TXN_META_INFO varchar(128),
  TXN_HEARTBEAT_COUNT int
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE HIVE_LOCKS (
  HL_LOCK_EXT_ID bigint NOT NULL,
  HL_LOCK_INT_ID bigint NOT NULL,
  HL_TXNID bigint,
  name varchar(128)NOT NULL、
  HL_TABLE varchar(128),
  HL_PARTITION varchar(767),
  HL_LOCK_STATE char(1) not null,
  HL_LOCK_TYPE char(1) not null,
  HL_LAST_HEARTBEAT bigint NOT NULL,
  HL_ACQUIRED_AT bigint,
  name varchar(128)NOT NULL、
  name varchar(128)NOT NULL、
  HL_HEARTBEAT_COUNT int,
  HL_AGENT_INFO varchar(128),
  HL_BLOCKEDBY_EXT_ID bigint,
  HL_BLOCKEDBY_INT_ID bigint,
  PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID),
  KEY HIVE_LOCK_TXNID_INDEX (HL_TXNID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE INDEX HL_TXNID_IDX ON HIVE_LOCKS (HL_TXNID);
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-1.2.0-to-2.0.0.mysql.sql
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql

CREATE TABLE TXN_COMPONENTS (
  TC_TXNID bigint,
  TC_DATABASE varchar(128) NOT NULL,
  TC_TABLE varchar(128),
  TC_PARTITION varchar(767),
  FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG`
(
    `NL_ID` BIGINT(20) NOT NULL,
    `EVENT_ID` BIGINT(20) NOT NULL,
    `EVENT_TIME` INT(11) NOT NULL,
    `EVENT_TYPE` varchar(32) NOT NULL,
    `DB_NAME` varchar(128),
    `TBL_NAME` varchar(128),
    `MESSAGE` mediumtext,
    PRIMARY KEY (`NL_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE IF NOT EXISTS `PARTITION_EVENTS` (
  `PART_NAME_ID` bigint(20) NOT NULL,
  `DB_NAME` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `EVENT_TIME` bigint(20) NOT NULL,
  `EVENT_TYPE` int(11) NOT NULL,
  `PARTITION_NAME` varchar(767) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  `TBL_NAME` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
  PRIMARY KEY (`PART_NAME_ID`),
  KEY `PARTITIONEVENTINDEX` (`PARTITION_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

CREATE TABLE COMPLETED_TXN_COMPONENTS (
  CTC_TXNID bigint NOT NULL,
  CTC_DATABASE varchar(128) NOT NULL,
  CTC_TABLE varchar(128),
  CTC_PARTITION varchar(767)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
 source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.2.0-to-2.3.0.mysql.sql
  CREATE TABLE NEXT_TXN_ID (
  NTXN_NEXT bigint NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_TXN_ID VALUES(1);
CREATE TABLE NEXT_LOCK_ID (
  NL_NEXT bigint NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_LOCK_ID VALUES(1);