StarRocks データソースを使用すると、StarRocks のデータの読み取りと書き込みができます。このトピックでは、DataWorks がサポートする StarRocks のデータ同期機能について説明します。
サポートされるバージョン
EMR Serverless StarRocks のすべてのバージョンがサポートされています。
E-MapReduce on ECS:StarRocks 2.1。
StarRocks Community Edition がサポートされています。
説明DataWorks は内部ネットワーク経由でのみ StarRocks に接続します。したがって、StarRocks の Community Edition は E-MapReduce on ECS にデプロイする必要があります。
このデータソースで互換性の問題が発生した場合は、チケットを送信してください。
制限事項
MySQL から StarRocks へのデータベース全体のリアルタイム同期では、ターゲットの StarRocks テーブルはプライマリキーモデルを使用する必要があります。
MySQL から StarRocks へデータベース全体のリアルタイム同期を実行する場合、TRUNCATE 以外の DDL 操作はサポートされません。これらの DDL 操作を無視するか、タスクがエラーを報告するように設定することができます。
サポートされるデータ型
数値、文字列、日付のデータ型のみがサポートされています。
ネットワーク接続
EMR Serverless StarRocks
ネットワーク接続を確保するには、DataWorks リソースグループの IP アドレスを EMR Serverless StarRocks インスタンスの IP アドレス許可リストに追加する必要があります。
DataWorks リソースグループの IP アドレスについては、「一般的な設定:許可リストに IP アドレスを追加」をご参照ください。
EMR コンソールで、EMR Serverless StarRocks インスタンスの許可リストに IP アドレスを追加できます。

自己管理型 StarRocks
DataWorks リソースグループが、ご利用の StarRocks インスタンスのクエリポート、FE ポート、および BE ポートにアクセスできることを確認してください。デフォルトのポートは 9030、8030、8040 です。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメーターの説明を参照して、各パラメーターの意味を理解することができます。
ネットワーク環境に基づいて StarRocks の接続モードを選択します:
ユースケース 1:内部ネットワーク接続 (推奨)
内部ネットワーク接続は、パブリックネットワークアクセスを必要とせず、低レイテンシーと高セキュリティを提供します。
ユースケース:ご利用の StarRocks インスタンスと Serverless リソースグループが同じ VPC 内にある場合。
Alibaba Cloud インスタンスモードと接続文字列モードの両方がサポートされています:
ApsaraDB for RDS を選択:同じ VPC 内の StarRocks インスタンスを直接選択します。システムが自動的に接続情報を取得します。
User-created Data Store with Public IP Addresses を選択:インスタンスの内部アドレスまたは IP アドレス、ポート、および Load URL を手動で入力します。
ユースケース 2:パブリックネットワーク接続
パブリックネットワーク経由のデータ転送にはセキュリティリスクが伴います。IP 許可リストや IP ベースの認証などのセキュリティ制御を使用してください。
ユースケース:リージョン間アクセスやオンプレミス環境からなど、パブリックネットワーク経由で StarRocks インスタンスにアクセスする必要がある場合。
接続文字列モードのみがサポートされています。ご利用の StarRocks インスタンスでパブリックネットワークアクセスが有効になっていることを確認してください:
User-created Data Store with Public IP Addresses を選択:インスタンスのパブリックアドレスまたは IP アドレス、ポート、および Load URL を手動で入力します。
デフォルトでは、Serverless リソースグループはパブリックネットワークにアクセスできません。パブリックエンドポイントを使用して StarRocks インスタンスに接続するには、NAT Gateway と Elastic IP アドレス (EIP) をバインドされた VPC に設定して、パブリックネットワークアクセスを有効にする必要があります。また、Serverless リソースグループが、ご利用の StarRocks インスタンスのクエリポート、FE ポート、および BE ポートにアクセスできることを確認する必要があります。デフォルトのポートは 9030、8030、8040 です。
Alibaba Cloud EMR StarRocks Serverless を使用している場合は、ホストアドレス/IP を Internal Endpoint または Public network address に設定し、ポートを [クエリポート] に設定します。
FE:インスタンス詳細ページで FE 情報を取得できます。

