kafka consumer api连接kafka server

背景


最近在公司老大要做一个kafka consumer接收kafka server的message,其实很简单的一个功能,用kafka都有些大材小用了,一般的JMS都可以了。

之前没有用过kafka,所以去简单了解了一下。

kafka是什么东西?


Apache Kafka® is a distributed streaming platform

kafka是一个分布式的流媒体平台

kafka的主要功能


  1. 发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统,所以kafka被归于消息队列框架。
  2. 容错的方式存储消息(流)。
  3. 可以在消息发布的时候进行处理。

kafka核心API


  • 生产者Producer API
  • 消费者Consumer API
  • 流处理Streams API
  • Connector API

基本术语


Topic

Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).

Producer

发布消息的对象称之为主题生产者(Kafka topic producer)

Consumer

订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)

Broker

已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

详细介绍


Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下图展示了kafka的基本机制。

  1. kafka消息传输流程

如上图所示,producer在向kafka cluster发送message之前,会先对message进行分类,也就是分成不同的topic,consumer在消费message的时候可以对指定的topic进行消费,只关心指定的topic。

consumer与kafka cluster建立长连接,不断的从集群中拉去消息并进行处理。


  1. kafka server消息存储策略

    ​ 谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。


  1. 生产者

producer在向kafka集群发送消息的时候有三种方式选择分区

  • 明确指定分区
  • 指定均衡策略
  • 不指定,则会采用随机的均衡策略


  1. 消费者

如上图所示,kafka中不同的消费者可以指定不同的group ID来同时消费同一个Topic下的消息。

特别提醒:

​ 对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费

​ 因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。

  1. 相较于其他消息队列系统的优势
    • 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;
    • 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
    • 支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。
    • 快速持久化,可以在O(1)的系统开销下进行消息持久化;

版本


之前做的时候老大说用high level consumer API或者low level conxumer API来做,但是我在做的过程中发现对接的kafka(网管团队维护)版本是0.11的,在网上查阅了解到kafka0.9之后新增了一个Java消费者用于替换现在基于zookeeper(一个分布式的应用程序协调服务,如果有做过dubbo项目的同学应该很熟悉)的high or low level consumer API。

依赖包


1
2
3
4
5
6
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>0.10.1.0</version>
> </dependency>
>

常用命令


查看当前kafka所有的topic

./kafka-topics.sh –list –zookeeper localhost:2181/nokia/nsp/kafka

producer发送message

./kafka-console-producer.sh –broker-list localhost:9092 –topic ns-eg-155c683f-f87e-4fc6-850d-dbc47f7ff3f0

consumer接受message

./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ns-eg-fefb2353-b213-41b6-aa21-186c0a3734ea –from-beginning

查看版本(kafka没有类似于java -version这样的命令)

find ./libs/ -name *kafka_* | head -1 | grep -o ‘\kafka[^\n]*’

这个命令查找到类似于kafka_2.9.2-0.8.1.1.jar.asc 这样的jar包,然后这个jar包的名称前面的数据2.9.2是scala(kafka就是用scala开发的)的版本,后面的0.8.1.1就是所用的kafka的版本。

使用


因为我只使用到了kafka consumer API,所以这里列出consumer的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

以上是自动提交偏移量(offset)的consumer client,消费者TCP长连接到Broker拉取message。关于偏移量的概念很重要,可参考下面提供的参考文章有详细叙述。

参考文章


kafka入门介绍

kafka消费者客户端

kafka官网

查看kafka版本

kafka实战

zhangxingrui wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!