我们使用KafkaTemplate.send(String data)这个方法发送消息到Kafka中,显然这个方法并不能满足我们系统的需求,那我们需要查看一下KafkaTemplate所实现的接口,看看还提供了什么方法。

当我们发送消息到Kafka后,我们又怎么去确认消息是否发送成功呢?这就涉及到KafkaTemplate的发送回调方法了。

接下来我们开始正式讲解

查看发送接口

首先我们Ctrl+鼠标左键进入KafkaTemplate的源代码中查看一下,可以看到有关发送的接口如下。

这里的参数还是比较简单的,值得一提的事,方法中有个Long类型的时间戳(timestamp)参数,这是Kafka0.10版本提供的新功能,主要用来使用时间索引进行查询数据以及日志切分清除策略。

还有一个ProducerRecord参数,这个类其实就是整合了topic、partition、data等数据的消费实体类。

topic:这里填写的是Topic的名字

partition:这里填写的是分区的id,其实也是就第几个分区,id从0开始。表示指定发送到该分区中

timestamp:时间戳,一般默认当前时间戳

key:消息的键

data:消息的数据

ProducerRecord:消息对应的封装类,包含上述字段

Message>:Spring自带的Message封装类,包含消息及消息头

ListenableFuture>sendDefault(V data);

ListenableFuture>sendDefault(K key, V data);

ListenableFuture>sendDefault(Integer partition, K key, V data);

ListenableFuture>sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture>send(String topic, V data);

ListenableFuture>send(String topic, K key, V data);

ListenableFuture>send(String topic, Integer partition, K key, V data);

ListenableFuture>send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture> send(ProducerRecordrecord);

ListenableFuture> send(Message> message);

View Code

使用sendDefault发送消息

首先在KafkaConfiguration编写一个带有默认Topic参数的KafkaTemplate,同时为另外一个KafkaTemplate加上@Primary注解,

@Primary注解的意思是在拥有多个同类型的Bean时优先使用该Bean,到时候方便我们使用@Autowired注解自动注入。

//这个是我们之前编写的KafkaTemplate代码,加入@Primary注解

@Beanpublic KafkaTemplatekafkaTemplate() {

KafkaTemplate template= new KafkaTemplate(producerFactory());returntemplate;

}

@Bean("defaultKafkaTemplate")

@Primarypublic KafkaTemplatedefaultKafkaTemplate() {

KafkaTemplate template= new KafkaTemplate(producerFactory());

template.setDefaultTopic("topic.quick.default");returntemplate;

}

View Code

接着编写测试方法,可以看到我们这里调用的是sendDefault方法,而且并没有在方法参数上添加topicName,

这是因为我们在声明defaultKafkaTemplate这个Bean的时候添加了这行代码 template.setDefaultTopic("topic.quick.default"),

只要调用sendDefault方法,kafkaTemplate会自动把消息发送到名为"topic.quick.default"的Topic中。

@ResourceprivateKafkaTemplate defaultKafkaTemplate;

@Testpublic voidtestDefaultKafkaTemplate() {

defaultKafkaTemplate.sendDefault("I`m send msg to default topic");

}

View Code

这里也顺便测试一下其他几个吧。

@Testpublic voidtestTemplateSend() {//发送带有时间戳的消息

kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");//使用ProducerRecord发送消息

ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");

kafkaTemplate.send(record);//使用Message发送消息

Map map = newHashMap();

map.put(KafkaHeaders.TOPIC,"topic.quick.demo");

map.put(KafkaHeaders.PARTITION_ID,0);

map.put(KafkaHeaders.MESSAGE_KEY,0);

GenericMessage message= new GenericMessage("use Message to send message",newMessageHeaders(map));

kafkaTemplate.send(message);

}

View Code

KafkaTemplate异步发送消息

发送时间较长的时候会导致进程提前关闭导致无法调用回调时间。主要是因为KafkaTemplate发送消息是采取异步方式发送的,我们可以看下KafkaTemplate的源代码

这是我们刚才调用的发送消息方法,可以看到KafkaTemplate会使用ProducerRecord把我们传递进来的参数再一次封装,最后调用doSend方法发送消息到Kafka中

send(String topic, V data)

public ListenableFuture>send(String topic, V data) {

ProducerRecord producerRecord = newProducerRecord(topic, data);return this.doSend(producerRecord);

}

View Code

ListenableFuture> doSend(final ProducerRecord producerRecord)

doSend方法先是检测是否开启事务,紧接着使用SettableListenableFuture发送消息,然后判断是否启动自动冲洗数据到Kafka中,我们再接着看看SettableListenableFuture实现了什么接口

