面试分享(五).相关工具使用

相关工具使用

Redis

Redis 中有几种类型 & 各自底层怎么实现的 & 项目中哪个地方用了什么类型,怎么使用的?

redis底层原理

  • 使用 ziplist 存储链表,ziplist是一种压缩链表,它的好处是更能节省内存空间,因为它所存储的内容都是在连续的内存区域当中的。
  • 使用 skiplist(跳跃表)来存储有序集合对象、查找上先从高Level查起、时间复杂度和红黑树相当,实现容易,无锁、并发性好。

1 单线程模型

Redis客户端对服务端的每次调用都经历了发送命令,执行命令,返回结果三个过程。其中执行命令阶段,由于Redis是单线程来处理命令的,所有每一条到达服务端的命令不会立刻执行,所有的命令都会进入一个队列中,然后逐个被执行。并且多个客户端发送的命令的执行顺序是不确定的。但是可以确定的是不会有两条命令被同时执行,不会产生并发问题,这就是Redis的单线程基本模型。

2 单线程模型每秒万级别处理能力的原因

(1)纯内存访问。数据存放在内存中,内存的响应时间大约是100纳秒,这是Redis每秒万亿级别访问的重要基础。

(2)非阻塞I/O,Redis采用epoll做为I/O多路复用技术的实现,再加上Redis自身的事件处理模型将epoll中的连接,读写,关闭都转换为了时间,不在I/O上浪费过多的时间。

(3)单线程避免了线程切换和竞态产生的消耗。

(4)Redis采用单线程模型,每条命令执行如果占用大量时间,会造成其他线程阻塞,对于Redis这种高性能服务是致命的,所以Redis是面向高速执行的数据库。

字符串string

字符串类型是Redis中最为基础的数据存储类型,是一个由字节组成的序列,他在Redis中是二进制安全的,这便意味着该类型可以接受任何格式的数据,如JPEG图像数据货Json对象描述信息等,是标准的key-value,一般来存字符串,整数和浮点数。Value最多可以容纳的数据长度为512MB
应用场景:很常见的场景用于统计网站访问数量,当前在线人数等。incr命令(++操作)

image

列表list

Redis的列表允许用户从序列的两端推入或者弹出元素,列表由多个字符串值组成的有序可重复的序列,是链表结构,所以向列表两端添加元素的时间复杂度为0(1),获取越接近两端的元素速度就越快。这意味着即使是一个有几千万个元素的列表,获取头部或尾部的10条记录也是极快的。List中可以包含的最大元素数量是4294967295。
应用场景:1.最新消息排行榜。2.消息队列,以完成多程序之间的消息交换。可以用push操作将任务存在list中(生产者),然后线程在用pop操作将任务取出进行执行。(消费者)

image

散列hash

Redis中的散列可以看成具有String key和String value的map容器,可以将多个key-value存储到一个key中。每一个Hash可以存储4294967295个键值对。
应用场景:例如存储、读取、修改用户属性(name,age,pwd等)

image

集合set

Redis的集合是无序不可重复的,和列表一样,在执行插入和删除和判断是否存在某元素时,效率是很高的。集合最大的优势在于可以进行交集并集差集操作。Set可包含的最大元素数量是4294967295。
应用场景:1.利用交集求共同好友。2.利用唯一性,可以统计访问网站的所有独立IP。3.好友推荐的时候根据tag求交集,大于某个threshold(临界值的)就可以推荐。
image

有序集合sorted set

和set很像,都是字符串的集合,都不允许重复的成员出现在一个set中。他们之间差别在于有序集合中每一个成员都会有一个分数(score)与之关联,Redis正是通过分数来为集合中的成员进行从小到大的排序。尽管有序集合中的成员必须是卫衣的,但是分数(score)却可以重复。
应用场景:可以用于一个大型在线游戏的积分排行榜,每当玩家的分数发生变化时,可以执行zadd更新玩家分数(score),此后在通过zrange获取几分top ten的用户信息。
image

key的通用操作

所有的数据类型都可以使用的
image

Redis如何实现分布式锁,zk如何实现分布式锁,两者的区别。如果service还没执行完,分布式锁在Redis中已经过期了,怎么解决这种问题?

解决redis分布式锁过期时间到了业务没执行完问题

很多同学在用分布式锁时,都是直接百度搜索找一个Redis分布式锁工具类就直接用了,其实Redis分布式锁比较正确的姿势是采用redisson这个客户端工具

image

默认情况下,加锁的时间是30秒.如果加锁的业务没有执行完,那么到 30-10 = 20秒的时候,就会进行一次续期,把锁重置成30秒.那这个时候可能又有同学问了,那业务的机器万一宕机了呢?宕机了定时任务跑不了,就续不了期,那自然30秒之后锁就解开了呗。

Redisson分布式锁的底层原理

image

1)加锁机制

咱们来看上面那张图,现在某个客户端要加锁。如果该客户端面对的是一个redis cluster集群,他首先会根据hash节点选择一台机器。

这里注意,仅仅只是选择一台机器!这点很关键!

紧接着,就会发送一段lua脚本到redis上,那段lua脚本如下所示:
image

为啥要用lua脚本呢?

因为一大坨复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。

那么,这段lua脚本是什么意思呢?

KEYS[1]代表的是你加锁的那个key,比如说:

1
RLock lock = redisson.getLock("myLock");

这里你自己设置了加锁的那个锁key就是“myLock”。

ARGV[1]代表的就是锁key的默认生存时间,默认30秒。

ARGV[2]代表的是加锁的客户端的ID,类似于下面这样:

8743c9c0-0795-4907-87fd-6c719a6b4586:1

给大家解释一下,第一段if判断语句,就是用“exists myLock”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

如何加锁呢?很简单,用下面的命令:

1
2
hset myLock 
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

image

上述就代表“8743c9c0-0795-4907-87fd-6c719a6b4586:1”这个客户端对“myLock”这个锁key完成了加锁。

接着会执行“pexpire myLock 30000”命令,设置myLock这个锁key的生存时间是30秒。

好了,到此为止,ok,加锁完成了。

(2)锁互斥机制

那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

很简单,第一个if判断会执行“exists myLock”,发现myLock这个锁key已经存在了。

接着第二个if判断,判断一下,myLock锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

所以,客户端2会获取到pttl myLock返回的一个数字,这个数字代表了myLock这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。

此时客户端2会进入一个while循环,不停的尝试加锁。

(3)watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

(4)可重入加锁机制

那如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

比如下面这种代码:

image

这时我们来分析一下上面那段lua脚本。

第一个if判断肯定不成立,“exists myLock”会显示锁key已经存在了。

第二个if判断会成立,因为myLock的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是8743c9c0-0795-4907-87fd-6c719a6b4586:1

此时就会执行可重入加锁的逻辑,他会用:

1
2
3
incrby myLock 

8743c9c0-0795-4907-87fd-6c71a6b4586:1 1

通过这个命令,对客户端1的加锁次数,累加1。

此时myLock数据结构变为下面这样:
image
(5)释放锁机制

如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

其实说白了,就是每次都对myLock数据结构中的那个加锁次数减1。

如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

“del myLock”命令,从redis里删除这个key。

然后呢,另外的客户端2就可以尝试完成加锁了。

这就是所谓的分布式锁的开源Redisson框架的实现机制。

一般我们在生产系统中,可以用Redisson框架提供的这个类库来基于redis进行分布式锁的加锁与释放锁。

(6)上述Redis分布式锁的缺点

其实上面那种方案最大的问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例。

但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。

接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁。

此时就会导致多个客户端对一个分布式锁完成了加锁。

这时系统在业务语义上一定会出现问题,导致各种脏数据的产生。

所以这个就是redis cluster,或者是redis master-slave架构的主从异步复制导致的redis分布式锁的最大缺陷:在redis master实例宕机的时候,可能导致多个客户端同时完成加锁。

Ehcache支持哪些缓存?

Ehcache

EhCache 是一个纯Java的进程内缓存框架,具有快速、精干等特点,是Hibernate中默认的CacheProvider。Ehcache是一种广泛使用的开源Java分布式缓存。

优点:

  • 快速
  • 简单
  • 缓存数据有两级:内存和磁盘,因此无需担心容量问题
  • 缓存数据会在虚拟机重启的过程中写入磁盘
  • 可以通过RMI、可插入API等方式进行分布式缓存
  • 具有缓存和缓存管理器的侦听接口
  • 支持多缓存管理器实例,以及一个实例的多个缓存区域
  • 提供Hibernate的缓存实现
  • 多种缓存策略,Ehcache提供了对大数据的内存和硬盘的存储,最近版本允许多实例、保存对象高灵活性、提供LRU、LFU、FIFO淘汰算法,基础属性支持热配置、支持的插件多

缺点:

  • 使用磁盘Cache的时候非常占用磁盘空间;
  • 不能保证数据的安全

memcache

memcache 是一种高性能、分布式对象缓存系统,最初设计于缓解动态网站数据库加载数据的延迟性,你可以把它想象成一个大的内存HashTable,就是一个key-value键值缓存。

memcache C语言所编写,依赖于最近版本的GCC和libevent。多线程支持

