DataHub データソースは、DataHub との間のデータ読み取りおよび書き込みのための双方向チャネルを提供します。このドキュメントでは、DataWorks でこのデータソースのデータ同期を構成する方法について説明します。
対応バージョン
DataHub Reader は、DataHub Java SDK を使用してデータを読み取ります。以下の SDK バージョンを使用します。
<dependency> <groupId>com.aliyun.DataHub</groupId> <artifactId>aliyun-sdk-DataHub</artifactId> <version>2.9.1</version> </dependency>DataHub Writer は、DataHub Java SDK を使用してデータを書き込みます。以下の SDK バージョンを使用します。
<dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>aliyun-sdk-datahub</artifactId> <version>2.5.1</version> </dependency>
制限事項
バッチデータ読み書き
STRING データの型は UTF-8 エンコーディングのみをサポートしています。単一の STRING フィールドのサイズは最大 1 MB です。
リアルタイムデータ読み書き
リアルタイム同期タスクは、サーバーレスリソースグループをサポートしています。
システムは、同じハッシュ値を持つデータを同じシャードに同期します。
リアルタイム全データベース書き込み
タスクの開始時に、バッチプロセスが完全データを DataHub に書き込みます。完全データ同期が完了すると、リアルタイムプロセスがソースから宛先への増分データの同期を開始します。
データは DataHub の TUPLE 型の Topic にのみ書き込むことができます。TUPLE データの型の詳細については、「データの型」をご参照ください。
DataHub にデータをリアルタイムで同期する場合、ソーステーブルフィールドに 5 つの追加フィールドが追加されます。タスクを構成する際に、カスタムフィールドを追加することもできます。DataHub に送信されるメッセージフォーマットの詳細については、「付録: メッセージフォーマット」をご参照ください。
サポートされているフィールドの型
DataHub とデータを同期すると、フィールドは対応するデータの型にマップされます。DataHub は、次のデータの型のみをサポートします: BIGINT、STRING、BOOLEAN、DOUBLE、TIMESTAMP、および DECIMAL。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの意味を理解するために、パラメーターの説明を表示できます。
データ同期タスクの開発
同期タスクの構成のエントリポイントとプロシージャについては、以下の構成ガイドをご参照ください。
単一テーブルバッチ同期
プロシージャについては、「コードレス UI の使用」および「コードエディタの使用」をご参照ください。
パラメーターの完全なリストとコードエディタのサンプルスクリプトについては、「付録: コードとパラメーター」をご参照ください。
単一テーブルリアルタイム同期
プロシージャについては、「DataStudio でリアルタイム同期タスクを構成する」をご参照ください。
DataHub の異なるデータの型、シャーディングポリシー、データフォーマット、およびサンプルメッセージでサポートされている操作の詳細については、「付録: メッセージフォーマット」をご参照ください。
全データベースリアルタイム同期
プロシージャについては、「リアルタイム全データベース同期タスクの構成」をご参照ください。
よくある質問
単一リクエストのデータサイズが制限を超えたために DataHub への書き込み操作が失敗した場合、どうすればよいですか?
付録: コードとパラメーター
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプトフォーマット要件に基づいてスクリプト内の関連パラメーターを構成する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。
Reader スクリプト例
{
"type":"job",
"version":"2.0",// バージョン番号。
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx", // DataHub エンドポイント。
"accessId": "xxx", // DataHub にアクセスするために使用される AccessKey ID。
"accessKey": "xxx", // DataHub にアクセスするために使用される AccessKey Secret。
"project": "xxx", // 宛先 DataHub プロジェクトの名前。
"topic": "xxx", // 宛先 DataHub Topic の名前。
"batchSize": 1000, // 一度に読み取るレコード数。
"beginDateTime": "20180910111214", // データ消費の開始時刻。
"endDateTime": "20180910111614", // データ消費の終了時刻。
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"// 許可される最大エラー数。
},
"speed":{
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは無視され、スロットルは適用されません。throttle が true に設定されている場合、スロットルが有効になります。
"concurrent":1,// 同時スレッド数。
"mbps":"12"// 最大データ転送レート。1 mbps = 1 MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}Reader パラメーター
パラメーター | 説明 | 必須 |
endpoint | DataHub の エンドポイント。 | はい |
accessId | DataHub にアクセスするために使用される AccessKey ID。 | はい |
accessKey | DataHub にアクセスするために使用される AccessKey Secret。 | はい |
project | ターゲット DataHub 内のプロジェクトの名前。プロジェクトは、リソースの隔離とコントロールに使用される DataHub のリソース管理単位です。 | はい |
topic | 宛先 DataHub Topic の名前。 | はい |
batchSize | 一度に読み取るレコード数。デフォルト値: 1,024。 | いいえ |
beginDateTime | yyyyMMddHHmmss フォーマットでのデータ消費の開始時刻。時間範囲は開始時刻を含み、終了時刻を含みません。データを増分同期するには、このパラメーターを DataWorks の スケジューリングパラメーターと共に使用できます。たとえば、パラメーター名を bizdate に、値を $[yyyymmdd-1] に設定できます。次に、beginDateTime を ${bizdate}000000 に設定します。これにより、開始時刻が前日の 00:00:00 に構成されます。 説明 beginDateTime と endDateTime パラメーターは一緒に使用する必要があります。 | はい |
endDateTime | yyyyMMddHHmmss フォーマットでのデータ消費の終了時刻。このタイムスタンプは、時間範囲の排他的境界です。データを増分同期するには、このパラメーターを DataWorks の スケジューリングパラメーターと共に使用できます。たとえば、パラメーター名を bizdate に、値を $[yyyymmdd-1] に設定できます。次に、endDateTime を ${bizdate}235959 に設定します。これにより、終了時刻が前日の 23:59:59 に構成されます。 説明 beginDateTime と endDateTime パラメーターは一緒に使用する必要があります。 | はい |
Writer スクリプト例
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",// プラグイン名。
"parameter": {
"datasource": "",// データソース。
"topic": "",// Topic は、DataHub におけるサブスクリプションと公開の基本単位です。Topic を使用して、特定のタイプまたはカテゴリのストリーミングデータを表すことができます。
"maxRetryCount": 500,// タスクが失敗した場合の最大再試行回数。
"maxCommitSize": 1048576// バッファーがこのサイズ (バイト単位) に達すると、データはバッチでコミットされます。DataHub のリクエストあたりのレコード制限は 10,000 です。エラーを防止するために、この値が (平均レコードサイズ * 10,000) 未満であることを確認してください。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""// 許可される最大エラー数。
},
"speed": {
"throttle":true,// throttle が false に設定されている場合、mbps パラメーターは無視され、スロットルは適用されません。throttle が true に設定されている場合、スロットルが有効になります。
"concurrent":20, // 同時スレッド数。
"mbps":"12"// 最大データ転送レート。1 mbps = 1 MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Writer パラメーター
パラメーター | 説明 | 必須 | デフォルト |
accessId | DataHub の AccessKey ID。 | はい | なし |
accessKey | DataHub の AccessKey Secret。 | はい | なし |
endPoint | リソースが配置されている DataHub サービスのエンドポイント。 | はい | なし |
maxRetryCount | タスクが失敗した場合の最大再試行回数。 | いいえ | なし |
mode | STRING データの型の値の書き込みモード。 | はい | なし |
parseContent | コンテンツを解析するかどうかを指定します。 | はい | なし |
project | プロジェクトは、DataHub のデータに対する基本的な組織単位です。プロジェクトには複数の Topic が含まれます。 説明 DataHub プロジェクトは MaxCompute プロジェクトとは独立しています。MaxCompute プロジェクトを DataHub で再利用することはできません。 | はい | なし |
topic | DataHub では、Topic はデータ公開とサブスクリプションの基本単位であり、ストリーミングデータのカテゴリを表します。 | はい | なし |
maxCommitSize | 書き込みパフォーマンスを向上させるために、DataX はデータをバッファーし、maxCommitSize (バイト単位) で指定されたサイズにバッファーが達すると、バッチでデータを書き込みます。DataHub はリクエストあたり最大 10,000 レコードを許可します。エラーを防止するために、このパラメーターを平均レコードサイズに基づいて 10,000 レコードの合計サイズよりも小さい値に設定してください。 | いいえ | 1 MB |