このトピックでは、Printコネクタの使用方法について説明します。
背景情報
Printコネクタは、すべての行をシステム出力またはシステムエラーストリームに書き込むために使用されます。デバッグ用に設計されています。中間結果またはFlinkデプロイメントの出力を表示するには、印刷結果テーブルを追加します。その後、指示に従ってTaskManagerログで結果データを表示できます。
Printコネクタを使用すると、他の結果テーブルに送信されたメッセージが期待どおりであるかどうかを確認できます。
次の表は、Printコネクタでサポートされている機能を示しています。
項目 | 説明 |
テーブルタイプ | 結果テーブルとデータインジェストシンク |
実行モード | バッチモードとストリーミングモード |
データ形式 | 該当なし |
メトリック | 該当なし |
APIタイプ | SQL APIとデータインジェストYAML API |
シンクテーブルでのデータの更新または削除 | サポートされています |
前提条件
Print結果テーブルの出力を表示する場合は、ログレベルがINFOに設定されていることを確認してください。
Taskmanager.outには最大 2,000 件のログエントリが表示できます。ダーティデータまたは特定のデータを確認する場合は、WHERE句で条件を指定して印刷操作を実行することをお勧めします。印刷操作により、表示されるデータレコード数が制限されている場合でも、必要なデータを効果的に確認できます。
SQL
構文
CREATE TABLE print_table (
a INT,
b varchar
) WITH (
'connector'='print',
'logger'='true'
);既存のテーブルスキーマに基づいて、LIKE 句を使用してPrintテーブルを作成することもできます。ステートメントの例:
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)WITH句のパラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
connector | テーブルのタイプ。 | String | はい | デフォルト値なし | 値を print に設定します。 |
logger | データ結果をコンソールに表示するかどうかを指定します。 | Boolean | いいえ | false | 有効な値:
|
print-identifier | データ結果の識別子。 | String | いいえ | デフォルト値なし | データ結果の識別子を使用してログ情報を取得します。 |
sink.parallelism | 結果テーブルの並列度。 | Int | いいえ | アップストリームの並列度と同じ値 | 該当なし。 |
データインジェスト
データインジェストのYAMLドラフトでは、値コネクタを使用してデータをファイルまたはログに出力できます。
構文
source:
type: xxx
sink:
type: values
name: Values Sink
print.enabled: trueWITH句のパラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
type | シンクのコネクタタイプ | STRING | はい | デフォルト値なし | 値は values に固定されています。 |
name | シンクのコネクタ名 | STRING | いいえ | デフォルト値なし | 該当なし。 |
print.enabled | コネクタをPrintコネクタとして使用するかどうか | BOOLEAN | はい | デフォルト値なし | 値は true に固定されています。 |
materialized.in.memory | バイナリログイベントをメモリに保持するかどうか。 | BOOLEAN | いいえ | false | 該当なし。 |
sink.print.standard-error | フォーマットをシステム出力ではなくシステムエラーに出力するかどうか。 | BOOLEAN | いいえ | false | デフォルトでは、フォーマットはシステム出力に出力されます。 |
sink.print.logger | 結果データをコンソールに表示するかどうか。 | BOOLEAN | いいえ | false | 該当なし。 |
sink.print.limit | 出力するデータレコードの最大数。 | LONG | いいえ | 2000 | 該当なし。 |
error.on.schema.change | スキーマの変更が発生したときにエラーを報告するかどうか。 | BOOLEAN | いいえ | false | 該当なし。 |
サンプルコード
結果テーブルのサンプルコード
CREATE TEMPORARY TABLE table_source( name VARCHAR, score BIGINT ) WITH ( ... ); CREATE TEMPORARY TABLE print_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * from table_source;データインジェストシンク
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true