时间轮在Kafka的实践_滴滴技术的博客-CSDN博客

导读:

时间轮技术其实出来很久了,在kafka、zookeeper等技术中都有时间轮使用的方式。

时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。所以时间轮的模型能够高效管理各种延时任务、周期任务、通知任务

  • 在Kafka中应用了大量的延迟操作但在Kafka中,并没用使用JDK自带的Timer或是DelayQueue用于延迟操作,而是使用自己开发的DelayedOperationPurgatory组件用于管理延迟操作。
  • Kafka这类分布式框架有大量延迟操作并且对性能要求及其高,而java.util.Timerjava.uti.concurrent.DelayQueue的插入和删除时间复杂度都为对数阶0(log n)并不能满足Kafka性能要求。
  • Kafka实现了基于时间轮的定时任务组件,该时间轮定时任务实现的插入与删除(开始定时器与暂停定时器)的时间复杂度都为常数阶O(1)。
  • 时间轮的应用并不少见,在Netty、Akka、Quarz、Zookeeper、Redis等高性能组件中都存在时间轮定时器的踪影。
  • Redis的定时调度是基于时间轮实现的。

本文主要介绍时间轮在kafka的应用和实战,从核心源码和设计的角度对时间轮进行深入的讲解 。

一、引子

从2个面试题说起,第一个问题:如果一台机器上有10w个定时任务,如何做到高效触发?

具体场景是:

有一个APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。

 其中,单机TCP同时在线量约在10w级别,keepalive请求包较分散大概30s一次,吞吐量约在3000qps

方案一:

常用方案使用time定时任务,每秒扫描一次所有连接的集合Map<uid, last_packet_time>,把连接时间(每次有新的请求更新对应连接的连接时间)比当前时间的差值大30s的连接找出来处理。

方案二,使用环形队列法:

三个重要的数据结构:

  1. 30s超时,就创建一个index从0到30的环形队列(本质是个数组)

  2. 环上每一个slot是一个Set<uid>,任务集合

  3. 同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里

这样当有某用户uid有请求包到达时:

  1. Map结构中,查找出这个uid存储在哪一个slot

  2. 从这个slotSet结构中,删除这个uid

  3. uid重新加入到新的slot中,具体是哪一个slot呢 => Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到

  4. 更新Map,这个uid对应slot的index值

哪些元素会被超时掉呢?

Current Index每秒种移动一个slot,超过index为29的slot对应的Set<uid>中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。

所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。

两种方案对比:

  1. 方案一每次都要轮询所有数据,
  2. 方案二使用环形队列只需要轮询这一刻需要过期的数据,如果没有数据过期则没有数据要处理,并且是批量超时,并且由于是环形结构更加节约空间,这很适合高性能场景。

第二个问题:在开发过程中有延迟一定时间的任务要执行,怎么做?

如果不重复造轮子的话,我们的选择当然是延迟队列或者Timer

延迟队列和在Timer中增加延时任务采用数组表示的最小堆的数据结构实现,每次放入新元素和移除队首元素时间复杂度为O(nlog(n))。

二、 时间轮

  • 方案二所采用的环形队列,就是时间轮的底层数据结构,它能够让需要处理的数据(任务的抽象)集中,在Kafka中存在大量的延迟操作,比如延迟生产、延迟拉取以及延迟删除等。Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定时器(SystemTimer)。
  • JDK的Timer和DelayQueue插入和删除操作的平均时间复杂度为O(nlog(n)),并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、Zookeeper等组件中都存在时间轮的踪影。

2.1 时间轮的数据结构

参考下图,Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。在Kafka源码中对这个TimeTaskList是用一个名称为buckets的数组表示的,所以后面介绍中可能TimerTaskList也会被称为bucket

图二

针对上图的几个名词简单解释下

  • tickMs:时间轮由多个时间格组成,每个时间格就是tickMs,它代表当前时间轮的基本时间跨度。

  • wheelSize:代表每一层时间轮的格数

  • interval:当前时间轮的总体时间跨度,interval=tickMs × wheelSize

  • startMs:构造当层时间轮时候的当前时间,第一层的时间轮的startMsTimeUnit.NANOSECONDS.toMillis(nanoseconds()),上层时间轮的startMs为下层时间轮的currentTime

  • currentTime:表示时间轮当前所处的时间,currentTime是tickMs整数倍(通过currentTime=startMs - (startMs % tickMs)来保正currentTime一定是tickMs的整数倍),这个运算类比钟表中分钟里65秒分钟指针指向的还是1分钟)。currentTime可以将整个时间轮划分为到期部分未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList的所有任务。

  • tickMs:滴答一次的时长,类似于手表的例子中向前推进一格的时间。对于秒针而言,tickMs 就是 1 秒。同理,分针是 1 分,时针是 1 小时。在 Kafka 中,第 1 层时间轮的 tickMs 被固定为 1 毫秒,也就是说,向前推进一格 Bucket 的时长是 1 毫秒。

  • wheelSize:每一层时间轮上的 Bucket 数量。第 1 层的 Bucket 数量是 20。

  • startMs:时间轮对象被创建时的起始时间戳。

  • taskCounter:这一层时间轮上的总定时任务数。

  • queue:将所有 Bucket 按照过期时间排序的延迟队列。随着时间不断向前推进,Kafka 需要依靠这个队列获取那些已过期的 Bucket,并清除它们。

  • interval:这层时间轮总时长,等于滴答时长乘以 wheelSize。以第 1 层为例,interval 就是 20 毫秒。由于下一层时间轮的滴答时长就是上一层的总时长,因此,第 2 层的滴答时长就是 20 毫秒,总时长是 400 毫秒,以此类推。

  • buckets:时间轮下的所有 Bucket 对象,也就是所有 TimerTaskList 对象。

  • currentTime:当前时间戳,只是源码对它进行了一些微调整,将它设置成小于当前时间的最大滴答时长的整数倍。举个例子,假设滴答时长是 20 毫秒,当前时间戳是 123 毫秒,那么,currentTime 会被调整为 120 毫秒。

  • overflowWheel:Kafka 是按需创建上层时间轮的。这也就是说,当有新的定时任务到达时,会尝试将其放入第 1 层时间轮。如果第 1 层的 interval 无法容纳定时任务的超时时间,就现场创建并配置好第 2 层时间轮,并再次尝试放入,如果依然无法容纳,那么,就再创建和配置第 3 层时间轮,以此类推,直到找到适合容纳该定时任务的第 N 层时间轮。

