すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Print connector

最終更新日:Jan 08, 2025

このトピックでは、Printコネクタの使用方法について説明します。

背景情報

Printコネクタは、すべての行をシステム出力またはシステムエラーストリームに書き込むために使用されます。デバッグ用に設計されています。中間結果またはFlinkデプロイメントの出力を表示するには、印刷結果テーブルを追加します。その後、指示に従ってTaskManagerログで結果データを表示できます。

Printコネクタを使用すると、他の結果テーブルに送信されたメッセージが期待どおりであるかどうかを確認できます。

次の表は、Printコネクタでサポートされている機能を示しています。

項目

説明

テーブルタイプ

結果テーブルとデータインジェストシンク

実行モード

バッチモードとストリーミングモード

データ形式

該当なし

メトリック

該当なし

APIタイプ

SQL APIとデータインジェストYAML API

シンクテーブルでのデータの更新または削除

サポートされています

前提条件

  • Print結果テーブルの出力を表示する場合は、ログレベルがINFOに設定されていることを確認してください。

  • Taskmanager.outには最大 2,000 件のログエントリが表示できます。ダーティデータまたは特定のデータを確認する場合は、WHERE句で条件を指定して印刷操作を実行することをお勧めします。印刷操作により、表示されるデータレコード数が制限されている場合でも、必要なデータを効果的に確認できます。

SQL

構文

CREATE TABLE print_table (
  a INT,
  b varchar
) WITH (
  'connector'='print',
  'logger'='true'
);

既存のテーブルスキーマに基づいて、LIKE 句を使用してPrintテーブルを作成することもできます。ステートメントの例:

CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)

WITH句のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

テーブルのタイプ。

String

はい

デフォルト値なし

値を print に設定します。

logger

データ結果をコンソールに表示するかどうかを指定します。

Boolean

いいえ

false

有効な値:

  • false: データ結果はコンソールに表示されません。これはデフォルト値です。

  • true: データ結果はコンソールに表示されます。

print-identifier

データ結果の識別子。

String

いいえ

デフォルト値なし

データ結果の識別子を使用してログ情報を取得します。

sink.parallelism

結果テーブルの並列度。

Int

いいえ

アップストリームの並列度と同じ値

該当なし。

データインジェスト

データインジェストのYAMLドラフトでは、値コネクタを使用してデータをファイルまたはログに出力できます。

構文

source:
  type: xxx

sink:
  type: values
  name: Values Sink
  print.enabled: true

WITH句のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

type

シンクのコネクタタイプ

STRING

はい

デフォルト値なし

値は values に固定されています。

name

シンクのコネクタ名

STRING

いいえ

デフォルト値なし

該当なし。

print.enabled

コネクタをPrintコネクタとして使用するかどうか

BOOLEAN

はい

デフォルト値なし

値は true に固定されています。

materialized.in.memory

バイナリログイベントをメモリに保持するかどうか。

BOOLEAN

いいえ

false

該当なし。

sink.print.standard-error

フォーマットをシステム出力ではなくシステムエラーに出力するかどうか。

BOOLEAN

いいえ

false

デフォルトでは、フォーマットはシステム出力に出力されます。

sink.print.logger

結果データをコンソールに表示するかどうか。

BOOLEAN

いいえ

false

該当なし。

sink.print.limit

出力するデータレコードの最大数。

LONG

いいえ

2000

該当なし。

error.on.schema.change

スキーマの変更が発生したときにエラーを報告するかどうか。

BOOLEAN

いいえ

false

該当なし。

サンプルコード

  • 結果テーブルのサンプルコード

    CREATE TEMPORARY TABLE table_source(
      name VARCHAR,
      score BIGINT
    ) WITH (
      ...
    );
    
    CREATE TEMPORARY TABLE print_sink(
      name VARCHAR,
      score BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO print_sink SELECT * from table_source;
  • データインジェストシンク

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true