优点:

一.部分容灾

假设只用一台memcache,如果这台memcache服务器挂掉了,那么请求将不断的冲击数据库,这样有可能搞死数据库,从而引发”雪崩“。如果使用多台memcache服务器,由于memcache使用一致性哈希算法,万一其中一台挂掉了,部分请求还是可以在memcache中命中,为修复系统赢得一些时间。

二.容量问题

一台memcache服务器的容量毕竟有限,可以使用多台memcache服务器,增加缓存容量。

三.均衡请求

使用多台memcache服务器,可以均衡请求,避免所有请求都冲进一台memcache服务器,导致服务器挂掉。

四.利用memcache分布式特性

使用一台memcache服务器,并没有利用memcache的数据分布式特性。

缺点:

  • 不能持久化存储
  • 存储数据有限制:1M 【大于1M,认为就行分割】(内存碎片)
  • mm存储数据只能key-value
  • 集群数据没有复制和同步机制 【崩溃不会影响程序,会从数据库中取数据】
  • 内存回收不能及时 LRU(算法):未使用内存》过期内存》最近最少使用内存 这是惰性删除

redis

单线程、读写性能优异、支持数据持久化,支持AOF和RDB两种持久化方式、支持主从复制,主机会自动将数据同步到从机,可以进行读写分离;数据结构丰富:除了支持string类型的value外还支持string、hash、set、sortedset、list等数据结构。

缺点:

1 Redis不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。

2 主机宕机,宕机前有部分数据未能及时同步到从机,切换IP后还会引入数据不一致的问题,降低了系统的可用性。

3 Redis的主从复制采用全量复制,复制过程中主机会fork出一个子进程对内存做一份快照,并将子进程的内存快照保存为文件发送给从机,这一过程需要确保主机有足够多的空余内存。若快照文件较大,对集群的服务能力会产生较大的影响,而且复制过程是在从机新加入集群或者从机和主机网络断开重连时都会进行,也就是网络波动都会造成主机和从机间的一次全量的数据复制,这对实际的系统运营造成了不小的麻烦。

4 Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。为避免这一问题,运维人员在系统上线时必须确保有足够的空间,这对资源造成了很大的浪费。

ehcache直接在jvm虚拟机中缓存,速度快,效率高;但是缓存共享麻烦,集群分布式应用不方便。
redis是通过socket访问到缓存服务,效率比ecache低,比数据库要快很多,处理集群和分布式缓存方便,有成熟的方案。
如果是单个应用或者对缓存访问要求很高的应用,用ehcache。
如果是大型系统,存在缓存共享、分布式部署、缓存内容很大的,建议用redis。

Redis是单线程的还是多线程的,为什么这么快?

Redis是一个开源的内存中的数据结构存储系统,它可以用作:数据库、缓存和消息中间件。

它支持多种类型的数据结构,如字符串(String),散列(Hash),列表(List),集合(Set),有序集合(Sorted Set或者是ZSet)与范围查询,Bitmaps,Hyperloglogs 和地理空间(Geospatial)索引半径查询。其中常见的数据结构类型有:String、List、Set、Hash、ZSet这5种。

Redis 内置了复制(Replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(Transactions) 和不同级别的磁盘持久化(Persistence),并通过 Redis哨兵(Sentinel)和自动分区(Cluster)提供高可用性(High Availability)。

Redis也提供了持久化的选项,这些选项可以让用户将自己的数据保存到磁盘上面进行存储。根据实际情况,可以每隔一定时间将数据集导出到磁盘(快照),或者追加到命令日志中(AOF只追加文件),他会在执行写命令时,将被执行的写命令复制到硬盘里面。您也可以关闭持久化功能,将Redis作为一个高效的网络的缓存数据功能使用。

Redis不使用表,他的数据库不会预定义或者强制去要求用户对Redis存储的不同数据进行关联。

数据库的工作模式按存储方式可分为:硬盘数据库和内存数据库。Redis 将数据储存在内存里面,读写数据的时候都不会受到硬盘 I/O 速度的限制,所以速度极快。

(1)硬盘数据库的工作模式:
image
(2)内存数据库的工作模式:
image

Redis为什么这么快

1、完全基于内存,绝大部分请求是纯粹的内存操作,非常快速。数据存在内存中,类似于HashMap,HashMap的优势就是查找和操作的时间复杂度都是O(1);

2、数据结构简单,对数据操作也简单,Redis中的数据结构是专门进行设计的;

3、采用单线程,避免了不必要的上下文切换和竞争条件,也不存在多进程或者多线程导致的切换而消耗 CPU,不用去考虑各种锁的问题,不存在加锁释放锁操作,没有因为可能出现死锁而导致的性能消耗;

4、使用多路I/O复用模型,非阻塞IO;

5、使用底层模型不同,它们之间底层实现方式以及与客户端之间通信的应用协议不一样,Redis直接自己构建了VM 机制 ,因为一般的系统调用系统函数的话,会浪费一定的时间去移动和请求;

以上几点都比较好理解,下边我们针对多路 I/O 复用模型进行简单的探讨:

(1)多路 I/O 复用模型

多路I/O复用模型是利用 select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。

这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗),且 Redis 在内存中操作数据的速度非常快,也就是说内存内的操作不会成为影响Redis性能的瓶颈,主要由以上几点造就了 Redis 具有很高的吞吐量。

那么为什么Redis是单线程的

我们首先要明白,上边的种种分析,都是为了营造一个Redis很快的氛围!官方FAQ表示,因为Redis是基于内存的操作,CPU不是Redis的瓶颈,Redis的瓶颈最有可能是机器内存的大小或者网络带宽。既然单线程容易实现,而且CPU不会成为瓶颈,那就顺理成章地采用单线程的方案了(毕竟采用多线程会有很多麻烦!)。

看到这里,你可能会气哭!本以为会有什么重大的技术要点才使得Redis使用单线程就可以这么快,没想到就是一句官方看似糊弄我们的回答!但是,我们已经可以很清楚的解释了为什么Redis这么快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了!

但是,我们使用单线程的方式是无法发挥多核CPU 性能,不过我们可以通过在单机开多个Redis 实例来完善!

警告1:这里我们一直在强调的单线程,只是在处理我们的网络请求的时候只有一个线程来处理,一个正式的Redis Server运行的时候肯定是不止一个线程的,这里需要大家明确的注意一下!例如Redis进行持久化的时候会以子进程或者子线程的方式执行(具体是子线程还是子进程待读者深入研究);例如我在测试服务器上查看Redis进程,然后找到该进程下的线程:

Redis Hash中某个key过大,变为String类型的大key,怎么处理,使用中如何避免出现这种问题?

由于redis是单线程运行的,如果一次操作的value很大会对整个redis的响应时间造成负面影响,所以,业务上能拆则拆,下面举几个典型的分拆方案。

单个简单的key存储的value很大

1.1、 改对象需要每次都整存整取
可以尝试将对象分拆成几个key-value, 使用multiGet获取值,这样分拆的意义在于分拆单次操作的压力,将操作压力平摊到多个redis实例中,降低对单个redis的IO影响;

1.2、该对象每次只需要存取部分数据
可以像第一种做法一样,分拆成几个key-value, 也可以将这个存储在一个hash中,每个field代表一个具体的属性,使用hget,hmget来获取部分的value,使用hset,hmset来更新部分属性

hash, set,zset,list 中存储过多的元素

类似于场景一种的第一个做法,可以将这些元素分拆。

以hash为例,原先的正常存取流程是 hget(hashKey, field) ; hset(hashKey, field, value)
现在,固定一个桶的数量,比如 10000, 每次存取的时候,先在本地计算field的hash值,模除 10000, 确定了该field落在哪个key上。

