Data in a data warehouse or a data lake is synchronized with that in an upstream business database. When the data in the upstream business database changes, the downstream data warehouse or data lake immediately detects and synchronizes the data changes. In the database field, this scenario is called Change Data Capture (CDC).

Background

In the database field, many CDC schemes and tools are available. However, in the big data field, CDC practices are few, and not many standards or technical implementations are available. Typically, you need to select existing engines and use their capabilities to build your own CDC scheme. The common schemes fall into two categories:

  • Batch update: captures incremental updates in an upstream source table, records the updated data in a new table, and uses the MERGE or UPSERT syntax to merge the table that stores the updated data with an existing table in a downstream warehouse. This method requires that tables have a primary key or a composite primary key. The real-time performance of this method is poor. If the downstream warehouse does not support the MERGE or UPSERT syntax, you need to find your own way to merge the updated data with the existing table. In addition, this method cannot synchronize data deletion. You can synchronize data deletion by deleting the existing table and writing updated data to a new table. However, you need to write full data to a table for each synchronization, and the performance is low. In this method, you need a special field to mark incrementally updated data.
  • Real-time synchronization: captures the binary logs of an upstream source table and replays the binary logs in a downstream data warehouse. Here, the binary logs are in a general sense, and are not limited to MySQL binary logs. This method requires that the downstream warehouse be capable of replaying the binary logs in real time. However, you can record only row changes in the binary logs. Then, the downstream warehouse only needs to support the INSERT, UPDATE, and DELETE operations. Different from the first method, this method can be combined with a streaming system. For example, you can ingest the binary logs to a topic in Kafka in real time. The downstream warehouse can subscribe to the topic to pull the binary logs and replay them in real time.

Batch update

