开发学院

您的位置:首页>教程>正文

教程正文

Apache Kafka:集成Spark

  在本章中,我们将讨论如何将Apache Kafka与Spark Streaming API集成在一起。

关于Spark

  Spark Streaming API支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从Kafka, Flume, Twitter等许多来源获取,可以使用复杂的算法进行处理,例如映射、缩减、连接和窗口等高级函数。最后,处理后的数据可以被推出到文件系统、数据库和实时仪表板。弹性分布式数据集(RDD)是斯巴克的基本数据结构。它是一个不可变的分布式对象集合。RDD的每个数据集被分成逻辑分区,这些分区可以在集群的不同节点上计算。

集成Spark

  Kafka是Spark流潜在的信息传递和集成平台。卡夫卡是实时数据流的中心枢纽,在Spark流中使用复杂的算法进行处理。一旦数据被处理,Spark流可能会将结果发布到另一个Kafka主题或存储在HDFS、数据库或仪表板中。下图描述了概念流程。

integration_spark.jpg

  现在,让我们详细看看Kafka-Spark API。

SparkConf API

  它代表Spark应用的配置。用于将各种Spark数设置为键值对。

  SparkConf类有以下方法:

  set(string key, string value):设置配置变量。

  remove(string key):从配置中移除密钥。

  setAppName(string name):为您的应用程序设置应用程序名称。

  get(string key) :获取键

StreamingContext API

  这是Spark功能的主要切入点。SparkContext表示与Spark群集的连接,可用于在群集上创建RDD、累加器和广播变量。构造函数定义如下。

public StreamingContext(String master, String appName, Duration batchDuration,

   String sparkHome, scala.collection.Seq<String> jars, 

   scala.collection.Map<String,String> environment)

  master:要连接到的集群URL (例如mesos://host:port, spark://host:port).

  appName:要显示在群集网络用户界面上的作业名称

  batchDuration:流数据将被分成批次的时间间隔

public StreamingContext(SparkConf conf, Duration batchDuration)

  通过new SparkContext所需的配置来创建StreamingContext。

  conf:Spark参数

  batchDuration:流数据将被分成批次的时间间隔

KafkaUtils API

  KafkaUtils API 用于将Kafka集群连接到Spark流。该API具有如下定义的有效方法创建流。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

  上面显示的方法用于创建一个从Kafka经纪人那里获取消息的输入流。

  ssc:StreamingContext对象

  zkQuorum:Zookeeper配额数

  groupId:此消费者的组id。

  topics:返回要消费的主题地图。

  storageLevel:用于存储接收到的对象的存储级别。

  KafkaUtils API还有另一种方法createDirectStream,用于创建一个输入流,直接从Kafka经纪人那里提取消息,而不使用任何接收者。这个流可以保证Kafka的每一条信息在转换中只包含一次。

  示例应用程序是在Scala中完成的。要编译应用程序,请下载并安装sbt、scala构建工具(类似于maven)。主要应用程序代码如下所示。

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

构建脚本

  spark-kafka的融合依赖于spark、spark streaming和spark Kafka的集成jar。创建一个新的文件build.sbt,并指定应用程序的详细信息及其依赖关系。sbt将在编译和打包应用程序时下载必要的jar。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

编译/打包

  运行以下命令编译并打包应用程序的jar文件。我们需要将jar文件提交到spark控制台来运行应用程序。

sbt package

提交到Spark

  启动Kafka生产者命令行界面(上一章已经解释过),创建一个名为“my-first-topic”的新主题,并提供一些示例消息,如下所示。

另一个spark测试消息

  运行以下命令将应用程序提交到spark控制台。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

  该应用程序的输出示例如下所示。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..