之前写了一篇文章,当然,来自Apache Kafka的翻译文档,让大家更能理解Kafka,地址是:

Apache Kafka® 是一个分布式流媒体平台,很适合用来处理大并发的数据,本人使用了Kafka有两年多了,非常稳定。

1.下载Kafka

我测试及开发的系统是windows,所以现在要下载一个win版本的,JDK环境就不说了,本次测试的版本号:kafka_2.12-1.0.1。

由于Kafka的运行在zookeeper环境内,所以在启动之前,我们需要安装一下zookeeper,版本号:zookeeper-3.4.11。

2.配置Kafka

如果下载下来不配置的话,会出现很多问题。

\kafka_2.12-1.0.1\config\server.properties

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

# FORMAT:

# listeners = listener_name://host_name:port

# EXAMPLE:

# listeners = PLAINTEXT://your.host.name:9092

#listeners=PLAINTEXT://:9092

port=9092

host.name=127.0.0.1

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files

#log.dirs=/tmp/kafka-logs

log.dirs=D:\\kafka\\logs

示例的配置有删减,因为事实上我只配置了这几点而已:

port=9092

host.name=127.0.0.1

log.dirs=D:\\kafka\\logs

\zookeeper-3.4.11\conf\zoo.cfg

dataDir=D:\\kafka\\logs

\kafka_2.12-1.0.1\config\zookeeper.properties

dataDir=D:\\kafka\\logs

到这里,基本上就可以运行了,现在首先启动zookeeper服务器:

zkServer  //简单直接

./zookeeper-3.4.11/bin/zkServer.cmd //或许可以这样子

./zkServer.sh start  //linux 可能是这样子

然后启动kafka服务器:

.\kafka_2.12-1.0.1\bin\windows\kafka-server-start.bat .\kafka_2.12-1.0.1\config\server.properties

./kafka-server-start.sh ../config/server.properties  //linux 启动

./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &  //linux 后台启动

nohup ./kafka-server-start.sh ../config/server.properties &  //linux 后台启动

服务器启动完了,现在来写代码,配置kafka

3.集成Kafka

maven

org.springframework.kafka

spring-kafka

1.3.0.RELEASE

配置生产者

private final Map producerArg = new HashMap<>();

private DefaultKafkaProducerFactory kafkaProducerFactory;

private KafkaTemplate kafkaTemplate;

private void initArg() {

producerArg.put("bootstrap.servers", "127.0.0.1:9092");

producerArg.put("group.id", "100");

producerArg.put("compression.type", "gzip");

producerArg.put("reconnect.backoff.ms ", 20000);

producerArg.put("retry.backoff.ms", 20000);

producerArg.put("retries", 30);

producerArg.put("batch.size", "16384");

producerArg.put("linger.ms", "50");

producerArg.put("acks", "all");

producerArg.put("buffer.memory", "33554432");

producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

}

/**

* 创建一个生产者工厂类

* @return

*/

public DefaultKafkaProducerFactory getKafkaProducerFactory() {

if (kafkaProducerFactory == null) {

kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg);

}

return kafkaProducerFactory;

}

/**

* 创建一个消息模板

* @param topic 默认的TOPIC

* @param listener 生产者监听,如不需要则传入null

* @return KafkaTemplate

*/

@Override

public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) {

if (kafkaTemplate == null) {

kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory());

kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT);

kafkaTemplate.setProducerListener(listener);

}

return kafkaTemplate;

}

/**

* 发布消息

* @param topic TopicName

* @param message 消息字符串,通常为JSON string

* @param isUsePartition 是否使用分区

* @param partitionNum 分区的数量

* @param role 用来区分消息key值

* @return

*/

@Override

public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) {

if (role == null) {

role = ROLE_DEFAULT;

}

String key = role + "_" + message.hashCode();

ListenableFuture> result;

if (isUsePartition) {

int index = getPartitionIndex(key, partitionNum);

result = kafkaTemplate.send(topic, index, key, message);

} else {

result = kafkaTemplate.send(topic, key, message);

}

return checkResult(result);

}

