springboot对kafka的client很好的实现了集成,使用非常方便,本文也实现了一个在springboot中实现操作kafka的demo。

1.POM配置

只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:

org.springframework.boot

spring-boot-starter-parent

1.5.4.RELEASE

1.8

1.2.2.RELEASE

UTF-8

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-starter-aop

org.springframework.kafka

spring-kafka

${spring-kafka.version}

org.springframework.kafka

spring-kafka-test

${spring-kafka.version}

test

2.生产者

参数配置类,其参数卸载yml文件中,通过@Value注入

package com.dhb.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.common.serialization.StringSerializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;

import java.util.Map;

@Configuration

public class SenderConfig {

@Value("${kafka.bootstrap-servers}")

private String bootstrapServers;

@Bean

public Map producerConfigs() {

Map props = new HashMap<>();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);

props.put(ProducerConfig.ACKS_CONFIG,"0");

return props;

}

@Bean

public ProducerFactory producerFactory() {

return new DefaultKafkaProducerFactory<>(producerConfigs());

}

@Bean

public KafkaTemplate kafkaTemplate() {

return new KafkaTemplate(producerFactory());

}

@Bean

public Sender sender() {

return new Sender();

}

}

消息发送类

package com.dhb.kafka.producer;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

@Slf4j

public class Sender {

@Autowired

private KafkaTemplate kafkaTemplate;

public void send(String topic,String payload) {

log.info("sending payload='{}' to topic='{}'",payload,topic);

this.kafkaTemplate.send(topic,payload);

}

}

3.消费者

参数配置类

package com.dhb.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableKafka

public class ReceiverConfig {

@Value("${kafka.bootstrap-servers}")

private String bootstrapServers;

public Map consumerConfigs() {

Map props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

props.put(ConsumerConfig.GROUP_ID_CONFIG,"helloword");

return props;

}

@Bean

public ConsumerFactory consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean

public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

return factory;

}

@Bean

public Receiver receiver() {

return new Receiver();

}

}

消息接受类

package com.dhb.kafka.consumer;

import lombok.extern.slf4j.Slf4j;

import org.springframework.kafka.annotation.KafkaListener;

import java.util.concurrent.CountDownLatch;

@Slf4j

public class Receiver {

private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {

return latch;

}

@KafkaListener(topics = "${kafka.topic.helloworld}")

public void receive(String payload) {

log.info("received payload='{}'",payload);

latch.countDown();

}

}

3.web测试类

定义了一个基于http的web测试接口

package com.dhb.kafka.web;

import com.dhb.kafka.producer.Sender;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;

import javax.servlet.http.HttpServletResponse;

import java.io.IOException;

@RestController

@Slf4j

public class KafkaProducer {

@Autowired

Sender sender;

@RequestMapping(value = "/sender.action", method = RequestMethod.POST)

public void exec(HttpServletRequest request, HttpServletResponse response,String data) throws IOException{

this.sender.send("testtopic",data);

response.setCharacterEncoding("UTF-8");

response.setContentType("text/json");

response.getWriter().write("success");

response.getWriter().flush();

response.getWriter().close();

}

}

4.启动类及配置

package com.dhb.kafka;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class KafkaApplication {

public static void main(String[] args) {

SpringApplication.run(KafkaApplication.class,args);

}

}

application.yml

kafka:

bootstrap-servers: 192.168.162.239:9092

topic:

helloworld: testtopic

程序结构:

6a3d55773ef363f813928c94e4fbfd6c.png

包结构

5.读写测试

通过执行KafkaApplication的main方法启动程序。然后打开postman进行测试:

eabac4c65c0a340fde585dd0d1c9a0d3.png

运行后返回success

17cabc941db1b53cd4f7f328cc7c9446.png

生产者日志:

479047feec033c53d2b4c364c3eb3175.png

消费者日志:

3267bc4bd3799b41b790d8f85dc87340.png

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