例如电商平台,秒杀活动。
一般流程会分为:
通过消息系统将秒杀活动业务拆分开,将不急需处理的业务放在后面慢慢处理;
流程改为:
3.1 网关在接受到请求后,就把请求放入到消息队列里面
3.2 后端的服务从消息队列里面获取到请求,完成后续的秒杀处理流程。然后再给用户返回结果。
Kafka的数据是由消费者自己去拉去Kafka里面的数据
默认一个topic有一个分区(partition),自己可设置多个分区(分区分散存储在服务器不同节点上)
解决了一个海量数据如何存储的问题
例如:有2T的数据,一台服务器有1T,一个topic可以分多个区,分别存储在多台服务器上,解决海量数据存储问题
Kafka集群中,一个kafka服务器就是一个broker,Topic只是逻辑上的概念,partition在磁盘上就体现为一个目录。
Consumer Group:消费组,消费数据的时候,都必须指定一个group id,指定一个组的id
假定程序A和程序B指定的group id号一样,那么两个程序就属于同一个消费组
特殊:
不同消费组之间没有影响。消费组需自定义,消费者名称程序自动生成(独一无二)。
Controller:Kafka节点里面的一个主节点。借助zookeeper
kafka写数据:
顺序写,往磁盘上写数据时,就是追加数据,没有随机写的操作。
经验:
如果一个服务器磁盘达到一定的个数,磁盘也达到一定转数,往磁盘里面顺序写(追加写)数据的速度和写内存的速度差不多。
生产者生产消息,经过kafka服务先写到os cache 内存中,然后经过sync顺序写到磁盘上
消费者读取数据流程:
kafka linux sendfile技术 — 零拷贝
Kafka中一个主题,一般会设置分区;比如创建了一个topic_a,然后创建的时候指定了这个主题有三个分区。
其实在三台服务器上,会创建三个目录。
服务器1(kafka1):
一个分区下面默认有n多个日志文件(分段存储),一个日志文件默认1G
服务器2(kafka2):
服务器3(kafka3):
Kafka里面每一条消息,都有自己的offset(相对偏移量),存在物理磁盘上面,在position
Position:物理位置(磁盘上面那个地方)
也就是说一条消息就有两个位置:
稀疏索引:
其中会采用二分查找
网络设计部分是kafka中设计最好的一个部分,这也是保证Kafka高并发、高性能的原因
对kafka进行调优,就得对kafka原理比较了解,尤其是网络设计部分
Reactor网络设计模式1:
Reactor网络设计模式2:
Reactor网络设计模式3:
Kafka超高并发网络设计:
在kafka里面分区是有副本的,注:0.8以前是没有副本机制的。创建主题时,可以指定分区,也可以指定副本个数。副本是有角色的:
leader partition:
生产者发送来一个消息,消息首先要写入到leader partition中
写完了以后,还要把消息写入到ISR列表里面的其它分区,写完后才算这个消息提交
follower partition:从leader partition同步数据。
Kafka — 高并发、高可用、高性能
写数据:
读数据:
电商平台,需要每天10亿请求都要发送到Kafka集群上面。二八反正,一般评估出来问题都不大。
10亿请求 -> 24 过来的,一般情况下,每天的12:00 到早上8:00 这段时间其实是没有多大的数据量的。80%的请求是用的另外16小时的处理的。16个小时处理 -> 8亿的请求。16 * 0.2 = 3个小时 处理了8亿请求的80%的数据
也就是说6亿的数据是靠3个小时处理完的。我们简单的算一下高峰期时候的qps
6亿/3小时 =5.5万/s qps=5.5万
10亿请求 * 50kb = 46T
每天需要存储46T的数据
一般情况下,我们都会设置两个副本 46T * 2 = 92T
,Kafka里面的数据是有保留的时间周期,保留最近3天的数据。
92T * 3天 = 276T
我这儿说的是50kb不是说一条消息就是50kb不是(把日志合并了,多条日志合并在一起),通常情况下,一条消息就几b,也有可能就是几百字节。
1)首先分析一下是需要虚拟机还是物理机
像Kafka mysql hadoop这些集群搭建的时候,我们生产里面都是使用物理机。
2)高峰期需要处理的请求总的请求每秒5.5万个,其实一两台物理机绝对是可以抗住的。一般情况下,我们评估机器的时候,是按照高峰期的4倍的去评估。
如果是4倍的话,大概我们集群的能力要准备到 20万qps。这样子的集群才是比较安全的集群。大概就需要5台物理机。每台承受4万请求。
场景总结:
搞定10亿请求,高峰期5.5万的qps,276T的数据,需要5台物理机。
1)SSD固态硬盘,还是需要普通的机械硬盘
SSD硬盘性能比较好,指的是它随机读写的性能比较好。适合MySQL这样集群。
但是其实他的顺序写的性能跟SAS盘差不多。
kafka的理解:就是用的顺序写。所以我们就用普通的【机械硬盘】就可以了。
2)需要我们评估每台服务器需要多少块磁盘
5台服务器,一共需要276T ,大约每台服务器 需要存储60T的数据。我们公司里面服务器的配置用的是 11块硬盘,每个硬盘 7T。11 * 7T = 77T
77T * 5 台服务器 = 385T
场景总结:
11.4 内存评估 搞定10亿请求,需要5台物理机,11(SAS) * 7T
我们发现kafka读写数据的流程 都是基于os cache,换句话说假设咱们的os cashe无限大那么整个kafka是不是相当于就是基于内存去操作,如果是基于内存去操作,性能肯定很好。内存是有限的。
Kafka的设计,没有把很多数据结构都放在jvm里面。所以我们的这个jvm不需要太大的内存。根据经验,给个10G就可以了。
NameNode:jvm里面还放了元数据(几十G),JVM一定要给得很大。比如给个100G。
假设我们这个10请求的这个项目,一共会有100个topic。100 topic * 5 partition * 2 = 1000 partition
一个partition其实就是物理机上面的一个目录,这个目录下面会有很多个.log的文件。
我们如果要保证 1000个partition 的最新的.log 文件的数据 如果都在内存里面,这个时候性能就是最好。1000 * 1G = 1000G
内存.
我们只需要把当前最新的这个log 保证里面的25%的最新的数据在内存里面。250M * 1000 = 0.25 G* 1000 =250G
的内存。
250内存 / 5 = 50G
内存50G+10G = 60G
内存64G的内存,另外的4G,操作系统本生是不是也需要内存。其实Kafka的jvm也可以不用给到10G这么多。评估出来64G是可以的。当然如果能给到128G的内存的服务器,那就最好。
我刚刚评估的时候用的都是一个topic是5个partition,但是如果是数据量比较大的topic,可能会有10个partition。
总结:
11(SAS) * 7T
,需要64G的内存(128G更好)评估一下每台服务器需要多少cpu core(资源很有限)
我们评估需要多少个cpu ,依据就是看我们的服务里面有多少线程去跑。线程就是依托cpu 去运行的。如果我们的线程比较多,但是cpu core比较少,这样的话,我们的机器负载就会很高,性能不就不好。
评估一下,kafka的一台服务器 启动以后会有多少线程?
所以大概一个Kafka的服务启动起来以后,会有一百多个线程。
如果我们的线程是100多个,或者差不多200个,那么8 个 cpu core是搞不定的。
另外,关注Java知音公众号,回复“后端面试”,送你一份面试题宝典!
所以我们这儿建议:
结论:
2cpu * 8 =16
cpu core4cpu * 8 = 32
cpu core总结:
11(SAS) * 7T
,需要64G的内存(128G更好),需要16个cpu core(32个更好)评估我们需要什么样网卡?
一般要么是千兆的网卡(1G/s),还有的就是万兆的网卡(10G/s)
高峰期的时候 每秒会有5.5万的请求涌入,5.5/5 = 大约是每台服务器会有1万个请求涌入。我们之前说的,10000 * 50kb = 488M
也就是每条服务器,每秒要接受488M的数据。数据还要有副本,副本之间的同步,也是走的网络的请求。488 * 2 = 976m/s
说明一下:
就是告诉大家,以后要是公司里面有什么需求,进行资源的评估,服务器的评估,大家按照我的思路去评估。
一条消息的大小 50kb -> 1kb 500byte 1M
ip 主机名
主机的规划:kafka集群架构的时候:主从式的架构:
zookeeper集群
kafka集群
KafkaManager — 页面管理工具
场景一:topic数据量太大,要增加topic数
一开始创建主题的时候,数据量不大,给的分区数不多。
kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6
kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6
broker id:
假设一个partition有三个副本:partition0:
a,b,c
ISR:{a,b,c}
如果一个follower分区 超过10秒 没有向leader partition去拉取数据,那么这个分区就从ISR列表里面移除。
场景二:核心topic增加副本因子
如果对核心业务数据需要增加副本因子
vim test.json脚本,将下面一行json脚本保存
{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}
执行上面json脚本:
kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute
场景三:负载不均衡的topic,手动迁移
vi topics-to-move.json
{“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1}
// 把你所有的topic都写在这里
kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate
// 把你所有的包括新加入的broker机器都写在这里,就会说是把所有的partition均匀的分散在各个broker上,包括新进来的broker
此时会生成一个迁移方案,可以保存到一个文件里去:expand-cluster-reassignment.json
kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
这种数据迁移操作一定要在晚上低峰的时候来做,因为他会在机器之间迁移数据,非常的占用带宽资源
场景四:如果某个broker leader partition过多
正常情况下,我们的leader partition在服务器之间是负载均衡。
现在各个业务方可以自行申请创建topic,分区数量都是自动分配和后续动态调整的,kafka本身会自动把leader partition均匀分散在各个机器上,这样可以保证每台机器的读写吞吐量都是均匀的。
但是也有例外,那就是如果某些broker宕机,会导致leader partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的broker重启之后都是folloer partition,读写请求很低。
造成集群负载不均衡有一个参数,auto.leader.rebalance.enable
,默认是true,每隔300秒(leader.imbalance.check.interval.seconds
)检查leader负载是否平衡
如果一台broker上的不均衡的leader超过了10%,leader.imbalance.per.broker.percentage
,就会对这个broker进行选举。
配置参数:
auto.leader.rebalance.enable
默认是trueleader.imbalance.per.broker.percentage
: 每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。这个值表示百分比。10%leader.imbalance.check.interval.seconds
:默认值300秒如何提升吞吐量:参数一:buffer.memory:
设置发送消息的缓冲区,默认值是33554432,就是32MB
参数二:compression.type:
默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销
参数三:batch.size:
1、LeaderNotAvailableException:
这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败,要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可;如果说你平时重启kafka的broker进程,肯定会导致leader切换,一定会导致你写入报错,是LeaderNotAvailableException
。
2、NotControllerException:
这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可。
3、NetworkException:网络异常 timeout
重试会带来一些问题:
消息重复
有的时候一些leader切换之类的问题,需要进行重试,设置retries即可,但是消息重试会导致,重复发送的问题,比如说网络抖动一下导致他以为没成功,就重试了,其实人家都成功了.
消息乱序消息重试是可能导致消息的乱序的,因为可能排在你后面的消息都发送出去了。所以可以使用" max.in.flight.requests.per.connection
"参数设置为1,这样可以保证producer同一时间只能发送一条消息。
两次重试的间隔默认是100毫秒,用"retry.backoff.ms
"来进行设置,基本上在开发过程中,靠重试机制基本就可以搞定95%的异常问题。
producer端
request.required.acks=0;
request.required.acks=1;
request.required.acks=-1;
kafka服务端:
min.insync.replicas:1
如果我们不设置的话,默认这个值是1,一个leader partition会维护一个ISR列表,这个值就是限制ISR列表里面,至少得有几个副本,比如这个值是2,那么当ISR列表里面只有一个副本的时候。往这个分区插入数据的时候会报错。
设计一个不丢数据的方案:
还有可能就是发送有异常:[对异常进行处理]
分区:
我们的消息就会被轮训的发送到不同的分区。
kafka自带的分区器,会根据key计算出来一个hash值,这个hash值会对应某一个分区。
如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区。
但是有些比较特殊的时候,我们就需要自定义分区
public class HotDataPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {
random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
String key = (String)keyObj;
List partitionInfoList = cluster.availablePartitionsForTopic(topic);
//获取到分区的个数 0,1,2
int partitionCount = partitionInfoList.size();
//最后一个分区
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}
}
如何使用:
配置上这个类即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);
需求分析:
电商背景 -》 二手的电商平台
【欢乐送】的项目,用户购买了东西以后会有【星星】,用星星去换物品。一块钱一个星星。
订单系统(消息的生产),发送一条消息(支付订单,取消订单) -> Kafka <- 会员系统,从kafak里面去消费数据,找到对应用户消费的金额,然后给该用户更新星星的数量。
分析一下:
发送消息的时候,可以指定key,也可以不指定key。
1)如果不指定key
2)如果指定key,key -> hash(数字) -> 对应分区号 -> 发送到对应的分区里面。
这个项目需要指定key,把用户的id指定为key.
groupid相同就属于同一个消费组
1)每个consumer都要属于一个consumer.group,就是一个消费组,topic的一个分区只会分配给一个消费组下的一个consumer来处理,每个consumer可能会分配多个分区,也有可能某个consumer没有分配到任何分区。
2)如果想要实现一个广播的效果,那只需要使用不同的group id去消费就可以。
topicA:
groupA:
groupB:
3)如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他
每个consumer内存里数据结构保存对每个topic的每个分区的消费offset,定期会提交offset,老版本是写入zk,但是那样高并发请求zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。
现在新的版本提交offset发送给kafka内部topic:__consumer_offsets
,提交过去的时候, key是group.id+topic+分区号
,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据。
__consumer_offsets
可能会接收高并发的请求,所以默认分区50个(leader partitiron -> 50 kafka),这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力。
web页面管理的一个管理软件(kafka Manager)
JMX_PORT=9988
另一个软件:主要监控的consumer的偏移量。
就是一个jar包java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar
com.quantifind.kafka.offsetapp.OffsetGetterWeb
写了一段程序 ,消费kafka里面的数据(consumer,处理数据 -> 业务代码) -> Kafka 如何去判断你的这段代码真的是实时的去消费的呢?
推荐:[310期面试题]
延迟几亿条数据 -> 阈值(20万条的时候 发送一个告警。)
heartbeat.interval.ms
:
session.timeout.ms
:
max.poll.interval.ms
:
fetch.max.bytes
:
获取一条消息最大的字节数,一般建议设置大一些,默认是1M 其实我们在之前多个地方都见到过这个类似的参数,意思就是说一条信息最大能多大?
max.poll.records
:
一次poll返回消息的最大条数,默认是500条
connection.max.idle.ms
:
consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收
enable.auto.commit
:
开启自动提交偏移量
auto.commit.interval.ms
:
每隔多久提交一次偏移量,默认值5000毫秒
auto.offset.reset
:
引入案例:二手电商平台(欢乐送),根据用户消费的金额,对用户星星进行累计。
面试题:消费者是如何实现rebalance的?— 根据coordinator实现
每个consumer group都会选择一个broker作为自己的coordinator,他是负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance的
首先对groupId进行hash(数字),接着对__consumer_offsets
的分区数量取模,默认是50,_consumer_offsets
的分区数可以通过offsets.topic.num.partitions
来设置,找到分区以后,这个分区所在的broker机器就是coordinator机器。
比如说:groupId,“myconsumer_group” -> hash值(数字)-> 对50取模 -> 8__consumer_offsets
这个主题的8号分区在哪台broker上面,那一台就是coordinator 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,
运行流程
leader broker开始进行socket连接以及消费消息
consumer group靠coordinator实现了Rebalance
这里有三种rebalance的策略:range、round-robin、sticky
比如我们消费的一个主题有12个分区:
p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11
假设我们的消费者组里面有三个消费者
range策略
round-robin策略
但是前面的这两个方案有个问题:12 -> 2 每个消费者会消费6个分区
假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3,这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。
sticky策略
最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer的分区还是属于他们,然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略
假设consumer3挂了
LEO:是跟offset偏移量有关系。
LEO:
在kafka里面,无论leader partition还是follower partition统一都称作副本(replica)。
每次partition接收到一条消息,都会更新自己的LEO,也就是log end offset,LEO其实就是最新的offset + 1
HW:高水位
LEO有一个很重要的功能就是更新HW,如果follower和leader的LEO同步了,此时HW就可以更新
HW之前的数据对消费者是可见,消息属于commit状态。HW之后的消息消费者消费不到。
15.3 hw更新
1: 竞争controller的
/controller/id
2:controller服务监听的目录:
/broker/ids/
用来感知 broker上下线/broker/topics/
创建主题,我们当时创建主题命令,提供的参数,ZK地址。/admin/reassign_partitions
分区重分配kafka的延迟调度机制(扩展知识)
我们先看一下kafka里面哪些地方需要有任务要进行延迟调度。
第一类延时的任务:
比如说producer的acks=-1,必须等待leader和follower都写完才能返回响应。
有一个超时时间,默认是30秒(request.timeout.ms)。
所以需要在写入一条数据到leader磁盘之后,就必须有一个延时任务,到期时间是30秒延时任务 放到DelayedOperationPurgatory(延时管理器)中。
假如在30秒之前如果所有follower都写入副本到本地磁盘了,那么这个任务就会被自动触发苏醒,就可以返回响应结果给客户端了,否则的话,这个延时任务自己指定了最多是30秒到期,如果到了超时时间都没等到,就直接超时返回异常。
第二类延时的任务:
follower往leader拉取消息的时候,如果发现是空的,此时会创建一个延时拉取任务
延时时间到了之后(比如到了100ms),就给follower返回一个空的数据,然后follower再次发送请求读取消息,但是如果延时的过程中(还没到100ms),leader写入了消息,这个任务就会自动苏醒,自动执行拉取任务。
海量的延时任务,需要去调度。
1.什么会有要设计时间轮?
Kafka内部有很多延时任务,没有基于JDK Timer来实现,那个插入和删除任务的时间复杂度是O(nlogn),而是基于了自己写的时间轮来实现的,时间复杂度是O(1),依靠时间轮机制,延时任务插入和删除,O(1)
2.时间轮是什么?
其实时间轮说白其实就是一个数组。
3.多层级的时间轮
比如:要插入一个110毫秒以后运行的任务。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8