配置消费者

private final Map consumerArg = new HashMap<>();

private DefaultKafkaConsumerFactory kafkaConsumerFactory;

/**

* 初始化参数

*/

private void initArg() {

String groupId = "20190504";

consumerArg.put("bootstrap.servers", "127.0.0.1:9092");

consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id

consumerArg.put("enable.auto.commit", "false");

consumerArg.put("auto.commit.interval.ms", "1000");

consumerArg.put("auto.offset.reset", "latest");

consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

}

/**

* 创建一个消费者工厂类

* @return

*/

public DefaultKafkaConsumerFactory getKafkaConsumerFactory() {

if (kafkaConsumerFactory == null) {

kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg);

}

return kafkaConsumerFactory;

}

/**

* 添加一个消费者监听

* @param listener 监听器

* @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id

* @param topic 监听Topic名称

* @return 返回KafkaMessageListenerContainer对象,可以进行stop或start

*/

@Override

public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) {

ContainerProperties properties = new ContainerProperties(topic);

properties.setMessageListener(listener);

properties.setGroupId(groupId);

KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties);

container.start();

return container;

}

非常简单,现在已经配置完,附上测试代码:

@RestController

@RequestMapping(value = "/test")

public class TestKafkaMQController {

private static final Logger logger = Logger.getLogger("kafkaMQ>");

@Autowired

private IKafkaMQService kafkaMQService;

private final Map listenerContainerMap = new ConcurrentHashMap<>();

/**

* http://localhost:8180/test/kafka/pub

*

* @param request

* @return

*/

@RequestMapping(value = "/kafka/pub")

public ResultResp kafkaPub(HttpServletRequest request) {

ResultResp resp = new ResultResp<>();

String msg = "test kafka " + DateTimeUtils.getTime();

try {

kafkaMQService.send(KafkaMQService.TOPIC_DEFAULT, msg);

resp.setInfo(msg);

} catch (Exception e) {

e.printStackTrace();

resp.setInfo(e.getMessage());

}

return resp;

}

/**

* http://localhost:8180/test/kafka/sub?group=20190504&id=100

* http://localhost:8180/test/kafka/sub?group=20190504&id=101

* http://localhost:8180/test/kafka/sub?group=20190503&id=102

*

* @param request

* @return

*/

@RequestMapping(value = "/kafka/sub")

public ResultResp kafkaSub(HttpServletRequest request) {

ResultResp resp = new ResultResp<>();

String id = request.getParameter("id");

String group = request.getParameter("group");

try {

KafkaMessageListenerContainer container = kafkaMQService.addListener(new MessageListener() {

@Override

public void onMessage(ConsumerRecord record) {

String log = "%s#{topic:%s, key:%s, value:%s, offset:%s, partition:%s, timestamp:%s }";

logger.info(String.format(log, id, record.topic(), record.key(), record.value(), record.offset(), record.partition(), record.timestamp()));

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}, group, KafkaMQService.TOPIC_DEFAULT);

listenerContainerMap.put(id, container);

resp.setInfo(id);

} catch (Exception e) {

e.printStackTrace();

resp.setInfo(e.getMessage());

}

return resp;

}

/**

* http://localhost:8180/test/kafka/cancel?id=100

*

* @param request

* @return

*/

@RequestMapping(value = "/kafka/cancel")

public ResultResp kafkaCancel(HttpServletRequest request) {

ResultResp resp = new ResultResp<>();

String id = request.getParameter("id");

if (listenerContainerMap.containsKey(id)) {

listenerContainerMap.get(id).stop();

listenerContainerMap.remove(id);

}

return resp;

}

}

完整的KafkaService服务类

IKafkaMQService接口

package com.lanxinbase.system.service.resource;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.listener.KafkaMessageListenerContainer;

import org.springframework.kafka.listener.MessageListener;

import org.springframework.kafka.listener.MessageListenerContainer;

import org.springframework.kafka.support.ProducerListener;

import java.util.Map;

/**

* Created by alan on 2019/5/4.

*/

public interface IKafkaMQService {

Map getProducerFactoryArg();

KafkaTemplate getKafkaTemplate();

KafkaTemplate getKafkaTemplate(String topic);

KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener);

KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener);

