Windows+docker下简单kafka测试联调
拉取成功后通过docker images命令或者docker desktop Images中看到此镜像。使用docker ps或者在docker desktop的containers中查看刚才的实例。1.安装完Windows desktop后,控制台拉取官方镜像(写时最新为4.1.1)网站上初始化一个Spring Boot项目,添加web+kafka依赖。
环境:
1. Windows desktop
2. OpenJDK 17
3. Idea community
4. offset explorer(kafka GUI tool, optional)
一. 简单环境安装
1.安装完Windows desktop后,控制台拉取官方镜像(写时最新为4.1.1)
docker pull apache/kafka:4.1.1
拉取成功后通过docker images命令或者docker desktop Images中看到此镜像
2.运行实例,将端口映射到本机9092上:
docker run -d --name kafka -p 9092:9092 apche/kafka:4.1.1
使用docker ps或者在docker desktop的containers中查看刚才的实例
3.进入到实例里面, 用docker exec命令或者docker destop直接查看:
docker exec -it kafka bash
# or
docker exec -it [your kafka instance id] bash
kafka在/opt/kafka目录下,查看目录结构:

查看版本验证:
bin/kafka-topics.sh --version

查看本地已有主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
手动创建topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic your-topic-name
手动发送/接受消息(开两个terminal,分别执行;交互式命令行,生产者输入字符串消息,消费者收到消息):
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning


删除topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic your-topic-name
二.使用Spring Boot/Java程序简单测试Kafka
1.去spring initializer网站上初始化一个Spring Boot项目,添加web+kafka依赖

2.用idea打开并添加如下代码:

application.yarml:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
server:
port: 8080
KafkaController.java:
package com.example.demo.controller;
import com.example.demo.service.KafkaProducer;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final KafkaProducer kafkaProducer;
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@PostMapping("/send")
public String sendMessage(@RequestParam String msg,
@RequestParam(defaultValue = "test-topic") String topic) {
kafkaProducer.sendMessage(topic, msg);
return "Message sent to topic '" + topic + "': " + msg;
}
}
KafkaConsumer.java:
package com.example.demo.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "my-group")
public void listen(ConsumerRecord<String, String> record) {
System.out.printf("Received message: topic=%s, partition=%d, offset=%d, value=%s%n",
record.topic(), record.partition(), record.offset(), record.value());
}
}
KafkaProducer.java:
package com.example.demo.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
System.out.println("Sent message: " + message + " to topic: " + topic);
}
}
运行DemoApplication的main方法,启动spring boot,控制台最后打印:

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)