1
2
3
newHashKey  =  hashKey + (*hash*(field) % 10000;   
hset (newHashKey, field, value) ;
hget(newHashKey, field)

set, zset, list 也可以类似上述做法.

但有些不适合的场景,比如,要保证 lpop 的数据的确是最早push到list中去的,这个就需要一些附加的属性,或者是在 key的拼接上做一些工作(比如list按照时间来分拆)。

哨兵机制、Redis两种备份方式的区别,项目中用的哪种,为什么?

持久化机制

 为了解决一旦断电或者宕机,内存数据库中的数据将会全部丢失这个缺点,Redis提供了将内存数据持久化到硬盘,以及用持久化文件来恢复数据的功能。Redis 支持两种形式的持久化,一种是RDB快照(snapshotting),另外一种是AOF(append-only-file)。
 
 (1)RDB是把当前内存中的数据集快照写入磁盘,也就是 Snapshot 快照(数据库中所有键值对数据)。恢复时是将快照文件直接读到内存里,RDB持久化有两种触发机制,分别是自动触发和手动触发。在redis.windows.conf(linux就是redis.conf)文件的SNAPSHOTTING 下有个自动触发rdb持久化的策略:
 
 image
 其中分别表示900s内,如果至少有一个key值变化,则保存到rdb;300s内,至少有10个key变化就保存;60s内至少有10000个key变化就保存。如果只需要redis的缓存功能那么就可以关掉rdb持久化,使用空串停用,如:save “”“”。

在这个配置下面还有几个关于rdb持久化的配置:

  • stop-writes-on-bgsave-error yes:默认yes,表示在后台通过rdb保存数据失败之后是否停止向redis写入数据(接收数据)。这样可以让用户意识到后台持久化失败了,避免后面数据不能持久化。
  • rdbcompression yes:默认yes,表示存储的rdb快照是否进行压缩存储,如果关闭则快照会比较大。
  • rdbchecksum yes:默认yes,存储之后是否对数据进行校验,若希望提升redis性能可以关闭。
  • dbfilename dump.rdb:存储的rdb快照文件名。
  • dir ./:设置快照存放路径,必须是目录,默认和当前配置文件在同一目录。

手动触发rdb快照保存可以使用save和bgsave命令。save会阻塞当前redis,redis不能处理其他命令直到rdb过程完成,而bgsave会在后台异步进行保存(redis会执行fork操作创建一个子进程),阻塞只会在fork短时间内,redis内部rdb自动保存都是采用bgsave命令。

将备份文件dump.rdb放到配置文件指定的目录(默认是和redis配置文件同一目录)下,启动redis就会自动将数据加载到内存中。

(2)AOF 持久化是通过保存Redis服务器所执行的写命令来记录数据库状态。在redis的配置文件中APPEND ONLY MODE的下有关于AOF持久化的相关配置。

image

  • appendonly:表示是否开启AOF持久化,因为redis默认的是rdb方式,打开aof需要改为yes。
  • appendfilename:aof文件名。
  • appendsync:aof持久化策略配置。no表示不执行fsync,有系统保证数据同步到磁盘,速度最快但不安全;always表示每次写入都要执行fsync,以保证数据完全同步到磁盘,效率低;everysec则是每秒保存一次,可能会丢失者1s数据,兼顾安全和效率。
  • no-appendfsync-on-rewrite:设置为yes表示rewrite期间对新写操作不fsync,暂时存在内存中,等rewrite完成后再写入,默认为no,建议yes。Linux的默认fsync策略是30秒。可能丢失30秒数据。默认值为no。
  • auto-aof-rewrite-percentage:默认值为100。aof自动重写配置,当目前aof文件大小超过上一次重写的aof文件大小的百分之多少进行重写,即当aof文件增长到一定大小的时候,Redis能够调用bgrewriteaof对日志文件进行重写。
  • auto-aof-rewrite-min-size:64mb。设置允许重写的最小aof文件大小,避免了达到约定百分比但尺寸仍然很小的情况还要重写。
  • aof-load-truncated:aof文件可能在尾部是不完整的(宕机或者断电等),如果选择的是yes,当截断的aof文件被导入的时候,会自动发布一个log给客户端然后load。如果是no,用户必须手动redis-check-aof修复AOF文件才可以。默认值为 yes

AOF文件不完整需要恢复可以使用命令:redis-check-aof –fix 进行修复。aof文件过大的时候也需要重写,和rdb的bgsave模式相似,都是创建子进程,设置重写缓冲区,在重写完成之后再将缓冲区文件写入aof文件。

总结

AOF 持久化的方法提供了多种的同步频率,即使使用默认的同步频率每秒同步一次,Redis 最多也就丢失 1 秒的数据而已。

AOF 文件使用 Redis 命令追加的形式来构造,因此,即使 Redis 只能向 AOF 文件写入命令的片断,使用 redis-check-aof 工具也很容易修正 AOF 文件。

AOF 文件的格式可读性较强,这也为使用者提供了更灵活的处理方式。例如,如果我们不小心错用了 FLUSHALL 命令,在重写还没进行时,我们可以手工将最后的 FLUSHALL 命令去掉,然后再使用 AOF 来恢复数据。

对于具有相同数据的的 Redis,AOF 文件通常会比 RDB文件体积更大。

虽然 AOF 提供了多种同步的频率,默认情况下,每秒同步一次的频率也具有较高的性能。但在 Redis 的负载较高时,RDB 比 AOF 具好更好的性能保证。

RDB 使用快照的形式来持久化整个 Redis 数据,而 AOF 只是将每次执行的命令追加到 AOF 文件中,因此从理论上说,RDB 比 AOF 方式更健壮。官方文档也指出,AOF 的确也存在一些 BUG,这些 BUG 在 RDB 没有存在。

主从复制

复制三份配置文件,分别更改端口号,并且配置文件的名字以端口号区分。
image
之后再更改每份配置文件:rdb文件名(dbfilename)、log日志文件名(logfile),之后使用下面三个命令启动三个redis服务端。

1
2
3
redis-server.exe redis.windows.conf
redis-server.exe redis.windows-10087.conf
redis-server.exe redis.windows-10088.conf

通过info replication查看各个节点信息。
image
此时都是master节点,接着设置slave节点。使用slaveof命令把10087和10088设为slave。
image
此时设为slave节点之后再使用info replication命令查看就发现role已经变为了slave了。现再在master节点写,在slave读。
image
可以看见现在主从关系已经成功建立了。注:

(1)如果master以前还存在一些key,那么slave节点也是会有的,因为master会全量复制到slave。

(2)默认从节点是不能够执行写命令的,配置文件中slave-read-only默认是yes。

(3)主节点down掉之后,另外两个slave角色依然不变,并且在master恢复之后,仍然是master并且有两个slave节点。

哨兵模式

哨兵模式就是不时地监控redis是否按照预期良好地运行(至少是保证主节点是存在的),若一台主机出现问题时,哨兵会自动将该主机下的某一个从机设置为新的主机,并让其他从机和新主机建立主从关系(如果监控主机发生故障,就根据投票数自动将从库转化为主库)。首先启动多个redis,并配置主从关系,下面再配置哨兵监控master。

(1)在配置文件目录下面使用touch命令新建sentinel.conf文件,然后配置内容:

sentinel monitor 被监控主机名(自己起) ip地址 端口 得票数 如:sentinel monitor my6379 127.0.0.1 6379 1。1表示当主机挂掉之后得票数>=1便成为主机。

(2)启动哨兵监控

使用命令:./redis-sentinel /mrliu_project/redis/sentinel.conf 注:/mrliu_project/redis/sentinel.conf 是配置文件所在目录

(3)exit退出master主机,之后查看哨兵控制台打印的日志,会发现在重新选择master。

在redis稳定版之后,挂掉的主机在选举之后重新连接上,会设置为新选举的msater的从节点。

启动三台带有redis的服务器133,132,130
分别更改redis.conf文件,并指定配置启动redis-server和redis-sentinel

进入redis的src目录:./redis-server ../redis.config ./redis-sentinel ../sentinel.conf(哨兵配置需要自己touch)

133启动一个master和哨兵的服务器,

132启动两个redis服务(两份配置文件),分别设为133的slave

130启动一个redis服务,设为133的slave

redis-config更改了一些目录和端口等,我大概改了如下内容:

1
2
3
4
5
daemonize yes
logfile
port
#bind.1 注释掉了
protected-mode no //关闭了

注意(出现设置了slaveof之后发现主机一直未down可能是如下原因):

(1)每个启动的redis服务需要注释掉bind(代表可以访问的主机),不然其他redis服务器不能感知到该redis服务的存在

(2)每个redis服务需要更改protected-mode 为no,

(3)每台linux服务器需要打开端口,不然直接关闭firewall或者是iptables也行,命令systemctl stop firewalld和service iptables stop。 永久关闭防火墙使用disable。

(4)建议redis-server都配置为守护进程,daemon设为yes即可。

哨兵需要自建配置文件,在redis的src目录下使用 ./redis-sentinel ../sentinel_liu.conf 启动,其中后面的是自己的哨兵配置文件,简易配置如下:

1
2
3
4
5
port 26379

sentinel monitor liu_master 192.168.15.130 6379 1//监控的主机,后面代表超过机票就成为leader
sentinel down-after-milliseconds liu_master 10000//可选,多久没心跳就认为down
sentinel failover-timeout liu_master 10000//可选,代表每次选举间隔

几台服务器如下:
image
之后进入133使用info确认从节点有三个,

之后查看哨兵启动日志
image
注:sdown代表哨兵主观任务下线,(后面是master断开之后的日志)odown是客观下线,当出现odown的时候,哨兵将不会监控该服务,任务确实下线了,然后开始选举。

断开133的master,观察哨兵日志
可以进入133的客户端,使用shutdown关掉,也可以在redis的src目录下使用./redis-cli shutdown关,也可以杀redis进程关。

哨兵日志如下(我这儿选了这么多次原因是其他机器防火墙开着,所以关了就选举成功了):
image

出现failover-end即是选举成功了,switch代表切换master到了130服务器节点,随后发现 +fix-slave-config日志,代表将132的6379redis服务和132的6380redis服务设为了130的slave节点,这时候130成为master,132的两个redis服务是slave节点。

重新上线之前断开的133master,查看哨兵日志

发现新上线的133设为了130redis服务的slave,下面去130服务器登上redis客户端输入info验证

image

发现回归后的master变为了新选举的master的slave节点。

哨兵机制、选举算法

Redis Sentinel是一个分布式系统,为Redis提供高可用性解决方案。可以在一个架构中运行多个 Sentinel 进程(progress), 这些进程使用流言协议(gossip protocols)来 接收关于主服务器是否下线的信息, 并使用投票协议(agreement protocols)来决定是否执行自动故 障迁移, 以及选择哪个从服务器作为新的主服务器。

Redis 的 Sentinel 系统用于管理多个 Redis 服务器(instance) 该系统执行以下三个任务:

  • 监控(Monitoring): Sentinel 会不断地定期检查你的主服务器和从服务器是否运作正常。
  • 提醒(Notification): 当被监控的某个 Redis 服务器出现问题时, Sentinel 可以通过 API 向管理员或者其他应用程序发送通知。
  • 自动故障迁移(Automaticfailover): 当一个主服务器不能正常工作时, Sentinel 会开始一次自动故障迁移操作, 它会将失效主服务器的其中 一个从服务器升级为新的主服务器, 并让失效主服务器的其他从服务器改为复制新的主服务器; 当客 户端试图连接失效的主服务器时, 集群也会向客户端返回新主服务器的地址, 使得集群可以使用新主 服务器代替失效服务器。

Sentinel 工作原理分析

(1)哨兵文件详解

配置一:sentinel monitor <master-name> <ip> <port> <quorum>

这个配置表达的是 哨兵节点定期监控 名字叫做 <master-name> 并且 IP 为 <ip> 端口号为 <port> 的主节点。<quorum> 表示的是哨兵判断主节点是否发生故障的票数。也就是说如果我们将<quorum>设置为2就代表至少要有两个哨兵认为主节点故障了,才算这个主节点是客观下线的了,一般是设置为sentinel节点数的一半加一。

配置二:sentinel down-after-milliseconds <master-name> <times>

每个哨兵节点会定期发送ping命令来判断Redis节点和其余的哨兵节点是否是可达的,如果超过了配置的<times>时间没有收到pong回复,就主观判断节点是不可达的,<times>的单位为毫秒。

配置三:sentinel parallel-syncs <master-name> <nums>

当哨兵节点都认为主节点故障时,哨兵投票选出的leader会进行故障转移,选出新的主节点,原来的从节点们会向新的主节点发起复制,这个配置就是控制在故障转移之后,每次可以向新的主节点发起复制的节点的个数,最多为<nums>个,因为如果不加控制会对主节点的网络和磁盘IO资源很大的开销。

配置四:sentinel failover-timeout <master-name> <times>

这个代表哨兵进行故障转移时如果超过了配置的<times>时间就表示故障转移超时失败。

配置五: sentinel auth-pass <master-name> <password>

如果主节点设置了密码,则需要这个配置,否则哨兵无法对主节点进行监控。

(2)为什么要用到哨兵

哨兵(Sentinel)主要是为了解决在主从复制架构中出现宕机的情况,主要分为两种情况:

1).从Redis宕机

