redis 使用Lettuce 当redis挂掉重启之后 网络是怎么重新连接
监控连接的生命周期,当连接丢失时触发重连。管理重连过程,通过延迟和重连尝试次数来合理安排重连。确保连接恢复,并在重连失败时通过事件总线通知其他组件。因此,只是连接建立时的一个简单事件,而是负责监控连接丢失后自动重连的核心组件。##看门狗源码/**//***//***/} else {});
Lettuce是一个高性能的Java Redis客户端,支持同步、异步和反应式编程模式
Lettuce的核心功能包括:
- 高性能:通过使用Netty作为底层网络通信框架,实现了非阻塞IO,提高了性能。
- 丰富的API:提供了丰富的Redis命令API,支持多种Redis数据类型和操作。
- 高级特性:支持命令批处理、事务、发布订阅等功能,并且可以适应不同的Redis数据类型和应用场景。
- 灵活性:支持多种Redis序列化器和编解码器,方便在不同场景下使用。
Lettuce的这些特性使得它成为了一个受欢迎的Redis客户端,广泛应用于各种需要高性能Redis交互的场景中。
Lettuce使用了Connection Watchdog(连接看门狗),用于管理和监控与远程服务器的连接。在网络通信中,Channel 代表了与远程服务的连接,当连接丢失或关闭时,Connection Watchdog 会自动尝试重新连接。它并不直接依赖于 channelActive() 来实现自动重连,而是负责在连接丢失时主动检测并安排重新连接的任务。
主要作用
-
监控连接状态:
ConnectionWatchdog继承自ChannelInboundHandlerAdapter,实现了 Netty 的ChannelHandler接口。它在channelActive()和channelInactive()事件中插入了额外的逻辑。- 当连接激活时(
channelActive()),它会初始化连接,清除之前的重连状态。 - 当连接关闭时(
channelInactive()),它会检测连接是否已经关闭,并尝试重新连接。
-
自动重连:
- 自动调度重连:如果连接丢失,
ConnectionWatchdog会在适当的时间间隔后安排一个新的重连尝试。 - 重连机制:它会基于预定义的重连延迟、尝试次数和其他条件,调度新的重连任务。重连操作是异步执行的,使用
reconnectWorkers线程池来处理。 - 延迟处理:重连延迟使用
Delay和StatefulDelay管理,以确保每次重连尝试之间有适当的间隔,防止过于频繁的重连尝试。
- 自动调度重连:如果连接丢失,
-
连接恢复:
- 如果
channelInactive()事件触发(即连接丢失),ConnectionWatchdog会在重连条件满足时重新启动连接。它通过reconnectionHandler.reconnect()来尝试重新建立连接。 - 重连失败事件:如果重连尝试失败,它会触发
ReconnectFailedEvent事件,将失败信息发布到事件总线eventBus,供其他组件处理。
- 如果
-
支持可配置的重连逻辑:
ConnectionWatchdog提供了多种配置项来控制重连行为,如:- 重连延迟:使用
Delay来管理每次重连之间的延迟。 - 重连调度:在连接丢失时,自动触发重连,且支持延迟、间隔等参数。
- 重连暂停:通过
setReconnectSuspended(true)方法可以暂停重连尝试,避免在某些情况下自动重连。
- 重连延迟:使用
关键方法
-
channelActive(ChannelHandlerContext ctx):- 这个方法在
Channel激活时调用。当连接建立成功时,会被触发。它会初始化一些内部状态,并清除之前的重连调度。
- 这个方法在
-
channelInactive(ChannelHandlerContext ctx):- 当
Channel关闭时调用。如果连接丢失,ConnectionWatchdog会根据当前配置来判断是否需要进行重连。 - 如果启用了重连监听(
listenOnChannelInactive为true),并且重连没有被暂停,它会调用scheduleReconnect()来触发重连。
- 当
-
scheduleReconnect():- 用于调度下一次的重连尝试。它会检查当前连接是否有效,如果没有有效的连接(即连接丢失),则会安排在适当的延迟后尝试重新连接。
- 这个方法会使用
reconnectDelay来计算每次重连之间的延迟时间。 - 重连尝试会通过
reconnectionHandler.reconnect()来实际执行重连逻辑。
-
run(int attempt):- 这是执行实际重连的代码。如果
scheduleReconnect()被调用,run()会尝试重新建立连接。 - 如果重连成功,它会停止重连操作;如果失败,则发布
ReconnectFailedEvent事件,并根据情况决定是否继续重连。
- 这是执行实际重连的代码。如果
为什么 channelActive() 不会自动重连?
在 Netty 中,channelActive() 只是一个通道激活事件。当一个连接成功建立时,channelActive() 会被触发,通常表示连接已经准备好进行数据传输。然而,channelActive() 事件本身并不处理连接丢失后的自动重连。
ConnectionWatchdog的作用就是在连接丢失或关闭时自动安排重连任务。- 自动重连的原因:因为一旦连接丢失,
channelInactive()会被触发。ConnectionWatchdog会在channelInactive()中判断是否启用重连逻辑,然后调度一个新的重连任务,确保在连接失败后能够尝试重新连接。 channelActive()只关心通道的初始化,不能保证在通道关闭或掉线后自动恢复连接。ConnectionWatchdog负责在通道断开后,通过一定的重连策略来确保连接恢复。
总结
ConnectionWatchdog 的作用是:
- 监控连接的生命周期,当连接丢失时触发重连。
- 管理重连过程,通过延迟和重连尝试次数来合理安排重连。
- 确保连接恢复,并在重连失败时通过事件总线通知其他组件。
因此,channelActive() 只是连接建立时的一个简单事件,而 ConnectionWatchdog 是负责监控连接丢失后自动重连的核心组件。
##看门狗源码
/*
* Copyright 2011-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.lettuce.core.protocol;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.resource.Delay;
import io.lettuce.core.resource.Delay.StatefulDelay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
/**
* A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost.
*
* @author Will Glozer
* @author Mark Paluch
* @author Koji Lin
*/
@ChannelHandler.Sharable
public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {
private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);
private final Delay reconnectDelay;
private final Bootstrap bootstrap;
private final EventExecutorGroup reconnectWorkers;
private final ReconnectionHandler reconnectionHandler;
private final ReconnectionListener reconnectionListener;
private final Timer timer;
private final EventBus eventBus;
private Channel channel;
private SocketAddress remoteAddress;
private long lastReconnectionLogging = -1;
private String logPrefix;
private final AtomicBoolean reconnectSchedulerSync;
private volatile int attempts;
private volatile boolean armed;
private volatile boolean listenOnChannelInactive;
private volatile Timeout reconnectScheduleTimeout;
/**
* Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new
* {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address.
*
* @param reconnectDelay reconnect delay, must not be {@literal null}
* @param clientOptions client options for the current connection, must not be {@literal null}
* @param bootstrap Configuration for new channels, must not be {@literal null}
* @param timer Timer used for delayed reconnect, must not be {@literal null}
* @param reconnectWorkers executor group for reconnect tasks, must not be {@literal null}
* @param socketAddressSupplier the socket address supplier to obtain an address for reconnection, may be {@literal null}
* @param reconnectionListener the reconnection listener, must not be {@literal null}
* @param connectionFacade the connection facade, must not be {@literal null}
* @param eventBus Event bus to emit reconnect events.
*/
public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer,
EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier,
ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus) {
LettuceAssert.notNull(reconnectDelay, "Delay must not be null");
LettuceAssert.notNull(clientOptions, "ClientOptions must not be null");
LettuceAssert.notNull(bootstrap, "Bootstrap must not be null");
LettuceAssert.notNull(timer, "Timer must not be null");
LettuceAssert.notNull(reconnectWorkers, "ReconnectWorkers must not be null");
LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
LettuceAssert.notNull(reconnectionListener, "ReconnectionListener must not be null");
LettuceAssert.notNull(connectionFacade, "ConnectionFacade must not be null");
LettuceAssert.notNull(eventBus, "EventBus must not be null");
this.reconnectDelay = reconnectDelay;
this.bootstrap = bootstrap;
this.timer = timer;
this.reconnectWorkers = reconnectWorkers;
this.reconnectionListener = reconnectionListener;
this.reconnectSchedulerSync = new AtomicBoolean(false);
this.eventBus = eventBus;
Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr)
.onErrorResume(t -> {
if (logger.isDebugEnabled()) {
logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()
+ ", reusing cached address " + remoteAddress, t);
} else {
logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString()
+ ", reusing cached address " + remoteAddress);
}
return Mono.just(remoteAddress);
});
this.reconnectionHandler = new ReconnectionHandler(clientOptions, bootstrap, wrappedSocketAddressSupplier, timer,
reconnectWorkers, connectionFacade);
resetReconnectDelay();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.debug("{} userEventTriggered(ctx, {})", logPrefix(), evt);
if (evt instanceof ConnectionEvents.Activated) {
attempts = 0;
resetReconnectDelay();
}
super.userEventTriggered(ctx, evt);
}
void prepareClose() {
setListenOnChannelInactive(false);
setReconnectSuspended(true);
Timeout reconnectScheduleTimeout = this.reconnectScheduleTimeout;
if (reconnectScheduleTimeout != null && !reconnectScheduleTimeout.isCancelled()) {
reconnectScheduleTimeout.cancel();
}
reconnectionHandler.prepareClose();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
reconnectSchedulerSync.set(false);
channel = ctx.channel();
reconnectScheduleTimeout = null;
logPrefix = null;
remoteAddress = channel.remoteAddress();
logPrefix = null;
logger.debug("{} channelActive()", logPrefix());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.debug("{} channelInactive()", logPrefix());
if (!armed) {
logger.debug("{} ConnectionWatchdog not armed", logPrefix());
return;
}
channel = null;
if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
}
super.channelInactive(ctx);
}
/**
* Enable {@link ConnectionWatchdog} to listen for disconnected events.
*/
void arm() {
this.armed = true;
setListenOnChannelInactive(true);
}
/**
* Schedule reconnect if channel is not available/not active.
*/
public void scheduleReconnect() {
logger.debug("{} scheduleReconnect()", logPrefix());
if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}
if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
return;
}
if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {
attempts++;
final int attempt = attempts;
int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);
this.reconnectScheduleTimeout = timer.newTimeout(it -> {
reconnectScheduleTimeout = null;
if (!isEventLoopGroupActive()) {
logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
return;
}
reconnectWorkers.submit(() -> {
ConnectionWatchdog.this.run(attempt);
return null;
});
}, timeout, TimeUnit.MILLISECONDS);
// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
if (!reconnectSchedulerSync.get()) {
reconnectScheduleTimeout = null;
}
} else {
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
}
}
/**
* Reconnect to the remote address that the closed channel was connected to. This creates a new {@link ChannelPipeline} with
* the same handler instances contained in the old channel's pipeline.
*
* @param attempt attempt counter
*
* @throws Exception when reconnection fails.
*/
public void run(int attempt) throws Exception {
reconnectSchedulerSync.set(false);
reconnectScheduleTimeout = null;
if (!isEventLoopGroupActive()) {
logger.debug("isEventLoopGroupActive() == false");
return;
}
if (!isListenOnChannelInactive()) {
logger.debug("Skip reconnect scheduling, listener disabled");
return;
}
if (isReconnectSuspended()) {
logger.debug("Skip reconnect scheduling, reconnect is suspended");
return;
}
boolean shouldLog = shouldLog();
InternalLogLevel infoLevel = InternalLogLevel.INFO;
InternalLogLevel warnLevel = InternalLogLevel.WARN;
if (shouldLog) {
lastReconnectionLogging = System.currentTimeMillis();
} else {
warnLevel = InternalLogLevel.DEBUG;
infoLevel = InternalLogLevel.DEBUG;
}
InternalLogLevel warnLevelToUse = warnLevel;
try {
reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));
logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);
Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();
CompletableFuture<Channel> future = tuple.getT1();
future.whenComplete((c, t) -> {
if (c != null && t == null) {
return;
}
CompletableFuture<SocketAddress> remoteAddressFuture = tuple.getT2();
SocketAddress remote = remoteAddress;
if (remoteAddressFuture.isDone() && !remoteAddressFuture.isCompletedExceptionally()
&& !remoteAddressFuture.isCancelled()) {
remote = remoteAddressFuture.join();
}
String message = String.format("Cannot reconnect to [%s]: %s", remote,
t.getMessage() != null ? t.getMessage() : t.toString());
if (ReconnectionHandler.isExecutionException(t)) {
if (logger.isDebugEnabled()) {
logger.debug(message, t);
} else {
logger.log(warnLevelToUse, message);
}
} else {
logger.log(warnLevelToUse, message, t);
}
eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remote, t, attempt));
if (!isReconnectSuspended()) {
scheduleReconnect();
}
});
} catch (Exception e) {
logger.log(warnLevel, "Cannot reconnect: {}", e.toString());
eventBus.publish(new ReconnectFailedEvent(LocalAddress.ANY, remoteAddress, e, attempt));
}
}
private boolean isEventLoopGroupActive() {
if (!isEventLoopGroupActive(bootstrap.group()) || !isEventLoopGroupActive(reconnectWorkers)) {
return false;
}
return true;
}
private static boolean isEventLoopGroupActive(EventExecutorGroup executorService) {
return !(executorService.isShuttingDown());
}
private boolean shouldLog() {
long quietUntil = lastReconnectionLogging + LOGGING_QUIET_TIME_MS;
return quietUntil <= System.currentTimeMillis();
}
/**
* Enable event listener for disconnected events.
*
* @param listenOnChannelInactive {@literal true} to listen for disconnected events.
*/
public void setListenOnChannelInactive(boolean listenOnChannelInactive) {
this.listenOnChannelInactive = listenOnChannelInactive;
}
public boolean isListenOnChannelInactive() {
return listenOnChannelInactive;
}
/**
* Suspend reconnection temporarily. Reconnect suspension will interrupt reconnection attempts.
*
* @param reconnectSuspended {@literal true} to suspend reconnection
*/
public void setReconnectSuspended(boolean reconnectSuspended) {
reconnectionHandler.setReconnectSuspended(reconnectSuspended);
}
public boolean isReconnectSuspended() {
return reconnectionHandler.isReconnectSuspended();
}
ReconnectionHandler getReconnectionHandler() {
return reconnectionHandler;
}
private void resetReconnectDelay() {
if (reconnectDelay instanceof StatefulDelay) {
((StatefulDelay) reconnectDelay).reset();
}
}
private String logPrefix() {
if (logPrefix != null) {
return logPrefix;
}
String buffer = "[" + ChannelLogDescriptor.logDescriptor(channel) + ", last known addr=" + remoteAddress + ']';
return logPrefix = buffer;
}
}
##redis lettuce重新连接代码
-
重连处理 (
reconnect和reconnect0):reconnect()方法会尝试重新连接 Redis 服务器。当远程地址发生变化时,它会尝试通过socketAddressSupplier获取新的地址并发起连接。reconnect0()方法执行实际的重连逻辑。它通过 Netty 的bootstrap.connect(remoteAddress)发起连接,并通过ChannelFuture来管理连接的异步状态。- 如果连接失败,会通过
ChannelFutureListener监听连接结果,执行相关的失败处理(如关闭通道、记录异常等)。
-
连接逻辑(Netty 的
bootstrap.connect()):- 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是
ChannelFuture和ChannelPromise的使用,这些都是 Netty 中用于异步连接、处理连接结果的关键工具。 bootstrap.connect(remoteAddress)是用来发起连接的核心方法。它返回一个ChannelFuture,通过这个ChannelFuture可以监听连接成功与否的结果。
- 您的代码中也展示了与 Netty 的连接过程密切相关的部分,特别是
-
连接超时处理:
- 在重连的过程中,代码实现了一个超时机制(
TimeoutException)。如果重连操作超时,它会取消连接操作,并触发异常。 - 通过
eventLoop().schedule()来设定连接超时。
- 在重连的过程中,代码实现了一个超时机制(
-
Channel的初始化与配置:- 在连接成功之后,会通过
RedisChannelInitializer初始化通道的处理流水线(ChannelPipeline)。如果初始化失败,会进行相应的失败处理,包括重置连接、关闭连接等。 - 如果连接成功,则会执行一些调试输出和状态更新。
- 在连接成功之后,会通过
-
错误处理与异常捕获:
- 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在
reconnect0中,如果连接失败或初始化失败,都会通过completeExceptionally()完成CompletableFuture,确保连接错误能够被外部捕获。
- 对于连接过程中的各种异常(如 DNS 解析失败、连接失败等),代码进行了详细的异常捕获和处理。在
-
使用
CompletableFuture:- 重连操作通过
CompletableFuture来管理异步结果。CompletableFuture<Channel>用来表示连接是否成功,CompletableFuture<SocketAddress>用来表示地址解析的结果。
- 重连操作通过
关键部分的 Netty 连接代码:
-
连接过程中的异步操作:
bootstrap.connect(remoteAddress)返回一个ChannelFuture,表示异步连接操作,addListener()用来监听连接的结果。
-
异常处理与重试机制:
- 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的
ChannelPipeline,并进行后续的操作。
- 如果连接失败,代码会尝试关闭连接并报告异常。如果连接成功,则会初始化相关的
-
超时处理:
TimeoutException用来在连接超时后进行错误处理。
##网络连接源码如下
Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = reconnectionHandler.reconnect();
protected Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> reconnect() {
CompletableFuture<Channel> future = new CompletableFuture<>();
CompletableFuture<SocketAddress> address = new CompletableFuture<>();
socketAddressSupplier.subscribe(remoteAddress -> {
address.complete(remoteAddress);
if (future.isCancelled()) {
return;
}
reconnect0(future, remoteAddress);
}, ex -> {
if (!address.isDone()) {
address.completeExceptionally(ex);
}
future.completeExceptionally(ex);
});
this.currentFuture = future;
return Tuples.of(future, address);
}
##reconnect0重连
private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {
ChannelFuture connectFuture = bootstrap.connect(remoteAddress);
ChannelPromise initFuture = connectFuture.channel().newPromise();
logger.debug("Reconnecting to Redis at {}", remoteAddress);
result.whenComplete((c, t) -> {
if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});
initFuture.addListener((ChannelFuture it) -> {
if (it.cause() != null) {
connectFuture.cancel(true);
close(it.channel());
result.completeExceptionally(it.cause());
} else {
result.complete(connectFuture.channel());
}
});
connectFuture.addListener((ChannelFuture it) -> {
if (it.cause() != null) {
initFuture.tryFailure(it.cause());
return;
}
ChannelPipeline pipeline = it.channel().pipeline();
RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class);
if (channelInitializer == null) {
initFuture.tryFailure(new IllegalStateException(
"Reconnection attempt without a RedisChannelInitializer in the channel pipeline"));
return;
}
channelInitializer.channelInitialized().whenComplete(
(state, throwable) -> {
if (throwable != null) {
if (isExecutionException(throwable)) {
initFuture.tryFailure(throwable);
return;
}
if (clientOptions.isCancelCommandsOnReconnectFailure()) {
connectionFacade.reset();
}
if (clientOptions.isSuspendReconnectOnProtocolFailure()) {
logger.error("Disabling autoReconnect due to initialization failure", throwable);
setReconnectSuspended(true);
}
initFuture.tryFailure(throwable);
return;
}
if (logger.isDebugEnabled()) {
logger.info("Reconnected to {}, Channel {}", remoteAddress,
ChannelLogDescriptor.logDescriptor(it.channel()));
} else {
logger.info("Reconnected to {}", remoteAddress);
}
initFuture.trySuccess();
});
});
Runnable timeoutAction = () -> {
initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ",
timeout, timeoutUnit)));
};
Timeout timeoutHandle = timer.newTimeout(it -> {
if (connectFuture.isDone() && initFuture.isDone()) {
return;
}
if (reconnectWorkers.isShutdown()) {
timeoutAction.run();
return;
}
reconnectWorkers.submit(timeoutAction);
}, this.timeout, timeoutUnit);
initFuture.addListener(it -> timeoutHandle.cancel());
}
##netty Bootstrap网络连接
public ChannelFuture connect(SocketAddress remoteAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
##
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
##
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
##
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
##
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
##
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
##
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
##
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
##socket连接工具
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
##socketChannel连接远程地址 socketChannel.connect(remoteAddress)
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)