The CSV format reads and writes CSV data by automatically inferring the CSV structure from the table schema. It is supported by the following connectors: Apache Kafka connector, Upsert Kafka connector, ApsaraMQ for RocketMQ connector, StarRocks connector, and Object Storage Service (OSS) connector.
Create a table with the CSV format
The following example creates a table using the Apache Kafka connector with the CSV format. It enables comment-line skipping and ignores parse errors.
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
);
Parameters
| Parameter | Required | Default | Type | Description |
|---|---|---|---|---|
format |
required | (none) | STRING | The format to use. Set to csv. |
csv.field-delimiter |
optional | , |
STRING | The field delimiter. Only single-character delimiters are supported. Use \\t for a tab character, or a Unicode character such as U\&'\\0001' for the 0x01 character. |
csv.disable-quote-character |
optional | false |
BOOLEAN | Whether to disable enclosing values in quotation marks. When set to true, you cannot configure csv.quote-character. |
csv.quote-character |
optional | " |
STRING | The character used to enclose field values. Default: double quotation mark ("). |
csv.allow-comments |
optional | false |
BOOLEAN | Whether to ignore comment lines starting with #. If enabled, also enable csv.ignore-parse-errors to handle empty rows produced by skipped comment lines. |
csv.ignore-parse-errors |
optional | false |
BOOLEAN | Whether to skip fields that fail to parse. When set to true, failed fields are set to null. When set to false, a parse error stops the deployment. |
csv.array-element-delimiter |
optional | ; |
STRING | The delimiter for array and row elements. Default: semicolon (;). |
csv.escape-character |
optional | (none) | STRING | The escape character. No escape character is used by default. |
csv.null-literal |
optional | (none) | STRING | The string treated as a null value. In input data, this string is converted to null; in output data, null is converted back to this string. |
csv.write-bigdecimal-in-scientific-notation |
optional | true |
BOOLEAN | Whether to write BIGDECIMAL values in scientific notation. When true, 100000 is written as 1E+5; when false, it is written as 100000. |
Data type mappings
The CSV format uses the jackson-databind API to parse CSV strings. The following table shows how Flink SQL data types map to CSV data types.
| Flink SQL data type | CSV data type |
|---|---|
| CHAR, VARCHAR, STRING | STRING |
| BOOLEAN | BOOLEAN |
| BINARY, VARBINARY | STRING with encoding: base64 |
| DECIMAL | NUMBER |
| TINYINT | NUMBER |
| SMALLINT | NUMBER |
| INT | NUMBER |
| BIGINT | NUMBER |
| FLOAT | NUMBER |
| DOUBLE | NUMBER |
| DATE | STRING with format: date |
| TIME | STRING with format: time |
| TIMESTAMP | STRING with format: date-time |
| INTERVAL | NUMBER |
| ARRAY | ARRAY |
| ROW | OBJECT |
Known limitations
Writing data to OSS in CSV format is not supported. For details, see FLINK-30635.