这个相对而言比较简单,在Redis中从库重新启动后会自动加入到主从架构中,自动完成同步数据。在Redis2.8版本后,主从断线后恢复
的情况下实现增量复制。

2).主Redis宕机

这个相对而言就会复杂一些,需要以下2步才能完成

a. 在从数据库中执行SLAVEOF NO ONE命令,断开主从关系并且提升为主库继续服务

b. 第二步,将主库重新启动后,执行SLAVEOF命令,将其设置为其他库的从库,这时数据就能更新回来

由于这个手动完成恢复的过程其实是比较麻烦的并且容易出错,所以Redis提供的哨兵(sentinel)的功能来解决

(3)哨兵机制(sentinel)的高可用
Sentinel(哨兵)是Redis 的高可用性解决方案:由一个或多个Sentinel 实例 组成的Sentinel 系统可以监视任意多个主服务器,以及这些主服务器属下的所有从服务器,并在被监视的主服务器进入下线状态时,自动将下线主服务器属下的某个从服务器升级为新的主服务器。
image

在Server1 掉线后:

image

升级Server2 为新的主服务器:
image

(4)哨兵的定时监控

任务1:每个哨兵节点每10秒会向主节点和从节点发送info命令获取最拓扑结构图,哨兵配置时只要配置对主节点的监控即可,通过向主节点发送info,获取从节点的信息,并当有新的从节点加入时可以马上感知到
image

任务2:每个哨兵节点每隔2秒会向redis数据节点的指定频道上发送该哨兵节点对于主节点的判断以及当前哨兵节点的信息,同时每个哨兵节点也会订阅该频道,来了解其它哨兵节点的信息及对主节点的判断,其实就是通过消息publish和subscribe来完成的

image

任务3:每隔1秒每个哨兵会向主节点、从节点及其余哨兵节点发送一次ping命令做一次心跳检测,这个也是哨兵用来判断节点是否正常的重要依据

image

主观下线:所谓主观下线,就是单个sentinel认为某个服务下线(有可能是接收不到订阅,之间的网络不通等等原因)。

sentinel会以每秒一次的频率向所有与其建立了命令连接的实例(master,从服务,其他sentinel)发ping命令,通过判断ping回复是有效回复,还是无效回复来判断实例时候在线(对该sentinel来说是“主观在线”)。

sentinel配置文件中的down-after-milliseconds设置了判断主观下线的时间长度,如果实例在down-after-milliseconds毫秒内,返回的都是无效回复,那么sentinel回认为该实例已(主观)下线,修改其flags状态为SRI_S_DOWN。如果多个sentinel监视一个服务,有可能存在多个sentinel的down-after-milliseconds配置不同,这个在实际生产中要注意。

客观下线:当主观下线的节点是主节点时,此时该哨兵3节点会通过指令sentinel is-masterdown-by-addr寻求其它哨兵节点对主节点的判断,如果其他的哨兵也认为主节点主观线下了,则当认为主观下线的票数超过了quorum(选举)个数,此时哨兵节点则认为该主节点确实有问题,这样就客观下线了,大部分哨兵节点都同意下线操作,也就说是客观下线
image

(5)哨兵lerder选举流程

如果主节点被判定为客观下线之后,就要选取一个哨兵节点来完成后面的故障转移工作,选举出一个leader的流程如下:

a)每个在线的哨兵节点都可以成为领导者,当它确认(比如哨兵3)主节点下线时,会向其它哨兵发is-master-down-by-addr命令,征求判断并要求将自己设置为领导者,由领导者处理故障转移;

b)当其它哨兵收到此命令时,可以同意或者拒绝它成为领导者;

c)如果哨兵3发现自己在选举的票数大于等于num(sentinels)/2+1时,将成为领导者,如果没有超过,继续选举…………

image

(6)自动故障转移机制

在从节点中选择新的主节点

sentinel状态数据结构中保存了主服务的所有从服务信息,领头sentinel按照如下的规则从从服务列表中挑选出新的主服务

  • 过滤掉主观下线的节点
  • 选择slave-priority最高的节点,如果由则返回没有就继续选择
  • 选择出复制偏移量最大的系节点,因为复制便宜量越大则数据复制的越完整,如果由就返回了,没有就继续
  • 选择run_id最小的节点
    image

更新主从状态

通过slaveof no one命令,让选出来的从节点成为主节点;并通过slaveof命令让其他节点成为其从节点。

将已下线的主节点设置成新的主节点的从节点,当其回复正常时,复制新的主节点,变成新的主节点的从节点
同理,当已下线的服务重新上线时,sentinel会向其发送slaveof命令,让其成为新主的从

Sentinel获取服务器信息

(1) Sentinel获取主服务器信息

    Sentinel默认会以每10秒一次的频率,通过命令连接向主服务器发送info命令,通过分析info命令的回复来获取主服务器的当前信息,就像在上篇讲到的复制功能,在客户端输入info replication 命令一样,Sentinel可以获取以下两方面的信息:

    (1) 关于主服务器本身的信息,包括服务器run_id,role的服务器角色。

    (2) 关于所有从服务器的信息,每个从服务器都由一个slave字符串开头的行记录,记录了从服务器IP和端口(主服务器中有从库的配置信息)。

(2)Sentinel获取从服务器信息

    当Sentinel发现主服务器有新的从服务器出现时,Sentinel除了会为这个新的从服务器创建相应的实例结构(sentinelRedisInstance)之外,Sentinel还会创建连接到从服务器的命令连接和订阅连接。Sentinel默认会以每10秒一次的频率通过命令连接从服务器发送info命令,通过分析info命令的回复来获取从服务器的当前信息。包括:从服务器运行run_ID、从服务器角色role、主服务器的ip及端口、主从服务器的连接状态master_link_status、从服务器的优先级slave_priority。

(3)Sentinel向主从服务器发送信息

    在默认情况下, Sentinel会以每2秒一次的频率,通过命令连接向所有被监视的主服务器和从服务器发送以下格式的命令:
image
 这条命令向服务器的sentinel:hello频道发送了一条信息,信息的内容由多个参数组成:

    (1) 以s_开头以参数记录的是sentinel本身的信息。

    (2) 而m_开头的参数记录的则是主服务器的信息,如果sentinel正在监视的是主服务器,那么这些参数就是主服务器的信息,如果sentinel正在监视的是从服务器,那么这些参数记录就是从服务器正在复制的主服务器的信息。          

