Doris データソースは、Doris データベースからの読み取りと書き込みのための双方向チャネルを提供するデータハブです。これにより、大量のデータを迅速に処理できます。このトピックでは、DataWorks における Doris のデータ同期機能について説明します。
サポートされているフィールドタイプ
Doris のバージョンによって、サポートされるデータの型と集計モデルは異なります。各 Doris バージョンのフィールドタイプの完全なリストについては、Doris 公式ドキュメントをご参照ください。次の表に、主要な Doris フィールドタイプのサポート状況を示します。
タイプ | サポートされているモデル | Doris バージョン |
SMALLINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
INT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
BIGINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
LARGEINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
FLOAT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DOUBLE | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DECIMAL | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DECIMALV3 | Aggregate,Unique,Duplicate | 1.2.1+, 2.x |
DATE | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DATETIME | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DATEV2 | Aggregate,Unique,Duplicate | 1.2.x, 2.x |
DATATIMEV2 | Aggregate,Unique,Duplicate | 1.2.x, 2.x |
CHAR | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
VARCHAR | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
STRING | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
VARCHAR | Aggregate,Unique,Duplicate | 1.1.x, 1.2.x, 2.x |
ARRAY | Duplicate | 1.2.x, 2.x |
JSONB | Aggregate,Unique,Duplicate | 1.2.x, 2.x |
HLL | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x |
BITMAP | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x |
QUANTILE_STATE | Aggregate | 1.2.x, 2.x |
前提条件
DataWorks でデータを同期する前に、このトピックで説明されているように Doris 環境を準備する必要があります。これにより、DataWorks で Doris データ同期タスクを正しく設定および実行できます。以下のセクションでは、必要な準備について説明します。
アカウントの作成と権限の設定
データウェアハウスに接続するには、ログインアカウントを作成し、パスワードを設定する必要があります。デフォルトの root ユーザーでログインする場合、このユーザーには初期パスワードが設定されていないため、パスワードを設定する必要があります。Doris で次の SQL 文を実行してパスワードを設定できます:
SET PASSWORD FOR 'root' = PASSWORD('your_password')Doris のネットワーク接続の設定
StreamLoad を使用してデータをインポートするには、フロントエンド (FE) ノードのプライベートエンドポイントにアクセスする必要があります。FE ノードのパブリックエンドポイントにアクセスすると、バックエンド (BE) ノードのプライベート IP アドレスにリダイレクトされます (データ操作に関するよくある質問)。したがって、Doris と、使用する サーバーレスリソースグループまたは Data Integration 専用リソースグループとの間にネットワーク接続を確立する必要があります。これにより、プライベートエンドポイント経由でのアクセスが可能になります。ネットワーク接続の確立方法の詳細については、「ネットワーク接続ソリューション」をご参照ください。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解できます。
以下に、Doris データソースの設定パラメーターを説明します:
[JdbcUrl]:IP アドレス、ポート番号、データベース、接続パラメーターを含む JDBC 接続文字列を入力します。パブリック IP アドレスとプライベート IP アドレスの両方がサポートされています。パブリック IP アドレスを使用する場合は、Data Integration リソースグループが Doris が存在するホストにアクセスできることを確認してください。
[FE endpoint]:FE ノードの IP アドレスとポートを入力します。クラスターに複数の FE ノードがある場合は、各ノードの IP アドレスとポートを入力できます。コンマ (,) で区切ります。例:
ip1:port1,ip2:port2。接続性をテストすると、指定されたすべての FE endpoint がテストされます。[Username]:Doris データベースのユーザー名を入力します。
[Password]:ユーザー名に対応するパスワードを入力します。
データ同期タスクの開発
同期タスクの設定のエントリポイントと手順については、以下の設定ガイドをご参照ください。
手順の詳細については、「コードレス UI でのタスクの設定」および「コードエディタでのタスクの設定」をご参照ください。
コードエディタのパラメーターの完全なリストとスクリプトデモについては、「付録:スクリプトのデモとパラメーター」をご参照ください。
付録:スクリプトのデモとパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディタでのタスクの設定」をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトのデモ
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "doris",// プラグイン名。
"parameter": {
"column": [// 列名。
"id"
],
"connection": [
{
"querySql": [
"select a,b from join1 c join join2 d on c.id = d.id;"
],
"datasource": ""// データソース名。
}
],
"where": "",// フィルター条件。
"splitPk": "",// シャードキー。
"encoding": "UTF-8"// エンコード形式。
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"// エラーレコード数。
},
"speed": {
"throttle": true,// throttle を false に設定すると、mbps パラメーターは有効にならず、レート制限がないことを示します。throttle を true に設定すると、レート制限が適用されます。
"concurrent": 1,// 同時実行ジョブ数。
"mbps": "12"// レート制限。1 mbps は 1 MB/s に相当します。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Reader スクリプトのパラメーター
スクリプトパラメーター名 | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。 | はい | なし |
table | データを同期するソーステーブルの名前。データ統合タスクは、1 つのテーブルからのみデータを読み取ることができます。 次の例は、table パラメーターを使用して範囲を設定する高度な使用法を示しています:
説明 タスクは、一致するすべてのテーブルからデータを読み取ります。具体的には、column パラメーターで指定された列からデータを読み取ります。テーブルまたは指定された列が存在しない場合、タスクは失敗します。 | はい | なし |
column | データを同期するソーステーブルの列。JSON 配列を使用してフィールド情報を記述します。デフォルトでは、すべての列が使用されます。例:
| はい | なし |
splitPk | Doris Reader がデータを抽出する際に、splitPk パラメーターを指定すると、指定されたフィールドに基づいてデータがシャーディングされます。これにより、同時データ同期タスクが可能になり、効率が向上します。
| いいえ | なし |
where | フィルター条件。ビジネスシナリオでは、当日のデータのみを同期したい場合があります。この場合、where 条件を
| いいえ | なし |
querySql (このパラメーターはコードエディタでのみ使用可能です。) | 一部のビジネスシナリオでは、where パラメーターだけではフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルター SQL 文をカスタマイズできます。このパラメーターを設定すると、データ同期システムはテーブル、列、および splitPk パラメーターを無視し、このパラメーターで指定された SQL 文を使用してデータをフィルターします。たとえば、データを同期する前に複数のテーブルを結合するには、 説明 querySql パラメーターは大文字と小文字を区別します。たとえば、querysql と入力した場合、パラメーターは有効になりません。 | いいえ | なし |
Writer スクリプトのデモ
{
"stepType": "doris",// プラグイン名。
"parameter":
{
"postSql":// データ同期タスクの実行後に実行される SQL 文。
[],
"preSql":
[],// データ同期タスクの実行前に実行される SQL 文。
"datasource":"doris_datasource",// データソース名。
"table": "doris_table_name",// テーブル名。
"column":
[
"id",
"table_id",
"table_no",
"table_name",
"table_status"
],
"loadProps":{
"column_separator": "\\x01",// CSV 形式の列区切り文字。
"line_delimiter": "\\x02"// CSV 形式の行区切り文字。
}
},
"name": "Writer",
"category": "writer"
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。コードエディタでデータソースを追加できます。このパラメーターの値は、追加されたデータソースの名前と同じである必要があります。 | はい | なし |
table | データを同期する宛先テーブルの名前。 | はい | なし |
column | データを書き込む宛先テーブルの列。列名をコンマ (,) で区切ります。例: | はい | なし |
preSql | データ同期タスクの実行前に実行される SQL 文。コードレス UI では 1 つの SQL 文のみ実行できます。コードエディタでは複数の SQL 文を実行できます。たとえば、タスク実行前にテーブルから古いデータをクリアできます。 | いいえ | なし |
postSql | データ同期タスクの実行後に実行される SQL 文。コードレス UI では 1 つの SQL 文のみ実行できます。コードエディタでは複数の SQL 文を実行できます。たとえば、タスク実行後にタイムスタンプを追加できます。 | いいえ | なし |
maxBatchRows | インポートされるデータの各バッチの最大行数。このパラメーターと batchSize は、各バッチのインポートサイズを制御します。バッチ内のデータがいずれかのしきい値に達すると、そのバッチのインポートが開始されます。 | いいえ | 500000 |
batchSize | インポートされるデータの各バッチの最大データ量。このパラメーターと maxBatchRows は、各バッチのインポートサイズを制御します。バッチ内のデータがいずれかのしきい値に達すると、そのバッチのインポートが開始されます。 | いいえ | 104857600 |
maxRetries | バッチインポートが失敗した後の再試行回数。 | いいえ | 3 |
labelPrefix | アップロードされる各ファイルのラベルプレフィックス。最終的なラベルは、 | いいえ | datax_doris_writer_ |
loadProps | StreamLoad のリクエストパラメーター。このパラメーターは主に、インポートされるデータのフォーマットを設定するために使用されます。デフォルトでは、データは CSV 形式でインポートされます。loadProps パラメーターを設定しない場合、デフォルトの CSV 形式が使用されます。列区切り文字は JSON 形式でデータをインポートする場合は、次の設定を使用します。 | いいえ | なし |
集計タイプのスクリプト (Doris Writer による集計タイプへの書き込み)
Doris Writer は、集計タイプのデータの書き込みをサポートしています。ただし、コードエディタで追加の設定を行う必要があります。次の例は、必要な設定を示しています。
たとえば、次の Doris テーブルを考えます。uuid 列は bitmap タイプ (集計タイプ) で、sex 列は HLL タイプ (集計タイプ) です。
CREATE TABLE `example_table_1` (
`user_id` int(11) NULL,
`date` varchar(10) NULL DEFAULT "10.5",
`city` varchar(10) NULL,
`uuid` bitmap BITMAP_UNION NULL, -- 集計タイプ
`sex` HLL HLL_UNION -- 集計タイプ
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32テーブルに次の生データを挿入します:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'Doris Writer を使用して集計タイプのデータを書き込む場合、writer.parameter.column で列を指定し、writer.parameter.loadProps.columns で集計関数を設定する必要があります。たとえば、uuid 列には bitmap_hash 集計関数を、sex 列には hll_hash 集計関数を使用できます。
次のコードはスクリプトの例です:
{
"stepType": "doris",// プラグイン名。
"writer":
{
"parameter":
{
"column":
[
"user_id",
"date",
"city",
"uuid",// bitmap 集計タイプ
"sex"// HLL 集計タイプ
],
"loadProps":
{
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02",
"columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 集計関数を指定する必要があります。
},
"postSql":
[
"select count(1) from example_tbl_3"
],
"preSql":
[],
"datasource":"doris_datasource",// データソース名。
"table": "doris_table_name",// テーブル名。
}
"name": "Writer",
"category": "writer"
}
}