MaxCompute カタログを設定すると、フルマネージド Flink でのジョブ開発中に、テーブルスキーマを定義することなく、MaxCompute に保存されているテーブルに直接アクセスできます。このトピックでは、フルマネージド Flink で MaxCompute カタログを作成、表示、使用、削除する方法について説明します。
背景情報
MaxCompute カタログは、MaxCompute サービスにクエリを実行して、MaxCompute に保存されている物理テーブルのスキーマ情報を取得します。これにより、Flink SQL で MaxCompute テーブルのスキーマを宣言することなく、特定のフィールド情報を取得できます。MaxCompute カタログには、次の特徴があります。
MaxCompute カタログのデータベース名は、MaxCompute プロジェクト名に対応します。データベースを切り替えることで、異なる MaxCompute プロジェクトのテーブルを使用できます。
MaxCompute カタログのテーブル名は、MaxCompute に保存されている物理テーブルの名前に対応します。データ型は自動的にマッピングされます。データ定義言語 (DDL) 文を使用して MaxCompute テーブルを手動で登録する必要はありません。これにより、開発効率と精度が向上します。
カタログが提供するテーブルは、Flink SQL ジョブのソーステーブル、ディメンションテーブル、結果テーブルとして直接使用できます。
MaxCompute カタログでテーブルを作成すると、対応する物理テーブルが MaxCompute サービスに自動的に作成されます。データ型も自動的にマッピングされます。これにより、開発効率が向上します。
このトピックでは、以下の方法で MaxCompute カタログを管理する方法について説明します。
制限事項
Ververica Runtime (VVR) 6.0.7 以降を使用する Flink コンピュートエンジンのみが MaxCompute カタログ構成をサポートします。
MaxCompute カタログでは、データベース (MaxCompute のプロジェクト) を作成することはできません。
MaxCompute カタログでは、テーブルスキーマを変更することはできません。
MaxCompute カタログは、CREATE TABLE AS (CTAS) 文をサポートしていません。
MaxCompute カタログの作成
MaxCompute カタログは、UI または SQL コマンドを使用して設定できます。UI を使用する方法を推奨します。
UI の使用 (推奨)
[Data Management] ページに移動します。
Realtime Compute for Apache Flink コンソールにログインし、対象のワークスペースの [操作] 列で [コンソール] をクリックします。
[Data Management] をクリックします。
[カタログの作成] をクリックし、[ODPS] を選択してから [次へ] をクリックします。
パラメーター設定を入力します。
重要カタログ作成後は、以下のパラメーターを変更できません。変更するには、既存のカタログを削除して新しいカタログを作成する必要があります。