protected ListenableFuture> doSend(final ProducerRecordproducerRecord) {if (this.transactional) {

Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");

}final Producer producer = this.getTheProducer();if (this.logger.isTraceEnabled()) {this.logger.trace("Sending: " +producerRecord);

}final SettableListenableFuture> future = newSettableListenableFuture();

producer.send(producerRecord,newCallback() {public voidonCompletion(RecordMetadata metadata, Exception exception) {try{if (exception == null) {

future.set(newSendResult(producerRecord, metadata));if (KafkaTemplate.this.producerListener != null) {

KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);

}if (KafkaTemplate.this.logger.isTraceEnabled()) {

KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " +metadata);

}

}else{

future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));if (KafkaTemplate.this.producerListener != null) {

KafkaTemplate.this.producerListener.onError(producerRecord, exception);

}if (KafkaTemplate.this.logger.isDebugEnabled()) {

KafkaTemplate.this.logger.debug("Failed to send: " +producerRecord, exception);

}

}

}finally{if (!KafkaTemplate.this.transactional) {

KafkaTemplate.this.closeProducer(producer, false);

}

}

}

});if (this.autoFlush) {this.flush();

}if (this.logger.isTraceEnabled()) {this.logger.trace("Sent: " +producerRecord);

}returnfuture;

}

View Code

可以看到SettableListenableFuture实现了ListenableFuture接口,ListenableFuture则实现了Future接口,

Future是Java自带的实现异步编程的接口,支持返回值的异步,而我们使用Thread或者Runnable都是不带返回值的。

public class SettableListenableFuture implements ListenableFuture

public interface ListenableFuture extends Future

View Code

KafkaTemplate同步发送消息

KafkaTemplate异步发送消息大大的提升了生产者的并发能力,但某些场景下我们并不需要异步发送消息,这个时候我们可以采取同步发送方式,实现也是非常简单的,

我们只需要在send方法后面调用get方法即可。

Future模式中,我们采取异步执行事件,等到需要返回值得时候我们再调用get方法获取future的返回值

@Testpublic void testSyncSend() throwsExecutionException, InterruptedException {

kafkaTemplate.send("topic.quick.demo", "test sync send message").get();

}

View Code

get方法还有一个比较有意思的重载方法,get(long timeout, TimeUnit unit),当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,

但需要注意,这里仅抛出异常,消息还是会发送成功的。

这里的测试方法设置send耗时必须小于 一微秒(那必须得失败呀,嘿嘿嘿),运行后我们可以看到抛出的异常,但也发现消息能发送成功并被监听器接收了。

那这功能有什么作用呢,如果还没有接触过SQL慢查询可以去了解一下,使用该方法作为SQL慢查询记录的条件。

@Testpublic void testTimeOut() throwsExecutionException, InterruptedException, TimeoutException {

kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS);

}

View Code

2018-09-08 16:36:09.110 INFO 7724 --- [ demo-0-C-1] com.viu.kafka.listen.DemoListener : demo receive : test send message timeout

java.util.concurrent.TimeoutException

View Code

消息结果回调

一般来说我们都会去获取KafkaTemplate发送消息的结果去判断消息是否发送成功,如果消息发送失败,则会重新发送或者执行对应的业务逻辑。所以这里我们去实现这个功能。

KafkaSendResultHandler

第一步还是编写一个消息结果回调类KafkaSendResultHandler。

当我们使用KafkaTemplate发送消息成功的时候回调用OnSuccess方法,发送失败则会调用onError方法。

@Componentpublic class KafkaSendResultHandler implementsProducerListener {private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

@Overridepublic voidonSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {

log.info("Message send success : " +producerRecord.toString());

}

@Overridepublic voidonError(ProducerRecord producerRecord, Exception exception) {

log.info("Message send error : " +producerRecord.toString());

}

}

View Code

接下来就使用KafkaSendResultHandler实现消息发送结果回调,这里需要休眠,发送时间较长的时候会导致进程提前关闭导致无法调用回调时间。

主要是因为KafkaTemplate发送消息是采取异步方式发送的

@AutowiredprivateKafkaSendResultHandler producerListener;

@Testpublic void testProducerListen() throwsInterruptedException {

kafkaTemplate.setProducerListener(producerListener);

kafkaTemplate.send("topic.quick.demo", "test producer listen");

Thread.sleep(1000);

}

View Code

运行测试方法,我们可以看到控制台输出的日志如下

2018-09-08 15:51:39.975 INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)

View Code

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