在開發過程中我們通常會碰到需要遷移資料的情境,本文介紹如何將自建叢集資料移轉到E-MapReduce叢集中。
背景資訊
- 線下Hadoop到E-MapReduce遷移。
- 線上ECS自建Hadoop到E-MapReduce遷移。
遷移情境:HDFS增量上遊資料來源包括RDS增量資料和Flume。
新舊叢集網路打通
- 線下IDC自建Hadoop
自建Hadoop遷移到E-MapReduce可以通過OSS進行過渡,或者使用阿里雲Express Connect產品建立線下IDC和線上E-MapReduce所在VPC網路的連通。
- 利用ECS自建Hadoop 由於VPC實現使用者專用網路之間的邏輯隔離,E-MapReduce建議使用VPC網路。
- 傳統網路與VPC網路打通
如果ECS自建Hadoop,需要通過ECS的classiclink的方式將傳統網路和VPC網路打通,詳情請參見建立ClassicLink串連。
- VPC網路之間連通
資料移轉一般需要較高的網路頻寬連通,建議新舊叢集盡量處在同一個地區的同一個可用性區域內。
- 傳統網路與VPC網路打通
HDFS資料移轉
- Distcp工具同步資料
HDFS資料移轉可以通過Hadoop社區標準的DistCp工具遷移,可以實現全量和增量的資料移轉。為減輕現有叢集資源壓力,建議在新舊叢集網路連通後在新叢集執行distcp命令。
- 全量資料同步
hadoop distcp -pbugpcax -m 1000 -bandwidth 30 hdfs://oldclusterip:8020/user/hive/warehouse /user/hive/warehouse - 增量資料同步
hadoop distcp -pbugpcax -m 1000 -bandwidth 30 -update -delete hdfs://oldclusterip:8020/user/hive/warehouse /user/hive/warehouse
參數說明:oldclusterip:填寫舊叢集namenode ip,多個namenode情況填寫目前狀態為active的。-p:預設副本數為3,如想保留原有副本數,-p後加r如-prbugpcax。如果不同步許可權和ACL,-p後去掉p和a。-m:指定map數,和叢集規模、資料量有關。例如叢集有2000核CPU,就可以指定2000個map。-bandwidth:指定單個map的同步速度,是靠控制副本複製速度實現的,是大概值。-update:校正源檔案和目標檔案的checksum和檔案大小,如果不一致源檔案會更新掉目的地組群資料,新舊叢集同步期間還有資料寫入,可以通過-update做增量資料同步。-delete:如果源叢集資料不存在,新叢集的資料也會被刪掉。
說明- 遷移整體速度受叢集間頻寬、叢集規模影響。同時檔案越多,checksum需要的時間越長。如果遷移資料量大,可以先試著同步幾個目錄評估一下整體時間。如果只能在指定時間段內同步,可以將目錄切為幾個小目錄,依次同步。
- 一般全量資料同步,需要有個短暫的業務停寫,以啟用雙寫雙算或直接將業務切換到新叢集上。
- 全量資料同步
- HDFS許可權配置
HDFS有使用權限設定,確定舊叢集是否有ACL規則,是否要同步,檢查新舊叢集dfs.permissions.enabled和dfs.namenode.acls.enabled的配置是否一致,按照實際需要修改。
如果有ACL規則要同步,distcp參數後要加
-p同步許可權參數。如果distcp操作提示xx叢集不支援 ACL,說明對應叢集沒配置ACL規則。新叢集沒配置ACL規則可以修改配置並重啟namenode。舊叢集不支援,說明舊叢集根本就沒有ACL方面的設定,也不需要同步。
Hive中繼資料同步
- 概述 Hive中繼資料,一般存在MySQL裡,與一般MySQL同步資料相比,要注意兩點:
- Location變化
- Hive版本對齊
E-MapReduce支援Hive Meta DB:- 統一中繼資料庫,E-MapReduce管控RDS,每個使用者一個Schema
- 使用者自建RDS
- 使用者ECS自建MySQL
為了保證遷移之後新舊資料完全一致,最好是在遷移的時候將老的metastore服務停掉,等遷移過去之後,再把舊叢集上的 metastore 服務開啟,然後新叢集開始提交業務作業。
- 操作步驟:
- 將新叢集的中繼資料庫刪除,直接輸出命令
drop database xxx。 - 將舊叢集的中繼資料庫的表結構和資料通過
mysqldump命令全部匯出。 - 替換location、Hive中繼資料中分區等資訊均帶有location資訊的,帶dfs nameservices首碼的表,如hdfs://mycluster:8020/,而E-MapReduce叢集的nameservices首碼是統一的E-MapReduce-cluster,所以需要訂正。 訂正的最佳方式是先匯出資料 。
mysqldump --databases hivemeta --single-transaction -u root -p > hive_databases.sql用sed替換hdfs://oldcluster:8020/為hdfs://E-MapReduce-cluster/ ,再匯入新db中。mysql hivemeta -p < hive_databases.sql - 在新叢集的介面上,停止掉hivemetastore服務。
- 登入新的中繼資料庫,
create database建立資料庫。 - 在新的中繼資料庫中,匯入替換location欄位之後的老中繼資料庫匯出來的所有資料。
- 版本對齊,E-MapReduce的Hive版本一般是當前社區最新的穩定版,自建叢集Hive版本可能會更老,所以匯入的舊版本資料可能不能直接使用。需要執行 Hive的升級指令碼(期間會有表、欄位已存在的問題可以忽略),請參見Hive升級指令碼。例如Hive從1.2 升級到2.3.0,需要依次執行upgrade-1.2.0-to-2.0.0.mysql.sql、upgrade-2.0.0-to-2.1.0.mysql.sql、upgrade-2.1.0-to-2.2.0.mysql.sql、upgrade-2.2.0-to-2.3.0.mysql.sql。指令碼主要是建表,加欄位,改內容,如有表已存在,欄位已存在的異常可以忽略。
- Meta資料全部訂正後,就可以重啟metaserver了。命令列輸入
hive,查詢庫和表、查詢資料、驗證資料的正確性。
- 將新叢集的中繼資料庫刪除,直接輸出命令
Flume資料移轉
- Flume雙寫配置
在新叢集上也開啟flume服務,並且將資料按照和老叢集完全一致的規則寫入到新叢集中。
- Flume分區表寫入
Flume資料雙寫,雙寫時需控制開始的時機,要保證flume在開始一個新的時間分區的時候來進行新叢集的同步。 如flume每小時整點會同步所有的表,那就要整點之前,開啟flume同步服務,這樣flume在一個新的小時內寫入的資料,在舊叢集和新叢集上是完全一致的。而不完整的舊資料在distcp的時候,全量的同步會覆蓋它。而開啟雙寫時間點後的新資料,在資料同步的時候不進行同步。 這個新的寫入的資料,我們在劃分資料階段,記得不要放到資料同步的目錄裡。
作業同步
Hadoop、Hive、Spark或MR等如果有較大的版本升級,可能涉及作業改造,要視具體情況而定。
- Gateway OOM
修改 /etc/ecm/hive-conf/hive-env.sh。
export HADOOP_HEAPSIZE=512改成1024。
- 作業執行記憶體不足
set mapreduce.map.java.opts=-Xmx3072mmapreduce.map.java.opts調整的是啟動JVM虛擬機器時,傳遞給虛擬機器的啟動參數,而預設值-Xmx3072m表示這個Java程式可以使用的最大堆記憶體數,一旦超過這個大小,JVM就會拋出Out of Memory異常,並終止進程。set mapreduce.map.memory.mb=3840mapreduce.map.memory.mb設定的是Container的記憶體上限,這個參數由NodeManager讀取並進行控制,當Container的記憶體大小超過了這個參數值,NodeManager會負責終止Container。
資料校正
由客戶自行抽檢報表完成。
Presto叢集遷移
如果有單獨的Presto叢集僅僅用來做資料查詢,需要修改 Hive 中設定檔,請參見Presto文檔。
- connector.name=hive-hadoop2
- hive.metastore.uri=thrift://E-MapReduce-header-1.cluster-500148414:9083
- hive.config.resources=/etc/ecm/hadoop-conf/core-site.xml, /etc/ecm/hadoop-conf/hdfs-site.xml
- hive.allow-drop-table=true
- hive.allow-rename-table=true
- hive.recursive-directories=true
附錄
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-1.2.0-to-2.0.0.mysql.sql
CREATE TABLE COMPACTION_QUEUE (
CQ_ID bigint PRIMARY KEY,
CQ_DATABASE varchar(128) NOT NULL,
CQ_TABLE varchar(128) NOT NULL,
CQ_PARTITION varchar(767),
CQ_STATE char(1) NOT NULL,
CQ_TYPE char(1) NOT NULL,
CQ_WORKER_ID varchar(128),
CQ_START bigint,
CQ_RUN_AS varchar(128),
CQ_HIGHEST_TXN_ID bigint,
CQ_META_INFO varbinary(2048),
CQ_HADOOP_JOB_ID varchar(32)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE TXNS (
TXN_ID bigint PRIMARY KEY,
TXN_STATE char(1) NOT NULL,
TXN_STARTED bigint NOT NULL,
TXN_LAST_HEARTBEAT bigint NOT NULL,
TXN_USER varchar(128) NOT NULL,
TXN_HOST varchar(128) NOT NULL,
TXN_AGENT_INFO varchar(128),
TXN_META_INFO varchar(128),
TXN_HEARTBEAT_COUNT int
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE HIVE_LOCKS (
HL_LOCK_EXT_ID bigint NOT NULL,
HL_LOCK_INT_ID bigint NOT NULL,
HL_TXNID bigint,
HL_DB varchar(128) NOT NULL,
HL_TABLE varchar(128),
HL_PARTITION varchar(767),
HL_LOCK_STATE char(1) not null,
HL_LOCK_TYPE char(1) not null,
HL_LAST_HEARTBEAT bigint NOT NULL,
HL_ACQUIRED_AT bigint,
HL_USER varchar(128) NOT NULL,
HL_HOST varchar(128) NOT NULL,
HL_HEARTBEAT_COUNT int,
HL_AGENT_INFO varchar(128),
HL_BLOCKEDBY_EXT_ID bigint,
HL_BLOCKEDBY_INT_ID bigint,
PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID),
KEY HIVE_LOCK_TXNID_INDEX (HL_TXNID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE INDEX HL_TXNID_IDX ON HIVE_LOCKS (HL_TXNID);
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-1.2.0-to-2.0.0.mysql.sql
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
CREATE TABLE TXN_COMPONENTS (
TC_TXNID bigint,
TC_DATABASE varchar(128) NOT NULL,
TC_TABLE varchar(128),
TC_PARTITION varchar(767),
FOREIGN KEY (TC_TXNID) REFERENCES TXNS (TXN_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.0.0-to-2.1.0.mysql.sql
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
CREATE TABLE IF NOT EXISTS `NOTIFICATION_LOG`
(
`NL_ID` BIGINT(20) NOT NULL,
`EVENT_ID` BIGINT(20) NOT NULL,
`EVENT_TIME` INT(11) NOT NULL,
`EVENT_TYPE` varchar(32) NOT NULL,
`DB_NAME` varchar(128),
`TBL_NAME` varchar(128),
`MESSAGE` mediumtext,
PRIMARY KEY (`NL_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE IF NOT EXISTS `PARTITION_EVENTS` (
`PART_NAME_ID` bigint(20) NOT NULL,
`DB_NAME` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
`EVENT_TIME` bigint(20) NOT NULL,
`EVENT_TYPE` int(11) NOT NULL,
`PARTITION_NAME` varchar(767) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
`TBL_NAME` varchar(128) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,
PRIMARY KEY (`PART_NAME_ID`),
KEY `PARTITIONEVENTINDEX` (`PARTITION_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE COMPLETED_TXN_COMPONENTS (
CTC_TXNID bigint NOT NULL,
CTC_DATABASE varchar(128) NOT NULL,
CTC_TABLE varchar(128),
CTC_PARTITION varchar(767)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.1.0-to-2.2.0.mysql.sql
source /usr/lib/hive-current/scripts/metastore/upgrade/mysql/upgrade-2.2.0-to-2.3.0.mysql.sql
CREATE TABLE NEXT_TXN_ID (
NTXN_NEXT bigint NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_TXN_ID VALUES(1);
CREATE TABLE NEXT_LOCK_ID (
NL_NEXT bigint NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
INSERT INTO NEXT_LOCK_ID VALUES(1);