1. 服务器环境

两台服务器同在一个网段(都开打开防火墙模拟跨网)分别为服务器A、服务器B

服务器A(172.**.**.219):安装Nginx(1.8.0以上的Nginx才支持stream模块)服务,对所有网段打开9092端口
服务器B(172.**.**.220):安装Kafka服务,对服务器A打开9092端口

2. 服务搭建

2.1 服务器B服务搭建

  1. 修改主机名
    vim /etc/hostname
    kafka
    
  2. 修改主机名映射
    vim /etc/hosts
    172.**.**.219 nginx
    172.**.**.220 kafka
    
  3. kafka安装

注:此配置只能使用主机名访问 如需IP访问 将listeners=PLAINTEXT://hostname:9092
改为
advertised.listeners=PLAINTEXT://x.x.x.x:9092

2.2 服务器A服务搭建

  1. 修改主机名
    vim /etc/hostname
    nginx
    
  2. 修改主机名映射
    vim /etc/hosts
    172.**.**.219 nginx
    172.**.**.220 kafka
    
  3. 安装Nginx依赖包
    yum install gcc gcc-c++ pcre-devel zlib-devel openssl-devel -y
    
  4. 将Nginx安装包上传至服务器并安装Nginx
    tar zxvf nginx-1.14.0.tar.gz
    
    cd nginx-1.14.0
    
    ./configure --prefix=/usr/local/nginx --with-stream
    
    make && make install
    
  5. 配置nginx.conf
    stream {
        server {
            listen 9092;
            proxy_pass kafka;
        }
    
        upstream kafka {
            server kafka:9092 weight=1;
        }
    }
    

stream模块与https模块同级

  1. 启动nginx
    /usr/local/nginx/sbin/nginx
    

3. 测试连通性

在本地Windows编写生产者消费者代码

  • 生产者
package com.msk.demo01;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.text.DecimalFormat;
import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) {
        //1.配置生产者连接属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"nginx:9092");
        // 或者
        //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.**.**.219:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.创建Kafka生产者
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        //3.构建ProducerRecord
        for (int i=0;i<10;i++){
            ProducerRecord<String, String> record = new ProducerRecord<String, String>("test_topic", i + "", "value" + i);
            //4.发送消息
            producer.send(record);
        }
        //5.清空缓冲区
        producer.flush();
        //6.关闭生产者
        producer.close();
    }
}
  • 消费者
package com.msk.demo01;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerDemo {
    public static void main(String[] args) {
        //1.配置生产者了连接属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nginx:9092");
        // 或者
        // props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"172.**.**.219:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");


        //2.创建Kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //3.订阅topics
        consumer.subscribe(Arrays.asList("test_topic"));
        //4.死循环读取消息
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            if(records!=null && !records.isEmpty()){
                for (ConsumerRecord<String, String> record : records) {
                    int partition = record.partition();
                    long offset = record.offset();
                    long timestamp = record.timestamp();
                    String key = record.key();
                    String value = record.value();
                    System.out.println(partition+"\t"+offset+"\t"+timestamp+"\t"+key+"\t"+value);
                }
            }
        }
    }
}

最终测试通过,客户端可以通过Nginx正常的向Kafka服务器生产消费数据

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