参数 描述
S_ip Sentinel的ip地址
S_port Sentinel的端口号
S_runid Sentinel的运行ID
S_epoch Sentinel 的当前配置纪元
m_name 主服务器的名字
M_ip 主服务器的IP地址
M_port 主服务器的端口号
M_epoch 主服务器的当前配置纪元

以下是一条sentinel通过publish命令向主服务器发送的信息示例:
image

这个示例中sentinel的ip地址为172.0.0.1端口号为26379, 运行ID为后面一串,当前纪元为0。主服务器的名字为mymaster,ip地址为127.0.0.1,端口号为6379, 当前纪元为0。

4)sentinel接收来自主服务器和从服务器的频道信息

    当sentinel与一个主服务器或者从服务器建立起订阅连接之后,Sentinel就会通过订阅连接,向服务器发送以下命令:subscribe_sentinel_:hello 。对于每个与Sentinel连接的服务器,Sentinel既通过命令连向服务器的_sentinel_:hello频道发送信息,又通过订阅连接从服务器的_sentinel_:hello频道接收信息。

    当有三个sentinel,分别是sentinel1、sentinel2 、sentinel3。三个sentinel在监视同一个服务器,那么当sentinel1向服务器的_sentinel_:hello频道发送一条信息时,所有订阅了_sentinel_:hello频道的sentinel(包括sentinel1自己在内)都会收到这条信息。

    当一个sentinel从_sentinel_:hello频道收到一条信息时,sentinel会对这条信息进行分析,提取出信息中sentinel 的 ip 、port、runID等8个参数,并进行以下检查:

    (1) 如果信息中记录的sentinel运行ID和接收信息的sentinel运行ID相同,那么说明这条信息是sentinel自己发送的,sentinel将丢弃这条信息,不做进一步处理。

    (2) 相反地,如果信息中记录的sentinel运行ID和接收信息的sentinel运行ID不相同,那说明这条信息监视同一个服务器的其它sentinel发来的,接收信息的sentinel将根据信息中的参数,对相应主服务器的实例结构进行更新。

(5)sentinel更新自己的sentinels字典
    sentinel为主服务器创建实例结构中的sentinels字典,保存了sentinel本身,还监视这个主服务器的其他sentinel的资料。当一个sentinel接收到其他sentinels发来的信息时,接收的sentinel会从信息中分析并提取出两方面参数:

    (1)与sentinel有关的参数,包括sentinel的ip、port、runid、配置纪元。

    (2)与主服务器有关的参数, 包括监视主服务器的ip、port、runid、配置纪元。

    假设分别有三个sentinel: 127.0.0.1:26379、127.0.0.1:26380、127.0.0.1:26381。三个sentinel正在监视主服务器127.0.0.1:6379, 那么当127.0.0.1:26379这个sentinel接收到以下消息时:
    
image

这个sentinel将执行以下动作:

    (1) 第一条信息发送者为自己,信息忽略。

    (2) 第二条信息发送者为26381, sentinel会根据信息提取出内容,对sentinels字典中26381对应的实例结构进行更新。

    (3) 第三条信息发送者为23680,同样更新字典中的23680对应的实例结构。

    每个sentinel都有自己的一个sentinels字典, 对于26379的sentinel它的sentinels字典信息保存了26380和26381两个sentinel信息。其它sentinel也一样。
    
(6)sentinel创建连向其他sentinel的命令连接

    当sentinel通过频道信息发现一个新的sentinel时,不仅更新sentinels字典,还会创建一个连向sentinel命令连接,而新的sentinel也会创建连向这个sentinel的命令连接,最终监视同一个主服务器的多个sentinel将形成相互连接的网络。如下图所示:
image

4.Sentinel的工作原理总结

1):每个Sentinel以每秒钟一次的频率向它所知的Master,Slave以及其他 Sentinel 实例发送一个 PING 命令。

2):如果一个实例(instance)距离最后一次有效回复 PING 命令的时间超过 down-after-milliseconds 选项所指定的值, 则这个实例会被 Sentinel 标记为主观下线。

3):如果一个Master被标记为主观下线,则正在监视这个Master的所有 Sentinel 要以每秒一次的频率确认Master的确进入了主观下线状态。

4):当有足够数量的 Sentinel(大于等于配置文件指定的值)在指定的时间范围内确认Master的确进入了主观下线状态, 则Master会被标记为客观下线 。

5):在一般情况下, 每个 Sentinel 会以每 10 秒一次的频率向它已知的所有Master,Slave发送 INFO 命令 。

6):当Master被 Sentinel 标记为客观下线时,Sentinel 向下线的 Master 的所有 Slave 发送 INFO 命令的频率会从 10 秒一次改为每秒一次 。

7):若没有足够数量的 Sentinel 同意 Master 已经下线, Master 的客观下线状态就会被移除。

若 Master 重新向 Sentinel 的 PING 命令返回有效回复, Master 的主观下线状态就会被移除。

消息中间件

如何保证RocketMQ 消息的顺序性,如何解决重复消费问题。

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  • 消息的顺序问题
  • 消息的重复问题

RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ有哪些关键特性?其实现原理是怎样的?

关键特性及其实现原理

顺序消息

消息有序指的是可以按照消息的发送顺序来消费。例如:一笔订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照顺序依次消费才有意义。与此同时多笔订单之间又是可以并行消费的。首先来看如下示例:

假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:

image

你可能会采用这种方式保证消息顺序

假定M1发送到S1,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端被消费后,通知S2,然后S2再将M2发送到消费端。

这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到MQ集群,也不能保证M1被先消费。换个角度看,如果M2先于M1达到MQ集群,甚至M2被消费后,M1才达到消费端,这时消息也就乱序了,说明以上模型是不能保证消息的顺序的。如何才能在MQ集群保证消息的顺序?一种简单的方式就是将M1、M2发送到同一个Server上:
image

保证消息顺序,你改进后的方法

这样可以保证M1先于M2到达MQServer(生产者等待M1发送成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。

这个模型也仅仅是理论上可以保证消息的顺序,在实际场景中可能会遇到下面的问题:
image

网络延迟问题

只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就仍将被先消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费的情况。

那如何解决这个问题?将M1和M2发往同一个消费者,且发送M1后,需要消费端响应成功后才能发送M2。

聪明的你可能已经想到另外的问题:如果M1被发送到消费端后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。

image

保证消息顺序的正确姿势

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达(数据在网络传送中丢失),另外一种消费端已经消费M1且已经发送响应消息,只是MQ Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就引入了我们要说的第二个问题,消息重复问题,这个后文会详细讲解。

回过头来看消息顺序问题,严格的顺序消息非常容易理解,也可以通过文中所描述的方式来简单处理。总结起来,要实现严格的顺序消息,简单且可行的办法就是:

保证生产者 - MQServer - 消费者是一对一对一的关系

这样的设计虽然简单易行,但也会存在一些很严重的问题,比如:

  • 并行度就会成为消息系统的瓶颈(吞吐量不够)
  • 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。

但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决问题本身,实际上不仅效率低下,而且也是一种浪费。从这个角度来看消息的顺序问题,我们可以得出两个结论:

  • 不关注乱序的应用实际大量存在
  • 队列无序并不意味着消息无序

所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是不是我们应该寻求的一种更合理的方式?

最后我们从源码角度分析RocketMQ怎么实现发送顺序消息的。

RocketMQ通过轮询所有队列的方式来确定消息被发送到哪一个队列(负载均衡策略)。比如下面的示例中,订单号相同的消息会被先后发送到同一个队列中:

image
在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的肯定是同一个队列。
image

消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

造成消息重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

消费端处理消息的业务逻辑保持幂等性

保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

第1条解决方案,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率其实很小,如果由消息系统来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

那么如何解决消息重复投递的问题?

以我们支付宝转账到余额宝为例,如果相同的消息被重复投递两次,那么我们余额宝账户将会增加2万而不是1万了(上面讲顺序消费是讲过,这里再提一下)。

为什么相同的消息会被重复投递?比如余额宝处理完消息msg后,发送了处理成功的消息给支付宝,正常情况下支付宝应该要删除消息msg,但如果支付宝这时候悲剧的挂了,重启后一看消息msg还在,就会继续发送消息msg。

解决方法很简单,在余额宝这边增加消息应用状态表(message_apply)(这就是上文说的去重表吧),通俗来说就是个账本,用于记录消息的消费情况,每次来一个消息,在真正执行之前,先去消息应用状态表中查询一遍,如果找到说明是重复消息,丢弃即可,如果没找到才执行,同时插入到消息应用状态表(同一事务) 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for each msg in queue 

Begin transaction

select count(*) as cnt from message_apply where msg_id=msg.msg_id;

if cnt==0 then

update B set amount=amount+10000 where userId=1;

insert into message_apply(msg_id) values(msg.msg_id);

End transaction

commit;

事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。我们以一个转帐的场景为例来说明这个问题:Bob向Smith转账100块。

在单机环境下,执行事务的情况,大概是下面这个样子:
image

单机环境下转账事务示意图

当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:
image
集群环境下转账事务示意图

这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。那如何来规避这个问题?

大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务:
image
小事务+异步消息

图中执行本地事务(Bob账户扣款)和发送异步消息应该保证同时成功或者同时失败,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?

首先看下先发送消息的情况,大致的示意图如下:
image

事务消息:先发送消息

存在的问题是:如果消息发送成功,但是扣款失败,消费端就会消费此消息,进而向Smith账户加钱。

先发消息不行,那就先扣款吧,大致的示意图如下:

image
事务消息-先扣款

存在的问题跟上面类似:如果扣款成功,发送消息失败,就会出现Bob扣钱了,但是Smith账户未加钱。

可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。在https://blog.csdn.net/yinni11/article/details/81122093中的非事务消息中间件就是采用的这种方法

这里需要说明一下:如果使用Spring来管理事物的话,大可以将发送消息的逻辑放到本地事物中去,发送消息失败抛出异常,Spring捕捉到异常后就会回滚此事物,以此来保证本地事物与发送消息的原子性。

RocketMQ支持事务消息,下面来看看RocketMQ是怎样来实现的。
image
RocketMQ实现发送事务消息

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。

细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那我们来看下RocketMQ源码,是如何处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):
image
接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。
image
endTransaction方法会将请求发往broker(mq server)去更新事务消息的最终状态:

  • 根据sendResult找到Prepared消息 ,sendResult包含事务消息的ID
  • 根据localTransaction更新消息的最终状态

