`
Donald_Draper
  • 浏览: 984396 次
社区版块
存档分类
最新评论

netty 默认通道配置后续

阅读更多
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
引言
前一篇文章我们看了默认通道配置初始化,先来回顾一下:
默认通道配置内部关联一个通道,一个消息大小估算器,默认为DefaultMessageSizeEstimator,,尝试写自旋次数默认为6,写操作失败,默认自动关闭通道,连接超时默认为30000ms,同时拥有一个字节buf 分配器和一个接收字节buf 分配器。通道配置构造,主要是初始化配置关联通道和接收字节buf分配器。如果系统属性io.netty.allocator.type,配置为unpooled,则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。默认接收字节buf分配器为AdaptiveRecvByteBufAllocator。接收字节buf分配器,主要是控制下一次接收字节buf的容量,如果当前读取字节数大于消息上一次读取的字节buf容量,则减少下一次接收buf的容量,否则增加下一次接收buf的容量。

今天我们来看默认通道的其他方法:

现将默认通道配置的定义贴出来,以便理解:

/**
 * The default {@link ChannelConfig} implementation.
 */
public class DefaultChannelConfig implements ChannelConfig {
    //消息大小估计器
    private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;

    private static final int DEFAULT_CONNECT_TIMEOUT = 30000; //默认连接超时时间
    //通道是否自动读取属性
    private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead");
   //通道写buf掩码
    private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(
                    DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark");

    protected final Channel channel;//关联通道
   //字节buf分配器
    private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
    //接收字节buf非配器
    private volatile RecvByteBufAllocator rcvBufAllocator;
    //消息大小估计器
    private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
    //连接超时时间
    private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
    private volatile int writeSpinCount = 16;//尝试写自旋次数
    @SuppressWarnings("FieldMayBeFinal")
    private volatile int autoRead = 1;//是否自动读取,
    private volatile boolean autoClose = true;//写操作失败,是否自动关闭通道
    //写buf掩码
    private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
    private volatile boolean pinEventExecutor = true;//是否每个事件分组一个单线程的事件执行器
    
    //构造默认通道配置
    public DefaultChannelConfig(Channel channel) {
        this(channel, new AdaptiveRecvByteBufAllocator());
    }

    protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
        setRecvByteBufAllocator(allocator, channel.metadata());
        this.channel = channel;
    }
}

来看一下写buf的掩码属性的设置:
@Override
public int getWriteBufferHighWaterMark() {
    return writeBufferWaterMark.high();
}

@Override
public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
    if (writeBufferHighWaterMark < 0) {
        throw new IllegalArgumentException(
                "writeBufferHighWaterMark must be >= 0");
    }
    for (;;) {
        WriteBufferWaterMark waterMark = writeBufferWaterMark;
        if (writeBufferHighWaterMark < waterMark.low()) {
            throw new IllegalArgumentException(
                    "writeBufferHighWaterMark cannot be less than " +
                            "writeBufferLowWaterMark (" + waterMark.low() + "): " +
                            writeBufferHighWaterMark);
        }
	//更新写buf高位掩码
        if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
                new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) {
            return this;
        }
    }
}

@Override
public int getWriteBufferLowWaterMark() {
    return writeBufferWaterMark.low();
}

@Override
public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
    if (writeBufferLowWaterMark < 0) {
        throw new IllegalArgumentException(
                "writeBufferLowWaterMark must be >= 0");
    }
    for (;;) {
        WriteBufferWaterMark waterMark = writeBufferWaterMark;
        if (writeBufferLowWaterMark > waterMark.high()) {
            throw new IllegalArgumentException(
                    "writeBufferLowWaterMark cannot be greater than " +
                            "writeBufferHighWaterMark (" + waterMark.high() + "): " +
                            writeBufferLowWaterMark);
        }
	//更新写buf低位掩码
        if (WATERMARK_UPDATER.compareAndSet(this, waterMark,
                new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) {
            return this;
        }
    }
}

@Override
public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
    this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark");
    return this;
}

@Override
public WriteBufferWaterMark getWriteBufferWaterMark() {
    return writeBufferWaterMark;
}

在看其他属性的相关方法,都是set与get方法没有什么好说的,简单过一下:
 @Override
 @SuppressWarnings("deprecation")
 public Map<ChannelOption<?>, Object> getOptions() {
     return getOptions(
             null,
             CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT,
             ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK,
             WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR,
             SINGLE_EVENTEXECUTOR_PER_GROUP);
 }

 protected Map<ChannelOption<?>, Object> getOptions(
         Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) {
     if (result == null) {
         result = new IdentityHashMap<ChannelOption<?>, Object>();
     }
     for (ChannelOption<?> o: options) {
         result.put(o, getOption(o));
     }
     return result;
 }

 @SuppressWarnings("unchecked")
 @Override
 public boolean setOptions(Map<ChannelOption<?>, ?> options) {
     if (options == null) {
         throw new NullPointerException("options");
     }

     boolean setAllOptions = true;
     for (Entry<ChannelOption<?>, ?> e: options.entrySet()) {
         if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
             setAllOptions = false;
         }
     }

     return setAllOptions;
 }

 @Override
 @SuppressWarnings({ "unchecked", "deprecation" })
 public <T> T getOption(ChannelOption<T> option) {
     if (option == null) {
         throw new NullPointerException("option");
     }

     if (option == CONNECT_TIMEOUT_MILLIS) {
         return (T) Integer.valueOf(getConnectTimeoutMillis());
     }
     if (option == MAX_MESSAGES_PER_READ) {
         return (T) Integer.valueOf(getMaxMessagesPerRead());
     }
     if (option == WRITE_SPIN_COUNT) {
         return (T) Integer.valueOf(getWriteSpinCount());
     }
     if (option == ALLOCATOR) {
         return (T) getAllocator();
     }
     if (option == RCVBUF_ALLOCATOR) {
         return (T) getRecvByteBufAllocator();
     }
     if (option == AUTO_READ) {
         return (T) Boolean.valueOf(isAutoRead());
     }
     if (option == AUTO_CLOSE) {
         return (T) Boolean.valueOf(isAutoClose());
     }
     if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
         return (T) Integer.valueOf(getWriteBufferHighWaterMark());
     }
     if (option == WRITE_BUFFER_LOW_WATER_MARK) {
         return (T) Integer.valueOf(getWriteBufferLowWaterMark());
     }
     if (option == WRITE_BUFFER_WATER_MARK) {
         return (T) getWriteBufferWaterMark();
     }
     if (option == MESSAGE_SIZE_ESTIMATOR) {
         return (T) getMessageSizeEstimator();
     }
     if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
         return (T) Boolean.valueOf(getPinEventExecutorPerGroup());
     }
     return null;
 }

 @Override
 @SuppressWarnings("deprecation")
 public <T> boolean setOption(ChannelOption<T> option, T value) {
     validate(option, value);

     if (option == CONNECT_TIMEOUT_MILLIS) {
         setConnectTimeoutMillis((Integer) value);
     } else if (option == MAX_MESSAGES_PER_READ) {
         setMaxMessagesPerRead((Integer) value);
     } else if (option == WRITE_SPIN_COUNT) {
         setWriteSpinCount((Integer) value);
     } else if (option == ALLOCATOR) {
         setAllocator((ByteBufAllocator) value);
     } else if (option == RCVBUF_ALLOCATOR) {
         setRecvByteBufAllocator((RecvByteBufAllocator) value);
     } else if (option == AUTO_READ) {
         setAutoRead((Boolean) value);
     } else if (option == AUTO_CLOSE) {
         setAutoClose((Boolean) value);
     } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
         setWriteBufferHighWaterMark((Integer) value);
     } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
         setWriteBufferLowWaterMark((Integer) value);
     } else if (option == WRITE_BUFFER_WATER_MARK) {
         setWriteBufferWaterMark((WriteBufferWaterMark) value);
     } else if (option == MESSAGE_SIZE_ESTIMATOR) {
         setMessageSizeEstimator((MessageSizeEstimator) value);
     } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
         setPinEventExecutorPerGroup((Boolean) value);
     } else {
         return false;
     }

     return true;
 }

 protected <T> void validate(ChannelOption<T> option, T value) {
     if (option == null) {
         throw new NullPointerException("option");
     }
     option.validate(value);
 }

 @Override
 public int getConnectTimeoutMillis() {
     return connectTimeoutMillis;
 }

 @Override
 public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
     if (connectTimeoutMillis < 0) {
         throw new IllegalArgumentException(String.format(
                 "connectTimeoutMillis: %d (expected: >= 0)", connectTimeoutMillis));
     }
     this.connectTimeoutMillis = connectTimeoutMillis;
     return this;
 }

 /**
  * {@inheritDoc}
  * <p>
  * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
  * {@link MaxMessagesRecvByteBufAllocator}.
  */
 @Override
 @Deprecated
 public int getMaxMessagesPerRead() {
     try {
         MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
         return allocator.maxMessagesPerRead();
     } catch (ClassCastException e) {
         throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
                 "MaxMessagesRecvByteBufAllocator", e);
     }
 }

 /**
  * {@inheritDoc}
  * <p>
  * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type
  * {@link MaxMessagesRecvByteBufAllocator}.
  */
 @Override
 @Deprecated
 public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
     try {
         MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator();
         allocator.maxMessagesPerRead(maxMessagesPerRead);
         return this;
     } catch (ClassCastException e) {
         throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " +
                 "MaxMessagesRecvByteBufAllocator", e);
     }
 }

 @Override
 public int getWriteSpinCount() {
     return writeSpinCount;
 }

 @Override
 public ChannelConfig setWriteSpinCount(int writeSpinCount) {
     if (writeSpinCount <= 0) {
         throw new IllegalArgumentException(
                 "writeSpinCount must be a positive integer.");
     }
     this.writeSpinCount = writeSpinCount;
     return this;
 }

 @Override
 public ByteBufAllocator getAllocator() {
     return allocator;
 }

 @Override
 public ChannelConfig setAllocator(ByteBufAllocator allocator) {
     if (allocator == null) {
         throw new NullPointerException("allocator");
     }
     this.allocator = allocator;
     return this;
 }

 @SuppressWarnings("unchecked")
 @Override
 public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() {
     return (T) rcvBufAllocator;
 }

 @Override
 public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
     rcvBufAllocator = checkNotNull(allocator, "allocator");
     return this;
 }

 /**
  * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers.
  * @param allocator the allocator to set.
  * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator}
  * is of type {@link MaxMessagesRecvByteBufAllocator}.
  */
 private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
     if (allocator instanceof MaxMessagesRecvByteBufAllocator) {
         ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
     } else if (allocator == null) {
         throw new NullPointerException("allocator");
     }
     setRecvByteBufAllocator(allocator);
 }
@Override
public MessageSizeEstimator getMessageSizeEstimator() {
    return msgSizeEstimator;
}

@Override
public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
    if (estimator == null) {
        throw new NullPointerException("estimator");
    }
    msgSizeEstimator = estimator;
    return this;
}

private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) {
    this.pinEventExecutor = pinEventExecutor;
    return this;
}

private boolean getPinEventExecutorPerGroup() {
    return pinEventExecutor;
}
 @Override
 public boolean isAutoRead() {
     return autoRead == 1;
 }

 @Override
 public ChannelConfig setAutoRead(boolean autoRead) {
     boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
     if (autoRead && !oldAutoRead) {
         //如果自动读取,则触发通道读操作
         channel.read();
     } else if (!autoRead && oldAutoRead) {
         //关闭通道读取,则清除通道自动读配置
         autoReadCleared();
     }
     return this;
 }
 /**
  * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was
  * {@code true} before.
  待子类扩展
  */
 protected void autoReadCleared() { }

 @Override
 public boolean isAutoClose() {
     return autoClose;
 }

 @Override
 public ChannelConfig setAutoClose(boolean autoClose) {
     this.autoClose = autoClose;
     return this;
 }

从上面来看,默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。

再来看默认通道的两个分支ServerSocket和Socket通道配置的默认实现:
先来看ServerSocket通道配置,

package io.netty.channel.socket;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.NetUtil;

import java.net.ServerSocket;
import java.net.SocketException;
import java.util.Map;

import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_RCVBUF;
import static io.netty.channel.ChannelOption.SO_REUSEADDR;

/**
 * The default {@link ServerSocketChannelConfig} implementation.
 */
public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
                                              implements ServerSocketChannelConfig {

    protected final ServerSocket javaSocket;//关联ServerSocket
    private volatile int backlog = NetUtil.SOMAXCONN;//接收的最大连接数

    /**
     * Creates a new instance.
     */
    public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        this.javaSocket = javaSocket;
    }

    @Override
    public Map<ChannelOption<?>, Object> getOptions() {
        return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T getOption(ChannelOption<T> option) {
        if (option == SO_RCVBUF) {
            return (T) Integer.valueOf(getReceiveBufferSize());
        }
        if (option == SO_REUSEADDR) {
            return (T) Boolean.valueOf(isReuseAddress());
        }
        if (option == SO_BACKLOG) {
            return (T) Integer.valueOf(getBacklog());
        }

        return super.getOption(option);
    }

    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_BACKLOG) {
            setBacklog((Integer) value);
        } else {
            return super.setOption(option, value);
        }

        return true;
    }

    @Override
    public boolean isReuseAddress() {
        try {
            return javaSocket.getReuseAddress();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) {
        try {
            javaSocket.setReuseAddress(reuseAddress);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public int getReceiveBufferSize() {
        try {
            return javaSocket.getReceiveBufferSize();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
        try {
            javaSocket.setReceiveBufferSize(receiveBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
        javaSocket.setPerformancePreferences(connectionTime, latency, bandwidth);
        return this;
    }

    @Override
    public int getBacklog() {
        return backlog;
    }

    @Override
    public ServerSocketChannelConfig setBacklog(int backlog) {
        if (backlog < 0) {
            throw new IllegalArgumentException("backlog: " + backlog);
        }
        this.backlog = backlog;
        return this;
    }

    @Override
    public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
        super.setConnectTimeoutMillis(connectTimeoutMillis);
        return this;
    }

    @Override
    @Deprecated
    public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
        super.setMaxMessagesPerRead(maxMessagesPerRead);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
        super.setWriteSpinCount(writeSpinCount);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
        super.setAllocator(allocator);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
        super.setRecvByteBufAllocator(allocator);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setAutoRead(boolean autoRead) {
        super.setAutoRead(autoRead);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
        super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
        super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
        super.setWriteBufferWaterMark(writeBufferWaterMark);
        return this;
    }

    @Override
    public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
        super.setMessageSizeEstimator(estimator);
        return this;
    }
}

我们来看默认的最大连接数backlog:
public final class NetUtil {
	...
	/**
	 * The SOMAXCONN value of the current machine.  If failed to get the value,  {@code 200}  is used as a
	 * default value for Windows or {@code 128} for others.
	 */
	public static final int SOMAXCONN;
	static {
		// As a SecurityManager may prevent reading the somaxconn file we wrap this in a privileged block.
		//
		// See https://github.com/netty/netty/issues/3680
		SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() {
		    @Override
		    public Integer run() {
			// Determine the default somaxconn (server socket backlog) value of the platform.
			// The known defaults:
			// - Windows NT Server 4.0+: 200
			// - Linux and Mac OS X: 128
			int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
			File file = new File("/proc/sys/net/core/somaxconn");
			BufferedReader in = null;
			try {
			    // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
			    // try / catch block.
			    // See https://github.com/netty/netty/issues/4936
			    if (file.exists()) {
				in = new BufferedReader(new FileReader(file));
				somaxconn = Integer.parseInt(in.readLine());
				if (logger.isDebugEnabled()) {
				    logger.debug("{}: {}", file, somaxconn);
				}
			    } else {
				// Try to get from sysctl
				Integer tmp = null;
				if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) {
				    tmp = sysctlGetInt("kern.ipc.somaxconn");
				    if (tmp == null) {
					tmp = sysctlGetInt("kern.ipc.soacceptqueue");
					if (tmp != null) {
					    somaxconn = tmp;
					}
				    } else {
					somaxconn = tmp;
				    }
				}
			   ...
			     
		});
	    }
}


从上面来看与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。

再来看Socket通道配置:
package io.netty.channel.socket;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.util.internal.PlatformDependent;

import java.net.Socket;
import java.net.SocketException;
import java.util.Map;

import static io.netty.channel.ChannelOption.*;

/**
 * The default {@link SocketChannelConfig} implementation.
 */
public class DefaultSocketChannelConfig extends DefaultChannelConfig
                                        implements SocketChannelConfig {

    protected final Socket javaSocket;//关联socket
    private volatile boolean allowHalfClosure;//写失败时,是否关闭通道

    /**
     * Creates a new instance.
     */
    public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
        super(channel);
        if (javaSocket == null) {
            throw new NullPointerException("javaSocket");
        }
        this.javaSocket = javaSocket;

        // Enable TCP_NODELAY by default if possible.
        if (PlatformDependent.canEnableTcpNoDelayByDefault()) {
            try {
                setTcpNoDelay(true);
            } catch (Exception e) {
                // Ignore.
            }
        }
    }

    @Override
    public Map<ChannelOption<?>, Object> getOptions() {
        return getOptions(
                super.getOptions(),
                SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS,
                ALLOW_HALF_CLOSURE);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T getOption(ChannelOption<T> option) {
        if (option == SO_RCVBUF) {
            return (T) Integer.valueOf(getReceiveBufferSize());
        }
        if (option == SO_SNDBUF) {
            return (T) Integer.valueOf(getSendBufferSize());
        }
        if (option == TCP_NODELAY) {
            return (T) Boolean.valueOf(isTcpNoDelay());
        }
        if (option == SO_KEEPALIVE) {
            return (T) Boolean.valueOf(isKeepAlive());
        }
        if (option == SO_REUSEADDR) {
            return (T) Boolean.valueOf(isReuseAddress());
        }
        if (option == SO_LINGER) {
            return (T) Integer.valueOf(getSoLinger());
        }
        if (option == IP_TOS) {
            return (T) Integer.valueOf(getTrafficClass());
        }
        if (option == ALLOW_HALF_CLOSURE) {
            return (T) Boolean.valueOf(isAllowHalfClosure());
        }

        return super.getOption(option);
    }

    @Override
    public <T> boolean setOption(ChannelOption<T> option, T value) {
        validate(option, value);

        if (option == SO_RCVBUF) {
            setReceiveBufferSize((Integer) value);
        } else if (option == SO_SNDBUF) {
            setSendBufferSize((Integer) value);
        } else if (option == TCP_NODELAY) {
            setTcpNoDelay((Boolean) value);
        } else if (option == SO_KEEPALIVE) {
            setKeepAlive((Boolean) value);
        } else if (option == SO_REUSEADDR) {
            setReuseAddress((Boolean) value);
        } else if (option == SO_LINGER) {
            setSoLinger((Integer) value);
        } else if (option == IP_TOS) {
            setTrafficClass((Integer) value);
        } else if (option == ALLOW_HALF_CLOSURE) {
            setAllowHalfClosure((Boolean) value);
        } else {
            return super.setOption(option, value);
        }

        return true;
    }

    @Override
    public int getReceiveBufferSize() {
        try {
            return javaSocket.getReceiveBufferSize();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public int getSendBufferSize() {
        try {
            return javaSocket.getSendBufferSize();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public int getSoLinger() {
        try {
            return javaSocket.getSoLinger();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public int getTrafficClass() {
        try {
            return javaSocket.getTrafficClass();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public boolean isKeepAlive() {
        try {
            return javaSocket.getKeepAlive();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public boolean isReuseAddress() {
        try {
            return javaSocket.getReuseAddress();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public boolean isTcpNoDelay() {
        try {
            return javaSocket.getTcpNoDelay();
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
    }

    @Override
    public SocketChannelConfig setKeepAlive(boolean keepAlive) {
        try {
            javaSocket.setKeepAlive(keepAlive);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setPerformancePreferences(
            int connectionTime, int latency, int bandwidth) {
        javaSocket.setPerformancePreferences(connectionTime, latency, bandwidth);
        return this;
    }

    @Override
    public SocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
        try {
            javaSocket.setReceiveBufferSize(receiveBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setReuseAddress(boolean reuseAddress) {
        try {
            javaSocket.setReuseAddress(reuseAddress);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setSendBufferSize(int sendBufferSize) {
        try {
            javaSocket.setSendBufferSize(sendBufferSize);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setSoLinger(int soLinger) {
        try {
            if (soLinger < 0) {
                javaSocket.setSoLinger(false, 0);
            } else {
                javaSocket.setSoLinger(true, soLinger);
            }
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
        try {
            javaSocket.setTcpNoDelay(tcpNoDelay);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public SocketChannelConfig setTrafficClass(int trafficClass) {
        try {
            javaSocket.setTrafficClass(trafficClass);
        } catch (SocketException e) {
            throw new ChannelException(e);
        }
        return this;
    }

    @Override
    public boolean isAllowHalfClosure() {
        return allowHalfClosure;
    }

    @Override
    public SocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) {
        this.allowHalfClosure = allowHalfClosure;
        return this;
    }

    @Override
    public SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
        super.setConnectTimeoutMillis(connectTimeoutMillis);
        return this;
    }

    @Override
    @Deprecated
    public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
        super.setMaxMessagesPerRead(maxMessagesPerRead);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteSpinCount(int writeSpinCount) {
        super.setWriteSpinCount(writeSpinCount);
        return this;
    }

    @Override
    public SocketChannelConfig setAllocator(ByteBufAllocator allocator) {
        super.setAllocator(allocator);
        return this;
    }

    @Override
    public SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
        super.setRecvByteBufAllocator(allocator);
        return this;
    }

    @Override
    public SocketChannelConfig setAutoRead(boolean autoRead) {
         super.setAutoRead(autoRead);
        return this;
    }

    @Override
    public SocketChannelConfig setAutoClose(boolean autoClose) {
        super.setAutoClose(autoClose);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
        super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
        super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
        return this;
    }

    @Override
    public SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
        super.setWriteBufferWaterMark(writeBufferWaterMark);
        return this;
    }

    @Override
    public SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
        super.setMessageSizeEstimator(estimator);
        return this;
    }
}


从上面来看,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
总结:
默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。默认ServerSocket通道配置,与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。默认Socket通道配置,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
0
1
分享到:
评论

相关推荐

    netty5完整配置实例

    这个“netty5完整配置实例”显然旨在帮助开发者理解和使用Netty 5版本,考虑到Netty的不同版本间的确存在显著差异,这个实例应该包含了Netty 5的关键配置和示例代码。 首先,Netty 5可能已经不被广泛使用,因为最新...

    netty4完整配置及实例

    Netty 的核心组件包括:Bootstrap(启动器)、ServerBootstrap(服务器启动器)、Channel(通道)、Handler(处理器)和EventLoopGroup(事件循环组)。Bootstrap 和 ServerBootstrap 用于创建客户端和服务端连接,...

    netty3的完整配置实例

    在本文中,我们将深入探讨 Netty 3 的配置实例,这对于理解 Netty 的基本工作原理以及如何在实际项目中使用它是至关重要的。 首先,Netty 3 版本相较于更现代的版本,例如 Netty 4,存在一些显著的差异。例如,...

    netty基于http socket websocke及心跳包机制t的demo

    1. 如何配置和启动一个基于Netty的HTTP服务器,处理HTTP请求和响应。 2. 实现WebSocket服务器和客户端,包括握手过程和数据帧的发送与接收。 3. 使用Netty的ChannelHandlerContext发送心跳包,并设置相应的超时检查...

    netty4与spring集成

    5. **线程模型**: Netty 使用 NIO 或 EPOLL 等非阻塞 I/O 模型,而 Spring 默认使用线程池。集成时需注意线程模型的兼容性,确保 Netty 的工作线程和 Spring 的任务调度器协同工作,避免资源冲突。 6. **Spring MVC...

    netty-all-4.1.5.Final完整pom.xml文件配置

    在标题"netty-all-4.1.5.Final完整pom.xml文件配置"中,我们关注的是Netty的特定版本——4.1.5.Final。这个版本包含了Netty的所有模块,提供了完整的功能集。`pom.xml`文件是Maven项目对象模型(Project Object ...

    netty做服务端支持ssl协议实现websocket的wss协议(java)

    4. **配置WebSocketServerProtocolHandler**:Netty提供了`WebSocketServerProtocolHandler`,它是处理WebSocket升级请求的关键。你需要配置这个处理器,指定WebSocket的路径和其他参数。 5. **心跳机制**:...

    spring+netty+mybatis整合实例

    创建一个Spring配置文件,例如`applicationContext.xml`,在这里定义Bean,包括Netty服务器、MyBatis的数据源、SqlSessionFactory等。使用Spring的注解或者XML配置来声明Bean,例如: ```xml &lt;!-- 数据库连接配置...

    springboot整合netty的demo

    Netty Server端通常会包含一个`ServerBootstrap`,用于设置服务器的各种属性,如事件循环组、通道处理器等。Client端则会使用`Bootstrap`来连接到服务器,同样也需要配置相应的处理器链。 在Server端,我们可以使用...

    netty-all-5.0.0.Alpha3 完整pom.xml配置

    这个“netty-all-5.0.0.Alpha3 完整pom.xml配置”是针对Netty 5.0.0 Alpha3版本的集成配置,包含了该版本所需的全部依赖项,便于开发者在基于Maven的项目中直接引用,避免了手动管理多个jar文件的繁琐过程。...

    Netty实战 电子版.pdf_java_netty_服务器_

    Pipeline(管道)则是一系列处理通道事件的处理器链,每个处理器可以对数据进行解码、编码或者执行业务逻辑。 3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持...

    Netty实战.epub_netty实战epub_netty实战epub_netty_

    《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...

    Netty进阶之路-跟着案例学Netty

    此外,书中还会探讨Netty的安全性,包括SSL/TLS加密通信的实现,以及如何配置和管理证书,保障网络通信的安全。 最后,实战部分会介绍如何将Netty应用于实际项目,如构建聊天服务器、游戏服务器,甚至是微服务间的...

    netty所需要得jar包

    - Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...

    Netty数据转发工具

    总的来说,Netty数据转发工具的实现涉及到对Netty框架的深入理解和灵活运用,包括服务器的启动、通道的创建、事件的处理以及异常的管理等多个方面。掌握这些技能,不仅有助于构建高效的数据转发服务,也能为更复杂的...

    netty框架 jar包

    在Netty中,通道(Channel)是连接的抽象,它代表了与远程实体的一个连接,可以进行读写操作。而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 ...

    整合netty实时通讯

    - 使用 Netty 创建 WebSocket 服务器,首先需要定义一个 ChannelInitializer,配置处理 WebSocket 数据的 ChannelHandler。例如,WebSocketFrameDecoder 和 WebSocketFrameEncoder 用于解码和编码 WebSocket 帧。 ...

    netty+protobuf入门案例

    在这个入门案例中,你可能会看到这些步骤的实现,包括如何组织项目结构、编写 `.proto` 文件、配置 Netty 通道处理器,以及如何实际运行和测试这个简单的通信系统。这将帮助你理解如何在实际项目中利用 Netty 的异步...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据传输的基础。ChannelHandler(通道处理器)是业务逻辑的载体,通过ChannelPipeline(通道管道)组织成处理链,...

    netty同步传输demo

    在Netty中,可以通过ChannelFuture的sync()方法将异步操作转换为同步,确保操作完成后再继续执行后续代码。 在这个“netty同步传输demo”中,客户端可能使用了自定义的Handler或ChannelFutureListener来实现同步...

Global site tag (gtag.js) - Google Analytics