Doris データソースを使用すると、Doris データベースからデータを読み取ったり、Doris データベースにデータを書き込んだりして、大規模なデータ処理を行うことができます。このトピックでは、DataWorks を使用して Doris とデータを同期する方法について説明します。
サポートされるデータの型
Doris はバージョンによって、サポートされるデータ型と集約モデルが異なります。各 Doris バージョンでサポートされているすべてのデータ型の詳細については、「Doris の公式ドキュメント」をご参照ください。次の表では、主にサポートされているデータ型について説明します。
データの型 | サポートされるモデル | Doris バージョン |
SMALLINT | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
INT | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
BIGINT | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
LARGEINT | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
FLOAT | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
DOUBLE | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
DECIMAL | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
DECIMALV3 | Aggregate、Unique、Duplicate | 1.2.1 以降のバージョン、2.x |
DATE | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
DATETIME | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
DATEV2 | Aggregate、Unique、Duplicate | 1.2.x、2.x |
DATATIMEV2 | Aggregate、Unique、Duplicate | 1.2.x、2.x |
CHAR | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
VARCHAR | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
STRING | Aggregate、Unique、Duplicate | 0.x.x、1.1.x、1.2.x、2.x |
VARCHAR | Aggregate、Unique、Duplicate | 1.1.x、1.2.x、2.x |
ARRAY | Duplicate | 1.2.x、2.x |
JSONB | Aggregate、Unique、Duplicate | 1.2.x、2.x |
HLL | Aggregate | 0.x.x、1.1.x、1.2.x、2.x |
BITMAP | Aggregate | 0.x.x、1.1.x、1.2.x、2.x |
QUANTILE_STATE | Aggregate | 1.2.x、2.x |
データ同期の前に ApsaraDB for OceanBase 環境を準備する
DataWorks を使用して Doris データソースにデータを同期する前に、Doris 環境を準備する必要があります。これにより、データ同期タスクを期待どおりに設定し、Doris データソースにデータを同期できるようになります。以下では、Doris データソースへのデータ同期のために Doris 環境を準備する方法について説明します。
アカウントの作成と権限の付与
後続の操作のために Doris データベースにログインするためのアカウントを作成する必要があります。Doris データベースへの後続の接続のために、アカウントのパスワードを指定する必要があります。Doris のデフォルトの root ユーザーを使用して Doris データベースにログインする場合は、root ユーザーのパスワードを指定する必要があります。デフォルトでは、root ユーザーにはパスワードがありません。Doris で SQL ステートメントを実行してパスワードを指定できます:
SET PASSWORD FOR 'root' = PASSWORD('Password')Doris のネットワーク接続の設定
StreamLoad メソッドを使用してデータを書き込むには、FE ノードのプライベート IP アドレスにアクセスする必要があります。FE ノードのパブリック IP アドレスにアクセスすると、BE ノードのプライベート IP アドレスにリダイレクトされます。リダイレクトの詳細については、「データ操作の問題」をご参照ください。この場合、リソースグループが内部ネットワーク経由でデータソースにアクセスできるようにするには、データソースと サーバーレスリソースグループ またはデータ統合専用リソースグループとの間にネットワーク接続を確立する必要があります。Doris データベースとリソースグループの間にネットワーク接続を確立する方法の詳細については、「ネットワーク接続ソリューション」をご参照ください。
データソースの追加
DataWorks で同期タスクを開発する前に、データソース管理の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターの説明を表示して、各パラメーターの意味を理解できます。
Doris データソースの以下の設定項目に関する設定要件に注意してください:
JdbcUrl:JDBC 接続文字列を入力します。これには、IP アドレス、ポート番号、データベース、および接続パラメーターが含まれます。パブリック IP アドレスとプライベート IP アドレスの両方がサポートされています。パブリック IP アドレスを使用する場合は、Data Integration リソースグループが Doris インスタンスが配置されているホストにアクセスできることを確認してください。
FE endpoint:FE ノードの IP アドレスとポートを入力します。クラスターに複数の FE ノードがある場合は、
ip1:port1,ip2:port2のように、カンマで区切って複数のエンドポイントを入力できます。接続をテストする際、DataWorks は指定されたすべての FE エンドポイントへの接続性をテストします。Username:Doris データベースへのアクセスに使用するユーザー名を入力します。
Password:ユーザー名に対応するパスワードを入力します。
データ同期タスクの開発
同期タスクを設定するためのエントリーポイントと手順については、以下の設定ガイドをご参照ください。
設定手順の詳細については、コードレス UI を使用したバッチ同期タスクの設定およびコードエディタを使用したバッチ同期タスクの設定をご参照ください。
コードエディタを使用してバッチ同期タスクを設定する際に設定されるすべてのパラメーターと実行されるコードについては、付録:コードとパラメーターをご参照ください。
付録:コードとパラメーター
コードエディタを使用したバッチ同期タスクの設定
コードエディタを使用してバッチ同期タスクを設定する場合、統一されたスクリプトフォーマットの要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、コードエディタの使用をご参照ください。以下では、コードエディタを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。
Reader スクリプトのデモ
{
"type": "job",
"version": "2.0",// バージョン番号。
"steps": [
{
"stepType": "doris",// プラグイン名。
"parameter": {
"column": [// 列名。
"id"
],
"connection": [
{
"querySql": [
"select a,b from join1 c join join2 d on c.id = d.id;"
],
"datasource": ""// データソース名。
}
],
"where": "",// WHERE 句。
"splitPk": "",// シャードキー。
"encoding": "UTF-8"// エンコード形式。
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"// 許容されるダーティデータレコードの最大数。
},
"speed": {
"throttle": true,// 速度制限を有効にするかどうかを指定します。false は速度制限が無効であることを示し、true は速度制限が有効であることを示します。mbps パラメーターは、throttle パラメーターが true に設定されている場合にのみ有効になります。
"concurrent": 1,// 最大並列スレッド数。
"mbps": "12"// 最大転送レート。単位:MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Reader スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加したデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。各同期タスクは、1 つのテーブルからのみデータを同期するために使用できます。 シャードテーブルの場合、table パラメーターを使用して、データを読み取るパーティションを指定できます。例:
説明 Doris Reader は、table パラメーターで指定されたパーティション内の column パラメーターで指定された列からデータを読み取ります。指定されたパーティションまたは列が存在しない場合、同期タスクは失敗します。 | はい | デフォルト値なし |
column | 同期する列。列は JSON 配列で記述します。デフォルトでは、すべての列が同期されます。たとえば、
| はい | デフォルト値なし |
splitPk | 読み取りパフォーマンスを向上させるために、splitPk パラメーターを使用してシャードキーを指定できます。Data Integration はこのキーを使用してデータをパーティション分割し、同時実行タスクを実行します。
| いいえ | デフォルト値なし |
where | フィルター条件。多くのビジネスシナリオでは、現在の日付のデータのみを同期したい場合があります。これを行うには、where 条件を
| いいえ | デフォルト値なし |
querySql (高度なパラメーター、コードエディタでのみ利用可能) | 一部のビジネスシナリオでは、where パラメーターだけでは目的のフィルター条件を記述するのに不十分な場合があります。このパラメーターを使用して、フィルタリング用のカスタム SQL クエリを指定できます。このパラメーターを設定すると、Data Integration は table、column、where、および splitPk パラメーターを無視し、カスタムクエリを使用してデータを取得します。たとえば、同期前に複数のテーブルを結合するには、 説明 querySql パラメーターの名前は、大文字と小文字を区別します。たとえば、querysql は有効になりません。 | いいえ | デフォルト値なし |
Writer スクリプトのデモ
{
"stepType": "doris",// プラグイン名。
"parameter":
{
"postSql":// 同期タスクの実行後に実行する SQL ステートメント。
[],
"preSql":
[],// 同期タスクの実行前に実行する SQL ステートメント。
"datasource":"doris_datasource",// データソース名。
"table": "doris_table_name",// テーブル名。
"column":
[
"id",
"table_id",
"table_no",
"table_name",
"table_status"
],
"loadProps":{
"column_separator": "\\x01",// CSV 形式のデータの列区切り文字。
"line_delimiter": "\\x02"// CSV 形式のデータの行区切り文字。
}
},
"name": "Writer",
"category": "writer"
}Writer スクリプトのパラメーター
パラメーター | 説明 | 必須 | デフォルト値 |
datasource | データソースの名前。追加したデータソースの名前と同じである必要があります。コードエディタを使用してデータソースを追加できます。 | はい | デフォルト値なし |
table | データを読み取るテーブルの名前。 | はい | デフォルト値なし |
column | データを書き込む宛先列。列名を配列で指定します。例: | はい | デフォルト値なし |
preSql | データ同期タスクが開始される前に実行する SQL ステートメント。コードレス UI では、1 つの SQL ステートメントしか実行できません。コードエディタでは、複数の SQL ステートメントを実行できます。たとえば、これらのステートメントを使用して、テーブルから既存のデータをクリアできます。 | いいえ | デフォルト値なし |
postSql | 同期タスクの実行後に実行する SQL ステートメント。たとえば、このパラメーターをタイムスタンプを追加するために使用される SQL ステートメントに設定できます。コードレス UI では 1 つの SQL ステートメントしか実行できず、コードエディタでは複数の SQL ステートメントを実行できます。 | いいえ | デフォルト値なし |
maxBatchRows | 一度に宛先テーブルに書き込むことができる最大行数。このパラメーターと batchSize パラメーターの両方で、一度に宛先テーブルに書き込むことができるデータレコードの数が決まります。キャッシュされたデータがいずれかのパラメーターの値に達するたびに、ライターは宛先テーブルへのデータの書き込みを開始します。 | いいえ | 500000 |
batchSize | 一度に宛先テーブルに書き込むことができるデータの最大量。このパラメーターと maxBatchRows パラメーターの両方で、一度に宛先テーブルに書き込むことができるデータレコードの数が決まります。キャッシュされたデータがいずれかのパラメーターの値に達するたびに、ライターは宛先テーブルへのデータの書き込みを開始します。 | いいえ | 104857600 |
maxRetries | 一度に複数のデータレコードを宛先テーブルに書き込むのに失敗した後に許可される最大再試行回数。 | いいえ | 3 |
labelPrefix | アップロードされる各ファイルバッチのラベルプレフィックス。最終的なラベルは | いいえ | datax_doris_writer_ |
loadProps | StreamLoad のリクエストパラメーターで、主にインポートデータ形式を設定するために使用されます。デフォルトでは、データは CSV 形式でインポートされます。loadProps パラメーターが設定されていない場合、デフォルトの CSV 形式が使用され、列区切り文字として JSON 形式でデータを書き込む場合は、次の設定を使用します: | いいえ | デフォルト値なし |
集約型のデータの書き込み
Doris Writer を使用して、特定の集約型の列にデータを書き込むことができます。Doris Writer を使用して特定の集約型の列にデータを書き込む場合は、追加のパラメーターを設定する必要があります。
たとえば、次の Doris テーブルでは、uuid は bitmap 型 (集約型) であり、sex は HLL 型 (集約型) です。
CREATE TABLE `example_table_1` (
`user_id` int(11) NULL,
`date` varchar(10) NULL DEFAULT "10.5",
`city` varchar(10) NULL,
`uuid` bitmap BITMAP_UNION NULL, -- 集約型
`sex` HLL HLL_UNION -- 集約型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32テーブルに生データを挿入します:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'Doris Writer を使用して集約型の列にデータを書き込む場合、writer.parameter.column で列を指定し、writer.parameter.loadProps.columns で集計関数を設定する必要があります。たとえば、uuid 列には集計関数 bitmap_hash を使用し、sex 列には集計関数 hll_hash を使用する必要があります。
サンプルコード:
{
"stepType": "doris",// プラグイン名。
"writer":
{
"parameter":
{
"column":
[
"user_id",
"date",
"city",
"uuid",// 集約型は bitmap です。
"sex"// 集約型は HLL です。
],
"loadProps":
{
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02",
"columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 集計関数を指定する必要があります。
},
"postSql":
[
"select count(1) from example_tbl_3"
],
"preSql":
[],
"datasource":"doris_datasource",// データソース名。
"table": "doris_table_name",// テーブル名。
}
"name": "Writer",
"category": "writer"
}
}