如果endTransaction方法执行失败,数据没有发送到broker,导致事务消息的 状态更新失败,broker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法来决定是回滚事务还是继续执行,最后调用endTransactionOneway让broker来更新消息的最终状态。

再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题,解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。
image
消费事务消息

这样基本上可以解决消费端超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,那么需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。这也是RocketMQ目前暂时没有解决这个问题的原因,在设计实现消息系统时,我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。

20160321补充:在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息,具体情况请参考rocketmq的issues:

Producer如何发送消息

Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:

image

producer发送消息负载均衡

首先分析一下RocketMQ的客户端发送消息的源码:
image
在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

  1. 如果没有指定namesrv地址,将会自动寻址
  2. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳…
  3. 启动负载均衡的服务

初始化完成后,开始发送消息,发送消息的主要代码如下:
image

代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。

如果Producer发送消息失败,会自动重试,重试的策略:

  1. 重试次数 < retryTimesWhenSendFailed(可配置)
  2. 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  3. 同时满足上面两个条件后,Producer会选择另外一个队列发送消息

消息存储

RocketMQ的消息存储是由consume queue和commit log配合完成的。

Consume Queue

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

我们可以在配置中指定consumequeue与commitlog存储的目录

每个topic下的每个queue都有一个对应的consumequeue文件,比如:

image

Consume Queue文件组织,如图所示:

image

Consume Queue文件组织示意图

  1. 根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。
  2. 按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。
  3. 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。

死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:
image

consumequeue文件存储单元格式

  1. CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
  2. Size存储中消息的大小
  3. Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。

文件的默认位置如下,仍然可通过配置文件修改:

${user.home} store${commitlog}${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。

image

Commit Log存储单元结构图

消息存储实现

消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析(目前正在收集素材),这小节只以代码说明一下具体的流程。
image

image

消息的索引文件

如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:
image

消息索引

索引文件主要用于根据key来查询消息的,流程主要是:

  1. 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
  2. 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)

消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

首先看下消费端的负载均衡:
image
消费端负载均衡

消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载:

  1. 遍历Consumer下的所有topic,然后根据topic订阅所有的消息
  2. 获取同一topic和Consumer Group下的所有Consumer
  3. 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

如同上图所示:如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

通过这样的策略来达到大体上的平均消费,这样的设计也可以很方便的水平扩展Consumer来提高消费能力。

消费端的Push模式是通过长轮询的模式来实现的,就如同下图:

image

Push模式示意图

Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,如果发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

RocketMQ的其他特性

前面的6个特性都是基本上都是点到为止,想要深入了解,还需要大家多多查看源码,多多在实际中运用。当然除了已经提到的特性外,RocketMQ还支持:

  • 定时消息
  • 消息的刷盘策略
  • 主动同步策略:同步双写、异步复制
  • 海量消息堆积能力
  • 高效通信
  • …….

其中涉及到的很多设计思路和解决方法都值得我们深入研究:

  • 消息的存储设计:既要满足海量消息的堆积能力,又要满足极快的查询效率,还要保证写入的效率。
  • 高效的通信组件设计:高吞吐量,毫秒级的消息投递能力都离不开高效的通信。
  • …….

RocketMQ最佳实践

一、Producer最佳实践

  • 一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
  • 每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
  • 消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
  • 对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
  • 某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

二、Consumer最佳实践

  • 消费过程要做到幂等(即消费端去重)
  • 尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
  • 优化每条消息消费过程

三、其他配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

RocketMQ设计相关

RocketMQ的设计假定:

  • 每台PC机器都可能宕机不可服务
  • 任意集群都有可能处理能力不足
  • 最坏的情况一定会发生
  • 内网环境需要低延迟来提供最佳用户体验

RocketMQ的关键设计:

  • 分布式集群化
  • 强数据安全
  • 海量数据堆积
  • 毫秒级投递延迟(推拉模式)

这是RocketMQ在设计时的假定前提以及需要到达的效果。我想这些假定适用于所有的系统设计。随着我们系统的服务的增多,每位开发者都要注意自己的程序是否存在单点故障,如果挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、自己管理的数据是否足够安全…… 多多规范自己的设计,才能开发出高效健壮的程序。

Kafka 如何保证消息顺序消费、在consumer group 中新增一个consumer 会提高消费消息的速度吗、那如果我想提高消息消费的速度,我要怎么办?

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,并保证即使对TB级以上数据也能保证常数时间的访问性能
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输
  • 支持Kafka Server间的消息分区,及分布式消息消费,同时保证每个partition内的消息顺序传输
  • 同时支持离线数据处理和实时数据处理

为什么要用Message Queue

  • 解耦
    • 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
  • 冗余
    • 有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
  • 扩展性
    • 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
  • 灵活性 & 峰值处理能力
    • 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。
  • 可恢复性
    • 当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
  • 送达保证
    • 消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
  • 顺序保证
    • 在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。
  • 缓冲
    • 在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行—写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
  • 理解数据流
    • 在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
  • 异步通信
    • 很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

常用Message Queue对比

  • RabbitMQ
    • RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
  • Redis
    • Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。
  • ZeroMQ
    • ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,Twitter的Storm中默认使用ZeroMQ作为数据流的传输。
  • ActiveMQ
    • ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。
  • Kafka/Jafka
    • Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

Kafka解析

Terminology

  • Broker
    • Kafka集群包含一个或多个服务器,这种服务器被称为broker
  • Topic
    • 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
  • Partition
    • parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
  • Producer
    • 负责发布消息到Kafka broker
  • Consumer
    • 消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

Kafka架构

image

如上图所示,一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper 集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

Push vs. Pull

作为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,比如Facebook的 Scribe 和Cloudera的 Flume ,采用非常不同的push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

Topic & Partition

Topic在逻辑上可以被认为是一个在的queue,每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。
image

每个日志文件都是“log entries”序列,每一个 log entry 包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消费格式如下:

1
2
3
4
5
6
7
message length4 bytes (value: 1+4+n)

“magic” value1 byte

crc : 4 bytes

payload : n bytes

这个“log entries”并非由一个文件构成,而是分成多个segment,每个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每个segment下包含的 log entry 的offset范围,如下图所示。
image

因为每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
image

每一条消息被发送到broker时,会根据paritition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties 中指定这个partition的数量(如下所示),当然也可以在topic创建之后去修改parition数量。

1
2
3
4
#The default number of log partitions per topic. More partitions allow greater
#parallelism for consumption, but this will also result in more files across
#the brokers.
num.partitions=3

在发送一条消息时,可以指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪个parition。paritition机制可以通过指定producer的paritition. class这一参数来指定,该class必须实现 kafka.producer.Partitioner 接口。本例中如果key可以被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每个parition都会有个序号)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
public JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}

如果将上例中的class作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic2(包含4个partition)。

1
2
3
4
5
6
7
8
9
10
public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
   List messageList = new ArrayList<KeyedMessage<String, String>>();
   for(int j = 0; j < 4; j++){
   messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
   }
   producer.send(messageList);
}
  producer.close();
}

则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和partition序号相同。(partition序号从0开始,本例中的key也正好从0开始)。如下图所示。
image

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties ,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#############################Log Retention Policy#############################

#he following configurations control the disposal of log segments. The policy can
#be set to delete segments after a period of time, or after a given size has accumulated.
#A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
#from the end of the log.

#The minimum age of a log file to be eligible for deletion
log.retention.hours=168

#A size-based retention policy for logs. Segments are pruned from the log as long as the remaininsegments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

#The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

#The interval at which log segments are checked to see if they can be deleted according
#to the retention policies
log.retention.check.interval.ms=300000

#By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
#If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false

这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

