开发学院

您的位置:首页>教程>正文

教程正文

Apache Kafka:与Storm集成(一)

  在本章中,我们将学习如何将Kafka与Apache Storm相结合。

Storm

  Storm最初是由内森·马尔斯和巴克类型的团队创造的。在很短的时间内,Apache Storm成为分布式实时处理系统的标准,允许您处理大量数据。Storm速度非常快,一个基准测试显示它每秒处理超过一百万个元组。Apache Storm持续运行,消耗来自配置源(喷口)的数据,并将数据传递到处理管道(螺栓)。喷口和螺栓构成一个拓扑。

集成Storm

  Kafka和Storm自然是相辅相成的,他们强有力的合作使得实时流分析能够用于快速移动的大数据。Kafka和Storm集成使开发者更容易从Storm拓扑中获取和发布数据流。

概念流程

  喷口是数据流的源头。例如,喷口可以读取Kafka主题中的元组,并以流的形式发出它们。螺栓消耗输入流、过程,并可能发出新的流。螺栓可以做任何事情,包括运行函数、过滤元组、进行流聚合、流连接、与数据库对话等等。Storm拓扑中的每个节点并行执行。拓扑无限期运行,直到您终止它。Storm将自动重新分配任何失败的任务。此外,Storm保证不会丢失数据,即使机器停机和消息丢失。

  让我们详细了解Kafka-Storm集成API。Kafka和Storm有三个主要的结合点。它们如下:

BrokerHosts - ZkHosts & StaticHosts

  BrokerHosts是一个接口,ZkHosts和StaticHosts是它的两个主要实现。ZkHosts用于通过维护ZooKeeper中的细节来动态跟踪Kafka经纪人,而StaticHosts用于手动/静态设置Kafka经纪人及其细节。ZkHosts是访问Kafka经纪人的简单快捷的方式。

  ZkHosts的构造函数如下:

public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)

  其中brokerZkStr是ZooKeeper主机,brokerZkPath是维护 Kafka代理细节的ZooKeeper路径。

KafkaConfig API

  该API用于定义Kafka集群的配置设置。KafkaConfig的构造函数定义如下

public KafkaConfig(BrokerHosts hosts, string topic)

  Hosts:可以是ZkHosts / StaticHosts.

  Topic:主题名字

SpoutConfig API

  Spoutconfig是KafkaConfig的扩展,支持额外的ZooKeeper信息。

public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)

  Hosts:BrokerHosts可以是任何BrokerHosts接口的实现。

  Topic:主题名字

  zkRoot : ZooKeeper 跟路径.

  id :喷口存储了它在ZooKeeper中消耗的补偿的状态。id唯一地标识您的喷口。

SchemeAsMultiScheme

  SchemeAsMultiScheme是一个接口,它规定了Kafka使用的字节缓冲区如何转换成Storm元组。它源自MultiScheme,接受方案类的实现。Scheme类有很多实现,其中一个就是StringScheme,它将字节解析为一个简单的字符串。它还控制输出字段的命名。签名定义如下。

public SchemeAsMultiScheme(Scheme scheme)

Scheme:kafka消耗的字节缓冲区。

KafkaSpout API

  KafkaSpout是我们的喷口实现,它将与Storm集成。它从Kafka主题中提取信息,并以元组的形式将其释放到Storm生态系统中。KafkaSpout从SpoutConfig获取其配置细节。

  下面是创建一个简单Kafka喷口的示例代码。

// ZooKeeper connection string
BrokerHosts hosts = new ZkHosts(zkConnString);

//Creating SpoutConfig Object
SpoutConfig spoutConfig = new SpoutConfig(hosts, 
   topicName, "/" + topicName UUID.randomUUID().toString());

//convert the ByteBuffer to String.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

//Assign SpoutConfig to KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);