Kafka 的 Lag 计算误区及正确实现

摘要: 原创出处 https://blog.csdn.net/u013256816/article/details/79955578 「朱小厮」欢迎转载,保留摘要,谢谢!

前言

消息堆积是消息中间件的一大特色,消息中间件的流量削峰、冗余存储等功能正是得益于消息中间件的消息堆积能力。然而消息堆积其实是一把亦正亦邪的双刃剑,如果应用场合不恰当反而会对上下游的业务造成不必要的麻烦,比如消息堆积势必会影响上下游整个调用链的时效性,有些中间件如RabbitMQ在发生消息堆积时在某些情况下还会影响自身的性能。对于Kafka而言,虽然消息堆积不会对其自身性能带来多大的困扰,但难免不会影响上下游的业务,堆积过多有可能会造成磁盘爆满,或者触发日志清除策略而造成消息丢失的情况。如何利用好消息堆积这把双刃剑,监控是最为关键的一步。

lag定义

消息堆积是消费滞后(Lag)的一种表现形式,消息中间件服务端中所留存的消息与消费掉的消息之间的差值即为消息堆积量,也称之为消费滞后(Lag)量。对于Kafka而言,消息被发送至Topic中,而Topic又分成了多个分区(Partition),每一个Partition都有一个预写式的日志文件,虽然Partition可以继续细分为若干个段文件(Segment),但是对于上层应用来说可以将Partition看成最小的存储单元(一个由多个Segment文件拼接的“巨型文件”)。每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。我们来看下图,其就是Partition的一个真实写照:img

上图中有四个概念:

  1. LogStartOffset:表示一个Partition的起始位移,初始为0,虽然消息的增加以及日志清除策略的影响,这个值会阶段性的增大。
  2. ConsumerOffset:消费位移,表示Partition的某个消费者消费到的位移位置。
  3. HighWatermark:简称HW,代表消费端所能“观察”到的Partition的最高日志位移,HW大于等于ConsumerOffset的值。
  4. LogEndOffset:简称LEO, 代表Partition的最高日志位移,其值对消费者不可见。比如在ISR(In-Sync-Replicas)副本数等于3的情况下(如下图所示),消息发送到Leader A之后会更新LEO的值,Follower B和Follower C也会实时拉取Leader A中的消息来更新自己,HW就表示A、B、C三者同时达到的日志位移,也就是A、B、C三者中LEO最小的那个值。由于B、C拉取A消息之间延时问题,所以HW必然不会一直与Leader的LEO相等,即LEO>=HW。img

lag计算

要计算Kafka中某个消费者的滞后量很简单,首先看看其消费了几个Topic,然后针对每个Topic来计算其中每个Partition的Lag,每个Partition的Lag计算就显得非常的简单了,参考下图:

由图可知消费Lag=HW - ConsumerOffset。对于这里大家有可能有个误区,就是认为Lag应该是LEO与ConsumerOffset之间的差值,笔者在这之前也犯过这样的错误认知,详细可以参考《如何使用JMX监控Kafka》。LEO是对消费者不可见的,既然不可见何来消费滞后一说。

那么这里就引入了一个新的问题,HW和ConsumerOffset的值如何获取呢?img

首先来说说ConsumerOffset,Kafka中有两处可以存储,一个是Zookeeper,而另一个是”consumer_offsets这个内部topic中,前者是0.8.x版本中的使用方式,但是随着版本的迭代更新,现在越来越趋向于后者。就拿1.0.0版本来说,虽然默认是存储在”consumer_offsets”中,但是保不齐用于就将其存储在了Zookeeper中了。这个问题倒也不难解决,针对两种方式都去拉取,然后哪个有值的取哪个。不过这里还有一个问题,对于消费位移来说,其一般不会实时的更新,而更多的是定时更新,这样可以提高整体的性能。那么这个定时的时间间隔就是ConsumerOffset的误差区间之一。

再来说说HW,其也是Kafka中Partition的一个状态。有可能你会察觉到在Kafka的JMX中可以看到“kafka.log:type=Log,name=LogEndOffset,topic=[topic_name],partition=[partition_num]”这样一个属性,但是这个值不是LEO而是HW。

那么怎样正确的计算消费的Lag呢?对Kafka熟悉的同学可能会想到Kafka中自带的kafka-consumer_groups.sh脚本中就有Lag的信息,示例如下:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

我们深究一下kafka-consumer_groups.sh脚本,发现只有一句代码:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"

其含义就是执行kafka.admin.ConsumerGroupCommand而已。进一步深究,在ConsumerGroupCommand内部抓住了2句关键代码:

