UP | HOME

实例

Table of Contents

Kafka有以下四个核心API:

这里讲解比较基础的Producer和Consumer

生产者

一个简单的Producer的实例:

public class ProducerDemo {
    public static void main(String[] args){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = null;
        try {
            producer = new KafkaProducer<String, String>(properties);
            for (int i = 0; i < 100; i++) {
                String msg = "Message " + i;
                producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
                System.out.println("Sent:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

使用 KafkaProducer 类的实例来创建一个生产者,KafkaProducer类的参数是一系列属性值:

  • bootstrap.servers : Kafka 集群的IP地址 ,如果Broker数量超过1个,则使用逗号分隔,如 192.168.1.110:9092,192.168.1.110:9092
  • key.serializer & value.serializer : 消息序列化类型
    • 消息是以 键值对 的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型
    • 在Message被发送到Kafka集群之前,Producer需要把 不同类型的消息序列化为二进制类型
for (int i = 0; i < 100; i++) {
    String msg = "Message " + i;
    producer.send(new ProducerRecord<String, String>("HelloWorld", msg));
    System.out.println("Sent:" + msg);
}

生产者使用 send 方法来发送100个消息到 HelloWorld 这个主题

消费者

一个简单的Consumer的实例:

public class ConsumerDemo {
    public static void main(String[] args){
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "127.0.0.1:9092");
    properties.put("group.id", "group-1");
    properties.put("enable.auto.commit", "true");
    properties.put("auto.commit.interval.ms", "1000");
    properties.put("auto.offset.reset", "earliest");
    properties.put("session.timeout.ms", "30000");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, value = %s", record.offset(), record.value());
        System.out.println();
        }
    }
    }
}

使用 KafkaConsumer 类的实例来创建一个消费者,和KafkaProucer类似,KafkaConsumer类的参数也是一系列属性值:

  • bootstrap.servers
  • key.serializer & value.serializer
  • group.id : Consumer分组ID
kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));

消费者订阅了 HelloWorld 这个主题

while (true) {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(200);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, value = %s", record.offset(), record.value());
        System.out.println();
    }
}

消费者使用 poll 方法来轮询获得消息:

  • 等待直到获得所有的消息
  • 参数 200 :超时时间

Previous: 操作

Home: 目录