Kafka3.0源码学习
Kafka3.0源码学习
一、生产者源码
1、初始化
生产者 main 线程初始化,点击 main()方法中的 KafkaProducer()
1 | KafkaProducer(ProducerConfig config, |
生产者 sender 线程初始化,KafkaProducer.java中点击 newSender()方法,查看发送线程初始化
1 | Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) { |
Sender 对象被放到了一个线程中启动,所有需要点击 newSender()方法中的 Sender,并找到 sender 对象中的 run()方法
1 |
|
2、发送数据到缓冲区
2.1 发送总体流程
从send()方法进入
1 | public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { |
2.2 分区选择
1 | private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
2.3 发送消息大小校验
1 | private void ensureValidRecordSize(int size) { |
2.4 内存池
1 | public RecordAppendResult append(TopicPartition tp, |
3、sender 线程发送数据
1 | void runOnce() { |
二、消费者源码
1、初始化
点击 main()方法中的 KafkaConsumer ()
1 | KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { |
2、消费者订阅主题
点击自己编写的 CustomConsumer.java 中的 subscribe ()方法
1 | public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { |
3、消费者拉取和处理数据
3.1 消费总体流程
点击自己编写的 CustomConsumer.java 中的 poll ()方法
1 | private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) { |
3.2 消费者/消费者组初始化
1 | public boolean poll(Timer timer, boolean waitForJoinGroup) { |
3.3 拉取数据
1 | private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) { |
3.4 消费者 Offset 提交
三、服务端源码
生产者消费者源码使用java编写,而服务端源码使用scala编写
程序入口在core→src→main→scala→Kafka→kafka.scala
1 | def main(args: Array[String]): Unit = { |