让我详细解释 __consumer_offsets topic 的数据结构。这个特殊的 topic 存储了非常重要的元数据信息,它的设计非常巧妙。

__consumer_offsets Topic 的数据结构

基本结构

__consumer_offsets topic 中的每条消息都是键值对(key-value)结构,但根据不同的用途,有几种不同的消息格式:

  1. 消费者 Offset 提交消息
// 键的结构
class OffsetCommitKey {
    String group;           // 消费者组名称
    String topic;           // 主题名称
    int partition;         // 分区号
    
    public byte[] serialize() {
        // 序列化格式:version + group + topic + partition
        ByteBuffer buffer = ByteBuffer.allocate(calculateSize());
        buffer.putShort(version);    // 版本号,用于支持格式演进
        buffer.putString(group);     // 消费者组
        buffer.putString(topic);     // 主题
        buffer.putInt(partition);    // 分区
        return buffer.array();
    }
}

// 值的结构
class OffsetCommitValue {
    long offset;           // 提交的偏移量
    String metadata;       // 可选的元数据(比如提交者的信息)
    long timestamp;        // 提交时间
    
    public byte[] serialize() {
        ByteBuffer buffer = ByteBuffer.allocate(calculateSize());
        buffer.putLong(offset);
        buffer.putString(metadata);
        buffer.putLong(timestamp);
        return buffer.array();
    }
}
  1. 消费者组元数据消息
// 键的结构
class GroupMetadataKey {
    String group;         // 消费者组名称
    
    public byte[] serialize() {
        ByteBuffer buffer = ByteBuffer.allocate(calculateSize());
        buffer.putShort(version);
        buffer.putString(group);
        return buffer.array();
    }
}

// 值的结构
class GroupMetadataValue {
    String protocolType;           // 协议类型
    String protocol;               // 分区分配协议
    List<MemberMetadata> members;  // 组成员信息
    
    class MemberMetadata {
        String memberId;           // 成员ID
        String clientId;           // 客户端ID
        String clientHost;         // 客户端主机名
        byte[] subscription;       // 订阅信息
        byte[] assignment;         // 分配信息
    }
}

实际应用示例

让我们看看这些结构是如何使用的:

public class OffsetManager {
    // 提交消费位移
    public void commitOffset(String group, String topic, int partition, long offset) {
        // 1. 构造键
        OffsetCommitKey key = new OffsetCommitKey(group, topic, partition);
        
        // 2. 构造值
        OffsetCommitValue value = new OffsetCommitValue(
            offset,
            "consumer-1",  // 元数据
            System.currentTimeMillis()
        );
        
        // 3. 写入到 __consumer_offsets topic
        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
            "__consumer_offsets",
            key.serialize(),
            value.serialize()
        );
        producer.send(record);
    }
    
    // 读取已提交的位移
    public long readCommittedOffset(String group, String topic, int partition) {
        // 1. 构造查询键
        OffsetCommitKey key = new OffsetCommitKey(group, topic, partition);
        
        // 2. 从 __consumer_offsets 中查找对应的记录
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
            if (Arrays.equals(record.key(), key.serialize())) {
                // 3. 反序列化值
                OffsetCommitValue value = OffsetCommitValue.deserialize(record.value());
                return value.offset;
            }
        }
        return -1; // 未找到已提交的位移
    }
}

存储优化

Kafka 对 __consumer_offsets topic 做了一些特殊的优化:

  1. 压缩
// broker 配置
Properties props = new Properties();
// 使用日志压缩
props.put("cleanup.policy", "compact");
// 确保相同 key 的旧值被新值覆盖
props.put("delete.retention.ms", "0");
  1. 分区管理
// 计算 offset 提交消息应该发送到哪个分区
public class OffsetCommitPartitioner {
    public int partition(String group, String topic, int partition) {
        // 使用 group 名称的哈希值来确定分区
        return Math.abs(group.hashCode()) % numPartitions;
    }
}

监控和管理

这种结构设计使得我们可以方便地监控消费进度:

public class ConsumerGroupMonitor {
    public Map<TopicPartition, Long> getLag(String group) {
        Map<TopicPartition, Long> result = new HashMap<>();
        
        // 1. 读取提交的 offset
        Map<TopicPartition, OffsetAndMetadata> committed = 
            consumer.committed(consumer.assignment());
            
        // 2. 获取每个分区的最新 offset
        for (TopicPartition partition : committed.keySet()) {
            long committedOffset = committed.get(partition).offset();
            long latestOffset = consumer.endOffsets(
                Collections.singleton(partition)).get(partition);
            
            // 3. 计算延迟
            result.put(partition, latestOffset - committedOffset);
        }
        
        return result;
    }
}

理解这个数据结构对于以下场景特别重要:

  • 调试消费者问题
  • 监控消费延迟
  • 实现自定义的消费者组管理逻辑
  • 灾难恢复
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