Replication & Leader election

Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties中配置。

1
default.replication.factor = 1

该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有一定影响的,但极大的增强了可用性。默认情况下,Kafka的replication数量为1。 每个partition都有一个唯一的leader,所有的读写操作都在leader上完成,leader批量从leader上pull数据。一般情况下partition的数量大于等于broker的数量,并且所有partition的leader均匀分布在broker上。follower上的日志和其leader上的完全一样。

和大部分分布式系统一样,Kakfa处理失败需要明确定义一个broker是否alive。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的heartbeat机制来实现)。二是follower必须能够及时将leader的writing复制过来,不能“落后太多”。

leader会track“in sync”的node list。如果一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预定值,该值可在 $KAFKA_HOME/config/server.properties 中配置

1
2
3
4
5
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000

#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000

需要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。

一条消息只有被“in sync” list里的所有follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而造成数据丢失(consumer无法消费这些数据)。而对于producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks 来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。

这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follwer都落后于leader,而leader突然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower可以批量的从leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。

上文说明了Kafka是如何做replication的,另外一个很重要的问题是当leader宕机了,怎样在follower中选举出新的leader。因为follower可能落后许多或者crash了,所以必须确保选择“最新”的follower作为新的leader。一个基本的原则就是,如果leader不在了,新的leader必须拥有原来的leader commit的所有消息。这就需要作一个折衷,如果leader在标明一条消息被commit前等待更多的follower确认,那在它die之后就有更多的follower可以作为新的leader,但这也会造成吞吐率的下降。

一种非常常用的选举leader的方式是“majority 灵秀”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个replica(包含leader和follower),那在commit之前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。因为在剩下的任意f+1个replica里,至少有一个replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几台server,也就是说,如果replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的replica,如果要容忍2个follower挂掉,必须要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的replica,而大量的replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在 Zookeeper 这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA feature是基于 majority-vote-based journal ,但是它的数据存储并没有使用这种expensive的方式。

实际上,leader election算法非常多,比如Zookeper的 Zab , Raft 和 Viewstamped Replication 。而Kafka所使用的leader election算法更像微软的 PacificA 算法。

Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的所有replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个replica的失败,majority vote和ISR在commit前需要等待的replica数量是一样的,但是ISR需要的总的replica的个数几乎是majority vote的一半。

虽然majority vote与ISR相比有不需等待最慢的server这一优势,但是Kafka作者认为Kafka可以通过producer选择是否被commit阻塞来改善这一问题,并且节省下来的replica和磁盘使得ISR模式仍然值得。

上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  • 等待ISR中的任一个replica“活”过来,并且选它作为leader
  • 选择第一个“活”过来的replica(不一定是ISR中的)作为leader

这就需要在可用性和一致性当中作出一个简单的平衡。如果一定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有replica都无法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica作为leader,而这个replica不是ISR中的replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为leader而作为consumer的数据源(前文有说明,所有读写都由leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

上文说明了一个parition的replication过程,然尔Kafka集群需要管理成百上千个partition,Kafka通过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也需要平衡leader的分布,尽可能的让所有partition的leader均匀分布在不同broker上。另一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的所有partition,并为之选举leader。实际上,Kafka选举一个broker作为controller,这个controller通过watch Zookeeper检测所有的broker failure,并负责为所有受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程如下图所示。

image

这样做的好处是,可以批量的通知leadership的变化,从而使得选举过程成本更低,尤其对大量的partition而言。如果controller失败了,幸存的所有broker都会尝试在Zookeeper中创建/controller->{this broker id},如果创建成功(只可能有一个创建成功),则该broker会成为controller,若创建不成功,则该broker会等待新controller的命令。
image

Consumer group

(本节所有描述都是基于consumer hight level API而非low level API)。

每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)

image

很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的consumer在不同的consumer group即可。下图展示了Kafka在Linkedin的一种简化部署。

image

为了更清晰展示Kafka consumer group的特性,笔者作了一项测试。创建一个topic (名为topic1),创建一个属于group1的consumer实例,并创建三个属于group2的consumer实例,然后通过producer向topic1发送key分别为1,2,3r的消息。结果发现属于group1的consumer收到了所有的这三条消息,同时group2中的3个consumer分别收到了key为1,2,3的消息。如下图所示。

image

Consumer Rebalance

(本节所讲述内容均基于Kafka consumer high level API)

Kafka保证同一consumer group中只有一个consumer会消息某条消息,实际上,Kafka保证的是稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是无法让同一个consumer group里的consumer均匀消费数据,优势是每个consumer不用都跟大量的broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个partition里的数据是有序的,这种设计可以保证每个partition里的数据也是有序被消费。

如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。

如下例所示,如果topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的所有数据。

image

增加一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另外一个consumer(consumer2)可消费另外一个partition的数据。

image

再增加一个consumer(consumer3)后,每个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
image

再增加一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另外一个consumer(consumer4)不能消费topic1任何数据。
image

此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
image

接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
image

再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。
image

consumer rebalance算法如下:

  • Sort PT (all partitions in topic T)
  • Sort CG(all consumers in consumer group G)
  • Let i be the index position of Ci in CG and let N=size(PT)/size(CG)
  • Remove current entries owned by Ci from the partition owner registry
  • Assign partitions from i N to (i+1) N-1 to consumer Ci
  • Add newly assigned partitions to the partition owner registry

目前consumer rebalance的控制策略是由每一个consumer通过Zookeeper完成的。具体的控制方式如下:

  • Register itself in the consumer id registry under its group.
  • Register a watch on changes under the consumer id registry.
  • Register a watch on changes under the broker id registry.
  • If the consumer creates a message stream using a topic filter, it also registers a watch on changes under the broker topic registry.
  • Force itself to rebalance within in its consumer group.

在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。

目前(2015-01-19)最新版(0.8.2)Kafka采用的是上述方式。但该方式有不利的方面:

  • Herd effect
    • 任何broker或者consumer的增减都会触发所有的consumer的rebalance
  • Split Brain
    • 每个consumer分别单独通过Zookeeper判断哪些partition down了,那么不同consumer从Zookeeper“看”到的view就可能不一样,这就会造成错误的reblance尝试。而且有可能所有的consumer都认为rebalance已经完成了,但实际上可能并非如此。

根据Kafka官方文档,Kafka作者正在考虑在还未发布的 0.9.x版本中使用中心协调器(coordinator) 。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功(这个过程跟replication controller非常类似,所以我很奇怪为什么当初设计replication controller时没有使用类似方式来解决consumer rebalance的问题)。

image

消息Deliver guarantee

通过上文介绍,想必读者已经明天了producer和consumer是如何工作的,以及Kafka是如何做replication的,接下来要讨论的是Kafka如何确保消息在producer和consumer之间传输。有这么几种可能的delivery guarantee:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
  • Kafka的delivery guarantee semantic非常直接。当producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到的网络问题而造成通信中断,那producer就无法判断该条消息是否已经commit。这一点有点像向一个自动生成primary key的数据库表中插入数据。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于primary key的东西,发生故障时幂等性的retry多次,这样就做到了Exactly one 。截止到目前(Kafka 0.8.2版本,2015-01-25),这一feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从producer和broker是确保了 At least once ,但可通过设置producer异步发送实现 At most once )。
  • 接下来讨论的是消息从broker到consumer的delivery guarantee semantic。(仅针对Kafka consumer high level API)。consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中存下该consumer在该partition下读取的消息的offset。该consumer下一次再读该partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将consumer设置为autocommit,即consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了 Exactly once 。但实际上实际使用中consumer并非读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
  • 读完消息先commit再处理消息。这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once
  • 读完消息先处理再commit。这种模式下,如果处理完了消息在commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于 At least once 。在很多情况使用场景下,消息都有一个primary key,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是 Exactly once 。(人个感觉这种说法有些牵强,毕竟它不是Kafka本身提供的机制,而且primary key本身不保证操作的幂等性。而且实际上我们说delivery guarantee semantic是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们的系统不应该把处理过程的特性—如是否幂等性,当成Kafka本身的feature)
  • 如果一定要做到 Exactly once ,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现 Exactly once 。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
  • 总之,Kafka默认保证 At least once ,并且允许通过设置producer异步提交来实现 At most once 。而 Exactly once 要求与目标存储系统协作,幸运的是Kafka提供的offset可以使用这种方式非常直接非常容易。
Benchmark

纸上得来终觉浅,绝知些事要躬行。笔者希望能亲自测一下Kafka的性能,而非从网上找一些测试数据。所以笔者曾在0.8发布前两个月做过详细的Kafka0.8性能测试,不过很可惜测试报告不慎丢失。所幸在网上找到了Kafka的创始人之一的 Jay Kreps的bechmark 。以下描述皆基于该benchmark。(该benchmark基于Kafka0.8.1)

测试环境

producer吞吐率

该项测试只测producer的吞吐率,也就是数据只被持久化,没有consumer读数据。

1个producer线程,无replication

在这一测试中,创建了一个包含6个partition且没有replication的topic。然后通过一个线程尽可能快的生成50 million条比较短(payload100字节长)的消息。测试结果是821,557 records/second ( 78.3MB/second )。

