背景
最近在公司老大要做一个kafka consumer接收kafka server的message,其实很简单的一个功能,用kafka都有些大材小用了,一般的JMS都可以了。
之前没有用过kafka,所以去简单了解了一下。
kafka是什么东西?
Apache Kafka® is a distributed streaming platform
kafka是一个分布式的流媒体平台
kafka的主要功能
- 发布和订阅消息(流),在这方面,它类似于一个消息队列或企业消息系统,所以kafka被归于消息队列框架。
- 以
容错
的方式存储消息(流)。 - 可以在消息发布的时候进行处理。
kafka核心API
- 生产者Producer API
- 消费者Consumer API
- 流处理Streams API
- Connector API
基本术语
Topic
Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic).
Producer
发布消息的对象称之为主题生产者(Kafka topic producer)
Consumer
订阅消息并处理发布的消息的种子的对象称之为主题消费者(consumers)
Broker
已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
详细介绍
Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下图展示了kafka的基本机制。
kafka消息传输流程
如上图所示,producer在向kafka cluster发送message之前,会先对message进行分类,也就是分成不同的topic,consumer在消费message的时候可以对指定的topic进行消费,只关心指定的topic。
consumer与kafka cluster建立长连接,不断的从集群中拉去消息并进行处理。
kafka server消息存储策略
谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。
生产者
producer在向kafka集群发送消息的时候有三种方式选择分区
- 明确指定分区
- 指定均衡策略
- 不指定,则会采用随机的均衡策略
消费者
如上图所示,kafka中不同的消费者可以指定不同的group ID来同时消费同一个Topic下的消息。
特别提醒:
对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费
因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。
- 相较于其他消息队列系统的优势
- 高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;
- 完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;
- 支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。
- 快速持久化,可以在O(1)的系统开销下进行消息持久化;
版本
之前做的时候老大说用high level consumer API或者low level conxumer API来做,但是我在做的过程中发现对接的kafka(网管团队维护)版本是0.11的,在网上查阅了解到kafka0.9之后新增了一个Java消费者用于替换现在基于zookeeper(一个分布式的应用程序协调服务,如果有做过dubbo项目的同学应该很熟悉)的high or low level consumer API。
依赖包
1
2
3
4
5
6 > <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>0.10.1.0</version>
> </dependency>
>
常用命令
查看当前kafka所有的topic
./kafka-topics.sh –list –zookeeper localhost:2181/nokia/nsp/kafka
producer发送message
./kafka-console-producer.sh –broker-list localhost:9092 –topic ns-eg-155c683f-f87e-4fc6-850d-dbc47f7ff3f0
consumer接受message
./kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic ns-eg-fefb2353-b213-41b6-aa21-186c0a3734ea –from-beginning
查看版本(kafka没有类似于java -version这样的命令)
find ./libs/ -name *kafka_* | head -1 | grep -o ‘\kafka[^\n]*’
这个命令查找到类似于kafka_2.9.2-0.8.1.1.jar.asc 这样的jar包,然后这个jar包的名称前面的数据2.9.2是scala(kafka就是用scala开发的)的版本,后面的0.8.1.1就是所用的kafka的版本。
使用
因为我只使用到了kafka consumer API,所以这里列出consumer的代码
1 | Properties props = new Properties(); |
以上是自动提交偏移量(offset)的consumer client,消费者TCP长连接到Broker拉取message。关于偏移量的概念很重要,可参考下面提供的参考文章有详细叙述。