boolean send(String topic, String message);

boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum);

boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role);

int getPartitionIndex(String hashCode, int partitionNum);

Map getConsumerFactoryArg();

Map setConsumerFactoryArg(String key, Object val);

KafkaMessageListenerContainer addListener(MessageListener listener, String topic);

KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic);

}

KafkaMQService实现

package com.lanxinbase.system.service;

import com.lanxinbase.system.basic.CompactService;

import com.lanxinbase.system.service.resource.IKafkaMQService;

import com.lanxinbase.system.utils.NumberUtils;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.listener.KafkaMessageListenerContainer;

import org.springframework.kafka.listener.MessageListener;

import org.springframework.kafka.listener.config.ContainerProperties;

import org.springframework.kafka.support.ProducerListener;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ExecutionException;

/**

* Created by alan on 2019/5/4.

*

* 0.需要下载Kafka,这里我下载的版本是:kafka_2.12-1.0.1。

* 1.配置kafka(主要是日志的路径,Socket Server Settings:{

* port=9092

* host.name=127.0.0.1

* })

* 2.启动zookeeper:zkServer

* 3.启动Kafka:.\bin\windows\kafka-server-start.bat .\config\server.properties

*

* Topic Test:

*

* 发送消息:

* http://localhost:8180/test/kafka/pub

*

* Id=100 & 100 监听:

* http://localhost:8180/test/kafka/sub?group=20190504&id=100

* http://localhost:8180/test/kafka/sub?group=20190504&id=101

*

* 测试日志:

* 04-May-2019 16:13:00.647 .onMessage 100#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 }

*

* Id=102 监听

* http://localhost:8180/test/kafka/sub?group=20190503&id=102

*

* 测试日志:

* 04-May-2019 16:13:06.892 .onMessage 102#{topic:lan_topic, key:app_-1937508585, value:test kafka 1556957580589, offset:113, partition:0, timestamp:1556957580589 }

* 注:102监听的Topic跟Id=100的是一样的,但是group.id不一样,所有102会收到上一条消息,可以通过时间戳对比

*

* ------------------------------------------------------------------------------------------------------------------

* 发送消息:

* http://localhost:8180/test/kafka/pub

*

* 测试日志:

* 04-May-2019 16:13:11.292 .onMessage 102#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 }

* 04-May-2019 16:13:11.293 .onMessage 100#{topic:lan_topic, key:app_-1936558289, value:test kafka 1556957591240, offset:114, partition:0, timestamp:1556957591240 }

* 注:由于100&102的group.id不一致,所以它们都收到了消息,但是为什么101收不到消息呢?因为是100的服务器状态良好,现在我们来取消100的监听

*

* ------------------------------------------------------------------------------------------------------------------

* 取消监听:

* http://localhost:8180/test/kafka/cancel?id=100

* KafkaMessageListenerContainer.stop();

*

* 发送消息:

* http://localhost:8180/test/kafka/pub

*

* 测试日志:

* 04-May-2019 16:13:23.147 .onMessage 101#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 }

* 04-May-2019 16:13:23.147 .onMessage 102#{topic:lan_topic, key:app_-1916183009, value:test kafka 1556957603093, offset:115, partition:0, timestamp:1556957603093 }

* 注:这下只有101&102能收到消息了。

*

* @See TestKafkaMQController

*/

@Service