パラメーター
説明
タイプ
必須
備考
catalog name
MaxCompute カタログの名前。
String
はい
任意の英語名を入力します。
endpoint
MaxCompute サービス接続用のエンドポイント。
String
はい
サイトの詳細については、「エンドポイント」をご参照ください。
accessId
MaxCompute サービスへのアクセスに使用する Alibaba Cloud アカウントの AccessKey ID。
String
はい
このアカウントには、カタログがアクセスするプロジェクトに対する管理者権限が必要です。
accessKey
MaxCompute サービスへのアクセスに使用する Alibaba Cloud アカウントの AccessKey Secret。
String
はい
なし。
project
カタログのデフォルトデータベースとして使用する MaxCompute プロジェクトの名前。
String
いいえ
このパラメーターを設定しない場合、デフォルトのプロジェクトは `default` です。
説明カタログ作成後、メタデータには入力したプロジェクトと、指定した Alibaba Cloud アカウントによって作成されたプロジェクトが表示されます。
catalog.schema.enabled
Flink カタログの概念におけるデータベースをスキーマレベルにマッピングするかどうかを指定します。
ブール値
いいえ
スキーマは、プロジェクト内のテーブル、リソース、およびユーザー定義関数 (UDF) を分類するために使用されるメカニズムです。1 つのプロジェクトに複数のスキーマを含めることができます。詳細については、「スキーマ操作」をご参照ください。
このパラメーターは、次の値に設定できます。
false (デフォルト):Flink カタログの概念におけるデータベースは、MaxCompute プロジェクトにマッピングされます。これは、スキーマ機能が無効になっている MaxCompute サービス用です。
true:Flink カタログの概念におけるデータベースは、MaxCompute スキーマにマッピングされます。これは、スキーマ機能が有効になっている MaxCompute サービス用です。
[OK] をクリックします。
作成されたカタログは [メタデータ] の下に表示されます。
重要カタログの作成に使用した AccessKey ID と AccessKey Secret にプロジェクトの権限がない場合、プロジェクト情報はメタデータに表示されません。これにより、カタログからの読み取りや書き込みの機能が影響を受けることはありません。
SQL コマンドの使用
[データクエリ] ページのテキストエディターで、MaxCompute カタログを設定するコマンドを入力します。
CREATE CATALOG `<catalogName>` WITH ( 'type' = 'odps', 'endpoint' = '<odpsEndpoint>', 'accessId' = '<aliyunAccountAccessId>', 'accessKey' = '<aliyunAccountAccessKey>', 'project' = '<defaultProject>', 'userAccount' = '<RAMUserAccount>' );次の表にパラメーターを説明します。
パラメーター
説明
タイプ
必須
備考
catalogName
MaxCompute カタログの名前。
String
はい
任意の英語名を入力します。
type
カタログのタイプ。
String
はい
値は `odps` に固定されています。
endpoint
MaxCompute サービス接続用のエンドポイント。
String
はい
サイトの詳細については、「エンドポイント」をご参照ください。
accessId
MaxCompute サービスへのアクセスに使用する Alibaba Cloud アカウントの AccessKey ID。
String
はい
このアカウントには、カタログがアクセスするプロジェクトに対する管理者権限が必要です。
accessKey
MaxCompute サービスへのアクセスに使用する Alibaba Cloud アカウントの AccessKey Secret。
String
はい
なし。
project
カタログのデフォルトデータベースとして使用する MaxCompute プロジェクトの名前。
String
いいえ
このパラメーターを設定しない場合、デフォルトのプロジェクトは `default` です。
userAccount
Alibaba Cloud アカウントまたは Resource Access Management (RAM) ユーザーの名前。
String
いいえ
AccessKey が Alibaba Cloud アカウントではなく RAM ユーザーに属しており、その RAM ユーザーが Alibaba Cloud アカウント配下の一部のプロジェクトに対してのみ管理者権限を持つ場合、このパラメーターをアカウント名に設定します。例:
RAM$[<account_name>:]<RAM_name>。これにより、MaxCompute カタログには、そのアカウントが権限を持つプロジェクトのリストのみが表示されます。MaxCompute のユーザー権限管理の詳細については、「ユーザー計画と管理」をご参照ください。
カタログを作成するコードを選択し、行番号の左側にある [実行] をクリックします。