val consumerGroupService = new KafkaConsumerGroupService(opts)
val (state, assignments) = consumerGroupService.describeGroup()

代码详解:consumerGroupService的类型是ConsumerGroupServicesealed trait类型),而KafkaConsumerGroupService只是ConsumerGroupService的一种实现,还有一种实现是ZkConsumerGroupService,分别对应新版的消费方式(消费位移存储在__consumer_offsets中)和旧版的消费方式(消费位移存储在zk中),详细计算步骤参考下一段落的内容。opt参数是指“ –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID”等参数。第2句代码是调用describeGroup()方法来获取具体的信息,即二元组中的assignments,这个assignments中保存了上面打印信息中的所有内容。

Scala小知识: 在Scala中trait(特征)相当于Java的接口,实际上它比接口更大强大。与Java中的接口不同的是,它还可以定义属性和方法的实现(JDK8起的接口默认方法)。一般情况下Scala中的类只能继承单一父类,但是如果是trait的话就可以继承多个,从结果来看是实现了多重继承。被sealed声明的trait仅能被同一文件的类继承。

ZkConsumerGroupService中计算消费lag的步骤如下:

  1. 通过zk获取一些基本信息,对应上面打印信息中的:TOPIC、PARTITION、CONSUMER-ID等,不过不会有HOST和CLIENT-ID。
  2. 通过OffsetFetchRequest请求获取消费位移(offset),如果获取失败则在通过zk获取。
  3. 通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO,可见的LEO)。
  4. 计算LogEndOffset与消费位移的差值来获取lag。

KafkaConsumerGroupService中计算消费lag的步骤如下:

  1. 通过DescibeGroupsRequest请求获取一些基本信息,不仅包括TOPIC、PARTITION、CONSUMER-ID,还有HOST和CLIENT-ID。其实还有通过 FindCoordinatorRequest请求来获取coordinator信息,如果不了解coordinator在这里也没影响。
  2. 通过OffsetFetchRequest请求获取消费位移。
  3. 通过OffsetReuqest请求获取分区的LogEndOffset(简称为LEO)。
  4. 计算LogEndOffset与消费位移的差值来获取lag。

可以看到KafkaConsumerGroupService与ZkConsumerGroupService的计算Lag的方式都差不多,但是KafkaConsumerGroupService能获取更多消费详情,并且ZkConsumerGroupService也被标注为@Deprecated的了,后面内容都针对KafkaConsumerGroupService来做说明。既然Kafka已经为我们提供了线程的方法来获取Lag,那么我们有何必再重复造轮子,这里笔者写了一个调用的KafkaConsumerGroupService的示例(KafkaConsumerGroupService是使用Scala语言编写的,在Java的程序里使用类似scala.collection.Seq这样的全名称以防止混淆):

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions opts =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(opts);
scala.Tuple2<scala.Option<String>, scala.Option<scala.collection.Seq<ConsumerGroupCommand
        .PartitionAssignmentState>>> res = kafkaConsumerGroupService.describeGroup();
scala.collection.Seq<ConsumerGroupCommand.PartitionAssignmentState> pasSeq = res._2.get();
scala.collection.Iterator<ConsumerGroupCommand.PartitionAssignmentState> iterable = pasSeq.iterator();
while (iterable.hasNext()) {
    ConsumerGroupCommand.PartitionAssignmentState pas = iterable.next();
    System.out.println(String.format("\n%-30s %-10s %-15s %-15s %-10s %-50s%-30s %s",
            pas.topic().get(), pas.partition().get(), pas.offset().get(),
            pas.logEndOffset().get(), pas.lag().get(), pas.consumerId().get(),
            pas.host().get(), pas.clientId().get()));
}

在使用时,你可以封装一下这段代码然后返回一个类似List的东西给上层业务代码做进一步的使用。ConsumerGroupCommand.PartitionAssignmentState的代码如下:

case class PartitionAssignmentState(
  group: String, coordinator: Option[Node], topic: Option[String],
  partition: Option[Int], offset: Option[Long], lag: Option[Long],
  consumerId: Option[String], host: Option[String],
  clientId: Option[String], logEndOffset: Option[Long])

Scala小知识: 对于case class, 在这里你可以简单的把它看成是一个JavaBean,但是它远比JavaBean强大,比如它会自动生成equals、hashCode、toString、copy、伴生对象、apply、unapply等等东西。在 scala 中,对保护(Protected)成员的访问比 java 更严格一些。因为它只允许保护成员在定义了该成员的的类的子类中被访问。而在java中,用protected关键字修饰的成员,除了定义了该成员的类的子类可以访问,同一个包里的其他类也可以进行访问。Scala中,如果没有指定任何的修饰符,则默认为 public。这样的成员在任何地方都可以被访问。