public class KafkaMQService extends CompactService implements IKafkaMQService, InitializingBean, DisposableBean {

private static final String uri = "127.0.0.1:9092";

public static final String TOPIC_DEFAULT = "lan_topic";

public static final String ROLE_DEFAULT = "app";

private final Map producerArg = new HashMap<>();

private final Map consumerArg = new HashMap<>();

private DefaultKafkaProducerFactory kafkaProducerFactory;

private DefaultKafkaConsumerFactory kafkaConsumerFactory;

private KafkaTemplate kafkaTemplate;

public KafkaMQService() {

}

/**

* 启动后执行

* @throws Exception

*/

@Override

public void afterPropertiesSet() throws Exception {

this.initArg();

this.getKafkaProducerFactory();

// this.getKafkaConsumerFactory();

kafkaTemplate = this.createKafkaTemplate(TOPIC_DEFAULT, this.getProducerListener());

}

/**

* 生产者监听

* @return

*/

private ProducerListener getProducerListener() {

return new ProducerListener() {

@Override

public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {

StringBuffer sb = new StringBuffer();

sb.append("success{")

.append("topic:" + topic)

.append(",partition:" + partition)

.append(",key:" + key)

.append(",value:" + value)

.append("}");

logger(sb.toString());

}

@Override

public void onError(String topic, Integer partition, String key, String value, Exception exception) {

StringBuffer sb = new StringBuffer();

sb.append("error{")

.append("topic:" + topic)

.append(",partition:" + partition)

.append(",key:" + key)

.append(",value:" + value)

.append("}");

logger(sb.toString());

}

@Override

public boolean isInterestedInSuccess() {

return false;

}

};

}

/**

* 初始化参数

*/

private void initArg() {

producerArg.put("bootstrap.servers", uri);

producerArg.put("group.id", "100");

producerArg.put("compression.type", "gzip");

producerArg.put("reconnect.backoff.ms ", 20000);

producerArg.put("retry.backoff.ms", 20000);

producerArg.put("retries", 30);

producerArg.put("batch.size", "16384");

producerArg.put("linger.ms", "50");

producerArg.put("acks", "all");

producerArg.put("buffer.memory", "33554432");

producerArg.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producerArg.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

String groupId = "20190504";

consumerArg.put("bootstrap.servers", uri);

consumerArg.put("group.id", groupId);//消费群组,如果需要所有消费者都能接收到消息,则为每个消费者设置不同的群组Id

consumerArg.put("enable.auto.commit", "false");

consumerArg.put("auto.commit.interval.ms", "1000");

consumerArg.put("auto.offset.reset", "latest");

consumerArg.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

consumerArg.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

}

@Override

public Map getProducerFactoryArg() {

return producerArg;

}

@Override

public KafkaTemplate getKafkaTemplate() {

return this.getKafkaTemplate(TOPIC_DEFAULT);

}

@Override

public KafkaTemplate getKafkaTemplate(String topic) {

return this.getKafkaTemplate(topic, null);

}

@Override

public KafkaTemplate getKafkaTemplate(String topic, ProducerListener listener) {

return this.createKafkaTemplate(topic, listener);

}

/**

* 创建一个消息模板

* @param topic 默认的TOPIC

* @param listener 生产者监听,如不需要则传入null

* @return KafkaTemplate

*/

@Override

public KafkaTemplate createKafkaTemplate(String topic, ProducerListener listener) {

if (kafkaTemplate == null) {

kafkaTemplate = new KafkaTemplate(this.getKafkaProducerFactory());

kafkaTemplate.setDefaultTopic(TOPIC_DEFAULT);

kafkaTemplate.setProducerListener(listener);

}

return kafkaTemplate;

}

/**

* 发布消息

* @param topic TopicName

* @param message 消息字符串,通常为JSON string

* @return

*/

@Override

public boolean send(String topic, String message) {

return this.send(topic, message, false, 0);

}

/**

* 发布消息

* @param topic TopicName

* @param message 消息字符串,通常为JSON string

* @param isUsePartition 是否使用分区

* @param partitionNum 分区的数量

* @return

*/

@Override

public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum) {

return this.send(topic, message, isUsePartition, partitionNum, ROLE_DEFAULT);

}

