kafkatemplate无法注入_spring - Spring Kafka @SendTo引发异常:需要KafkaTemplate来支持回复 - 堆栈内存溢出...
根据Spring kafka doc的说法,我正在尝试获取消费者结果。基于这个stackoverflow问题 ,应该只能通过使用@SendTo注释来执行此操作,因为spring boot“如果上下文中还没有一个模板,也可以自动配置kafka模板。”但我无法使它正常工作,我仍然可以java.lang.IllegalStateException: a KafkaTemplate is required
根据Spring kafka doc的说法,我正在尝试获取消费者结果。
基于这个stackoverflow问题 ,应该只能通过使用@SendTo注释来执行此操作,因为spring boot“如果上下文中还没有一个模板,也可以自动配置kafka模板。”
但我无法使它正常工作,我仍然可以
java.lang.IllegalStateException: a KafkaTemplate is required to support replies
at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:156)
...
这是我的监听方法
@KafkaListener(topics = "t_invoice")
@SendTo("t_ledger")
public List consume(Invoice invoice) throws IOException {
// do some processing
var ledgerCredit = new LedgerEntry(invoice.getAmount(), "Credit side", 0, "");
var ledgerDebit = new LedgerEntry(0, "", invoice.getAmount(), "Debit side");
return List.of(ledgerCredit, ledgerDebit);
}
我错过了什么?
这是我在使用者上拥有的唯一@Configuration文件。 消费者与生产者是分开的系统(例如,支付系统产生到kafka的发票,我的程序是获取数据并创建分类帐分录的会计系统)
@Configuration
public class KafkaConfig {
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConsumerFactory consumerFactory() {
var properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "600000");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
aplication.yml
spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
试用错误1
如果我禁用KafkaConfig或在运行期间启用调试,则存在此错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.accounting.kafkaconsumer.entity.LedgerEntry to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class com.accounting.kafkaconsumer.entity.LedgerEntry cannot be cast to class java.lang.String (com.accounting.kafkaconsumer.entity.LedgerEntry is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
...
试错2
如果我禁用KafkaConfig并使用此签名(返回String),则可以使用。 但这是无法预期的,因为我的配置是在KafkaConfig
@KafkaListener(topics = "t_invoice")
@SendTo("t_ledger")
public String consume(Invoice invoice) throws IOException {
// do some processing
var listLedger = List.of(ledgerCredit, ledgerDebit);
return objectMapper.writeValueAsString(listLedger);
}
我认为问题出在这里( KafkaConfig ),因为我创建了KafkaListenerContainerFactory新实例, replyTemplate为null。 如何设置我的KafkaConfig的正确方法?
@Bean
public KafkaListenerContainerFactory> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)