All Products
Search
Document Center

E-MapReduce:Use EMR to transmit MySQL binary logs in near real time

Last Updated:Sep 14, 2023

This topic describes how to use plug-ins in Alibaba Cloud Simple Log Service and an E-MapReduce (EMR) cluster to transmit MySQL binary logs in near real time.

Prerequisites

  • An EMR Hadoop cluster is created. For more information, see Create a cluster.

  • A MySQL database, such as an ApsaraDB RDS for MySQL database or a PolarDB for Xscale (PolarDB-X) database, is created. The binary logging feature must be enabled for the MySQL database, and the binary log format must be set to ROW.

    An ApsaraDB RDS for MySQL instance is used as an example in this topic. For information about how to create an ApsaraDB RDS for MySQL instance, see Create an ApsaraDB RDS for MySQL instance.

    Note

    By default, the binary logging feature is enabled for the ApsaraDB RDS for MySQL instance.

Procedure

  1. Connect to the ApsaraDB RDS for MySQL instance and add user permissions.

    1. Run a command to connect to the ApsaraDB RDS for MySQL instance. For more information, see Use a database client or the CLI to connect to an ApsaraDB RDS for MySQL instance.

    2. Run the following commands to add user permissions:

      CREATE USER canal IDENTIFIED BY 'canal';
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
      FLUSH PRIVILEGES;
  2. Add a configuration file for Simple Log Service. For more information, see Collect MySQL binary logs.

    Note

    A project named canaltest and a Logstore named canal are created.

    Check whether log data is uploaded in the Simple Log Service console. If log data fails to be uploaded, use the log collection feature of Simple Log Service to troubleshoot issues.

  3. Compile a JAR package and upload it to an Object Storage Service (OSS) bucket.

    1. Open Git Bash on your on-premises machine and run the following command to copy sample code:

      git clone https://github.com/aliyun/aliyun-emapreduce-demo.git
    2. Modify sample code.

      LoghubSample is contained in sample code. This class is used to collect data from Simple Log Service and display the data. Sample code after the modification:

      package com.aliyun.emr.example
      import org.apache.spark.SparkConf
      import org.apache.spark.storage.StorageLevel
      import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
      import org.apache.spark.streaming.{Milliseconds, StreamingContext}
      object LoghubSample {
      def main(args: Array[String]): Unit = {
      if (args.length < 7) {
       System.err.println(
         """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
           |            
           |           
         """.stripMargin)
       System.exit(1)
      }
      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)
      val conf = new SparkConf().setAppName("Mysql Sync")
      //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
       ssc,
       loghubProject,
       logStore,
       loghubGroupName,
       endpoint,
       1,
       accessKeyId,
       accessKeySecret,
       StorageLevel.MEMORY_AND_DISK)
      loghubStream.foreachRDD(rdd =>
         rdd.saveAsTextFile("/mysqlbinlog")
      )
      ssc.start()
      ssc.awaitTermination()
      }
      }

      In the preceding sample code, loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") ) is changed to loghubStream.foreachRDD(rdd => rdd.saveAsTextFile("/mysqlbinlog")). This way, data from Spark Streaming jobs that are run in EMR can be stored to HDFS of EMR.

    3. Debug the code on your on-premises machine and run the following command to package the code:

      mvn clean install
    4. Upload the JAR package to an OSS bucket.

      Create an OSS bucket and upload the JAR package to the OSS bucket. For more information, see Create buckets and Upload objects.

      Note

      In this example, an OSS bucket named EMR-test is created. The examples-1.1-shaded.jar package is uploaded to the EMR-test/jar directory.

  4. Submit and run a Spark job.

    spark-submit --master yarn --deploy-mode client \
      --driver-memory 4g --executor-memory 2g --executor-cores 2 \
      --class com.aliyun.EMR.example.LoghubSample \
      oss://EMR-test/jar/examples-1.1-shaded.jar \
      canaltest canal sparkstreaming cn-hangzhou-intranet.log.aliyuncs.com \
      $ALIBABA_CLOUD_ACCESS_KEY_ID $ALIBABA_CLOUD_ACCESS_KEY_SECRET 1
    Note

    You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

  5. Run the following command to view files in the mysqlbinlog directory:

    hadoop fs -ls /mysqlbinlog

    You can also run the hadoop fs -cat /mysqlbinlog/part-00000 command to view the file content.