spark读取kafka数据流

spark读取kafka数据流提供了两种方式createDstream和createDirectStream。

两者区别如下:

1、KafkaUtils.createDstream

构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 
使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上 
A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量 
B、对于不同的group和topic可以使用多个receivers创建不同的DStream 
C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)

成都创新互联公司专注于乌鲁木齐网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供乌鲁木齐营销型网站建设,乌鲁木齐网站制作、乌鲁木齐网页设计、乌鲁木齐网站官网定制、小程序定制开发服务,打造乌鲁木齐网络公司原创品牌,更为您提供乌鲁木齐网站排名全网营销落地服务。

2.KafkaUtils.createDirectStream

区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api 
优点: 
A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。 
B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中 
C、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

public void adclick(){

SparkConf conf = new SparkConf()

.setAppName("")

.setMaster("");

JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(10));

jssc.checkpoint("");

Map kafkaParams = new HashMap();

kafkaParams.put("metadata.broker.list", ConfigurationManager.getProperty("metadata.broker.list"));

String kafkaTopics = ConfigurationManager.getProperty("kafkaTopics");

String[] kafkaTopicsSplits = kafkaTopics.split(",");

Set tops = new HashSet();

for(String xx:kafkaTopicsSplits){

tops.add(xx);

}

JavaPairInputDStream adRealTimeDStream = KafkaUtils.

createDirectStream(

jssc, 

String.class, 

String.class, 

StringDecoder.class, 

StringDecoder.class, 

kafkaParams, 

tops);

jssc.start();

jssc.awaitTermination();

jssc.close();

}


本文名称:spark读取kafka数据流
当前地址:http://ybzwz.com/article/gjepge.html