You can use Realtime Compute for Apache Flink to create a Simple Log Service source table to consume log data in Simple Log Service. This topic describes how to use Realtime Compute for Apache Flink to create a Simple Log Service source table and how to extract the attribute fields involved in the creation process.
Background information
The following table describes the settings that you must configure for Realtime Compute for Apache Flink to consume log data.
Category | Description |
Supported type | You can configure a source table and a result table. |
Running mode | Only the streaming mode is supported. |
Metric | Metrics are not supported. |
Data format | None. |
API type | SQL statements are supported. |
Whether log data can be updated or deleted in a result table | You cannot update or delete log data in a result table. You can only insert log data into a result table. |
For more information about how to use Realtime Compute for Apache Flink to consume log data, see Getting started with a Flink SQL deployment.
Prerequisites
If you want to use a Resource Access Management (RAM) user or a RAM role to consume log data,make sure that the RAM user or RAM role has the required permissions on the Realtime Compute for Apache Flink console. For more information, see Permissions.
A Realtime Compute for Apache Flink workspace is created. For more information, see Create a workspace.
A project and a Logstore are created. For more information, see Create a project and a Logstore.
Limits
Only Ververica Runtime (VVR) 11.1 or later supports using SLS as a data ingestion source.
The SLS connector supports only at-least-once semantics.
Avoid setting the source parallelism higher than the number of shards. Doing so wastes resources. In VVR 8.0.5 or earlier, if the shard count changes after you set a high parallelism, automatic failover may fail. This can leave some shards unconsumed.
Create a Simple Log Service source table and a result table
You must develop a complete SQL draft before you use Realtime Compute for Apache Flink to consume log data in Simple Log Service. A complete SQL draft contains a source table and a result table. After log data in the source table is processed, the results are inserted into the result table by using the INSERT INTO statement.
For more information about how to develop an SQL draft in Realtime Compute for Apache Flink, see Job development overview.
Simple Log Service stores log data in real time. Realtime Compute for Apache Flink can read the data in streaming mode as input data. The following code provides an example of a log:
__source__: 11.85.*.199
__tag__:__receive_time__: 1562125591
__topic__: test-topic
request_method: GET
status: 200Sample code
The following code provides an example of an SQL draft that you can develop in Realtime Compute for Apache Flink to consume log data in Simple Log Service.
If the names of tables, columns, and reserved fields in an SQL draft conflict with each other, you must enclose the names in backticks (`).
CREATE TEMPORARY TABLE sls_input(
request_method STRING,
status BIGINT,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
request_method STRING,
status BIGINT,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
request_method,
status,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; WITH parameters
General
Parameters
Description
Data type
Required?
Default value
Remarks
connector
Table type.
String
Yes
None
Set it to sls.
endPoint
The endpoint address.
String
Yes
None
Enter the VPC endpoint of SLS. For more information, see Endpoints.
NoteBy default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable communication between VPCs and the Internet. For more information, see How do I access the Internet?.
Avoid accessing SLS over the Internet. If you must do so, use HTTPS and enable transfer acceleration for SLS.
project
The name of the SLS project.
String
Yes
None
None.
logStore
The name of an SLS Logstore or Metricstore.
String
Yes
None
Data in a Logstore is consumed in the same way as in a Metricstore.
accessId
The AccessKey ID of your Alibaba Cloud account.
String
Yes
No default value
For more information, see How do I view the AccessKey ID and AccessKey secret?.
ImportantTo protect your AccessKey pair, use variables to configure your AccessKey.
accessKey
The AccessKey secret of your Alibaba Cloud account.
String
Yes
No default value
Source-specific
Parameters
Description
Data type
Required?
Default value
Remarks
enableNewSource
Specifies whether to use the new source interface that implements FLIP-27.
Boolean
No
false
The new source adapts automatically to shard changes and distributes shards evenly across all source subtasks.
ImportantThis option is supported only in VVR 8.0.9 or later. Starting from VVR 11.1, this option defaults to true.
If you change this option, your job cannot resume from a saved state. To work around this, first start your job with the consumerGroup option to record the current consumer offset. Then, set consumeFromCheckpoint to true and restart your job without states.
If SLS contains read-only shards, some Flink subtasks may finish reading from those shards and then request other unread shards. This can cause uneven shard distribution across subtasks, reducing overall consumption efficiency and system performance. To reduce this imbalance, adjust the source parallelism, optimize task scheduling, or merge small shards.
shardDiscoveryIntervalMs
The interval at which shard changes are detected dynamically. Unit: milliseconds.
Long
No
60000
Set this option to a negative value to disable dynamic detection.
NoteThis option must be at least 1 minute (60,000 milliseconds).
This option takes effect only if enableNewSource is set to true.
This option is supported only in VVR 8.0.9 or later.
startupMode
The startup mode of the source table.
String
No
timestamp
timestamp(default): Consume logs starting from the specified time.latest: Consume logs starting from the latest offset.earliest: Consume logs starting from the earliest offset.consumer_group: Consume logs starting from the offset recorded in the consumer group. If no offset is recorded for a shard, consume logs starting from the earliest offset.
ImportantIn VVR versions earlier than 11.1, the consumer_group value is not supported. To consume logs from the offset recorded by the specified consumer group, set
consumeFromCheckpointtotrue. In this case, this startup mode will not take effect.
startTime
The time to start consuming logs.
String
No
Current time
Format:
yyyy-MM-dd hh:mm:ss.This option takes effect only if
startupModeis set totimestamp.NoteThe startTime and stopTime options are based on the __receive_time__ field in SLS, not the __timestamp__ field.
stopTime
The end time of the consumption log.
String
No
None
Format:
yyyy-MM-dd hh:mm:ss.NoteUse this option only to consume historical logs. Set it to a past time point. If you set it to a future time, consumption may stop unexpectedly if no new logs are written. This appears as a broken data stream with no error messages.
To exit the Flink program after log consumption finishes, also set exitAfterFinish to true.
consumerGroup
The name of the consumer group.
String
No
None
A consumer group records consumption progress. You can specify any custom name.
NoteYou cannot share a consumer group across multiple jobs for collaborative consumption. Use different consumer groups for different jobs. If you use the same consumer group for different jobs, each job consumes all data. When Flink consumes data from SLS, it does not assign shards through the SLS consumer group. So each job independently consumes all messages, even if they share the same consumer group.
consumeFromCheckpoint
Specifies whether to consume logs from the checkpoint saved in the specified consumer group.
String
No
false
true: If you set this parameter to true, you must also specify a consumer group. Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Flink consumes logs from the time specified by the startTime parameter.false(default): Flink does not consume logs from the checkpoint saved in the specified consumer group.
ImportantThis option is not supported in VVR 11.1 or later. For VVR 11.1 or later, set
startupModetoconsumer_group.maxRetries
The number of retries after reading from SLS fails.
String
No
3
None.
batchGetSize
The number of log groups to read per request.
String
No
100
Set
batchGetSizeto a value less than 1000. Otherwise, an error occurs.exitAfterFinish
Specifies whether the Flink program exits after data consumption finishes.
String
No
false
true: The Flink program exits after data consumption finishes.false(default): The Flink program does not exit after data consumption finishes.
query
ImportantThis option was deprecated in VVR 11.3 but remains compatible in later versions.
The query statement used to preprocess data before consuming SLS data.
String
No
No default value
Use the query option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed.
For example,
'query' = '*| where request_method = ''GET'''filters logs where the request_method field equals GET before Flink reads them.NoteWrite queries using SPL syntax.
ImportantThis option is supported only in VVR 8.0.1 or later.
This feature incurs SLS fees. For details, see Billing.
processor
The SLS consumer processor. If both query and processor are set, query takes precedence.
String
No
None
Use the processor option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed. We recommend using processor instead of query.
For example,
'processor' = 'test-filter-processor'applies the SLS consumer processor to filter data before Flink reads it.NoteWrite processors using SPL syntax. For details about creating and updating SLS consumer processors, see Manage consumer processors.
ImportantThis option is supported only in VVR 11.3 or later.
This feature incurs SLS fees. For details, see Billing.
Sink-specific
parameter
Description
Data type
Required?
Default value
Remarks
topicField
The name of a field whose value overrides the __topic__ field. This indicates the log topic.
String
No
None
This parameter specifies an existing field in the table.
timeField
The name of a field whose value overrides the __timestamp__ field. This indicates the log write time.
String
No
Current time
This field must exist in the table and its type must be INT. If not specified, the current time is used.
sourceField
The name of a field whose value overrides the __source__ field. This indicates the log source, such as the IP address of the machine that generated the log.
String
No
None
This field must exist in the table.
partitionField
The name of a field. A hash value is calculated from this field's value when writing data. Data with the same hash value is written to the same shard.
String
No
No default value
If not specified, each data entry is written randomly to an available shard.
buckets
The number of buckets to regroup by hash value when partitionField is specified.
String
No
64
Valid values: [1, 256]. The value must be a power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, some shards receive no data.
flushIntervalMs
The interval at which data writes are triggered.
String
No
2000
Unit: milliseconds.
writeNullProperties
Specifies whether to write null values as empty strings to SLS.
Boolean
No
true
true(default): Write null values as empty strings.false: Do not write fields whose computed value is null.
NoteThis option is supported only in VVR 8.0.6 or later.
Extract attribute fields
Realtime Compute for Apache Flink can extract log fields, custom fields, and the following attribute fields.
Field | Type | Description |
__source__ | STRING METADATA VIRTUAL | The message source. |
__topic__ | STRING METADATA VIRTUAL | The message topic. |
__timestamp__ | BIGINT METADATA VIRTUAL | The log time. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The message tag. For the |
To extract attribute fields, you must define headers in an SQL statement. Example:
create table sls_stream(
__timestamp__ bigint HEADER,
__receive_time__ bigint HEADER
b int,
c varchar
) with (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);References
For more information about how to use the DataStream API of Realtime Compute for Apache Flink to consume log data, see DataStream API.