All Products
Search
Document Center

Realtime Compute for Apache Flink:Manage SLS catalogs

Last Updated:Mar 26, 2026

An SLS Catalog maps Simple Log Service (SLS) Logstores to Flink SQL tables by automatically inferring their schemas from log data. Once configured, Flink SQL jobs can query SLS Logstores directly without manually declaring table schemas.

An SLS Catalog is a read-only metadata view of SLS. DDL operations that create, modify, or delete databases or tables are not supported.

How it works

An SLS Catalog translates SLS metadata into queryable Flink tables. The following table shows how SLS concepts map to the Flink catalog hierarchy:

SLS conceptFlink conceptNotes
SLS CatalogCatalogMaps to a Flink catalog
projectDatabaseMaps to a Flink database
LogstoreTableEach Logstore becomes a queryable Flink table

When the catalog infers a Logstore schema, it consumes one log message to parse field names and types. Because SLS stores all log data as strings, all inferred fields are of the String type.

Limitations

  • All fields are inferred as the String type. Use Flink SQL type casting to convert to other data types.

  • Only Flink jobs running on Ververica Runtime (VVR) 6.0.7 or later support SLS Catalogs.

  • Existing SLS Catalogs cannot be modified with DDL statements.

  • You can only query data tables. Creating, modifying, or deleting databases and tables is not supported.

  • Tables from an SLS Catalog can be used as source tables and sink tables in Flink SQL jobs, but not as lookup dimension tables.

Schema consistency risk

An SLS Catalog infers schemas from sample log data. For Logstores with inconsistent data formats, the catalog retains all columns by default to return the widest possible schema.

If the data format of a Logstore changes over time, the inferred schema can differ between reads. This creates a risk when a job restarts from a savepoint: the job may retrieve a schema that differs from the one used before the restart. The saved execution plan references the previous schema, which can cause mismatches in downstream operations such as filter conditions or field value retrieval.

To avoid this, define a fixed schema with CREATE TEMPORARY TABLE in your Flink SQL job instead of relying on the catalog-inferred schema.

Prerequisites

Before you begin, make sure you have:

  • An SLS project with at least one Logstore, with indexing enabled on each Logstore you plan to use as a source or sink

  • Your Alibaba Cloud AccessKey ID and AccessKey secret. Store them as project variables rather than hardcoding them in configuration. For details, see Project variables

  • A Flink workspace running VVR 6.0.7 or later

Create an SLS Catalog

You can create an SLS Catalog from the console UI or with a SQL command.

Important

After you create a catalog, its settings cannot be changed. To change any setting, delete the catalog and create a new one.

Use the console UI (recommended)

  1. Log on to the Realtime Computing for Apache Flink console. In the Actions column for the target workspace, click Console.

  2. Click Data Management.

  3. Click Create Catalog, select SLS, and then click Next.

  4. Configure the following parameters:

    ParameterTypeRequiredDescription
    catalog nameStringYesA custom name for the catalog. Use English characters only.
    endpointStringYesThe SLS service endpoint. See Endpoints.
    projectStringYesThe name of the SLS project.
    accessIdStringYesYour Alibaba Cloud AccessKey ID. See How do I view the AccessKey ID and AccessKey secret? To protect your AccessKey information, use variables to specify the AccessKey value. See Project variables.
    accessKeyStringYesYour Alibaba Cloud AccessKey secret. See How do I view the AccessKey ID and AccessKey secret?

    SLS Catalog configuration page

  5. Click Confirm.

The created catalog appears in the Metadata panel on the left.

Use a SQL command

  1. Open the Data Query editor and enter the following command:

    CREATE CATALOG <catalogName> WITH(
      'type'='sls',
      'endpoint'='<endpoint>',
      'project'='<project>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}'
    )
    ParameterTypeRequiredDescription
    catalogNameStringYesA custom name for the catalog. Use English characters only.
    typeStringYesFixed value: sls.
    endpointStringYesThe SLS service endpoint. See Endpoints.
    projectStringYesThe name of the SLS project.
    accessIdStringYesYour Alibaba Cloud AccessKey ID. Use secrets management to avoid hardcoding credentials. See Manage variables.
    accessKeyStringYesYour Alibaba Cloud AccessKey secret. Use secrets management to avoid hardcoding credentials. See Manage variables.
  2. Select the CREATE CATALOG statement and click Run next to the line numbers.

  3. Verify that the catalog appears in the Metadata panel on the left.

View SLS Catalog metadata

View all catalogs

  1. Log on to the Realtime Computing for Apache Flink console. In the Actions column for the target workspace, click Console.

  2. Click Data Management.

  3. On the Catalog List page, review the Name and Type columns. To view the Logstores in a catalog, click View next to the catalog name.

View a Logstore schema

  1. Open the Data Query editor and enter the following command:

    DESCRIBE `${catalog_name}`.`${project_name}`.`${logstore_name}`;
    ParameterDescription
    ${catalog_name}The name of the SLS Catalog.
    ${project_name}The name of the SLS project.
    ${logstore_name}The name of the SLS Logstore.
  2. Select the DESCRIBE statement and click Run next to the line numbers. The table schema appears in the results panel.

    Table schema results

Use an SLS Catalog in Flink SQL jobs

Before reading from or writing to a Logstore, enable indexing on that Logstore. The catalog reads data from the sink table to verify that the schema of the data to be written matches the schema of the destination SLS Logstore. For details on enabling indexing, see Create an index.

Read from a Logstore (source table)

INSERT INTO ${other_sink_table}
SELECT ...
FROM `${catalog_name}`.`${project_name}`.`${logstore_name}`/*+OPTIONS('startTime'='2023-06-01 00:00:00')*/;

To pass additional WITH parameters without modifying the catalog definition, use SQL Hints. The example above uses a SQL Hint to set the data consumption start time. For all available parameters, see the Simple Log Service connector.

Write to a Logstore (sink table)

INSERT INTO `${catalog_name}`.`${project_name}`.`${logstore_name}`
SELECT ...
FROM ${other_source_table}

Delete an SLS Catalog

Warning

Deleting a catalog does not interrupt running jobs. However, any job that references a table from the deleted catalog will fail when published or restarted.

Use the console UI (recommended)

  1. Log on to the Realtime Computing for Apache Flink console. In the Actions column for the target workspace, click Console.

  2. Click Data Management.

  3. On the Catalog List page, click Delete in the Actions column next to the target catalog.

  4. In the confirmation dialog, click Delete.

  5. Verify that the catalog no longer appears in the Metadata panel on the left.

Use a SQL command

  1. Open the Data Query editor and enter the following command, replacing ${catalog_name} with the name of the catalog to delete:

    DROP CATALOG ${catalog_name};
  2. Right-click the statement and select Run.

  3. Verify that the catalog no longer appears in the Metadata panel on the left.

Default table configuration

When the SLS Catalog maps a Logstore to a Flink table, it automatically adds the following connection parameters. These values come from the catalog configuration and cannot be overridden per table without using SQL Hints.

ParameterDescription
connectorFixed to sls.
endpointThe SLS service endpoint.
projectThe SLS project name.
logstoreThe Logstore or metricstore name.
accessIdThe AccessKey ID used by the catalog. To protect your AccessKey information, use secrets management to specify the AccessKey ID value. See Manage variables.
accessKeyThe AccessKey secret used by the catalog. To protect your AccessKey information, use secrets management to specify the AccessKey secret value. See Manage variables.

What's next