如果你正在试着运行上面一段程序,你会发现编译失败,报错:cannot access ‘kafka.admin.ConsumerGroupCommand.PartitionAssignmentState’ in ‘kafka.admin.ConsumerGroupCommand‘。这时候需要将所引入的kafka.core包中的kafka.admin.ConsumerGroupCommand中的PartitionAssignmentState类前面的protected修饰符去掉才能编译通过。

现实情况下你可能并不想变更kafka-core的代码然后再重新打包,而是寻求直接能够调用的东西,至于到底怎么实现将会在下一篇文章中介绍,如果你比较猴急,可以先预览一下代码的实现,具体参见:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/com/hidden/custom/kafka/admin/KafkaConsumerGroupCustomService.java。

kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,故要么将PartitionAssignmentState前的protected修饰符去掉,要么像《 如何获取Kafka的消费者详情》和《集群管理工具KafkaAdminClient——改造》这两篇这样来实现,但是真的需要这样子做么?

可以直接将describeGroup返回的结果转换成JSON然后传至监控页面(supported by YANGliiN oba)。代码如下:

String[] agrs = {"--describe", "--bootstrap-server", brokers, "--group", groupId};
ConsumerGroupCommand.ConsumerGroupCommandOptions options =
        new ConsumerGroupCommand.ConsumerGroupCommandOptions(agrs);
ConsumerGroupCommand.KafkaConsumerGroupService kafkaConsumerGroupService =
        new ConsumerGroupCommand.KafkaConsumerGroupService(options);
ObjectMapper mapper = new ObjectMapper();
//1. 使用jackson-module-scala_2.12
mapper.registerModule(new DefaultScalaModule());
//2. 反序列化时忽略对象不存在的属性
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//3. 将Scala对象序列化成JSON字符串
String source = mapper.writeValueAsString(kafkaConsumerGroupService.describeGroup()._2.get());

这里需要采用的是jackson-module-scala的包实现,如果直接用普通的JSON序列化方式那么会达不到想要的效果,jackson以及jackson-module-scala对应的Maven库如下:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.9.4</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_2.12</artifactId>
    <version>2.9.5</version>
</dependency>

注意如果本地安装的Scala版本与所配置的jackson-module-scala版本不一致的话会报出一些异常。发散一下思维:既然可以序列化为JSON,那么完全可以通过JSON再反序列化会对象,只不过通过JSON作为中间媒介,将访问受限的Scala对象转变为Java对象,上面剩余代码如下:

//4. 将JSON字符串反序列化成Java对象
List<PartitionAssignmentState> target = mapper.readValue(source,
        getCollectionType(mapper,List.class,PartitionAssignmentState.class));
//5. 排序
target.sort((o1, o2) -> o1.getPartition() - o2.getPartition());
//6. 打印
printPasList(target);

如此就可以达到与前面几篇文章中关于获取消费者详情功能同样的效果。这里有两个注意要点:

  1. PartitionAssignmentState中的coordinator是Node类型,这个类型需要自定义,Kafka原生的会报错。
  2. 反序列化时Node会有一个empty的属性不识别,解决方案参考代码中的步骤2.

代码更多细节请参考:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/com/hidden/custom/kafka/admin/KafkaConsumerGroupScalaService.java

通过JSON的序列化和反序列化操作实现了原本不能为之的事情,那么思维再发散一下,也可以序列化成字节流,比如通过ByteBuffer进行转换,只不过编程逻辑变得复杂了。

上面这段陈述有可能会让人觉得Scala与Java之间的互操作起来不容易,其实不然,上面这段陈述只是用来补充一下如何获取消费者详情的另一种方法,Scala与Java之间的互操作还是比较简单的,一般情况下都可以直接使用对方的类。对于集合而言,Scala中还有用于Scala与Java集合的互转的scala.collection.JavaConverters(scala2.8.1开始引入),与此雷同的scala.collection.JavaConversions已被标注为@Deprecated(since 2.12.0)。在scala代码中如果需要集合转换,首先引入scala.collection.JavaConverters._,进而显示调用asJava或者asScala方法完成转型。关于Scala与Java集合互转的介绍会在下一篇文章中呈现。

这些信息有用吗?
Do you have any suggestions for improvement?

Thanks for your feedback!