Apache Kafka:简单的生产者例子(一)
让我们创建一个使用Java客户端发布和消费消息的应用程序。Kafka生产者客户端由以API组成。
Kafka生产者API
让我们理解这一节中最重要的一组Kafka生产者API。KafkaProducer API的中心部分是KafkaProducer类。KafkaProducer类提供了一个选项,可以通过以下方法在其构造函数中连接Kafka代理。
KafkaProducer类提供发送方法来异步发送消息到主题。send()的用法如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord:生产者管理一个等待发送的记录缓冲区。
Callback:用户提供的回调,当记录被服务器确认时执行(null表示没有回调)。
KafkaProducer类提供了一种刷新方法,以确保所有以前发送的消息都已实际完成。刷新方法的语法如下
public void flush()
KafkaProducer类提供partitionFor方法,这有助于获取给定主题的分区元数据。这可以用于自定义分区。该方法的语法如下
public Map metrics()
它返回由生产者维护的内部度量图。
public void close()
KafkaProducer类提供关闭方法块,直到所有先前发送的请求完成。
生产者API
生产者API的核心部分是Producer类。Producer类提供了一个选项,通过以下方法在构造函数中连接Kafka代理。
Producer类
Producer类提供send方法,使用以下方式向单个或多个主题发送消息。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop);
有两种类型的生产者:同步和异步。
相同的API配置也适用于同步生成器。它们之间的区别是同步producer直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,异步producer是首选。在像0.8这样的早期版本中,异步生成器没有send()的回调来注册错误处理程序。这仅在0.9的当前版本中可用。
public void close()
Producer类提供了关闭连接到所有Kafka生产者池的方法。
配置选项
下面列出了Producer API的主要配置设置,以便更好地理解
client.id:标识生产者应用程序
producer.type:同步或异步
acks:ack配置控制生产者请求下的标准被认为是完整的
retries:如果生产者请求失败,则使用特定值自动重试。
bootstrap.servers:经纪人列表。
linger.ms:如果您想减少请求的数量,可以将linger.ms设置为大于某个值的值。
key.serializer:序列化程序接口的key。
value.serializer:序列化程序接口的值。
batch.size:缓冲大小
buffer.memory:控制生产者可用于缓冲的内存总量。
ProducerRecord API
ProducerRecord是发送到Kafka集群的键/值对。ProducerRecord类构造函数,用于使用以下方式创建带有分区、键和值对的记录。
public ProducerRecord (string topic, int partition, k key, v value)
Topic: 将附加到记录中的用户定义的主题名称。
Partition :分区数
Key:将包含在记录中的key。
Value : 记录正文
public ProducerRecord (string topic, k key, v value)
ProducerRecord类构造函数用于创建带有键、值对且不带分区的记录。
Topic: 将附加到记录中的用户定义的主题名称。
Key:将包含在记录中的key。
Value : 记录正文
public ProducerRecord (string topic, v value)
ProducerRecord类创建一个没有分区和键的记录。
Topic: 将附加到记录中的用户定义的主题名称。
Value : 记录正文
下面列出了ProducerRecord类的方法
public string topic():将附加到记录中的主题。
public K key():将包含在记录中的密钥。如果没有这样的键,空值将在这里被重新翻转。
public V value():记录内容。
partition():记录的分区计数