支持流式入库的系统都基本遵循了一个思路,流式数据按照小批量数据写小文件到存储系统,然后定时合并这些文件。例如,Hive和Delta Lake。Kudu也支持流式入库,但是Kudu的存储是自己设计的,不属于基于大数据存储系统之上的解决方案。本文以Kafka数据源为例介绍,其余数据源根据控制台提示操作即可。
流式入库演变
阶段 | 详细情况 |
---|---|
以前 | 以前针对流式入库的需求,通常都是自己动手,事实表按照时间划分Partition,粒度比较细。例如,五分钟一个Partition,每当一个Partition运行完成,触发一个INSERT OVERWRITE动作,合并该Partition内的文件重新写入分区。但是这么做有以下几个问题:
Hive从0.13版本提供了事务支持,并且从2.0版本开始提供了Hive Streaming功能来实现流式入库的支持。但是在实际使用Hive Streaming功能的案例并不多见。其主要原因如下:
|
现在 | 有了Delta,可以很方便地应对流式入库的场景。只需要以下四个动作:
|
Delta实例展示
从上游Kafka中读取数据,写入Delta表。上游Kafka准备一个Python脚本,不断向Kafka内发送数据。
#! /usr/bin/env python3
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
bootstrap = ['emr-header-1:9092']
topic = 'delta_stream_sample'
def gnerator():
id = 0
line = {}
while True:
line['id'] = id
line['date'] = '2019-11-11'
line['name'] = 'Robert'
line['sales'] = 123
yield line
id = id + 1
def sendToKafka():
producer = KafkaProducer(bootstrap_servers=bootstrap)
for line in gnerator():
data = json.dumps(line).encode('utf-8')
# Asynchronous by default
future = producer.send(topic, data)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError as e:
# Decide what to do if produce request failed
pass
time.sleep(0.1)
sendToKafka()
为了方便,数据只有
id
不一样。{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
启动一个Spark Streaming作业,从Kafka读数据,写入Delta表。
- Scala
- bash
spark-shell --master local --use-emr-datasource
- scala
import org.apache.spark.sql.{functions, SparkSession} import org.apache.spark.sql.types.DataTypes import org.apache.spark.sql.types.StructField val targetDir = "/tmp/delta_table" val checkpointLocation = "/tmp/delta_table_checkpoint" val bootstrapServers = "192.168.XX.XX:9092" val topic = "delta_stream_sample" val schema = DataTypes.createStructType(Array[StructField]( DataTypes.createStructField("id", DataTypes.LongType, false), DataTypes.createStructField("date", DataTypes.DateType, false), DataTypes.createStructField("name", DataTypes.StringType, false), DataTypes.createStructField("sales", DataTypes.StringType, false))) val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .option("maxOffsetsPerTrigger", 1000) .option("startingOffsets", "earliest") .option("failOnDataLoss", value = false) .load() .select(functions.from_json(functions.col("value").cast("string"), schema).as("json")) .select("json.*") val query = lines.writeStream .outputMode("append") .format("delta") .option("checkpointLocation", checkpointLocation) .start(targetDir) query.awaitTermination()
- bash
- SQL
- bash
streaming-sql --master local --use-emr-datasource
- SQL
CREATE TABLE IF NOT EXISTS kafka_table USING kafka OPTIONS( kafka.bootstrap.servers='192.168.XX.XX:9092', subscribe='delta_stream_sample' ); CREATE TABLE IF NOT EXISTS delta_table (id LONG, `date` DATE, name STRING, sales STRING) USING delta LOCATION '/tmp/delta_table'; CREATE SCAN stream_kafka_table on kafka_table USING STREAM OPTIONS( maxOffsetsPerTrigger='1000', startingOffsets='earliest', failOnDataLoss=false ); CREATE STREAM job OPTIONS( checkpointLocation='/tmp/delta_table_checkpoint' ) INSERT INTO delta_table SELECT content.id as id, content.date as date, content.name as name, content.sales as sales FROM ( SELECT from_json(CAST(value as STRING), 'id LONG, `date` DATE, name STRING, sales STRING') as content FROM stream_kafka_table );
- bash
另新建一个spark-shell,确认已经读到数据。
- Scala
val df = spark.read.format("delta").load("/tmp/delta_table") df.select("*").orderBy("id").show(10000)
- SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
现在已经写入了2285条数据。
|2295|2019-11-11|Robert| 123|
|2296|2019-11-11|Robert| 123|
|2297|2019-11-11|Robert| 123|
|2275|2019-11-11|Robert| 123|
|2276|2019-11-11|Robert| 123|
|2277|2019-11-11|Robert| 123|
|2278|2019-11-11|Robert| 123|
|2279|2019-11-11|Robert| 123|
|2280|2019-11-11|Robert| 123|
|2281|2019-11-11|Robert| 123|
|2282|2019-11-11|Robert| 123|
|2283|2019-11-11|Robert| 123|
|2284|2019-11-11|Robert| 123|
|2285|2019-11-11|Robert| 123|
+----+----------+------+-----+
Exactly-Once测试
停掉Spark Streaming作业,再重新启动。重新读一下表,读数据正常的话,数据能够从上次断掉的地方衔接上。
- Scala
df.select("*").orderBy("id").show(10000)
- SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
|2878|2019-11-11|Robert| 123| |2879|2019-11-11|Robert| 123| |2880|2019-11-11|Robert| 123| |2881|2019-11-11|Robert| 123| |2882|2019-11-11|Robert| 123| |2883|2019-11-11|Robert| 123| |2884|2019-11-11|Robert| 123| |2885|2019-11-11|Robert| 123| |2886|2019-11-11|Robert| 123| |2887|2019-11-11|Robert| 123| |2888|2019-11-11|Robert| 123| |2889|2019-11-11|Robert| 123| |2890|2019-11-11|Robert| 123| |2891|2019-11-11|Robert| 123|