This method is applicable to scenarios where no data deletion needs to be synchronized and data does not need to be synchronized in real time.

  1. Create a MySQL table and insert data to the table.
    CREATE TABLE sales(id LONG, date DATE, name VARCHAR(32), sales DOUBLE, modified DATETIME);
    
    INSERT INTO sales VALUES (1, '2019-11-11', 'Robert', 323.00, '2019-11-11 12:00:05'), (2, '2019-11-11', 'Lee', 500.00, '2019-11-11 16:11:46'), (3, '2019-11-12', 'Robert', 136.00, '2019-11-12 10:23:54'), (4, '2019-11-13', 'Lee', 211.00, '2019-11-13 11:33:27');
    
    SELECT * FROM sales;
    
    +------+------------+--------+-------+---------------------+
    | id   | date       | name   | sales | modified            |
    +------+------------+--------+-------+---------------------+
    | 1    | 2019-11-11 | Robert |   323 | 2019-11-11 12:00:05 |
    | 2    | 2019-11-11 | Lee    |   500 | 2019-11-11 16:11:46 |
    | 3    | 2019-11-12 | Robert |   136 | 2019-11-12 10:23:54 |
    | 4    | 2019-11-13 | Lee    |   211 | 2019-11-13 11:33:27 |
    +------+------------+--------+-------+---------------------+
    Note The modified field is the special field for marking incrementally updated data.
  2. Export all the data in the MySQL table to a Hadoop Distributed File System (HDFS).
    sqoop import --connect jdbc:mysql://emr-header-1:3306/test --username root --password EMRroot1234 -table sales -m1 --target-dir /tmp/cdc/staging_sales
    
    hdfs dfs -ls /tmp/cdc/staging_sales
    Found 2 items
    -rw-r-----   2 hadoop hadoop          0 2019-11-26 10:58 /tmp/cdc/staging_sales/_SUCCESS
    -rw-r-----   2 hadoop hadoop        186 2019-11-26 10:58 /tmp/cdc/staging_sales/part-m-00000
  3. Create a Delta table and import all the data in the MySQL table to the Delta table.
    -- The LOAD DATA INPATH syntax is not applicable to a Delta table. Therefore, you need to create a temporary foreign table first.
    CREATE TABLE staging_sales (id LONG, date STRING, name STRING, sales DOUBLE, modified STRING) USING csv LOCATION '/tmp/cdc/staging_sales/';
    
    CREATE TABLE sales USING delta LOCATION '/user/hive/warehouse/test.db/test' SELECT * FROM staging_sales;
    
    SELECT * FROM sales;
    
    1    2019-11-11    Robert    323.0    2019-11-11 12:00:05.0
    2    2019-11-11    Lee    500.0    2019-11-11 16:11:46.0
    3    2019-11-12    Robert    136.0    2019-11-12 10:23:54.0
    4    2019-11-13    Lee    211.0    2019-11-13 11:33:27.0
    
    -- Delete the temporary table.
    DROP TABLE staging_sales;

    Switch to the HDFS command-line interface (CLI) and delete the temporary directory.

    # Delete the temporary directory.
    hdfs dfs -rm -r -skipTrash /tmp/cdc/staging_sales/
  4. Update data in and insert data to the MySQL table.
    -- Note that Sqoop cannot export deleted data. Therefore, data deletion cannot be synchronized to the target table.
    -- DELETE FROM sales WHERE id = 1;
    UPDATE sales SET name='Robert',modified=now() WHERE id = 2;
    INSERT INTO sales VALUES (5, '2019-11-14', 'Lee', 500.00, now());
    
    SELECT * FROM sales;
    
    +------+------------+--------+-------+---------------------+
    | id   | date       | name   | sales | modified            |
    +------+------------+--------+-------+---------------------+
    | 1    | 2019-11-11 | Robert |   323 | 2019-11-11 12:00:05 |
    | 2    | 2019-11-11 | Robert |   500 | 2019-11-26 11:08:34 |
    | 3    | 2019-11-12 | Robert |   136 | 2019-11-12 10:23:54 |
    | 4    | 2019-11-13 | Lee    |   211 | 2019-11-13 11:33:27 |
    | 5    | 2019-11-14 | Lee    |   500 | 2019-11-26 11:08:38 |
    +------+------------+--------+-------+---------------------+
  5. Use Sqoop to export the updated data.
    sqoop import --connect jdbc:mysql://emr-header-1:3306/test --username root --password EMRroot1234 -table sales -m1 --target-dir /tmp/cdc/staging_sales --incremental lastmodified --check-column modified --last-value "2019-11-20 00:00:00"
    
    hdfs dfs -ls /tmp/cdc/staging_sales/
    Found 2 items
    -rw-r-----   2 hadoop hadoop          0 2019-11-26 11:11 /tmp/cdc/staging_sales/_SUCCESS
    -rw-r-----   2 hadoop hadoop         93 2019-11-26 11:11 /tmp/cdc/staging_sales/part-m-00000
  6. Create a temporary table for storing the updated data and merge the temporary table with the target table.
    CREATE TABLE staging_sales (id LONG, date STRING, name STRING, sales DOUBLE, modified STRING) USING csv LOCATION '/tmp/cdc/staging_sales/';
    
    MERGE INTO sales AS target
    USING staging_sales AS source
    ON target.id = source.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;
    
    SELECT * FROM sales;
    
    1    2019-11-11    Robert    323.0    2019-11-11 12:00:05.0
    3    2019-11-12    Robert    136.0    2019-11-12 10:23:54.0
    2    2019-11-11    Robert    500.0    2019-11-26 11:08:34.0
    5    2019-11-14    Lee    500.0    2019-11-26 11:08:38.0
    4    2019-11-13    Lee    211.0    2019-11-13 11:33:27.0

Real-time synchronization

Compared with batch update, real-time synchronization has fewer limits on scenarios. For example, data deletion can be synchronized with no need to modify the business model or add an additional field. However, it is relatively complex to implement real-time synchronization. If the binary logs of the source database is not in the standard format, you may need to write a user-defined function (UDF) to process the binary logs. For example, the format of the binary logs provided by Log Service is different from that of the binary logs provided by ApsaraDB RDS for MySQL.