/**

* 发布消息

* @param topic TopicName

* @param message 消息字符串,通常为JSON string

* @param isUsePartition 是否使用分区

* @param partitionNum 分区的数量

* @param role 用来区分消息key值

* @return

*/

@Override

public boolean send(String topic, String message, boolean isUsePartition, Integer partitionNum, String role) {

if (role == null) {

role = ROLE_DEFAULT;

}

String key = role + "_" + message.hashCode();

ListenableFuture> result;

if (isUsePartition) {

int index = getPartitionIndex(key, partitionNum);

result = kafkaTemplate.send(topic, index, key, message);

} else {

result = kafkaTemplate.send(topic, key, message);

}

return checkResult(result);

}

/**

* 检查是否发送成功

* @param result ListenableFuture

* @return

*/

private boolean checkResult(ListenableFuture> result) {

if (result != null) {

try {

long offset = result.get().getRecordMetadata().offset();

if (offset >= 0) {

return true;

}

logger("unknown offset.");

} catch (InterruptedException e) {

e.printStackTrace();

logger("send time out.", e.getMessage());

} catch (ExecutionException e) {

e.printStackTrace();

logger("send time out.", e.getMessage());

}

}

return false;

}

/**

* 获取分区索引,根据key自动分配

* @param hashCode key的hashCode

* @param partitionNum 分区的总数量

* @return 返回索引号

*/

@Override

public int getPartitionIndex(String hashCode, int partitionNum) {

if (hashCode == null) {

return NumberUtils.nextInt(partitionNum);

}

return Math.abs(hashCode.hashCode()) % partitionNum;

}

@Override

public Map getConsumerFactoryArg() {

return consumerArg;

}

@Override

public Map setConsumerFactoryArg(String key, Object val) {

consumerArg.put(key, val);

return consumerArg;

}

/**

* 添加一个消费者监听

* @param listener 监听器

* @param topic 监听Topic名称

* @return 返回KafkaMessageListenerContainer对象,可以进行stop或start

*/

@Override

public KafkaMessageListenerContainer addListener(MessageListener listener, String topic) {

return this.addListener(listener, getConsumerFactoryArg().get("group.id").toString(), topic);

}

/**

* 添加一个消费者监听

* @param listener 监听器

* @param groupId 消费者Id,需要让所有的消费者接收消息,请指定不同的分组Id

* @param topic 监听Topic名称

* @return 返回KafkaMessageListenerContainer对象,可以进行stop或start

*/

@Override

public KafkaMessageListenerContainer addListener(MessageListener listener, String groupId, String... topic) {

ContainerProperties properties = new ContainerProperties(topic);

properties.setMessageListener(listener);

properties.setGroupId(groupId);

KafkaMessageListenerContainer container = new KafkaMessageListenerContainer(getKafkaConsumerFactory(), properties);

container.start();

return container;

}

/**

* 创建一个生产者工厂类

* @return

*/

public DefaultKafkaProducerFactory getKafkaProducerFactory() {

if (kafkaProducerFactory == null) {

kafkaProducerFactory = new DefaultKafkaProducerFactory(producerArg);

}

return kafkaProducerFactory;

}

/**

* 创建一个消费者工厂类

* @return

*/

public DefaultKafkaConsumerFactory getKafkaConsumerFactory() {

if (kafkaConsumerFactory == null) {

kafkaConsumerFactory = new DefaultKafkaConsumerFactory(consumerArg);

}

return kafkaConsumerFactory;

}

@Override

public void destroy() throws Exception {

}

}

这里就不附图片了,完整的实现类里面有测试日志。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