All Products
Search
Document Center

PolarDB:Build a Data-Agent using an external data source

Last Updated:Mar 30, 2026

PolarDB for AI lets you connect external databases—PostgreSQL, Hive, Elasticsearch, and others—to Data-Agent so that users can query them using plain language. Register the schema metadata of your external tables, and the built-in large language model (LLM) translates natural language questions like "find the customer with the most orders" into SQL or DSL queries that run against your actual data source.

How it works

PolarDB for AI stores metadata from your external data sources—table schemas, column descriptions, and sample values—in a dedicated table. The built-in _polar4ai_text2vec model converts that metadata into vectors and stores them in a searchable schema_index table.

When a user asks a question, the NL2SQL (natural language to SQL) feature:

  1. Converts the question into a vector.

  2. Searches schema_index to find the most relevant tables and columns.

  3. Generates an executable SQL or DSL query using the retrieved metadata.

Choose a build mode

PolarDB for AI provides two modes for registering external metadata. Choose based on how much control you need and whether your source database can export DDL.

schema_info mode schema_meta mode
How it works You define each table, column, data type, and sample value manually The system parses DDL or Elasticsearch index definitions automatically
Best for Fine-grained control: adding business descriptions, sample values, and foreign key relationships to improve NL2SQL accuracy Quickly onboarding many tables from databases that can export standard DDL
Supported sources Any data source (all metadata is entered manually) Hive, MySQL, PostgreSQL, and Elasticsearch
Trade-off More setup work; richer metadata quality Less setup work; accuracy depends on what is already in the DDL

Start with schema_info mode when query accuracy matters most or when your source database cannot export DDL. Use schema_meta mode when you need to onboard many tables quickly and the DDL contains adequate column descriptions.

Think of the metadata you provide as an onboarding package for the LLM. The more context you give—clear table descriptions, realistic sample values, explicit foreign key relationships—the more accurately it can translate questions into correct queries. A sparse metadata set produces acceptable results for simple questions but will struggle with joins and business-specific filters.

Prerequisites

Before you begin, ensure that you have:

If you added an AI node when you purchased the cluster, you can skip the node setup and go straight to configuring the database account. The account must have Read and Write permissions on the target database.
Important

When connecting from the command line, include the -c flag. If you use Data Management Service (DMS), it connects to the primary endpoint by default, which routes queries away from the AI node. Switch to the cluster endpoint by using the database selector at the top of the DMS page to search for and select your target database.

Build a Data-Agent with manual metadata (schema_info mode)

Use this mode when you want precise control over the metadata the LLM sees. Each row in the schema_info table describes one column from an external table.

Step 1: Create the schema_info table

