通过Nginx代理Kafka实现跨网络消费
通过Nginx代理Kafka实现跨网络消费1. 服务器环境2. 服务搭建2.1 服务器B服务搭建2.2 服务器A服务搭建3. 测试连通性1. 服务器环境两台服务器同在一个网段(都开打开防火墙模拟跨网)分别为服务器A、服务器B服务器A(172.**.**.219):安装Nginx(1.8.0以上的Nginx才支持stream模块)服务,对所有网段打开9092端口服务器B(172.**.**.220)
·
通过Nginx代理Kafka实现跨网络消费
1. 服务器环境
两台服务器同在一个网段(都开
打开防火墙模拟跨网)分别为服务器A、服务器B
服务器A(172.**.**.219):安装Nginx(1.8.0以上的Nginx才支持stream模块)服务,对所有网段打开9092端口服务器B(172.**.**.220):安装Kafka服务,对服务器A打开9092端口
2. 服务搭建
2.1 服务器B服务搭建
- 修改主机名
vim /etc/hostnamekafka - 修改主机名映射
vim /etc/hosts172.**.**.219 nginx 172.**.**.220 kafka - kafka安装
注:此配置只能使用主机名访问 如需IP访问 将listeners=PLAINTEXT://hostname:9092
改为
advertised.listeners=PLAINTEXT://x.x.x.x:9092
2.2 服务器A服务搭建
- 修改主机名
vim /etc/hostnamenginx - 修改主机名映射
vim /etc/hosts172.**.**.219 nginx 172.**.**.220 kafka - 安装Nginx依赖包
yum install gcc gcc-c++ pcre-devel zlib-devel openssl-devel -y - 将Nginx安装包上传至服务器并安装Nginx
tar zxvf nginx-1.14.0.tar.gz cd nginx-1.14.0 ./configure --prefix=/usr/local/nginx --with-stream make && make install - 配置nginx.conf
stream { server { listen 9092; proxy_pass kafka; } upstream kafka { server kafka:9092 weight=1; } }
stream模块与https模块同级
- 启动nginx
/usr/local/nginx/sbin/nginx
3. 测试连通性
在本地Windows编写生产者消费者代码
- 生产者
package com.msk.demo01;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.DecimalFormat;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
//1.配置生产者连接属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nginx:9092");
// 或者
//props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.**.**.219:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//2.创建Kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//3.构建ProducerRecord
for (int i=0;i<10;i++){
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test_topic", i + "", "value" + i);
//4.发送消息
producer.send(record);
}
//5.清空缓冲区
producer.flush();
//6.关闭生产者
producer.close();
}
}
- 消费者
package com.msk.demo01;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
//1.配置生产者了连接属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nginx:9092");
// 或者
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.**.**.219:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
//2.创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//3.订阅topics
consumer.subscribe(Arrays.asList("test_topic"));
//4.死循环读取消息
while(true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
if(records!=null && !records.isEmpty()){
for (ConsumerRecord<String, String> record : records) {
int partition = record.partition();
long offset = record.offset();
long timestamp = record.timestamp();
String key = record.key();
String value = record.value();
System.out.println(partition+"\t"+offset+"\t"+timestamp+"\t"+key+"\t"+value);
}
}
}
}
}
最终测试通过,客户端可以通过Nginx正常的向Kafka服务器生产消费数据
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)