MaxCompute カタログの表示
UI の使用 (推奨)
[Data Management] ページに移動します。
対象のワークスペースの [操作] 列で [コンソール] をクリックします。
[Data Management] をクリックします。
[カタログリスト] ページで、[カタログ名] と [タイプ] を表示できます。
[表示] をクリックして、対象のカタログ内のデータベースとテーブルを表示します。
SQL コマンドの使用
[データクエリ] ページのテキストエディターで、次のコマンドを入力します。
DESCRIBE `<catalogName>`.`<projectName>`.`<tableName>`;パラメーター
説明
catalogName
MaxCompute カタログの名前。
projectName
MaxCompute のプロジェクト名。
tableName
MaxCompute に保存されている物理テーブルの名前。
カタログを表示するコードを選択し、行番号の左側にある [実行] をクリックします。
コマンドが正常に実行されると、エディターの下の結果セクションに、Flink の MaxCompute 物理テーブルのスキーマ情報が表示されます。
MaxCompute カタログの使用
カタログを使用した MaxCompute 物理テーブルの作成
Flink SQL DDL 文を使用して MaxCompute カタログにテーブルを作成すると、対応する MaxCompute プロジェクトに物理テーブルが自動的に作成されます。Flink のデータ型は、自動的に MaxCompute のデータ型に変換されます。非パーティションテーブルとパーティションテーブルを作成できます。
非パーティションテーブルの作成例:
CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING
);文が実行された後、MaxCompute の対応するプロジェクトでテーブルを表示できます。指定された名前の非パーティションテーブルが作成され、その列名とデータ型は Flink DDL 文のものに対応します。
パーティションテーブルの作成例:
CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING,
ds STRING
) PARTITIONED BY (ds);Flink DDL 文のスキーマの末尾にパーティションキー列を追加し、PARTITIONED BY 句でパーティションキー列名を宣言します。文が実行された後、対応する MaxCompute プロジェクトでテーブルを表示します。指定された名前のパーティションテーブルが作成されます。その通常の列は f0、f1、f2、f3 で、パーティションキー列は ds です。
MaxCompute の列名はすべて小文字ですが、Flink の列名は大文字と小文字を区別します。DDL 文の列名に大文字が含まれている場合、自動的に小文字に変換されます。DDL 文に小文字に変換された後、同じ名前の列が複数含まれている場合、エラーが報告されます。
MaxCompute カタログテーブルからのデータ読み取り
MaxCompute カタログは、MaxCompute サービスから物理テーブルのスキーマを取得できます。したがって、Flink で対応するスキーマを宣言することなく、直接データを読み取ることができます。例:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`;パラメーターが宣言されていない場合、デフォルトではすべてのパーティションが読み取られます。特定のパーティションを読み取るか、増分ソーステーブルモードを使用するには、SQL コメントでパラメーターを宣言できます。パラメーター設定の詳細については、「MaxCompute」をご参照ください。例:
特定のパーティションの読み取り:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=230613') */;増分ソーステーブルモードの使用:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('startPartition' = 'ds=230613') */;ディメンションテーブルモードの使用:
SELECT * FROM `<anotherTable>` AS l LEFT JOIN
`<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'max_pt()', 'cache' = 'ALL') */
FOR SYSTEM_TIME AS OF l.proc_time AS r
ON l.id = r.id;MaxCompute でサポートされている他のソーステーブルおよびディメンションテーブルのパラメーターもこの方法で設定できます。ただし、MaxCompute カタログはウォーターマーク情報を保存しないことに注意してください。ソーステーブルからデータを読み取る際にウォーターマークを指定するには、CREATE TABLE ... LIKE ... 文を使用します。例:
CREATE TABLE `<newTable>` ( WATERMARK FOR ts AS ts )
LIKE `<catalogName>`.`<projectName>`.`<tableName>`;`ts` は、MaxCompute 物理テーブルの DATETIME 型の列です。この型は Flink でイベント時間として設定でき、ウォーターマークを追加できます。テーブルが作成された後、`newTable` から読み取られるすべてのデータにウォーターマークが付きます。
MaxCompute カタログテーブルへのデータ書き込み
MaxCompute カタログは、静的または動的パーティションモードでのデータ書き込みをサポートしています。詳細については、「MaxCompute」をご参照ください。たとえば、MaxCompute 物理テーブルにサブパーティション `ds` と `hh` がある場合、次の文を使用してデータを書き込むことができます。
-- 静的パーティションへの書き込み
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=20231024,hh=09') */
SELECT <otherColumns>, '20231024', '09' FROM `<anotherTable>`;
-- 動的パーティションへの書き込み
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds,hh') */
SELECT <otherColumns>, ds, hh FROM `<anotherTable>`;SELECT 文では、パーティションキー列は、パーティションレベルの順序で他の通常の列の後に配置する必要があります。
MaxCompute カタログの削除
MaxCompute カタログを削除しても、実行中のジョブには影響しません。ただし、このカタログのテーブルを使用するジョブは、公開または再起動時に「テーブルが見つかりません」というエラーを報告します。操作には注意が必要です。
UI メソッド
[Data Management] ページに移動します。
対象のワークスペースの [操作] 列で [コンソール] をクリックします。
[Data Management] をクリックします。
[カタログリスト] ページで、削除するカタログを見つけ、[操作] 列の [削除] をクリックします。
表示されるダイアログボックスで、[削除] をクリックします。
説明削除が完了すると、対象のカタログは左側の [メタデータ] エリアから削除されます。
SQL コマンドの使用
[データクエリ] ページのテキストエディターで、次のコマンドを入力します。
DROP CATALOG `<catalogName>`;`<catalogName>` は、削除する対象の MaxCompute カタログの名前です。
警告MaxCompute カタログを削除しても、実行中のジョブには影響しません。ただし、この操作はまだオンラインになっていないジョブや、一時停止して再開する必要があるジョブに影響します。操作には注意が必要です。
削除カタログコマンドを選択し、右クリックして [実行] を選択します。
左側の [メタデータ] エリアで、対象のカタログが削除されたことを確認します。
MaxCompute と Flink 間のデータ型マッピング
MaxCompute がサポートするデータ型の詳細については、「データ型 2.0」をご参照ください。
MaxCompute から Flink へ
既存の MaxCompute 物理テーブルからデータを読み取る際、フィールドの MaxCompute データ型は、次の表に示すように Flink データ型にマッピングされます。
MaxCompute の型 | Flink の型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
Flink から MaxCompute へ
Flink DDL 文を使用してカタログ内に MaxCompute テーブルを作成する際、DDL 文内のフィールドの Flink データ型は、次の表に示すように MaxCompute データ型にマッピングされます。
Flink の型 | MaxCompute の型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR / STRING | STRING |
BINARY | BINARY |
VARBINARY / BYTES | BINARY |
DATE | DATE |
TIMESTAMP(n<=3) | DATETIME |
TIMESTAMP(3<n<=9) | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |