Sparkstream读取kafka消息队列数据时,如果Sparkstream突然由于一些非代码原因挂掉,重启Spark集群如何能确保Sparkstream能不丢失kafka队列内的数据。主要利用了kafka集群的offset值。offset记录了kafka每个分区数据读取到了哪里,类似于游标。有三种解决方案操作offset:

1.Checkpoints
利用spark Checkpointing 机制保存数据至hdfs,最大的缺点是代码改动后,之前的checkpoint将失效
详情参考http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

2.Kafka itself
详情参考http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

3.Your own data store
操作前记录offset值,操作后更新offset值。本文介绍如何用mysql存储offset,读取更新offset值

自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
手动提交,设置enable.auto.commit=false,这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息会被重复消费一次。

spark-streaming-kafka-0-8和spark-streaming-kafka-0-10对比

image.png

第一步:创建数据库和表

mysql> create database offset_db; Query OK, 1 row affected (0.01 sec) mysql> use offset_db; Database changed mysql> create table offset_tb( -> topic varchar(32), --kafka消息topic -> groupid varchar(50), --消费者消费组 -> partitions int, --分区 -> fromoffset bigint, --start offset -> untiloffset bigint, -- end offset -> primary key(topic,groupid,partitions) -- 联合主键 -> ); Query OK, 0 rows affected (0.06 sec)

第二步 pom依赖

<properties> <scala.version>2.11.8</scala.version> <spark.version>2.3.1</spark.version> <scalikejdbc.version>2.5.0</scalikejdbc.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- kafka0.8 scala2.11 spark 2.31 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> <!-- scala jdbc --> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.scalikejdbc</groupId> <artifactId>scalikejdbc-config_2.11</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> </dependencies>

第三步:配置管理

在resource目录下新建application.conf

broker.list = "192.168.25.142:9092" #获取不到offset,从0开始读 auto.offset.reset = "smallest" group.id = "test_group" kafka.topics = "test_offset" serializer.class = "kafka.serializer.StringEncoder" request.required.acks = "1" # JDBC settings db.default.driver = "com.mysql.jdbc.Driver" db.default.url="jdbc:mysql://localhost:3306/offset_db" db.default.user="root" db.default.password="root"

工具类读取配置信息

package com.sparkstreaming import com.typesafe.config.ConfigFactory object GlobalConfigUtil { def config = ConfigFactory.load() def brokeLists = config.getString("broker.list") def autoOffsetReset = config.getString("auto.offset.reset") def groupId = config.getString("group.id") def kafkaTopics = config.getString("kafka.topics") def serializerClass = config.getString("serializer.class") def requestRrequiredAcks = config.getString("request.required.acks") }

第四步:代码实现

实现思路
1)在 Direct DStream初始化的时候,需要指定一个包含每个分区的offset用于让Direct DStream从指定位置读取数据。
2)读取消息并处理消息
3)处理完之后存储结果数据
最后,将offsets保存在外部持久化数据库如 HBase, Kafka, HDFS, zookeeper and mysql中

package com.sparkstreaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils} import org.apache.spark.streaming.{Seconds, StreamingContext} import scalikejdbc._ import scalikejdbc.config._ object SparkSteamingKafka { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("sparkSteaming").setMaster("local[2]") val streamingContext = new StreamingContext(sparkConf,Seconds(5)) //构建kafka参数 val kafkaParams = Map[String,String]( "metadata.broker.list"->GlobalConfigUtil.brokeLists, "auto.offset.reset"->GlobalConfigUtil.autoOffsetReset, "group.id"->GlobalConfigUtil.groupId ) //构建topics val topics = GlobalConfigUtil.kafkaTopics.split(",").toSet //从mysql数据库中获取offset DBs.setup() val fromOffset : Map[TopicAndPartition, Long] = DB.readOnly(implicit session => { val sql: SQL[Nothing, NoExtractor] = SQL("select * from offset_tb") val value: SQL[(TopicAndPartition, Long), HasExtractor] = sql.map(rs => { (TopicAndPartition(rs.string("topic"), rs.int("partitions")), rs.long("untiloffset")) }) val tuples: List[(TopicAndPartition, Long)] = value.list().apply() tuples }).toMap //如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费 val messagesDstream = if (fromOffset.isEmpty) { println("从头开始消费...") KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](streamingContext,kafkaParams,topics) } else { println("从已存在记录开始消费...") val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(),mm.message()) KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](streamingContext,kafkaParams,fromOffset,messageHandler) } messagesDstream.foreachRDD(rdd=>{ if(!rdd.isEmpty()){ //输出rdd的数据量 println("本批次读取的数据记录数:"+rdd.count()) //业务逻辑:获取kafka数据,并输出 rdd.foreach(x=>{ println("key:"+x._1+" value:"+x._2) }) //获取fromoffset和untiloffset,同步更新到mysql val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetRanges.foreach(x => { //输出每次消费的主题,分区,开始偏移量和结束偏移量 println(s"---${x.topic},${x.partition},${x.fromOffset},${x.untilOffset}---") //将最新的偏移量信息保存到MySQL表中 DB.autoCommit( implicit session => { SQL("replace into offset_tb(topic,groupid,partitions,fromoffset,untiloffset) values (?,?,?,?,?)") .bind(x.topic,GlobalConfigUtil.groupId,x.partition,x.fromOffset,x.untilOffset) .update().apply() }) }) } }) streamingContext.start() streamingContext.awaitTermination() } }

第五步:测试

5.1:启动kafka,创建topic

#为了测试方便指定一个partition,多个partition是一样的 ./kafka-topics.sh --create --zookeeper 192.168.25.142:2181 --replication-factor 1 --partitions 1 --topic test_offset

5.2:查看mysql数据库,此时没有任何数据

image.png

5.3:通过kafka producer产生500条数据

package com.sparkstreaming import java.util.{Date, Properties} import kafka.producer.{KeyedMessage, Producer, ProducerConfig} object KafkaProducer { def main(args: Array[String]): Unit = { val properties = new Properties() properties.put("serializer.class", GlobalConfigUtil.serializerClass) properties.put("metadata.broker.list", GlobalConfigUtil.brokeLists) properties.put("request.required.acks", GlobalConfigUtil.requestRrequiredAcks) val producerConfig = new ProducerConfig(properties) val producer = new Producer[String, String](producerConfig) val topic = GlobalConfigUtil.kafkaTopics //每次产生500条数据 var i = 0 for (i <- 1 to 500) { val runtimes = new Date().toString val messages = new KeyedMessage[String, String](topic, i + "", "hlw: " + runtimes) producer.send(messages) } println("数据发送完毕...") } }

5.4:启动SparkStreaming程序,从0开始消费

控制台打印信息:

从头开始消费...
本批次读取的数据记录数:500
---test_offset,0,0,500---

查看MySQL表,offset记录成功

image.png

5.5:streaming停机重启再测

关闭SparkStreaming程序,再使用kafka producer生产100条数据,再次启动spark程序(如果spark从500开始消费,说明成功读取了offset,做到at least once)

控制台信息:

从已存在记录开始消费...
本批次读取的数据记录数:100
---test_offset,0,500,600---

查看更新后的offset MySQL数据

image.png

拓展:redis存储offset

InternalRedisClient redis客户端

object InternalRedisClient extends Serializable { @transient private var pool: JedisPool = null def makePool(redisHost: String, redisPort: Int, redisTimeout: Int, maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = { makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000) } def makePool(redisHost: String, redisPort: Int, redisTimeout: Int, maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean, testOnReturn: Boolean, maxWaitMillis: Long): Unit = { if (pool == null) { val poolConfig = new GenericObjectPoolConfig() poolConfig.setMaxTotal(maxTotal) poolConfig.setMaxIdle(maxIdle) poolConfig.setMinIdle(minIdle) poolConfig.setTestOnBorrow(testOnBorrow) poolConfig.setTestOnReturn(testOnReturn) poolConfig.setMaxWaitMillis(maxWaitMillis) pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout, "root_jinkun") val hook = new Thread { override def run = pool.destroy() } sys.addShutdownHook(hook.run) } } def getPool: JedisPool = { assert(pool != null) pool } }

**KafkaRedisStreaming **

object KafkaRedisStreaming { private val LOG = LoggerFactory.getLogger("KafkaRedisStreaming") def initRedisPool() = { // Redis configurations val maxTotal = 20 val maxIdle = 10 val minIdle = 1 val redisHost = "127.0.0.1" val redisPort = 6379 val redisTimeout = 30000 InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle) } /** * 从redis里获取Topic的offset值 * * @param topicName * @param partitions * @return */ def getLastCommittedOffsets(topicName: String, partitions: Int): Map[TopicPartition, Long] = { if (LOG.isInfoEnabled()) LOG.info("||--Topic:{},getLastCommittedOffsets from Redis--||", topicName) //从Redis获取上一次存的Offset val jedis = InternalRedisClient.getPool.getResource val fromOffsets = collection.mutable.HashMap.empty[TopicPartition, Long] for (partition <- 0 to partitions - 1) { val topic_partition_key = topicName + "_" + partition val lastSavedOffset = jedis.get(topic_partition_key) val lastOffset = if (lastSavedOffset == null) 0L else lastSavedOffset.toLong fromOffsets += (new TopicPartition(topicName, partition) -> lastOffset) } jedis.close() fromOffsets.toMap } def main(args: Array[String]): Unit = { //初始化Redis Pool initRedisPool() val conf = new SparkConf() .setAppName("ScalaKafkaStream") .setMaster("local[2]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(3)) val bootstrapServers = "hadoop1:9092,hadoop2:9092,hadoop3:9092" val groupId = "kafka-test-group" val topicName = "Test" val maxPoll = 20000 val kafkaParams = Map( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG -> groupId, ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPoll.toString, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 这里指定Topic的Partition的总数 val fromOffsets = getLastCommittedOffsets(topicName, 3) // 初始化KafkaDS val kafkaTopicDS = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)) kafkaTopicDS.foreachRDD(rdd => { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 如果rdd有数据 if (!rdd.isEmpty()) { val jedis = InternalRedisClient.getPool.getResource val p = jedis.pipelined() p.multi() //开启事务 // 处理数据 rdd .map(_.value) .flatMap(_.split(" ")) .map(x => (x, 1L)) .reduceByKey(_ + _) .sortBy(_._2, false) .foreach(println) //更新Offset offsetRanges.foreach { offsetRange => println("partition : " + offsetRange.partition + " fromOffset: " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset) val topic_partition_key = offsetRange.topic + "_" + offsetRange.partition p.set(topic_partition_key, offsetRange.untilOffset + "") } p.exec() //提交事务 p.sync //关闭pipeline jedis.close() } }) ssc.start() ssc.awaitTermination() } }

0