实例
Kafka有以下四个核心API:
- Producer
- Consumer
- Streamer
- Connector
这里讲解比较基础的Producer和Consumer
生产者
一个简单的Producer的实例:
public class ProducerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("acks", "all"); properties.put("retries", 0); properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
使用 KafkaProducer 类的实例来创建一个生产者,KafkaProducer类的参数是一系列属性值:
- bootstrap.servers : Kafka 集群的IP地址 ,如果Broker数量超过1个,则使用逗号分隔,如 192.168.1.110:9092,192.168.1.110:9092
- key.serializer & value.serializer : 消息序列化类型
- 消息是以 键值对 的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型
- 在Message被发送到Kafka集群之前,Producer需要把 不同类型的消息序列化为二进制类型
for (int i = 0; i < 100; i++) { String msg = "Message " + i; producer.send(new ProducerRecord<String, String>("HelloWorld", msg)); System.out.println("Sent:" + msg); }
生产者使用 send 方法来发送100个消息到 HelloWorld 这个主题
消费者
一个简单的Consumer的实例:
public class ConsumerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); properties.put("group.id", "group-1"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("HelloWorld")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
使用 KafkaConsumer 类的实例来创建一个消费者,和KafkaProucer类似,KafkaConsumer类的参数也是一系列属性值:
- bootstrap.servers
- key.serializer & value.serializer
- group.id : Consumer分组ID
kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
消费者订阅了 HelloWorld 这个主题
while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(200); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } }
消费者使用 poll 方法来轮询获得消息:
- 等待直到获得所有的消息
- 参数 200 :超时时间