开发学院

您的位置:首页>教程>正文

教程正文

Apache Kafka:简单的生产者例子(二)

生产者的简单例子

  在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建主题。然后创建一个名为Sim-pleProducer.java的java类,并键入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name”);
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serializa-tion.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

  可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

  使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

输出

  Message sent successfully

  要检查上述输出,请打开新终端,并键入消费者命令行界面命令来接收消息。

>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

消费者的简单例子

  到目前为止,我们已经创建了一个生产者向Kafka集群发送消息。现在让我们创建一个消费者来消费Kafka集群中的信息。KafkaConsumer API 用于消费Kafka集群中的消息。KafkaConsumer类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

  configs:返回消费者配置的映射。

  KafkaConsumer类具有以下重要方法。

  public java.util.Set<TopicPar-tition> assignment():Get the set of partitions currently assigned by the con-sumer.

  public string subscription():Subscribe to the given list of topics to get dynamically as-signed partitions.

  public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener):订阅指定的主题列表,以动态获得签名分区。

  public void unsubscribe():取消订阅指定分区列表中的主题。

  public void sub-scribe(java.util.List<java.lang.String> topics):订阅指定的主题列表,如果给定的主题列表为空,它将被视为取消订阅()。

  public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener):参数模式是指正则表达式格式的订阅模式,侦听器参数从订阅模式中获取通知。

  public void as-sign(java.util.List<TopicParti-tion> partitions):手动为客户分配分区列表。

  poll():获取使用订阅/分配API之一指定的主题或分区的数据。如果在轮询数据之前没有订阅主题,这将返回错误。

  public void commitSync():提交上次poll()返回的所有主题和分区的描述列表的偏移量。相同的操作也适用于commitAsyn()。

  public void seek(TopicPartition partition, long offset):获取消费者将在下一个轮询()方法中使用的当前偏移值。

  public void resume():恢复暂停的分区。

  public void wakeup():唤醒消费者。