In this example, ApsaraDB RDS for MySQL is used as the source database, and Alibaba Cloud Data Transmission Service (DTS) is used to export the binary logs of the source database to a Kafka cluster in real time. You can also use open-source Maxwell or Canal to export the binary logs. Next, the binary logs are periodically read from the Kafka cluster and stored in Alibaba Cloud Object Storage Service (OSS) or HDFS. Then, Spark is used to read and parse the binary logs to obtain inserted, updated, and deleted data. Finally, the merge API of Delta is used to merge the changes in the source table to a Delta table, as shown in the following figure.

cdc_delta
  1. Activate ApsaraDB RDS for MySQL and configure users, databases, and permissions. For more information about how to use RDS, see the RDS documentation. Create a table and insert some data.
    -- You can easily create a table in the RDS console. The final table creation statement is as follows:
    CREATE TABLE `sales` (
      `id` bigint(20) NOT NULL,
      `date` date DEFAULT NULL,
      `name` varchar(32) DEFAULT NULL,
      `sales` double DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8
    
    -- Insert some data and confirm the result.
    INSERT INTO sales VALUES (1, '2019-11-11', 'Robert', 323.00), (2, '2019-11-11', 'Lee', 500.00), (3, '2019-11-12', 'Robert', 136.00), (4, '2019-11-13', 'Lee', 211.00);
    
    SELECT * FROM `sales` ;
    add_data
  2. Create an E-MapReduce Kafka cluster. You can also use an existing E-MapReduce Kafka cluster. Create a topic named sales in the Kafka cluster.

    Bash

    kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2:2181,emr-header-3:2181 --partitions 1 --replication-factor 1 --topic sales
  3. Activate DTS if it has not been activated. Create a synchronization job and set the source instance to the ApsaraDB RDS for MySQL database and the destination instance to the Kafka cluster.
  4. Configure the DTS synchronization channel to synchronize the binary logs of the sales table in the ApsaraDB RDS for MySQL database to the sales topic in the Kafka cluster. If the synchronization is successful, the binary logs can be viewed in the Kafka cluster.
  5. Create a Spark Streaming job to parse the binary logs in Kafka and use the merge API of Delta to replay the binary logs in the destination Delta table in real time. The following code shows the format of the binary logs synchronized by DTS to Kafka. For more information, see Appendix: Introduction to the binary log format in Kafka. Each record in the binary logs represents a change to the data in the database.
    |Field|Value|
    |:--|:--|
    |recordid|1|
    |source|{"sourceType": "MySQL", "version": "0.0.0.0"}|
    |dbtable|delta_cdc.sales|
    |recordtype|INIT|
    |recordtimestamp|1970-01-01 08:00:00|
    |extratags|{}|
    |fields|["id","date","name","sales"]|
    |beforeimages|{}|
    |afterimages|{"sales":"323.0","date":"2019-11-11","name":"Robert","id":"1"}|
    Note Take note of the recordtype, beforeimages, and afterimages fields. The recordtype field indicates the action that was performed on a row in the database. Valid values: INIT, UPDATE, DELETE, and INSERT. The beforeimages field indicates the data before the action was performed. The afterimages field indicates the data after the action was performed.
    • Scala
      • Bash
        spark-shell --master yarn --use-emr-datasource
      • Scala
        import io.delta.tables._
        import org.apache.spark.internal.Logging
        import org.apache.spark.sql.{ AnalysisException, SparkSession}
        import org.apache.spark.sql.expressions.Window
        import org.apache.spark.sql.functions._
        import org.apache.spark.sql.types.{ DataTypes, StructField}
        
        val schema = DataTypes.createStructType(Array[StructField](
          DataTypes.createStructField("id", DataTypes.StringType, false),
          DataTypes.createStructField("date", DataTypes.StringType, true),
          DataTypes.createStructField("name", DataTypes.StringType, true),
          DataTypes.createStructField("sales", DataTypes.StringType, true)
        ))
        
        // init the delta table with the 'INIT' records.
        def initDeltaTable(): Unit = {
          spark.read
              .format("kafka")
              .option("kafka.bootstrap.servers", "192.168.67.88:9092")
              .option("subscribe", "sales")
              .option("failOnDataLoss", value = false)
              .load()
              .createTempView("initData")
        
          // The data synchronized by DTS to Kafka is encoded in the Avro format. E-MapReduce provides the dts_binlog_parser UDF to parse such data.
          val dataBatch = spark.sql(
              """
                |SELECT dts_binlog_parser(value)
                |AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages)
                |FROM initData
              """.stripMargin)
        
          // Select the data where recordType is INIT as the initial data.
          dataBatch.select(from_json(col("afterImages").cast("string"), schema).as("jsonData"))
              .where("recordType = 'INIT'")
              .select(
                col("jsonData.id").cast("long").as("id"),
                col("jsonData.date").as("date"),
                col("jsonData.name").as("name"),
                col("jsonData.sales").cast("decimal(7,2)")).as("sales")
              .write.format("delta").mode("append").save("/delta/sales")
        }
        
        try {
          DeltaTable.forPath("/delta/sales")
        } catch {
          case e: AnalysisException if e.getMessage().contains("is not a Delta table") =>
            initDeltaTable()
        }
        
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "192.168.67.88:9092")
            .option("subscribe", "sales")
            .option("startingOffsets", "earliest")
            .option("maxOffsetsPerTrigger", 1000)
            .option("failOnDataLoss", value = false)
            .load()
            .createTempView("incremental")
        
        // The data synchronized by DTS to Kafka is encoded in the Avro format. E-MapReduce provides the dts_binlog_parser UDF to parse such data.
        val dataStream = spark.sql(
          """
            |SELECT dts_binlog_parser(value)
            |AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages)
            |FROM incremental
          """.stripMargin)
        
        val task = dataStream.writeStream
            .option("checkpointLocation", "/delta/sales_checkpoint")
            .foreachBatch(
              (ops, id) => {
                // The window function is used to obtain the latest modification to a record.
                val windowSpec = Window
                    .partitionBy(coalesce(col("before_id"), col("id")))
                    .orderBy(col("recordId").desc)
                // Obtain the recordType, beforeImages.id, afterImages.id, afterImages.date, afterImages.name, and afterImages.sales fields from binary logs.
                val mergeDf = ops
                    .select(
                      col("recordId"),
                      col("recordType"),
                      from_json(col("beforeImages").cast("string"), schema).as("before"),
                      from_json(col("afterImages").cast("string"), schema).as("after"))
                    .where("recordType ! = 'INIT'")
                    .select(
                      col("recordId"),
                      col("recordType"),
                      when(col("recordType") === "INSERT", col("after.id")).otherwise(col("before.id")).cast("long").as("before_id"),
                      when(col("recordType") === "DELETE", col("before.id")).otherwise(col("after.id")).cast("long").as("id"),
                      when(col("recordType") === "DELETE", col("before.date")).otherwise(col("after.date")).as("date"),
                      when(col("recordType") === "DELETE", col("before.name")).otherwise(col("after.name")).as("name"),
                      when(col("recordType") === "DELETE", col("before.sales")).otherwise(col("after.sales")).cast("decimal(7,2)").as("sales")
                    )
                    .select(
                      dense_rank().over(windowSpec).as("rk"),
                      col("recordType"),
                      col("before_id"),
                      col("id"),
                      col("date"),
                      col("name"),
                      col("sales")
                    )
                    .where("rk = 1")
        
                // The merge condition, which is used to merge incremental data to the Delta table.
                val mergeCond = "target.id = source.before_id"
        
                DeltaTable.forPath(spark, "/delta/sales").as("target")
                    .merge(mergeDf.as("source"), mergeCond)
                    .whenMatched("source.recordType='UPDATE'")
                    .updateExpr(Map(
                      "id" -> "source.id",
                      "date" -> "source.date",
                      "name" -> "source.name",
                      "sales" -> "source.sales"))
                    .whenMatched("source.recordType='DELETE'")
                    .delete()
                    .whenNotMatched("source.recordType='INSERT' OR source.recordType='UPDATE'")
                    .insertExpr(Map(
                      "id" -> "source.id",
                      "date" -> "source.date",
                      "name" -> "source.name",
                      "sales" -> "source.sales"))
                    .execute()
              }
            ).start()
        
        task.awaitTermination()
    • SQL
      • Bash
        streaming-sql --master yarn --use-emr-datasource
      • SQL
        CREATE TABLE kafka_sales
        USING kafka
        OPTIONS(
        kafka.bootstrap.servers='192.168.67.88:9092',
        subscribe='sales'
        );
        
        CREATE TABLE delta_sales(id long, date string,  name string, sales decimal(7, 2))
        USING delta
        LOCATION '/delta/sales';
        
        INSERT INTO delta_sales
        SELECT CAST(jsonData.id AS LONG), jsonData.date, jsonData.name, jsonData.sales
        FROM (
          SELECT from_json(CAST(afterImages as STRING), 'id STRING, date DATE, name STRING, sales STRING') as jsonData
          FROM (
            SELECT dts_binlog_parser(value) AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages)
            FROM kafka_sales
          ) binlog WHERE recordType='INIT'
        ) binlog_wo_init;
        
        CREATE SCAN incremental on kafka_sales USING STREAM
        OPTIONS(
        startingOffsets='earliest',
        maxOffsetsPerTrigger='1000',
        failOnDataLoss=false
        );
        
        
        CREATE STREAM job
        OPTIONS(
        checkpointLocation='/delta/sales_checkpoint'
        )
        MERGE INTO delta_sales as target
        USING (
          SELECT
          recordId,
          recordType,
          before_id,
          id,
          date,
          name,
          sales
          FROM(
            SELECT
            recordId,
            recordType,
            CASE WHEN recordType = "INSERT" then after.id else before.id end as before_id,
            CASE WHEN recordType = "DELETE" then CAST(before.id as LONG) else CAST(after.id as LONG) end as id,
            CASE WHEN recordType = "DELETE" then before.date else after.date end as date,
            CASE WHEN recordType = "DELETE" then before.name else after.name end as name,
            CASE WHEN recordType = "DELETE" then CAST(before.sales as DECIMAL(7, 2)) else CAST(after.sales as DECIMAL(7, 2)) end as sales,
            dense_rank() OVER (PARTITION BY coalesce(before.id,after.id) ORDER BY recordId DESC) as rank
            FROM (
              SELECT
              recordId,
              recordType,
              from_json(CAST(beforeImages as STRING), 'id STRING, date STRING, name STRING, sales STRING') as before,
              from_json(CAST(afterImages as STRING), 'id STRING, date STRING, name STRING, sales STRING') as after
              FROM (
                select dts_binlog_parser(value) as (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages) from incremental
              ) binlog WHERE recordType ! = 'INIT'
            ) binlog_wo_init
          ) binlog_extract WHERE rank=1
        ) as source
        ON target.id = source.before_id
        WHEN MATCHED AND source.recordType='UPDATE' THEN
        UPDATE SET id=source.id, date=source.date, name=source.name, sales=source.sales
        WHEN MATCHED AND source.recordType='DELETE' THEN
        DELETE
        WHEN NOT MATCHED AND (source.recordType='INSERT' OR source.recordType='UPDATE') THEN
        INSERT (id, date, name, sales) values (source.id, source.date, source.name, source.sales);
  6. Read the Delta table after the Spark Streaming job created in the previous step starts.

    Scala

    spark.read.format("delta").load("/delta/sales").show
    
    +---+----------+------+------+
    | id|      date|  name| sales|
    +---+----------+------+------+
    |  1|2019-11-11|Robert|323.00|
    |  2|2019-11-11|   Lee|500.00|
    |  3|2019-11-12|Robert|136.00|
    |  4|2019-11-13|   Lee|211.00|
    +---+----------+------+------+

    Run the following four statements in the RDS console and confirm the result. Note that the record whose id is 2 is updated twice. The final result is theoretically the latest update.

    DELETE FROM sales WHERE id = 1;
    UPDATE sales SET sales = 150 WHERE id = 2;
    UPDATE sales SET sales = 175 WHERE id = 2;
    INSERT INTO sales VALUES (5, '2019-11-14', 'Robert', 233);
    
    SELECT * FROM sales;
    delta_data
    Read the Delta table again. You can find that the data has been updated. In addition, the value of sales in the record whose id is 2 is the latest.
    spark.read.format("delta").load("/delta/sales").show
    
    +---+----------+------+------+
    | id|      date|  name| sales|
    +---+----------+------+------+
    |  5|2019-11-14|Robert|233.00|
    |  3|2019-11-12|Robert|136.00|
    |  4|2019-11-13|   Lee|211.00|
    |  2|2019-11-11|   Lee|175.00|
    +---+----------+------+------+

