Apache Kafka:与Storm集成(一)
在本章中,我们将学习如何将Kafka与Apache Storm相结合。
Storm
Storm最初是由内森·马尔斯和巴克类型的团队创造的。在很短的时间内,Apache Storm成为分布式实时处理系统的标准,允许您处理大量数据。Storm速度非常快,一个基准测试显示它每秒处理超过一百万个元组。Apache Storm持续运行,消耗来自配置源(喷口)的数据,并将数据传递到处理管道(螺栓)。喷口和螺栓构成一个拓扑。
集成Storm
Kafka和Storm自然是相辅相成的,他们强有力的合作使得实时流分析能够用于快速移动的大数据。Kafka和Storm集成使开发者更容易从Storm拓扑中获取和发布数据流。
概念流程
喷口是数据流的源头。例如,喷口可以读取Kafka主题中的元组,并以流的形式发出它们。螺栓消耗输入流、过程,并可能发出新的流。螺栓可以做任何事情,包括运行函数、过滤元组、进行流聚合、流连接、与数据库对话等等。Storm拓扑中的每个节点并行执行。拓扑无限期运行,直到您终止它。Storm将自动重新分配任何失败的任务。此外,Storm保证不会丢失数据,即使机器停机和消息丢失。
让我们详细了解Kafka-Storm集成API。Kafka和Storm有三个主要的结合点。它们如下:
BrokerHosts - ZkHosts & StaticHosts
BrokerHosts是一个接口,ZkHosts和StaticHosts是它的两个主要实现。ZkHosts用于通过维护ZooKeeper中的细节来动态跟踪Kafka经纪人,而StaticHosts用于手动/静态设置Kafka经纪人及其细节。ZkHosts是访问Kafka经纪人的简单快捷的方式。
ZkHosts的构造函数如下:
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
其中brokerZkStr是ZooKeeper主机,brokerZkPath是维护 Kafka代理细节的ZooKeeper路径。
KafkaConfig API
该API用于定义Kafka集群的配置设置。KafkaConfig的构造函数定义如下
public KafkaConfig(BrokerHosts hosts, string topic)
Hosts:可以是ZkHosts / StaticHosts.
Topic:主题名字
SpoutConfig API
Spoutconfig是KafkaConfig的扩展,支持额外的ZooKeeper信息。
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
Hosts:BrokerHosts可以是任何BrokerHosts接口的实现。
Topic:主题名字
zkRoot : ZooKeeper 跟路径.
id :喷口存储了它在ZooKeeper中消耗的补偿的状态。id唯一地标识您的喷口。
SchemeAsMultiScheme
SchemeAsMultiScheme是一个接口,它规定了Kafka使用的字节缓冲区如何转换成Storm元组。它源自MultiScheme,接受方案类的实现。Scheme类有很多实现,其中一个就是StringScheme,它将字节解析为一个简单的字符串。它还控制输出字段的命名。签名定义如下。
public SchemeAsMultiScheme(Scheme scheme)
Scheme:kafka消耗的字节缓冲区。
KafkaSpout API
KafkaSpout是我们的喷口实现,它将与Storm集成。它从Kafka主题中提取信息,并以元组的形式将其释放到Storm生态系统中。KafkaSpout从SpoutConfig获取其配置细节。
下面是创建一个简单Kafka喷口的示例代码。
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);