Apache Kafka:简单的生产者例子(二)
生产者的简单例子
在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建主题。然后创建一个名为Sim-pleProducer.java的java类,并键入以下代码。
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer” public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name”); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully”); producer.close(); } }
可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
使用以下命令执行应用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
输出
Message sent successfully
要检查上述输出,请打开新终端,并键入消费者命令行界面命令来接收消息。
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
消费者的简单例子
到目前为止,我们已经创建了一个生产者向Kafka集群发送消息。现在让我们创建一个消费者来消费Kafka集群中的信息。KafkaConsumer API 用于消费Kafka集群中的消息。KafkaConsumer类构造函数定义如下。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs:返回消费者配置的映射。
KafkaConsumer类具有以下重要方法。
public java.util.Set<TopicPar-tition> assignment():Get the set of partitions currently assigned by the con-sumer.
public string subscription():Subscribe to the given list of topics to get dynamically as-signed partitions.
public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener):订阅指定的主题列表,以动态获得签名分区。
public void unsubscribe():取消订阅指定分区列表中的主题。
public void sub-scribe(java.util.List<java.lang.String> topics):订阅指定的主题列表,如果给定的主题列表为空,它将被视为取消订阅()。
public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener):参数模式是指正则表达式格式的订阅模式,侦听器参数从订阅模式中获取通知。
public void as-sign(java.util.List<TopicParti-tion> partitions):手动为客户分配分区列表。
poll():获取使用订阅/分配API之一指定的主题或分区的数据。如果在轮询数据之前没有订阅主题,这将返回错误。
public void commitSync():提交上次poll()返回的所有主题和分区的描述列表的偏移量。相同的操作也适用于commitAsyn()。
public void seek(TopicPartition partition, long offset):获取消费者将在下一个轮询()方法中使用的当前偏移值。
public void resume():恢复暂停的分区。
public void wakeup():唤醒消费者。