2.2 时间轮中的任务存放

若时间轮的tickMs=1ms,wheelSize=20,那么可以计算得出interval20ms。初始情况下表盘指针currentTime指向时间格0,此时有一个定时为2ms的任务插入进来会存放到时间格为2的TimerTaskList中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2所对应的TimeTaskList中的任务做相应的到期操作。此时若又有一个定时为8ms的任务插入进来,则会存放到时间格10中,currentTime再过8ms后会指向时间格10。如果同时有一个定时为19ms的任务插入进来怎么办?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入到原本已经到期的时间格1中。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTimecurrentTime+interval之间。

2.3 时间轮的升降级

如果此时有个定时为350ms的任务该如何处理?直接扩充wheelSize的大小么?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize100万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。Kafka为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。

TimingWheelkafka时间轮的实现,内部包含了一个TimerTaskList数组,每个数组包含了一些链表组成的TimerTaskEntry事件,每个TimerTaskList表示时间轮的某一格,这一格的时间跨度为tickMs,同-个TimerTaskList中的事件都是相差在-个tickMs跨度内的,整个时间轮的时间跨度为interval = tickMs * wheelSize该时间轮能处理的时间范围在cuurentTimecurrentTime + interval之间的事件
当添加一个时间他的超时时间大于整个时间的度时,exoiration = currentTime + interval,会将该事件向上级传递,上级的tickMs 是下级的interval,传递直到某一个时间满足expiration  currentTime + interval,然后计算对应位于哪一格,然后将事件放进去,重新设置超时时间,然后放进jdk延迟队列。

图三

参考上图,复用之前的案例,

  • 第一层的时间轮tickMs=1ms, wheelSize=20, interval=20ms。
  • 第二层的时间轮的tickMs为第一层时间轮的interval,即为20ms
  • 每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms
  • 以此类推,400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms

计算公式:buckets[(expiration / tickMs) % wheelSize] 

其中,

  • expiration=delayMs+startMs=2ms
  • startMs = currentTime
  • 刚才提到的350ms的任务,不会插入到第一层时间轮,会插入到interval=20*20的第二层时间轮中,具体插入到时间轮的哪个bucket呢?假设currentTime=0ms, 先用(350)/tickMs(20)=virtualId(17),然后virtualId(17) %wheelSize (20) = 17,所以350会放在第16(17 - 1 = 16, 起始为0)个bucket
  • 如果此时有一个450ms后执行的任务,将会放在第三层时间轮中,按照刚才的计算公式,仍然假设currentTime=0ms, (450ms / tickMs(400)),会放在第1bucket。第0bucket里会包含[0, 400)ms的任务, 第1bucket里会包含[400, 800)ms的任务。
  • 随着时间流逝,当时间过去了400ms,那么450ms后就要执行的任务还剩下50ms的时间才能执行,此时有一个时间轮降级的操作,将50ms任务重新提交到二层级时间轮中。
  • 此时50ms的任务根据公式(50ms/tickMs(20))会放入第二个时间轮的第2个的bucket中,此bucket的时间范围为[40, 60)ms,然后再经过40ms,这个50ms的任务又会被监控到。
  • 此时距离任务执行还有10ms,同样将10ms的任务提交到一层级时间轮,此时会加入到第一层时间轮的第10bucket,所以再经过10ms后,此任务到期,最终执行。

整个时间轮的升级降级操作是不是很类似于我们的时钟? 

  • 第一层时间轮tickMs=1s, wheelSize=60,interval=1min,此为秒钟;
  • 第二层tickMs=1min,wheelSize=60,interval=1hour,此为分钟;
  • 第三层tickMs=1hour,wheelSize为12,interval为12hours

此为时钟。而钟表的指针就对应程序中的currentTime,这个后面分析代码时候会讲到(对这个的理解也是时间轮理解的重点和难点)。

2.4 任务添加和驱动时间轮滚动核心流程图

图四

▍2总结

kafka的延迟队列使用时间轮实现,能够支持大量任务的高效触发,但是在kafka延迟队列实现方案里还是看到了delayQueue的影子,使用delayQueue是对时间轮里面的bucket放入延迟队列,以此来推动时间轮滚动,但是基于将插入和删除操作则放入时间轮中,将这些操作的时间复杂度都降为O(1),提升效率。Kafka对性能的极致追求让它把最合适的组件放在最适合的位置。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