之所以使用短消息,是因为对于消息系统来说这种使用场景更难。因为如果使用MB/second来表征吞吐率,那发送长消息无疑能使得测试结果更好。

整个测试中,都是用每秒钟delivery的消息的数量乘以payload的长度来计算MB/second的,没有把消息的元信息算在内,所以实际的网络使用量会比这个大。对于本测试来说,每次还需传输额外的22个字节,包括一个可选的key,消息长度描述,CRC等。另外,还包含一些请求相关的overhead,比如topic,partition,acknowledgement等。这就导致我们比较难判断是否已经达到网卡极限,但是把这些overhead都算在吞吐率里面应该更合理一些。因此,我们已经基本达到了网卡的极限。

初步观察此结果会认为它比人们所预期的要高很多,尤其当考虑到Kafka要把数据持久化到磁盘当中。实际上,如果使用随机访问数据系统,比如RDBMS,或者key-velue store,可预期的最高访问频率大概是5000到50000个请求每秒,这和一个好的RPC层所能接受的远程请求量差不多。而该测试中远超于此的原因有两个。

  • Kafka确保写磁盘的过程是线性磁盘I/O,测试中使用的6块廉价磁盘线性I/O的最大吞吐量是822MB/second,这已经远大于1Gb网卡所能带来的吞吐量了。许多消息系统把数据持久化到磁盘当成是一个开销很大的事情,这是因为他们对磁盘的操作都不是线性I/O。
  • 在每一个阶段,Kafka都尽量使用批量处理。如果想了解批处理在I/O操作中的重要性,可以参考David Patterson的” Latency Lags Bandwidth “
1个producer线程,3个异步replication

该项测试与上一测试基本一样,唯一的区别是每个partition有3个replica(所以网络传输的和写入磁盘的总的数据量增加了3倍)。每一个broker即要写作为leader的partition,也要读(从leader读数据)写(将数据写到磁盘)作为follower的partition。测试结果为 786,980 records/second ( 75.1MB/second )。

该项测试中replication是异步的,也就是说broker收到数据并写入本地磁盘后就acknowledge producer,而不必等所有replica都完成replication。也就是说,如果leader crash了,可能会丢掉一些最新的还未备份的数据。但这也会让message acknowledgement延迟更少,实时性更好。

这项测试说明,replication可以很快。整个集群的写能力可能会由于3倍的replication而只有原来的三分之一,但是对于每一个producer来说吞吐率依然足够好。

1个producer线程,3个同步replication

该项测试与上一测试的唯一区别是replication是同步的,每条消息只有在被 in sync 集合里的所有replica都复制过去后才会被置为committed(此时broker会向producer发送acknowledgement)。在这种模式下,Kafka可以保证即使leader crash了,也不会有数据丢失。测试结果为 421,823 records/second ( 40.2MB/second)。

Kafka同步复制与异步复制并没有本质的不同。leader会始终track follower replica从而监控它们是否还alive,只有所有 in sync 集合里的replica都acknowledge的消息才可能被consumer所消费。而对follower的等待影响了吞吐率。可以通过增大batch size来改善这种情况,但为了避免特定的优化而影响测试结果的可比性,本次测试并没有做这种调整。

3个producer,3个异步replication

该测试相当于把上文中的1个producer,复制到了3台不同的机器上(在1台机器上跑多个实例对吞吐率的增加不会有太大帮忙,因为网卡已经基本饱和了),这3个producer同时发送数据。整个集群的吞吐率为 2,024,032 records/second ( 193,0MB/second)。

Producer Throughput Vs. Stored Data

消息系统的一个潜在的危险是当数据能都存于内存时性能很好,但当数据量太大无法完全存于内存中时(然后很多消息系统都会删除已经被消费的数据,但当消费速度比生产速度慢时,仍会造成数据的堆积),数据会被转移到磁盘,从而使得吞吐率下降,这又反过来造成系统无法及时接收数据。这样就非常糟糕,而实际上很多情景下使用queue的目的就是解决数据消费速度和生产速度不一致的问题。

但Kafka不存在这一问题,因为Kafka始终以O(1)的时间复杂度将数据持久化到磁盘,所以其吞吐率不受磁盘上所存储的数据量的影响。为了验证这一特性,做了一个长时间的大数据量的测试,下图是吞吐率与数据量大小的关系图。
image

上图中有一些variance的存在,并可以明显看到,吞吐率并不受磁盘上所存数据量大小的影响。实际上从上图可以看到,当磁盘数据量达到1TB时,吞吐率和磁盘数据只有几百MB时没有明显区别。

这个variance是由Linux I/O管理造成的,它会把数据缓存起来再批量flush。上图的测试结果是在生产环境中对Kafka集群做了些tuning后得到的,这些tuning方法可参考 这里 。

consumer吞吐率

需要注意的是,replication factor并不会影响consumer的吞吐率测试,因为consumer只会从每个partition的leader读数据,而与replicaiton factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。

1个consumer

该测试从有6个partition,3个replication的topic消费50 million的消息。测试结果为940,521 records/second ( 89.7MB/second )。

可以看到,Kafkar的consumer是非常高效的。它直接从broker的文件系统里读取文件块。Kafka使用 sendfile API 来直接通过操作系统直接传输,而不用把数据拷贝到用户空间。该项测试实际上从log的起始处开始读数据,所以它做了真实的I/O。在生产环境下,consumer可以直接读取producer刚刚写下的数据(它可能还在缓存中)。实际上,如果在生产环境下跑 I/O stat ,你可以看到基本上没有物理“读”。也就是说生产环境下consumer的吞吐率会比该项测试中的要高。

3个consumer

将上面的consumer复制到3台不同的机器上,并且并行运行它们(从同一个topic上消费数据)。测试结果为 2,615,968 records/second ( 249.5MB/second )。

正如所预期的那样,consumer的吞吐率几乎线性增涨。

Producer and Consumer

上面的测试只是把producer和consumer分开测试,而该项测试同时运行producer和consumer,这更接近使用场景。实际上目前的replication系统中follower就相当于consumer在工作。

该项测试,在具有6个partition和3个replica的topic上同时使用1个producer和1个consumer,并且使用异步复制。测试结果为 795,064 records/second (75.8MB/second )。

可以看到,该项测试结果与单独测试1个producer时的结果几乎一致。所以说consumer非常轻量级。

消息长度对吞吐率的影响

上面的所有测试都基于短消息(payload 100字节),而正如上文所说,短消息对Kafka来说是更难处理的使用方式,可以预期,随着消息长度的增大,records/second会减小,但MB/second会有所提高。下图是records/second与消息长度的关系图。
image

正如我们所预期的那样,随着消息长度的增加,每秒钟所能发送的消息的数量逐渐减小。但是如果看每秒钟发送的消息的总大小,它会随着消息长度的增加而增加,如下图所示。
image

从上图可以看出,当消息长度为10字节时,因为要频繁入队,花了太多时间获取锁,CPU成了瓶颈,并不能充分利用带宽。但从100字节开始,我们可以看到带宽的使用逐渐趋于饱和(虽然MB/second还是会随着消息长度的增加而增加,但增加的幅度也越来越小)。

端到端的Latency

上文中讨论了吞吐率,那消息传输的latency如何呢?也就是说消息从producer到consumer需要多少时间呢?该项测试创建1个producer和1个consumer并反复计时。结果是, 2 ms (median), 3ms (99th percentile, 14ms (99.9th percentile) 。

(这里并没有说明topic有多少个partition,也没有说明有多少个replica,replication是同步还是异步。实际上这会极大影响producer发送的消息被commit的latency,而只有committed的消息才能被consumer所消费,所以它会最终影响端到端的latency)

Kafka 如何保证消息顺序消费

两种方案:

方案一,kafka topic 只设置一个partition分区

方案二,producer将消息发送到指定partition分区

解析:

方案一:kafka默认保证同一个partition分区内的消息是有序的,则可以设置topic只使用一个分区,这样消息就是全局有序,缺点是只能被consumer group里的一个消费者消费,降低了性能,不适用高并发的情况

方案二:既然kafka默认保证同一个partition分区内的消息是有序的,则producer可以在发送消息时可以指定需要保证顺序的几条消息发送到同一个分区,这样消费者消费时,消息就是有序。

producer发送消息时具体到topic的哪一个partition分区,提供了三种方式

1)指定分区

2)不指定分区,有指定key 则根据key的hash值与分区数进行运算后确定发送到哪个partition分区

3)不指定分区,不指定key,则轮询各分区发送

提高消费者速度

一般来说有几类

1.增加分区(题上不让)

2.关闭autocommit(偏移量手工提交可以按需减少分区偏移量的更新,有利于提升消费速度)

3.增加单次拉取消息的大小(大量消息的场景下可减少拉取消息的次数)

比较另类的:

1.如果不考虑数据一致性,可以将key值平均一下,这样每个分区的消息大小都差不多,有利于负载均衡

2.如果没有开启压缩,最好开启压缩(需要重启集群),可大大提高通信效率,有得消费速度提升

其他

  • JUC有研究没有,讲一讲?