开发学院

您的位置:首页>教程>正文

教程正文

Apache Kafka:简单的生产者例子(一)

  让我们创建一个使用Java客户端发布和消费消息的应用程序。Kafka生产者客户端由以API组成。

Kafka生产者API

  让我们理解这一节中最重要的一组Kafka生产者API。KafkaProducer API的中心部分是KafkaProducer类。KafkaProducer类提供了一个选项,可以通过以下方法在其构造函数中连接Kafka代理。

  KafkaProducer类提供发送方法来异步发送消息到主题。send()的用法如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);

  ProducerRecord:生产者管理一个等待发送的记录缓冲区。

  Callback:用户提供的回调,当记录被服务器确认时执行(null表示没有回调)。

  KafkaProducer类提供了一种刷新方法,以确保所有以前发送的消息都已实际完成。刷新方法的语法如下

public void flush()

  KafkaProducer类提供partitionFor方法,这有助于获取给定主题的分区元数据。这可以用于自定义分区。该方法的语法如下

public Map metrics()

  它返回由生产者维护的内部度量图。

public void close()

  KafkaProducer类提供关闭方法块,直到所有先前发送的请求完成。

生产者API

  生产者API的核心部分是Producer类。Producer类提供了一个选项,通过以下方法在构造函数中连接Kafka代理。

Producer类

  Producer类提供send方法,使用以下方式向单个或多个主题发送消息。

public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

  有两种类型的生产者:同步和异步。

  相同的API配置也适用于同步生成器。它们之间的区别是同步producer直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,异步producer是首选。在像0.8这样的早期版本中,异步生成器没有send()的回调来注册错误处理程序。这仅在0.9的当前版本中可用。

public void close()

  Producer类提供了关闭连接到所有Kafka生产者池的方法。

配置选项

  下面列出了Producer API的主要配置设置,以便更好地理解

  client.id:标识生产者应用程序

  producer.type:同步或异步

  acks:ack配置控制生产者请求下的标准被认为是完整的

  retries:如果生产者请求失败,则使用特定值自动重试。

  bootstrap.servers:经纪人列表。

  linger.ms:如果您想减少请求的数量,可以将linger.ms设置为大于某个值的值。

  key.serializer:序列化程序接口的key。

  value.serializer:序列化程序接口的值。

  batch.size:缓冲大小

  buffer.memory:控制生产者可用于缓冲的内存总量。

ProducerRecord API

    ProducerRecord是发送到Kafka集群的键/值对。ProducerRecord类构造函数,用于使用以下方式创建带有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)

  Topic: 将附加到记录中的用户定义的主题名称。

  Partition :分区数

  Key:将包含在记录中的key。

  Value : 记录正文

public ProducerRecord (string topic, k key, v value)

  ProducerRecord类构造函数用于创建带有键、值对且不带分区的记录。

  Topic: 将附加到记录中的用户定义的主题名称。

  Key:将包含在记录中的key。

  Value : 记录正文

public ProducerRecord (string topic, v value)

  ProducerRecord类创建一个没有分区和键的记录。

  Topic: 将附加到记录中的用户定义的主题名称。

  Value : 记录正文

  下面列出了ProducerRecord类的方法

  public string topic():将附加到记录中的主题。

  public K key():将包含在记录中的密钥。如果没有这样的键,空值将在这里被重新翻转。

  public V value():记录内容。

  partition():记录的分区计数