Best practices

When a stream of data is ingested in real time, the number of small files in Delta increases rapidly. Two workarounds are available to resolve this issue:

  • Partition the table. Generally, data is written to the latest partition, and historical partitions are seldom modified. You can compact historical partitions, and the compaction operation is less likely to fail due to transaction conflicts. After you partition the table, you can query the table with partition predicates. Queries with partition predicates are much more efficient than queries without partition predicates.
  • Perform the compaction operation periodically during stream ingestion. For example, you can perform the compaction operation every 10 mini batches. In this case, the compaction operation will not fail due to transaction conflicts. However, subsequent mini batches of data may fail to be written in time due to the compaction operation. You must control the frequency of the compaction operation.
  • If data does not need to be synchronized in real time and you worry about failures of the compaction operation due to transaction conflicts, you can also adopt the batch processing method. In this method, you need to periodically collect the binary logs to OSS. You can use DTS to export the binary logs to Kafka and then use kafka-connect-oss to collect the binary logs to OSS. You can also use other tools to collect the binary logs to OSS. Then, create a Spark batch job to read the binary logs from OSS and merges all the binary logs to Delta Lake at a time. The following figure shows the flowchart of synchronizing data in the batch processing mode. You can replace the scheme framed in dotted lines with other schemes.cdc_delta

