GeoMesa is a suite of open source geographic big data processing tools provided by LocationTech. This topic describes how to use DLA Ganos to query HBase and Cassandra databases that are managed based on GeoMesa.

GeoMesa allows you to perform large-scale geospatial queries and analysis on NoSQL distributed storage systems. GeoMesa supports NoSQL databases such as Accumulo, HBase, Google Bigtable, and Cassandra. For more information about GeoMesa, see https://www.geomesa.org/documentation/stable/user/index.html.

Note If you want to use DLA Ganos to query other databases managed based on GeoMesa, modify relevant parameters by following the instructions provided in GeoMesa documentation. For more information about the code, see GitHub sample library of DLA Ganos.

Lindorm(HBase)

  1. Initialize a Spark session.
    val spark = SparkSession.builder
        .appName("Simple Application")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.crossJoin.enabled", "true")
        .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
        .getOrCreate()
    import spark.implicits. _
    
    // Load the JTS package for the Spark session to process spatio-temporal data.
    spark.withJTS 
    val sc = spark.sparkContext
  2. Configure HBase connection parameters and load data by using Spark SQL.
    
    // Specify HBase connection parameters. The catalog name is set to POINT.
    val params = Map(
          "hbase.catalog" -> "AIS",
          "hbase.zookeepers" -> "The ZooKeeper address that is used to connect to HBase",
    
    // Load the AIS dataset.
    val dataFrame = spark.read
          .format("ganos-geometry")
          .options(params)
          .option("ganos.feature", "testpoints")
          .load()
    
     dataFrame.createOrReplaceTempView("testpoints")
     // Create an SQL query job.
     val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)")
     
     // Generate the schema and table content.
     points.printSchema
     points.show
  3. View the results.
    root
     |-- __fid__: string (nullable = false)
     |-- name: string (nullable = true)
     |-- attr: string (nullable = true)
     |-- dtg: timestamp (nullable = true)
     |-- geom: point (nullable = true)
     
    +-------+-----+-----+-------------------+-------------+
    |__fid__| name| attr|                dtg|         geom|
    +-------+-----+-----+-------------------+-------------+
    |      2|name2|name2|2014-01-03 08:00:01|POINT (42 52)|
    |      3|name3|name3|2014-01-04 08:00:01|POINT (43 53)|
    |      4|name4|name4|2014-01-05 08:00:01|POINT (44 54)|
    |      5|name5|name5|2014-01-06 08:00:01|POINT (45 55)|
    |      6|name6|name6|2014-01-07 08:00:01|POINT (46 56)|
    |      7|name7|name7|2014-01-08 08:00:01|POINT (47 57)|
    |      8|name8|name8|2014-01-09 08:00:01|POINT (48 58)|
    |      9|name9|name9|2014-01-10 08:00:01|POINT (49 59)|
    +-------+-----+-----+-------------------+-------------+
Notice You can use DLA Ganos to write DataFrame objects back to databases. Before you perform this operation, you must create a table in which the objects are written. Otherwise, data writing fails.
points.write.format("ganos-geometry").options(dsParams).option("ganos.feature", "testpoints").save()

Cassandra

  1. Initialize a Spark session.
    val spark = SparkSession.builder
        .appName("Simple Application")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.sql.crossJoin.enabled", "true")
        .config("spark.kryo.registrator", classOf[GanosSparkKryoRegistrator].getName)
        .getOrCreate()
    import spark.implicits. _
    
    // Load the JTS package for the Spark session to process spatio-temporal data.
    spark.withJTS 
    val sc = spark.sparkContext
  2. Configure connection parameters.
    val dsParams = Map(
        "cassandra.contact.point" -> "Connection address of Cassandra:9042",
        "cassandra.keyspace" -> "ganos",
        "cassandra.catalog"->"test_sft"
      )
    
    // Create a DataFrame that corresponds to a Cassandra table.
      val dataFrame = spark.read
        .format("ganos-geometry")
        .options(dsParams)
        .option("ganos.feature", "testpoints")
        .load()
      
     dataFrame.createOrReplaceTempView("testpoints")
     // Create an SQL query job.
     val points = spark.sql("select * from testpoints where st_contains(st_makeBox2d(st_point(38,48), st_point(52,62)),geom)")
     
     // Generate the schema and table content.
     points.printSchema
     points.show
  3. View the results.
    root
     |-- __fid__: string (nullable = false)
     |-- name: string (nullable = true)
     |-- attr: string (nullable = true)
     |-- dtg: timestamp (nullable = true)
     |-- geom: point (nullable = true)
     
    +-------+-----+-----+-------------------+-------------+
    |__fid__| name| attr|                dtg|         geom|
    +-------+-----+-----+-------------------+-------------+
    |      2|name2|name2|2014-01-03 08:00:01|POINT (42 52)|
    |      3|name3|name3|2014-01-04 08:00:01|POINT (43 53)|
    |      4|name4|name4|2014-01-05 08:00:01|POINT (44 54)|
    |      5|name5|name5|2014-01-06 08:00:01|POINT (45 55)|
    |      6|name6|name6|2014-01-07 08:00:01|POINT (46 56)|
    |      7|name7|name7|2014-01-08 08:00:01|POINT (47 57)|
    |      8|name8|name8|2014-01-09 08:00:01|POINT (48 58)|
    |      9|name9|name9|2014-01-10 08:00:01|POINT (49 59)|
    +-------+-----+-----+-------------------+-------------+
Notice You can use DLA Ganos to write DataFrame objects back to databases. Before you perform this operation, you must create a table in which the objects are written. Otherwise, data writing fails.
points.write.format("ganos-geometry").options(dsParams).option("ganos.feature", "testpoints").save()