Print コネクタは、各行を TaskManager の標準出力または標準エラー出力ストリームに書き込みます。開発中に中間結果を検査したり、本番 sink を構成せずに最終出力を検証したりするために使用します。
Print コネクタの出力は、Flink Web UI コンソールではなく、TaskManager ログに表示されます。出力を確認するには、TaskManager のログレベルを INFO に設定してください。
サポートされている機能
| 項目 | 説明 |
|---|---|
| テーブルタイプ | 結果テーブル、データインジェスト sink |
| 実行モード | バッチモード、ストリーミングモード |
| データ形式 | N/A |
| メトリック | N/A |
| API タイプ | SQL API、データインジェスト YAML API |
| 結果テーブルでのデータ更新または削除 | サポートされています |
ユースケース
-
開発テスト:既存の sink とともに Print 結果テーブルを追加し、本番システムに書き込む前に正しい行が流れていることを確認します。
SQL
構文
CREATE TABLE print_table (
a INT,
b VARCHAR
) WITH (
'connector' = 'print',
'logger' = 'true'
);
列を再定義せずに既存のテーブルスキーマをミラーリングするには、LIKE 句を使用します。
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)
パラメーター
| パラメーター | 説明 | データ型 | 必須 | デフォルト値 |
|---|---|---|---|---|
connector |
コネクタタイプ。print に設定します。 |
String | はい | — |
logger |
コンソールに出力を表示するかどうか。 | ブール値 | いいえ | false |
print-identifier |
各出力行の前に付加されるラベル。複数の Print sink がアクティブな場合に、結果を区別するために使用します。 | String | いいえ | — |
sink.parallelism |
Print sink の並列度。 | Int | いいえ | アップストリームと同じ |
データインジェスト
データインジェスト YAML ジョブの場合、values コネクタを print.enabled: true とともに使用して、出力をログまたは出力ファイルにルーティングします。
構文
source:
type: xxx
sink:
type: values
name: Values Sink
print.enabled: true
パラメーター
| パラメーター | 説明 | データ型 | 必須 | デフォルト値 |
|---|---|---|---|---|
type |
sink コネクタタイプ。values に設定します。 |
STRING | はい | — |
name |
sink の表示名。 | STRING | いいえ | — |
print.enabled |
print 動作を有効にします。true に設定します。 |
BOOLEAN | はい | — |
sink.print.standard-error |
出力を stdout の代わりに stderr に書き込むかどうか。 | BOOLEAN | いいえ | false |
sink.print.logger |
コンソールに出力を表示するかどうか。 | BOOLEAN | いいえ | false |
sink.print.limit |
プリントするレコードの最大数。 | LONG | いいえ | 2000 |
materialized.in.memory |
バイナリログイベントをメモリに永続化するかどうか。 | BOOLEAN | いいえ | false |
error.on.schema.change |
スキーマ変更が発生したときにエラーを発生させるかどうか。 | BOOLEAN | いいえ | false |
例
SQL 結果テーブル
次の例は、ソーステーブルから読み取り、すべての行を Print sink に書き込みます。出力を確認するには、TaskManager ログを確認してください。
CREATE TEMPORARY TABLE table_source (
name VARCHAR,
score BIGINT
) WITH (
...
);
CREATE TEMPORARY TABLE print_sink (
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO print_sink SELECT * FROM table_source;
データインジェスト sink
次の例は、MySQL からデータをインジェストし、すべての行を出力ファイルにプリントします。
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 7601-7604
sink:
type: values
name: Values Sink
print.enabled: true
制限事項
-
ログレベル:Print コネクタの出力を確認するには、TaskManager のログレベルを INFO に設定してください。出力は Flink Web UI コンソールではなく、TaskManager ログに書き込まれます。
-
レコード制限:TaskManager.out は最大 2,000 件のログエントリを表示します。データ量が大きい場合に特定の行を検査するには、Print sink に到達する前に行をフィルターする
WHERE句を追加します。