This topic describes how to use Data Lake Analytics (DLA) Ganos to analyze data of spatio-temporal geometries.

Spatio-temporal geometries in DLA Ganos involve the following items:

(1) Spatio-temporal geometric objects.
  • Vector data, such as the Point, Line, and Plane features
  • Spatial data (spatial trajectory data), which consists of vector data and temporal attributes

(2) Operations related to spatio-temporal geometric objects. For example, you can determine their spatial relationships, such as intersection, neighborhood, and inclusion.

DLA Ganos uses the SimpleFeature model that is defined in Open Geospatial Consortium (OGC) standards as its vector data model. Geometric objects include Point, LineString, Polygon, MultiPoint, MultiLineString, and MultiPolygon. Geometries are loaded by the serverless Spark engine and used for computations in the form of user-defined data type (UDT).

DLA Ganos supports the following vector data sources:
  • Hive
  • PolarDB
  • Lindorm(HBase)
  • GeoMesa
  • PostGIS

Procedure

  1. Obtain the SDK.

    Submit a ticket or contact the Alibaba Cloud expert team to obtain the DLA Ganos SDK for compilation and debugging on your on-premises machine. For more information, see Expert service.

  2. Build a development environment and prepare test data.

    After you obtain the SDK, you can build a development environment and create a Ganos Spark job. First, obtain the Automatic Identification System (AIS) sample data.

    sid;date;longitude;latitude;course;speed;sourceid;heading;rot;messageid;status
    205003001;2018-09-03 09:15:50;72.96675666666667;11.982751666666667;130.0;13.1;17;129.0;0.0;1;Sailing
    205003001;2018-09-03 11:10:17;73.27778666666667;11.70024;133.0;13.1;17;131.0;0.0;1;Sailing
    205003001;2018-09-03 11:12:20;73.28349333333334;11.695146666666666;133.0;13.1;17;131.0;0.0;1;Sailing
    205003001;2018-09-03 12:45:25;73.54346666666666;11.469733333333334;134.0;13.2;17;133.0;0.0;1;Sailing
    205003001;2018-09-03 14:21:37;73.75408;11.182;144.8;14.0;17;145.0;0.0;1;Sailing
    205003001;2018-09-03 14:21:45;73.75434666666666;11.1816;144.8;14.0;17;145.0;0.0;1;Sailing
    205003001;2018-09-03 16:49:06;74.06773333333334;10.7524;146.0;12.8;17;143.0;0.0;1;Sailing
    205003001;2018-09-03 17:55:34;74.20368;10.555546666666666;146.0;12.8;17;143.0;0.0;1;Sailing
    205003001;2018-09-03 22:29:56;74.76798666666667;9.754333333333333;142.0;12.6;17;141.0;0.0;1;Sailing
    ...

    After you obtain the test data, upload it to the Object Storage Service (OSS) directory or server to generate a Uniform Resource Identifier (URI) that is used to access the test data, for example, https://<bucket_name>.oss-cn-beijing.aliyuncs.com/xxx/data.csv.

    Create a Maven project named dla-ganos-quickstart in the integrated development environment (IDE), and add the following dependencies to the <dependency> tag in the pom.xml file:

    <dependency>
         <groupId>com.aliyun.ganos</groupId>
         <artifactId>ganos-spark-sdk</artifactId>
         <version>1.0</version>
         <scope>system</scope>
         <systemPath>URI that is used to obtain the package of the DLA Ganos SDK</systemPath>
         </dependency>
    
    <! -- GeoTools deps -->
    <dependency>
         <groupId>org.geotools</groupId>
         <artifactId>gt-referencing</artifactId>
         <version>22.0</version>
    </dependency>
    <dependency>
         <groupId>org.geotools</groupId>
         <artifactId>gt-epsg-hsql</artifactId>
           <version>22.0</version>
    </dependency>
    
    <! -- provided deps -->
    <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-core_2.11</artifactId>
         <version>2.4.3</version>
    </dependency>
    <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-sql_2.11</artifactId>
           <version>2.4.3</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-client</artifactId>
         <version>4.3.0</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-framework</artifactId>
         <version>4.3.0</version>
    </dependency>
    <dependency>
         <groupId>org.apache.curator</groupId>
         <artifactId>curator-recipes</artifactId>
         <version>4.3.0</version>
    </dependency>

    The development environment is built.

  3. Load and process spatio-temporal data.

    Create the QuickStart.scala file and import the file into the following application package:

    import org.apache.log4j.{ Level, Logger}
    import com.aliyun.ganos.spark.GanosSparkKryoRegistrator
    import com.aliyun.ganos.spark.jts._
    import com.aliyun.ganos.spark.SparkUtils._
    import com.aliyun.ganos.spark.utils._
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{ DataFrame, Row, SQLContext, SparkSession}
    import org.apache.spark.sql.functions.col
    import org.locationtech.geomesa.utils.io.WithStore
    import org.geotools.util.factory.Hints
    import org.geotools.data.DataStore
    import org.opengis.feature.simple.{ SimpleFeature, SimpleFeatureType}

    Initialize the SparkSession object and register the Kryo serialization tool GanosSparkKryoRegistrator.

    val spark = SparkSession.builder
        .appName("Simple Application")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.crossJoin.enabled", "true")
        .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
        .getOrCreate()
    import spark.implicits. _
    
    //SparkSession loads the Java Topology Suite (JTS) package to process spatio-temporal data.
    spark.withJTS
    val sc = spark.sparkContext

    Load a CSV file by calling the CSV interface of Spark, and call the ST_MAKEPOINT function to construct a spatio-temporal object. In the following example, the sc.addFile interface of SparkContext is used to download the CSV file to your cluster and then access the file.

    sc.addFile("https://Bucket name.oss-cn-beijing.aliyuncs.com/xxx/data.csv")
    val result = spark.read.format("csv")
        .option("header", "true")
        .option("delimiter", ";")
        .option("inferSchema", "true")
        .load(org.apache.spark.SparkFiles.get("data.csv"))
    
    //Call the ST_MAKEPOINT function to construct a spatio-temporal object.
    val ais = result.withColumn("geom", st_makePoint(col("longitude"), col("latitude")))
    ais.show           //Display the content of DataFrame.
    ais.printSchema    //Display the schema of DataFrame.

    The following results are returned:

    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |      sid|               date|        longitude|          latitude|course|speed|sourceid|heading|rot|messageid|status|                geom|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |205003001|2018-09-03 09:15:50|72.96675666666667|11.982751666666667| 130.0| 13.1|      17|  129.0|0.0|        1|  Sailing|POINT (72.9667566... |
    |205003001|2018-09-03 11:10:17|73.27778666666667|          11.70024| 133.0| 13.1|      17|  131.0|0.0|        1|  Sailing|POINT (73.2777866... |
    |205003001|2018-09-03 11:12:20|73.28349333333334|11.695146666666666| 133.0| 13.1|      17|  131.0|0.0|        1|  Sailing|POINT (73.2834933... |
    |205003001|2018-09-03 12:45:25|73.54346666666666|11.469733333333334| 134.0| 13.2|      17|  133.0|0.0|        1|  Sailing|POINT (73.5434666... |
    |205003001|2018-09-03 14:21:37|         73.75408|            11.182| 144.8| 14.0|      17|  145.0|0.0|        1|  Sailing|POINT (73.75408 1... |
    |205003001|2018-09-03 14:21:45|73.75434666666666|           11.1816| 144.8| 14.0|      17|  145.0|0.0|        1|  Sailing|POINT (73.7543466... |
    |205003001|2018-09-03 16:49:06|74.06773333333334|           10.7524| 146.0| 12.8|      17|  143.0|0.0|        1|  Sailing|POINT (74.0677333... |
    |205003001|2018-09-03 17:55:34|         74.20368|10.555546666666666| 146.0| 12.8|      17|  143.0|0.0|        1|  Sailing|POINT (74.20368 1... |
    |205003001|2018-09-03 22:29:56|74.76798666666667| 9.754333333333333| 142.0| 12.6|      17|  141.0|0.0|        1|  Sailing|POINT (74.7679866... |
    |205003001|2018-09-04 00:07:58|74.96557333333334|           9.46616| 145.0| 12.8|      17|  145.0|0.0|        1|  Sailing|POINT (74.9655733... |
    |205003001|2018-09-04 00:35:11|75.02181333333333|           9.38648| 145.0| 12.9|      17|  145.0|0.0|        1|  Sailing|POINT (75.0218133... |
    |205003001|2018-09-04 00:41:32|75.03477333333333|           9.36856| 145.2| 14.0|      17|  145.0|0.0|        1|  Sailing|POINT (75.0347733... |
    |205003001|2018-09-04 00:41:51|75.03549333333333| 9.367653333333333| 145.2| 14.0|      17|  145.0|0.0|        1|  Sailing|POINT (75.0354933... |
    |205003001|2018-09-04 00:42:02|75.03581333333334| 9.367146666666667| 145.0| 12.8|      17|  145.0|0.0|        1|  Sailing|POINT (75.0358133... |
    |205003001|2018-09-04 00:49:42|75.05202666666666| 9.344533333333333| 144.0| 12.8|      17|  145.0|0.0|        1|  Sailing|POINT (75.0520266... |
    |205003001|2018-09-04 02:30:35|75.25965333333333| 9.049493333333333| 146.0| 13.2|      17|  147.0|0.0|        1|  Sailing|POINT (75.2596533... |
    |205003001|2018-09-04 02:30:45|            75.26| 9.048986666666666| 146.0| 13.2|      17|  147.0|0.0|        1|  Sailing|POINT (75.26 9.04... |
    |205003001|2018-09-04 10:54:37|76.46938666666667| 7.623226666666667| 141.0| 13.7|      17|  139.0|0.0|        1|  Sailing|POINT (76.4693866... |
    |205003001|2018-09-04 10:54:59|         76.47024|           7.62216| 141.0| 13.7|      17|  139.0|0.0|        1|  Sailing|POINT (76.47024 7... |
    |205003001|2018-09-04 10:55:59|76.47261333333333|            7.6192| 142.0| 13.7|      17|  139.0|0.0|        1|  Sailing|POINT (76.4726133...|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    only showing top 20 rows
    
    root
     |-- sid: integer (nullable = true)
     |-- date: timestamp (nullable = true)
     |-- longitude: double (nullable = true)
     |-- latitude: double (nullable = true)
     |-- course: double (nullable = true)
     |-- speed: double (nullable = true)
     |-- sourceid: integer (nullable = true)
     |-- heading: double (nullable = true)
     |-- rot: double (nullable = true)
     |-- messageid: integer (nullable = true)
     |-- status: string (nullable = true)
     |-- geom: point (nullable = true)

    The output shows that the geom column with the POINT data type is added to the schema. This column is the spatio-temporal object that is constructed by using the ST_MAKEPOINT function. You can query and analyze spatio-temporal data in the geom column. For example, you can execute SQL statements in the serverless Spark engine of DLA to query and filter spatio-temporal data in this column.

    ais.createOrReplaceTempView("ais")
    val query = spark.sql(
        "SELECT * FROM ais WHERE " +
          "st_contains(st_geomfromtext('POLYGON((73.0 8.5,75 8.5,75 13.5,73.0 13.5,73.0 8.5))'), geom) AND " +
          "date>=to_timestamp('2018-09-03') AND date <=to_timestamp('2018-10-05') order by date")
    query.show()
    println(s"count=$query.count()")

    The following results are returned:

    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |      sid|               date|        longitude|          latitude|course|speed|sourceid|heading|rot|messageid|status|                geom|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    |205003001|2018-09-03 11:10:17|73.27778666666667|          11.70024| 133.0| 13.1|      17|  131.0|0.0|        1|  Sailing|POINT (73.2777866... |
    |205003001|2018-09-03 11:12:20|73.28349333333334|11.695146666666666| 133.0| 13.1|      17|  131.0|0.0|        1|  Sailing|POINT (73.2834933... |
    |205003001|2018-09-03 12:45:25|73.54346666666666|11.469733333333334| 134.0| 13.2|      17|  133.0|0.0|        1|  Sailing|POINT (73.5434666... |
    |205003001|2018-09-03 14:21:37|         73.75408|            11.182| 144.8| 14.0|      17|  145.0|0.0|        1|  Sailing|POINT (73.75408 1... |
    |205003001|2018-09-03 14:21:45|73.75434666666666|           11.1816| 144.8| 14.0|      17|  145.0|0.0|        1|  Sailing|POINT (73.7543466... |
    |205003001|2018-09-03 16:49:06|74.06773333333334|           10.7524| 146.0| 12.8|      17|  143.0|0.0|        1|  Sailing|POINT (74.0677333... |
    |205003001|2018-09-03 17:55:34|         74.20368|10.555546666666666| 146.0| 12.8|      17|  143.0|0.0|        1|  Sailing|POINT (74.20368 1... |
    |205003001|2018-09-03 22:29:56|74.76798666666667| 9.754333333333333| 142.0| 12.6|      17|  141.0|0.0|        1|  Sailing|POINT (74.7679866... |
    |205003001|2018-09-04 00:07:58|74.96557333333334|           9.46616| 145.0| 12.8|      17|  145.0|0.0|        1|  Sailing|POINT (74.9655733...|
    +---------+-------------------+-----------------+------------------+------+-----+--------+-------+---+---------+------+--------------------+
    
    count=9

    The output shows that nine data records that meet the filter conditions are obtained after the ST_CONTAINS function is used to filter spatio-temporal data.

  4. Submit a job to the serverless Spark engine of DLA.
    Compile the project and package it.
    mvn clean package
    Log on to the DLA console to submit a Spark job. For more information, see Create and run Spark jobs.
  5. Perform other operations.

    You can perform the following operations on the spatio-temporal object:

    (1) Use spatio-temporal geometry functions. For more information, see Spatio-temporal geometry functions.

    (2) Import data into the ApsaraDB for HBase Enhanced Edition (Lindorm). For more information, see Spatio-temporal geometry functions.