CREATE TABLE schema_info (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT 'Auto-increment primary key',
    table_name VARCHAR(255) NOT NULL COMMENT 'Table name',
    table_comment TEXT COMMENT 'Table comment',
    column_name VARCHAR(255) NOT NULL COMMENT 'Column name',
    column_comment TEXT COMMENT 'Column comment',
    data_type VARCHAR(255) COMMENT 'Column data type (optional)',
    sample_values TEXT COMMENT 'Sample values',
    is_primary INT COMMENT 'Whether it is a primary key (1/0)',
    is_foreign INT COMMENT 'Whether it is a foreign key (1/0)',
    ext TEXT COMMENT 'Foreign key information',
    db_type VARCHAR(128) COMMENT 'Relational database language type (such as MySQL, PostgreSQL)'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Database metadata information table';

Step 2: Insert metadata

Insert one row per column. Follow these constraints:

  • db_type: Set to the source database type, such as MySQL or PostgreSQL. All columns from the same logical database must share the same db_type.

  • sample_values: Separate multiple values with commas. Keep the total field length under 100 characters.

  • is_foreign: Set to 1 if the column is a foreign key.

  • ext: Required when is_foreign is 1. Format: <database_name>.<table_name>.<column_name>.

The following example inserts metadata for a customers table and an orders table. Note that orders.customer_id is a foreign key pointing to customers.id—this relationship lets the LLM generate correct JOIN queries.

-- Metadata for the customers table
INSERT INTO schema_info (table_name, table_comment, column_name, column_comment, data_type, sample_values, is_primary, is_foreign, ext, db_type)
VALUES
('customers', 'Customer information table', 'id', 'Unique customer ID', 'INT', '1,2,3', 1, 0, NULL, 'PostgreSQL'),
('customers', 'Customer information table', 'name', 'Customer name', 'VARCHAR(100)', 'Zhang San,Li Si,Wang Wu', 0, 0, NULL, 'PostgreSQL'),
('customers', 'Customer information table', 'email', 'Customer email', 'VARCHAR(100) UNIQUE', 'zhangsan@example.com', 0, 0, NULL, 'PostgreSQL');

-- Metadata for the orders table
INSERT INTO schema_info (table_name, table_comment, column_name, column_comment, data_type, sample_values, is_primary, is_foreign, ext, db_type)
VALUES
('orders', 'Order information table', 'order_id', 'Unique order ID', 'INT', '1001,1005,1003', 1, 0, NULL, 'PostgreSQL'),
('orders', 'Order information table', 'order_date', 'Order date', 'DATE', '2023-01-01,2023-05-06', 0, 0, NULL, 'PostgreSQL'),
('orders', 'Order information table', 'customer_id', 'Associated customer ID', 'INT', '1,2,3', 0, 1, 'dbname.customers.id', 'PostgreSQL'),
('orders', 'Order information table', 'total_amount', 'Total order amount', 'DECIMAL(10,2)', '99.99,101.81', 0, 0, NULL, 'PostgreSQL');

Step 3: Build the search index

The following commands create the schema_index table and start an asynchronous task that vectorizes all metadata in schema_info.

-- Create the vector index table
/*polar4ai*/CREATE TABLE schema_index(
  id integer,
  table_name varchar,
  table_comment text_ik_max_word,
  table_ddl text_ik_max_word,
  column_names text_ik_max_word,
  column_comments text_ik_max_word,
  sample_values text_ik_max_word,
  vecs vector_768,
  ext text_ik_max_word,
  database_service text_ik_max_word,
  PRIMARY key (id)
);

-- Start the asynchronous indexing task
/*polar4ai*/SELECT * FROM PREDICT (
MODEL _polar4ai_text2vec, SELECT ''
) WITH (mode='async', resource='schema_info', to_sample = 1)
INTO schema_index;
to_sample = 1 tells the system to vectorize the sample values in schema_info.sample_values, which improves accuracy for value-based queries such as "orders with status 'shipped'". Set it to 0 to skip this step.

After the command runs, it returns a task ID such as b3b8fd1e-b886-11f0-9f89-97664bebacb7. Save this ID to check the task status in the next step.

Step 4: Check the task status

Run the following command with your task ID. When the status changes to finish, the index is ready.

/*polar4ai*/SHOW TASK `<task_id>`;

Replace <task_id> with the ID returned in the previous step.

Step 5: Query data using Data-Agent

With the index built, use the NL2SQL feature to query your external data in plain language.

/*polar4ai*/SELECT * FROM PREDICT (MODEL _polar4ai_nl2sql, select 'the 10 customers with the most orders') WITH (basic_index_name='schema_index');

Build a Data-Agent by parsing DDL (schema_meta mode)

Use this mode to onboard many tables at once by importing their DDL or Elasticsearch index definitions. You do not need actual tables in the database—the LLM parses the creation statements and builds the search index directly.

Step 1: Create the schema_meta table

CREATE TABLE IF NOT EXISTS schema_meta (
    db_name VARCHAR(255) COMMENT 'Database name',
    create_table_statement TEXT COMMENT 'Statement to create a table or index'
) COMMENT='Stores DDL or index definitions from external databases';

Step 2: Import DDL or index definitions

Insert the creation statements from your external data source into schema_meta.

  • For relational databases (Hive, MySQL, PostgreSQL): each statement must start with CREATE.

  • For Elasticsearch: each statement must start with PUT.

-- Relational database DDL example (Hive)
INSERT INTO schema_meta (db_name, create_table_statement) VALUES
('my_hive_db', "CREATE TABLE dwd.dwd_col_df_court_case_detail (
    case_id BIGINT COMMENT 'Case ID',
    created_date STRING COMMENT 'Creation time (yyyy-mm-dd)'
)
COMMENT 'Case order table'
PARTITIONED BY (dt STRING)
STORED AS ORC;");

-- Elasticsearch index definition example
INSERT INTO schema_meta (db_name, create_table_statement) VALUES
('my_es_cluster', 'PUT /product_info
{
  "settings": {"number_of_shards": 5, "number_of_replicas": 1},
  "mappings" : {
    "properties": {
      "productName": {"type": "text", "analyzer": "ik_smart"},
      "describe": {"type": "text", "analyzer": "ik_smart"}
    }
  }
}');

Batch import using Python scripts

To import DDL for many tables at once, use the following Python scripts.

Batch import from a relational database

This example uses SelectDB as the source database. The script retrieves DDL for all tables and inserts them into the schema_meta table in PolarDB for AI.

  1. Install the dependency: pip install pymysql.

  2. Save the following files in your working directory.

    selectdb_data_transfer.py

    from mysql_connector import MysqlConnector
    
    import json
    
    def parse_ddl_from_json(file_name: str) -> list[str]:
        ddl_statements = []
        with open(file_name, 'r', encoding='utf-8') as f:
            # Parse the JSON file into a Python object
            data = json.load(f)
    
            # Extract the DDL statement from each table entry
            for table_info in data:
                create_table_sql = table_info.get('Create Table')
                if create_table_sql:
                    ddl_statements.append(create_table_sql)
    
        return ddl_statements
    
    def insert_into_schema_meta(db_name, ddls: list[str], connector: MysqlConnector):
        """
        Insert DDL statements into the schema_meta table in PolarDB.
        """
        insert_sql = """insert into schema_meta (db_name, create_table_statement) values (%s, %s)"""
        for i, ddl in enumerate(ddls):
            rows = connector.execute_update(insert_sql, (db_name, ddl))
            if rows == 0:
                print('insert failed: ', i)
                break
    
    def show_index(connector: MysqlConnector, db_name):
        table_names = connector.execute_query(
            f"select table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_schema = '{db_name}'")
        table_ddls = []
        for result in table_names:
            # Adjust the column name based on the source database type.
            # For PolarDB, use TABLE_NAME.
            table_name = result['table_name']
            table_ddl = connector.execute_query(f"show create table {table_name}")[0]
            table_ddls.append(table_ddl)
        json.dump(table_ddls, open('table_ddls.json', 'w'), indent=4)
    
    
    if __name__ == '__main__':
        selectdb_host = "<SelectDB endpoint>"
        selectdb_user = "<SelectDB username>"
        selectdb_password = "<SelectDB password>"
        selectdb_database = "<SelectDB database>"
        selectdb_port = <SelectDB port>
    
        polar_host = "<PolarDB endpoint>"
        polar_user = "<PolarDB username>"
        polar_password = "<PolarDB password>"
        polar_database = "<PolarDB database>"
        polar_port = <PolarDB port>
    
        # Step 1: Retrieve DDL for all tables from SelectDB
        # selectdb_connector = MysqlConnector(selectdb_host, selectdb_port, selectdb_user, selectdb_password, selectdb_database)
        # show_index(selectdb_connector, selectdb_database)
    
        # Step 2: Parse the exported JSON file to get DDL statements
        # results = parse_ddl_from_json('table_ddls.json')
        #
        # Step 3: Insert DDL into PolarDB schema_meta
        # polar_mysql_connector = MysqlConnector(polar_host, polar_port, polar_user, polar_password, polar_database)
        # insert_into_schema_meta(polar_database, results, polar_mysql_connector)

    mysql_connector.py

    import pymysql
    from pymysql import Error
    from pymysql.cursors import DictCursor
    
    class MysqlConnector:
        def __init__(self, host, port, user, password, database, charset='utf8mb4'):
            """
            Initialize the database connection.
            :param host: MySQL host address
            :param user: Username
            :param password: Password
            :param database: Database name
            :param port: Port number, default is 3306
            :param charset: Character set, default is utf8mb4
            """
            self.connection = pymysql.connect(
                host=host,
                user=user,
                password=password,
                database=database,
                port=port,
                charset=charset,
                cursorclass=DictCursor  # Return results as dictionaries
            )
    
        def execute_query(self, sql, params=None):
            """
            Execute a SELECT query.
            :param sql: SQL statement
            :param params: Parameter dictionary or tuple
            :return: List of query results (each element is a dictionary)
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(sql, params)
                    result = cursor.fetchall()
                    return result
            except Error as e:
                print(f"[ERROR] Query execution failed: {e}")
                return None
    
        def execute_update(self, sql, params=None):
            """
            Execute UPDATE, INSERT, or DELETE statements.
            :param sql: SQL statement
            :param params: Parameter dictionary or tuple
            :return: Number of affected rows
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.execute(sql, params)
                self.connection.commit()
                return cursor.rowcount
            except Error as e:
                print(f"[ERROR] Update execution failed: {e}")
                self.connection.rollback()
                return 0
    
        def execute_update_multirows(self, sql, params_list=None):
            """
            Execute batch INSERT, UPDATE, or DELETE statements.
            :param sql: SQL statement
            :param params_list: List of parameter tuples
            :return: Number of affected rows
            """
            try:
                with self.connection.cursor() as cursor:
                    cursor.executemany(sql, params_list)
                self.connection.commit()
                return cursor.rowcount
            except Error as e:
                print(f"[ERROR] Update execution failed: {e}")
                self.connection.rollback()
                return 0
    
        def begin_transaction(self):
            """ Start a transaction. """
            try:
                self.connection.begin()
            except Error as e:
                print(f"[ERROR] Failed to start transaction: {e}")
    
        def commit_transaction(self):
            """ Commit the transaction. """
            try:
                self.connection.commit()
            except Error as e:
                print(f"[ERROR] Failed to commit transaction: {e}")
    
        def rollback_transaction(self):
            """ Roll back the transaction. """
            try:
                self.connection.rollback()
            except Error as e:
                print(f"[ERROR] Failed to roll back transaction: {e}")
    
        def close(self):
            """ Close the database connection. """
            if self.connection:
                self.connection.close()
    
        def __enter__(self):
            return self
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()
  3. Update the connection parameters for SelectDB and PolarDB in selectdb_data_transfer.py.

  4. Run the script. First retrieve and review the DDL for all tables to confirm correctness, then run the batch import.

Batch import from Elasticsearch

  1. Save the following code as es_data_transfer.py.

  2. import base64
    import http.client
    import json
    
    # Cluster connection settings
    ClusterHost = "<elasticsearch-host>:<elasticsearch-port>"
    ClusterUserName = "<elasticsearch-username>"
    ClusterPassword = "<elasticsearch-password>"
    DEFAULT_REPLICAS = 0
    
    def httpRequest(method, host, endpoint, params="", username="", password=""):
        conn = http.client.HTTPConnection(host)
        headers = {}
        if username:
            auth_str = f"{username}:{password}".encode('utf-8')
            base64string = base64.b64encode(auth_str).decode('utf-8')
            headers["Authorization"] = "Basic {}".format(base64string)
        if method == "GET":
            headers["Content-Type"] = "application/x-www-form-urlencoded"
            conn.request(method=method, url=endpoint, headers=headers)
        else:
            headers["Content-Type"] = "application/json"
            if params:
                conn.request(method=method, url=endpoint, body=params.encode('utf-8'), headers=headers)
            else:
                conn.request(method=method, url=endpoint, headers=headers)
        response = conn.getresponse()
        res = response.read().decode('utf-8')
        return res
    
    def httpGet(host, endpoint, username="", password=""):
        return httpRequest("GET", host, endpoint, "", username, password)
    def httpPost(host, endpoint, params, username="", password=""):
        return httpRequest("POST", host, endpoint, params, username, password)
    def httpPut(host, endpoint, params, username="", password=""):
        return httpRequest("PUT", host, endpoint, params, username, password)
    def getIndices(host, username="", password=""):
        endpoint = "/_cat/indices"
        indicesResult = httpGet(ClusterHost, endpoint, ClusterUserName, ClusterPassword)
        indicesList = indicesResult.split("\n")
        indexList = []
        for indices in indicesList:
            if (indices.find("open") > 0):
                indexList.append(indices.split()[2])
        return indexList
    def getSettings(index, host, username="", password=""):
        endpoint = "/" + index + "/_settings"
        indexSettings = httpGet(host, endpoint, username, password)
        settingsDict = json.loads(indexSettings)
        # Shard count matches the source cluster index
        number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
        # Replica count defaults to 0
        number_of_replicas = DEFAULT_REPLICAS
        newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
        return newSetting
    def getMapping(index, host, username="", password=""):
        endpoint = "/" + index + "/_mapping"
        indexMapping = httpGet(host, endpoint, username, password)
        mappingDict = json.loads(indexMapping)
        mappings = json.dumps(mappingDict[index]["mappings"])
        newMapping = "\"mappings\" : " + mappings
        return newMapping
    def createIndexStatement(IndexName):
        settingStr = getSettings(IndexName, ClusterHost, ClusterUserName, ClusterPassword)
        mappingStr = getMapping(IndexName, ClusterHost, ClusterUserName, ClusterPassword)
        createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
        return createstatement
    def createIndex(IndexName, newIndexName=""):
        if (newIndexName == "") :
            newIndexName = IndexName
        createstatement = createIndexStatement(IndexName)
        print ("PUT /" + newIndexName + "\n" + createstatement)
    
    indexList = getIndices(ClusterHost, ClusterUserName, ClusterPassword)
    systemIndex = []
    for index in indexList:
        if (index.startswith(".")):
            systemIndex.append(index)
        else :
            createIndex(index, index)
    # Uncomment to review system indexes that are automatically skipped
    # if (len(systemIndex) > 0) :
    #     for index in systemIndex:
    #         print (index + " might be a system index and will not be recreated. Handle it separately if needed.")

  3. Update the Elasticsearch connection parameters in the script.

  4. Run the script to export index creation statements. The output looks like this:

PUT /product_info
{
"settings": {"number_of_shards": 5, "number_of_replicas": 0},
"mappings" : {"properties": {"annual_rate": {"type": "keyword"}, "describe": {"type": "text", "analyzer": "ik_smart"}, "productName": {"type": "text", "analyzer": "ik_smart"}}}
}
PUT /user_info
{
"settings": {"number_of_shards": 5, "number_of_replicas": 0},
"mappings" : {"properties": {"address": {"type": "text", "analyzer": "ik_smart"}, "describe": {"type": "text", "analyzer": "ik_smart"}, "userName": {"type": "text", "analyzer": "ik_smart"}}}
}
  1. Insert the exported statements into the schema_meta table in PolarDB for AI.

Step 3: Build the search index

Important

The following operation drops the schema_index table. Any previously built index will be lost. If you already have a working index, back it up or proceed with caution. This operation is for demonstration purposes only.

-- 1. Drop the existing index table
/*polar4ai*/DROP TABLE schema_index;

-- 2. Create a new vector index table
/*polar4ai*/CREATE TABLE schema_index(
  id integer,
  table_name varchar,
  table_comment text_ik_max_word,
  table_ddl text_ik_max_word,
  column_names text_ik_max_word,
  column_comments text_ik_max_word,
  sample_values text_ik_max_word,
  vecs vector_768,
  ext text_ik_max_word,
  database_service text_ik_max_word,
  PRIMARY key (id)
);

-- 3. Start the asynchronous indexing task from schema_meta
/*polar4ai*/SELECT * FROM PREDICT (
MODEL _polar4ai_text2vec, SELECT ''
) WITH (mode='async', resource='schema_meta', data_service = 'ES')
INTO schema_index;
You must explicitly specify data_service = 'ES' when your schema_meta table contains Elasticsearch index definitions.

After the command runs, save the returned task ID to check progress in the next step.

Step 4: Check the task status

/*polar4ai*/SHOW TASK `<task_id>`;

When the status changes to finish, the index is ready.

Step 5: Query data using Data-Agent

/*polar4ai*/SELECT * FROM PREDICT (MODEL _polar4ai_nl2sql, select 'short-term high-yield products') WITH (basic_index_name='schema_index');

FAQ

The `SHOW TASK` command returns a status of `fail`. What should I check?

Check permissions first: the database account running PREDICT must have read permissions on schema_info or schema_meta and write permissions on schema_index. If permissions are correct, the issue is usually malformed metadata.

  • In schema_info mode: verify that the ext field uses the format <database_name>.<table_name>.<column_name> and that db_type is consistent across all rows for the same logical database.

  • In schema_meta mode: verify that each DDL starts with CREATE and each Elasticsearch definition starts with PUT.

NL2SQL is returning incorrect results. How do I improve accuracy?

The quality of results depends directly on the metadata you provided. Use the following symptom-to-fix guide to diagnose and resolve specific issues:

Symptom Root cause Fix
Query references the wrong table table_comment is too vague or missing Add a description that explains the business purpose of the table, not just its technical structure
JOIN results are wrong or missing Foreign key relationships are not defined Check that is_foreign and ext are set correctly for all foreign key columns. The LLM cannot infer join relationships that are not explicitly defined
Filter conditions do not match actual values sample_values does not cover the values users query by Add representative values, especially for status fields, categories, and enumeration columns