データベース:EMR StarRocks Manager を使用してインスタンスに接続した後、SQL Editor または メタデータ管理 で対応するデータベースを表示できます。
説明データベースを作成するには、SQL エディターで直接 SQL コマンドを実行します。
データ同期タスクの開発
同期タスクの設定のエントリーポイントと手順については、以下の設定ガイドをご参照ください。
単一テーブルのバッチ同期
サポートされるデータソース:データ統合モジュールでサポートされるすべてのデータソースタイプ。
手順:詳細については、「コードレス UI の使用」および「コードエディターの使用」をご参照ください。
コードエディターの全パラメーターリストとコード例については、「付録:コードとパラメーター」をご参照ください。
単一テーブルのリアルタイム同期
サポートされるデータソース:Kafka
ガイド:「データベース全体のリアルタイム同期タスクの設定」
データベース全体のバッチ同期
サポートされるデータソース:MySQL
ガイド:「データベース全体のリアルタイム同期タスクの設定」
データベース全体のリアルタイム同期
サポートされるデータソース:MySQL、Oracle、および PolarDB
ガイド:「データベース全体のリアルタイム同期タスクの設定」
付録:コードとパラメーター
コードエディターを使用したバッチ同期タスクの設定
コードエディターを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディターの使用」をご参照ください。以下では、コードエディターを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader コード例
{
"stepType": "starrocks",
"parameter": {
"selectedDatabase": "didb1",
"datasource": "starrocks_datasource",
"column": [
"id",
"name"
],
"where": "id>100",
"table": "table1",
"splitPk": "id"
},
"name": "Reader",
"category": "reader"
}Reader パラメーター
パラメーター | 説明 | 必須 | デフォルト |
datasource | StarRocks データソースの名前。 | はい | なし |
selectedDatabase | StarRocks データベースの名前。 | いいえ | StarRocks データソースを設定したときに指定したデータベース名。 |
column | 同期対象として設定されたテーブル内の列名のコレクション。StarRocks からデータを読み取る際に SET_VAR ヒントを追加したい場合は、column の最初の列名の前にヒントを追加できます。例えば、同期する列が id で、 | はい | なし |
where | 実際のビジネスシナリオでは、当日のデータを同期するための一般的なフィルターは、where 句を
| いいえ | なし |
table | ソーステーブル。 | はい | なし |
splitPk | StarRocks Reader がデータを抽出する際に、splitPk パラメーターを指定して、splitPk に指定されたフィールドに基づいてデータをシャーディングできます。これにより、同時データ同期タスクが開始され、効率が向上します。テーブルのプライマリキーは通常均等に分散されているため、結果として得られるシャードでのデータホットスポットを防ぐのに役立つため、分割キーとしてテーブルのプライマリキーを使用することを推奨します。 | いいえ | なし |
Writer コード例
{
"stepType": "starrocks",
"parameter": {
"selectedDatabase": "didb1",
"loadProps": {
"row_delimiter": "",
"column_separator": ""
},
"datasource": "starrocks_public",
"column": [
"id",
"name"
],
"loadUrl": [
"1.1.X.X:8030"
],
"table": "table1",
"preSql": [
"truncate table table1"
],
"postSql": [
],
"maxBatchRows": 500000,
"maxBatchSize": 5242880,
"strategyOnError": "exit"
},
"name": "Writer",
"category": "writer"
}Writer パラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | StarRocks データソースの名前。 | はい | なし |
selectedDatabase | StarRocks データベースの名前。 | いいえ | StarRocks データソースを設定したときに指定したデータベース名。 |
loadProps | StarRocks StreamLoad ジョブのリクエストパラメーター。CSV 形式でデータをインポートするためのインポートパラメーターを設定できます。特別な設定が不要な場合は、このパラメーターを
データに \t や \n が含まれている場合は、他の文字を区切り文字として指定する必要があります。特殊文字の使用例を以下に示します: StreamLoad は JSON 形式でのデータインポートもサポートしています。format パラメーターを json に設定できます: JSON 形式では以下のパラメーターを設定できます:
| はい | なし |
column | データを書き込む先の列。 | はい | なし |
loadUrl | StarRocks FrontEnd の IP と HTTP ポート (デフォルトは | はい | なし |
table | ターゲットテーブル。 | はい | なし |
preSql | 同期タスクが開始される前に実行する SQL 文。例えば、TRUNCATE TABLE tablename を使用して、テーブル内の既存のデータをクリアできます。 | いいえ | なし |
postSql | 同期タスクが終了した後に実行する SQL 文。 | いいえ | なし |
maxBatchRows | 書き込みバッチあたりの最大行数。 | いいえ | 500000 |
maxBatchSize | 書き込みバッチあたりの最大データサイズ (バイト単位)。 | いいえ | 5242880 |
strategyOnError | バッチ書き込み中の例外を処理するためのポリシー。 有効な値:
デフォルト値: | いいえ | exit |