Appendix: Introduction to the binary log format in Kafka

The binary logs synchronized by DTS to Kafka are encoded in the Avro format. You can use the dts_binlog_parser UDF provided by E-MapReduce to decode Avro-encoded binary logs to the text format.

  • Scala
    • Bash
      spark-shell --master local --use-emr-datasource
    • Scala
      Run the following Scala code in the Spark shell:
      spark.read
          .format("kafka")
          .option("kafka.bootstrap.servers", "192.168.67.88:9092")
          .option("subscribe", "sales")
          .option("maxOffsetsPerTrigger", 1000)
          .load()
          .createTempView("kafkaData")
      
      val kafkaDF = spark.sql("SELECT dts_binlog_parser(value) FROM kafkaData")
      
      kafkaDF.show(false)
  • SQL
    • Bash
      streaming-sql --master local --use-emr-datasource
    • SQL
      CREATE TABLE kafkaData
      USING kafka
      OPTIONS(
      kafka.bootstrap.servers='192.168.67.88:9092',
      subscribe='sales'
      );
      
      SELECT dts_binlog_parser(value) FROM kafkaData;

The decoded logs are as follows:

+--------+---------------------------------------------+---------------+----------+-------------------+---------+----------------------------+------------+--------------------------------------------------------------+
|recordid|source                                       |dbtable        |recordtype|recordtimestamp    |extratags|fields                      |beforeimages|afterimages                                                   |
+--------+---------------------------------------------+---------------+----------+-------------------+---------+----------------------------+------------+--------------------------------------------------------------+
|1       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"323.0","date":"2019-11-11","name":"Robert","id":"1"}|
|2       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"500.0","date":"2019-11-11","name":"Lee","id":"2"}   |
|3       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"136.0","date":"2019-11-12","name":"Robert","id":"3"}|
|4       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"211.0","date":"2019-11-13","name":"Lee","id":"4"}   |
+--------+---------------------------------------------+---------------+----------+-------------------+---------+----------------------------+------------+--------------------------------------------------------------+