搭建服务端

启动类

在springboot中利用netty替换tomcat,关闭tomcat与banner。添加扫描注解

 
import org.springframework.boot.Banner;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
 
@SpringBootApplication
@ComponentScan(includeFilters = @ComponentScan.Filter(EventProcess.class))
 
public class NettyServerApplication {
 
	public static void main(String[] args) {
		SpringApplicationBuilder app = new SpringApplicationBuilder(NettyServerApplication.class);
		app.web(WebApplicationType.NONE);
		app.bannerMode(Banner.Mode.OFF);
		app.run(args);
	}
}

在控制类中写的注解

 
 
import java.lang.annotation.*;
 
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventProcess {
    String name() default "";
}

配置文件

netty.server.port=8080
netty.server.host=127.0.0.1
spring.application.name=netty-server

netty服务搭建

 
 
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class NettyServer implements ApplicationListener<ApplicationStartedEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    @Value("${netty.server.port}")
    private int port;
    @Value("${netty.server.host}")
    private String host;
    @Autowired
    private ServerInializer serverInializer;
 
    @Override
    public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
            bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, false);
            bootstrap.childOption(ChannelOption.SO_RCVBUF, 2048);
            bootstrap.childOption(ChannelOption.SO_SNDBUF, 2048);
            bootstrap.handler(new LoggingHandler(LogLevel.INFO));
            bootstrap.childHandler(serverInializer);
            ChannelFuture channelFuture = bootstrap.bind(host, port).sync();
            String logBanner = "\n\n" +
                    "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n" +
                    "                   Netty Server started on port {}.                                  \n" +
                    "* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\n";
            LOGGER.info(logBanner, port);
            channelFuture.channel().closeFuture().sync();
 
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            LOGGER.info("Netty Server Start Shutdown ............");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

添加服务器端初始化初始化器

 
 
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
 
/**
 * 服务器端初始化初始化器
 *
 * @author hanhuide
 * @date 2021/04/14
 */
@Component
public class ServerInializer extends ChannelInitializer<SocketChannel> {
    @Autowired
    private ServerHandler serverHandler;
 
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ObjectEncoder());//编码器
        pipeline.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));//解码器
        pipeline.addLast(serverHandler);
    }
}

添加服务端处理程序

 
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
 
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
/**
 * 服务器处理程序
 *
 * @author hanhuide
 * @date 2021/04/14
 */
@ChannelHandler.Sharable
@Component
public class ServerHandler extends SimpleChannelInboundHandler<Event> implements ApplicationContextAware {
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
    private ConcurrentHashMap<String, IFunctionHandler> functionHandlerMap = new ConcurrentHashMap<>();
    private ExecutorService executor = Executors.newCachedThreadPool(runnable -> {
        Thread thread = Executors.defaultThreadFactory().newThread(runnable);
        thread.setName("NettyServerHandler-" + thread.getName());
        return thread;
    });
 
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Event event) throws Exception {
//        handlingEvents(ctx, event);
//        Thread.sleep(2000);
        executor.execute(() -> handlingEvents(ctx, event));
    }
 
    private void handlingEvents(ChannelHandlerContext ctx, Event event) {
        IFunctionHandler handler = functionHandlerMap.get("hello");
        SocketChannel ch = ChannelMap.randomChannel();
        LOGGER.info(event.toString());
        handler.execute(event, null).forEach(e -> {
//            LOGGER.info(e.toString());
            ch.writeAndFlush(e);
        });
        ReferenceCountUtil.release(event);
    }
 
    /**
     * 通道关闭
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //channel失效,从Map中移除
        ChannelMap.remove((SocketChannel) ctx.channel());
        LOGGER.info("{}:客户端下线", ctx.channel().remoteAddress());
    }
 
    /**
     * 通道畅通
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelMap.add((SocketChannel) ctx.channel());
        LOGGER.info("{}:客户端上线", ctx.channel().remoteAddress());
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, Object> handlers = applicationContext.getBeansWithAnnotation(EventProcess.class);
        for (Map.Entry<String, Object> entry : handlers.entrySet()) {
            Object handler = entry.getValue();
            String name = handler.getClass().getAnnotation(EventProcess.class).name();
            if (functionHandlerMap.containsKey(name)) {
                LOGGER.error("IFunctionHandler has duplicated :{}", name, new IllegalPathDuplicatedException());
                System.exit(0);
            }
            functionHandlerMap.put(name, (IFunctionHandler) handler);
        }
    }
}

netty中传递参数的基类为Event

 
import lombok.Data;
 
import java.io.Serializable;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * Reprsent a simulation event
 *
 * @author: Yu Wang
 * @date: 2021-03-29 14:46
 **/
@Data
public class Event implements Serializable {
    public Event(String type) {
        Type = type;
    }
 
    String name;
    String Type;
    String shareResource;
    Date occurTime = null;
    Date processTime = null;
}

在服务断使用到的工具类

 
 
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.socket.SocketChannel;
 
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
 
public class ChannelMap {
    private static Map<String, SocketChannel> channelMap = new ConcurrentHashMap<String, SocketChannel>();
 
    public static String add(SocketChannel socketChannel) {
        ChannelId channelId = socketChannel.id();
        String idStr = channelId.asLongText();
        channelMap.put(idStr, socketChannel);
        return idStr;
    }
 
    public static Channel get(String clientId) {
        return channelMap.get(clientId);
    }
 
    public static void remove(SocketChannel socketChannel) {
        ChannelId channelId = socketChannel.id();
        String idStr = channelId.asLongText();
        channelMap.remove(idStr);
    }
 
    public static SocketChannel randomChannel() {
        Collection<SocketChannel> values = channelMap.values();
        int ran2 = (int) (Math.random() * (values.size()));
        int i = 0;
        for (SocketChannel channel : values) {
            if (i == ran2) return channel;
            i++;
        }
        return null;
    }
 
    public static int getSize() {
        return channelMap.size();
    }
 
    public static Set<String> getKeys() {
        return channelMap.keySet();
    }
 
    public static void main(String[] args) {
        int ran2 = (int) (Math.random() * (2));
        System.out.println(ran2);
    }
}

测试类接口

 
import io.netty.util.Attribute;
 
import javax.xml.ws.Response;
import java.util.List;
 
public interface IFunctionHandler {
    List<Event> execute(Event event, Attribute<SimResources> resourcesAttribute);
}
 
import io.netty.util.Attribute;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
 
@EventProcess(name = "hello")
public class HelloController implements IFunctionHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(HelloController.class);
 
    @Override
    public List<Event> execute(Event event, Attribute<SimResources> attribute) {
        List<Event> list = new ArrayList<>();
        if ("main".equals(event.getType())) {
            event.setName("起床");
            Calendar calendar = Calendar.getInstance();
            calendar.set(2021, 4, 25, 6, 00, 00);
            event.setProcessTime(calendar.getTime());
            list.add(event);
        } else {
 
            list.add(event);
        }
        return list;
    }
}
Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