- 浏览: 980894 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
netty 通道接口定义:http://donald-draper.iteye.com/blog/2392740
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
引言
前一篇文章我们看了默认通道配置初始化,先来回顾一下:
默认通道配置内部关联一个通道,一个消息大小估算器,默认为DefaultMessageSizeEstimator,,尝试写自旋次数默认为6,写操作失败,默认自动关闭通道,连接超时默认为30000ms,同时拥有一个字节buf 分配器和一个接收字节buf 分配器。通道配置构造,主要是初始化配置关联通道和接收字节buf分配器。如果系统属性io.netty.allocator.type,配置为unpooled,则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。默认接收字节buf分配器为AdaptiveRecvByteBufAllocator。接收字节buf分配器,主要是控制下一次接收字节buf的容量,如果当前读取字节数大于消息上一次读取的字节buf容量,则减少下一次接收buf的容量,否则增加下一次接收buf的容量。
今天我们来看默认通道的其他方法:
现将默认通道配置的定义贴出来,以便理解:
来看一下写buf的掩码属性的设置:
在看其他属性的相关方法,都是set与get方法没有什么好说的,简单过一下:
从上面来看,默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。
再来看默认通道的两个分支ServerSocket和Socket通道配置的默认实现:
先来看ServerSocket通道配置,
我们来看默认的最大连接数backlog:
从上面来看与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。
再来看Socket通道配置:
从上面来看,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
总结:
默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。默认ServerSocket通道配置,与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。默认Socket通道配置,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
netty 抽象通道初始化:http://donald-draper.iteye.com/blog/2392801
netty 抽象Unsafe定义:http://donald-draper.iteye.com/blog/2393053
netty 通道Outbound缓冲区:http://donald-draper.iteye.com/blog/2393098
netty 抽象通道后续:http://donald-draper.iteye.com/blog/2393166
netty 抽象nio通道:http://donald-draper.iteye.com/blog/2393269
netty 抽象nio字节通道:http://donald-draper.iteye.com/blog/2393323
netty 抽象nio消息通道:http://donald-draper.iteye.com/blog/2393364
netty NioServerSocketChannel解析:http://donald-draper.iteye.com/blog/2393443
netty 通道配置接口定义:http://donald-draper.iteye.com/blog/2393484
netty 默认通道配置初始化:http://donald-draper.iteye.com/blog/2393504
引言
前一篇文章我们看了默认通道配置初始化,先来回顾一下:
默认通道配置内部关联一个通道,一个消息大小估算器,默认为DefaultMessageSizeEstimator,,尝试写自旋次数默认为6,写操作失败,默认自动关闭通道,连接超时默认为30000ms,同时拥有一个字节buf 分配器和一个接收字节buf 分配器。通道配置构造,主要是初始化配置关联通道和接收字节buf分配器。如果系统属性io.netty.allocator.type,配置为unpooled,则默认的字节buf分配器为UnpooledByteBufAllocator,否则为PooledByteBufAllocator,对于Android平台,默认为UnpooledByteBufAllocator。默认接收字节buf分配器为AdaptiveRecvByteBufAllocator。接收字节buf分配器,主要是控制下一次接收字节buf的容量,如果当前读取字节数大于消息上一次读取的字节buf容量,则减少下一次接收buf的容量,否则增加下一次接收buf的容量。
今天我们来看默认通道的其他方法:
现将默认通道配置的定义贴出来,以便理解:
/** * The default {@link ChannelConfig} implementation. */ public class DefaultChannelConfig implements ChannelConfig { //消息大小估计器 private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT; private static final int DEFAULT_CONNECT_TIMEOUT = 30000; //默认连接超时时间 //通道是否自动读取属性 private static final AtomicIntegerFieldUpdater<DefaultChannelConfig> AUTOREAD_UPDATER = AtomicIntegerFieldUpdater.newUpdater(DefaultChannelConfig.class, "autoRead"); //通道写buf掩码 private static final AtomicReferenceFieldUpdater<DefaultChannelConfig, WriteBufferWaterMark> WATERMARK_UPDATER = AtomicReferenceFieldUpdater.newUpdater( DefaultChannelConfig.class, WriteBufferWaterMark.class, "writeBufferWaterMark"); protected final Channel channel;//关联通道 //字节buf分配器 private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; //接收字节buf非配器 private volatile RecvByteBufAllocator rcvBufAllocator; //消息大小估计器 private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR; //连接超时时间 private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; private volatile int writeSpinCount = 16;//尝试写自旋次数 @SuppressWarnings("FieldMayBeFinal") private volatile int autoRead = 1;//是否自动读取, private volatile boolean autoClose = true;//写操作失败,是否自动关闭通道 //写buf掩码 private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT; private volatile boolean pinEventExecutor = true;//是否每个事件分组一个单线程的事件执行器 //构造默认通道配置 public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); } protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; } }
来看一下写buf的掩码属性的设置:
@Override public int getWriteBufferHighWaterMark() { return writeBufferWaterMark.high(); } @Override public ChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { if (writeBufferHighWaterMark < 0) { throw new IllegalArgumentException( "writeBufferHighWaterMark must be >= 0"); } for (;;) { WriteBufferWaterMark waterMark = writeBufferWaterMark; if (writeBufferHighWaterMark < waterMark.low()) { throw new IllegalArgumentException( "writeBufferHighWaterMark cannot be less than " + "writeBufferLowWaterMark (" + waterMark.low() + "): " + writeBufferHighWaterMark); } //更新写buf高位掩码 if (WATERMARK_UPDATER.compareAndSet(this, waterMark, new WriteBufferWaterMark(waterMark.low(), writeBufferHighWaterMark, false))) { return this; } } } @Override public int getWriteBufferLowWaterMark() { return writeBufferWaterMark.low(); } @Override public ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { if (writeBufferLowWaterMark < 0) { throw new IllegalArgumentException( "writeBufferLowWaterMark must be >= 0"); } for (;;) { WriteBufferWaterMark waterMark = writeBufferWaterMark; if (writeBufferLowWaterMark > waterMark.high()) { throw new IllegalArgumentException( "writeBufferLowWaterMark cannot be greater than " + "writeBufferHighWaterMark (" + waterMark.high() + "): " + writeBufferLowWaterMark); } //更新写buf低位掩码 if (WATERMARK_UPDATER.compareAndSet(this, waterMark, new WriteBufferWaterMark(writeBufferLowWaterMark, waterMark.high(), false))) { return this; } } } @Override public ChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { this.writeBufferWaterMark = checkNotNull(writeBufferWaterMark, "writeBufferWaterMark"); return this; } @Override public WriteBufferWaterMark getWriteBufferWaterMark() { return writeBufferWaterMark; }
在看其他属性的相关方法,都是set与get方法没有什么好说的,简单过一下:
@Override @SuppressWarnings("deprecation") public Map<ChannelOption<?>, Object> getOptions() { return getOptions( null, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ, WRITE_SPIN_COUNT, ALLOCATOR, AUTO_READ, AUTO_CLOSE, RCVBUF_ALLOCATOR, WRITE_BUFFER_HIGH_WATER_MARK, WRITE_BUFFER_LOW_WATER_MARK, WRITE_BUFFER_WATER_MARK, MESSAGE_SIZE_ESTIMATOR, SINGLE_EVENTEXECUTOR_PER_GROUP); } protected Map<ChannelOption<?>, Object> getOptions( Map<ChannelOption<?>, Object> result, ChannelOption<?>... options) { if (result == null) { result = new IdentityHashMap<ChannelOption<?>, Object>(); } for (ChannelOption<?> o: options) { result.put(o, getOption(o)); } return result; } @SuppressWarnings("unchecked") @Override public boolean setOptions(Map<ChannelOption<?>, ?> options) { if (options == null) { throw new NullPointerException("options"); } boolean setAllOptions = true; for (Entry<ChannelOption<?>, ?> e: options.entrySet()) { if (!setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { setAllOptions = false; } } return setAllOptions; } @Override @SuppressWarnings({ "unchecked", "deprecation" }) public <T> T getOption(ChannelOption<T> option) { if (option == null) { throw new NullPointerException("option"); } if (option == CONNECT_TIMEOUT_MILLIS) { return (T) Integer.valueOf(getConnectTimeoutMillis()); } if (option == MAX_MESSAGES_PER_READ) { return (T) Integer.valueOf(getMaxMessagesPerRead()); } if (option == WRITE_SPIN_COUNT) { return (T) Integer.valueOf(getWriteSpinCount()); } if (option == ALLOCATOR) { return (T) getAllocator(); } if (option == RCVBUF_ALLOCATOR) { return (T) getRecvByteBufAllocator(); } if (option == AUTO_READ) { return (T) Boolean.valueOf(isAutoRead()); } if (option == AUTO_CLOSE) { return (T) Boolean.valueOf(isAutoClose()); } if (option == WRITE_BUFFER_HIGH_WATER_MARK) { return (T) Integer.valueOf(getWriteBufferHighWaterMark()); } if (option == WRITE_BUFFER_LOW_WATER_MARK) { return (T) Integer.valueOf(getWriteBufferLowWaterMark()); } if (option == WRITE_BUFFER_WATER_MARK) { return (T) getWriteBufferWaterMark(); } if (option == MESSAGE_SIZE_ESTIMATOR) { return (T) getMessageSizeEstimator(); } if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { return (T) Boolean.valueOf(getPinEventExecutorPerGroup()); } return null; } @Override @SuppressWarnings("deprecation") public <T> boolean setOption(ChannelOption<T> option, T value) { validate(option, value); if (option == CONNECT_TIMEOUT_MILLIS) { setConnectTimeoutMillis((Integer) value); } else if (option == MAX_MESSAGES_PER_READ) { setMaxMessagesPerRead((Integer) value); } else if (option == WRITE_SPIN_COUNT) { setWriteSpinCount((Integer) value); } else if (option == ALLOCATOR) { setAllocator((ByteBufAllocator) value); } else if (option == RCVBUF_ALLOCATOR) { setRecvByteBufAllocator((RecvByteBufAllocator) value); } else if (option == AUTO_READ) { setAutoRead((Boolean) value); } else if (option == AUTO_CLOSE) { setAutoClose((Boolean) value); } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) { setWriteBufferHighWaterMark((Integer) value); } else if (option == WRITE_BUFFER_LOW_WATER_MARK) { setWriteBufferLowWaterMark((Integer) value); } else if (option == WRITE_BUFFER_WATER_MARK) { setWriteBufferWaterMark((WriteBufferWaterMark) value); } else if (option == MESSAGE_SIZE_ESTIMATOR) { setMessageSizeEstimator((MessageSizeEstimator) value); } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) { setPinEventExecutorPerGroup((Boolean) value); } else { return false; } return true; } protected <T> void validate(ChannelOption<T> option, T value) { if (option == null) { throw new NullPointerException("option"); } option.validate(value); } @Override public int getConnectTimeoutMillis() { return connectTimeoutMillis; } @Override public ChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { if (connectTimeoutMillis < 0) { throw new IllegalArgumentException(String.format( "connectTimeoutMillis: %d (expected: >= 0)", connectTimeoutMillis)); } this.connectTimeoutMillis = connectTimeoutMillis; return this; } /** * {@inheritDoc} * <p> * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type * {@link MaxMessagesRecvByteBufAllocator}. */ @Override @Deprecated public int getMaxMessagesPerRead() { try { MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator(); return allocator.maxMessagesPerRead(); } catch (ClassCastException e) { throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " + "MaxMessagesRecvByteBufAllocator", e); } } /** * {@inheritDoc} * <p> * @throws IllegalStateException if {@link #getRecvByteBufAllocator()} does not return an object of type * {@link MaxMessagesRecvByteBufAllocator}. */ @Override @Deprecated public ChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { try { MaxMessagesRecvByteBufAllocator allocator = getRecvByteBufAllocator(); allocator.maxMessagesPerRead(maxMessagesPerRead); return this; } catch (ClassCastException e) { throw new IllegalStateException("getRecvByteBufAllocator() must return an object of type " + "MaxMessagesRecvByteBufAllocator", e); } } @Override public int getWriteSpinCount() { return writeSpinCount; } @Override public ChannelConfig setWriteSpinCount(int writeSpinCount) { if (writeSpinCount <= 0) { throw new IllegalArgumentException( "writeSpinCount must be a positive integer."); } this.writeSpinCount = writeSpinCount; return this; } @Override public ByteBufAllocator getAllocator() { return allocator; } @Override public ChannelConfig setAllocator(ByteBufAllocator allocator) { if (allocator == null) { throw new NullPointerException("allocator"); } this.allocator = allocator; return this; } @SuppressWarnings("unchecked") @Override public <T extends RecvByteBufAllocator> T getRecvByteBufAllocator() { return (T) rcvBufAllocator; } @Override public ChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { rcvBufAllocator = checkNotNull(allocator, "allocator"); return this; } /** * Set the {@link RecvByteBufAllocator} which is used for the channel to allocate receive buffers. * @param allocator the allocator to set. * @param metadata Used to set the {@link ChannelMetadata#defaultMaxMessagesPerRead()} if {@code allocator} * is of type {@link MaxMessagesRecvByteBufAllocator}. */ private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) { if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead()); } else if (allocator == null) { throw new NullPointerException("allocator"); } setRecvByteBufAllocator(allocator); } @Override public MessageSizeEstimator getMessageSizeEstimator() { return msgSizeEstimator; } @Override public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { if (estimator == null) { throw new NullPointerException("estimator"); } msgSizeEstimator = estimator; return this; } private ChannelConfig setPinEventExecutorPerGroup(boolean pinEventExecutor) { this.pinEventExecutor = pinEventExecutor; return this; } private boolean getPinEventExecutorPerGroup() { return pinEventExecutor; } @Override public boolean isAutoRead() { return autoRead == 1; } @Override public ChannelConfig setAutoRead(boolean autoRead) { boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1; if (autoRead && !oldAutoRead) { //如果自动读取,则触发通道读操作 channel.read(); } else if (!autoRead && oldAutoRead) { //关闭通道读取,则清除通道自动读配置 autoReadCleared(); } return this; } /** * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was * {@code true} before. 待子类扩展 */ protected void autoReadCleared() { } @Override public boolean isAutoClose() { return autoClose; } @Override public ChannelConfig setAutoClose(boolean autoClose) { this.autoClose = autoClose; return this; }
从上面来看,默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。
再来看默认通道的两个分支ServerSocket和Socket通道配置的默认实现:
先来看ServerSocket通道配置,
package io.netty.channel.socket; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.util.NetUtil; import java.net.ServerSocket; import java.net.SocketException; import java.util.Map; import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_RCVBUF; import static io.netty.channel.ChannelOption.SO_REUSEADDR; /** * The default {@link ServerSocketChannelConfig} implementation. */ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig implements ServerSocketChannelConfig { protected final ServerSocket javaSocket;//关联ServerSocket private volatile int backlog = NetUtil.SOMAXCONN;//接收的最大连接数 /** * Creates a new instance. */ public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } this.javaSocket = javaSocket; } @Override public Map<ChannelOption<?>, Object> getOptions() { return getOptions(super.getOptions(), SO_RCVBUF, SO_REUSEADDR, SO_BACKLOG); } @SuppressWarnings("unchecked") @Override public <T> T getOption(ChannelOption<T> option) { if (option == SO_RCVBUF) { return (T) Integer.valueOf(getReceiveBufferSize()); } if (option == SO_REUSEADDR) { return (T) Boolean.valueOf(isReuseAddress()); } if (option == SO_BACKLOG) { return (T) Integer.valueOf(getBacklog()); } return super.getOption(option); } @Override public <T> boolean setOption(ChannelOption<T> option, T value) { validate(option, value); if (option == SO_RCVBUF) { setReceiveBufferSize((Integer) value); } else if (option == SO_REUSEADDR) { setReuseAddress((Boolean) value); } else if (option == SO_BACKLOG) { setBacklog((Integer) value); } else { return super.setOption(option, value); } return true; } @Override public boolean isReuseAddress() { try { return javaSocket.getReuseAddress(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public ServerSocketChannelConfig setReuseAddress(boolean reuseAddress) { try { javaSocket.setReuseAddress(reuseAddress); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public int getReceiveBufferSize() { try { return javaSocket.getReceiveBufferSize(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { try { javaSocket.setReceiveBufferSize(receiveBufferSize); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public ServerSocketChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { javaSocket.setPerformancePreferences(connectionTime, latency, bandwidth); return this; } @Override public int getBacklog() { return backlog; } @Override public ServerSocketChannelConfig setBacklog(int backlog) { if (backlog < 0) { throw new IllegalArgumentException("backlog: " + backlog); } this.backlog = backlog; return this; } @Override public ServerSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { super.setConnectTimeoutMillis(connectTimeoutMillis); return this; } @Override @Deprecated public ServerSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; } @Override public ServerSocketChannelConfig setWriteSpinCount(int writeSpinCount) { super.setWriteSpinCount(writeSpinCount); return this; } @Override public ServerSocketChannelConfig setAllocator(ByteBufAllocator allocator) { super.setAllocator(allocator); return this; } @Override public ServerSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { super.setRecvByteBufAllocator(allocator); return this; } @Override public ServerSocketChannelConfig setAutoRead(boolean autoRead) { super.setAutoRead(autoRead); return this; } @Override public ServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); return this; } @Override public ServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); return this; } @Override public ServerSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { super.setWriteBufferWaterMark(writeBufferWaterMark); return this; } @Override public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { super.setMessageSizeEstimator(estimator); return this; } }
我们来看默认的最大连接数backlog:
public final class NetUtil { ... /** * The SOMAXCONN value of the current machine. If failed to get the value, {@code 200} is used as a * default value for Windows or {@code 128} for others. */ public static final int SOMAXCONN; static { // As a SecurityManager may prevent reading the somaxconn file we wrap this in a privileged block. // // See https://github.com/netty/netty/issues/3680 SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() { @Override public Integer run() { // Determine the default somaxconn (server socket backlog) value of the platform. // The known defaults: // - Windows NT Server 4.0+: 200 // - Linux and Mac OS X: 128 int somaxconn = PlatformDependent.isWindows() ? 200 : 128; File file = new File("/proc/sys/net/core/somaxconn"); BufferedReader in = null; try { // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the // try / catch block. // See https://github.com/netty/netty/issues/4936 if (file.exists()) { in = new BufferedReader(new FileReader(file)); somaxconn = Integer.parseInt(in.readLine()); if (logger.isDebugEnabled()) { logger.debug("{}: {}", file, somaxconn); } } else { // Try to get from sysctl Integer tmp = null; if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) { tmp = sysctlGetInt("kern.ipc.somaxconn"); if (tmp == null) { tmp = sysctlGetInt("kern.ipc.soacceptqueue"); if (tmp != null) { somaxconn = tmp; } } else { somaxconn = tmp; } } ... }); } }
从上面来看与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。
再来看Socket通道配置:
package io.netty.channel.socket; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; import io.netty.channel.DefaultChannelConfig; import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.util.internal.PlatformDependent; import java.net.Socket; import java.net.SocketException; import java.util.Map; import static io.netty.channel.ChannelOption.*; /** * The default {@link SocketChannelConfig} implementation. */ public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { protected final Socket javaSocket;//关联socket private volatile boolean allowHalfClosure;//写失败时,是否关闭通道 /** * Creates a new instance. */ public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } } @Override public Map<ChannelOption<?>, Object> getOptions() { return getOptions( super.getOptions(), SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER, IP_TOS, ALLOW_HALF_CLOSURE); } @SuppressWarnings("unchecked") @Override public <T> T getOption(ChannelOption<T> option) { if (option == SO_RCVBUF) { return (T) Integer.valueOf(getReceiveBufferSize()); } if (option == SO_SNDBUF) { return (T) Integer.valueOf(getSendBufferSize()); } if (option == TCP_NODELAY) { return (T) Boolean.valueOf(isTcpNoDelay()); } if (option == SO_KEEPALIVE) { return (T) Boolean.valueOf(isKeepAlive()); } if (option == SO_REUSEADDR) { return (T) Boolean.valueOf(isReuseAddress()); } if (option == SO_LINGER) { return (T) Integer.valueOf(getSoLinger()); } if (option == IP_TOS) { return (T) Integer.valueOf(getTrafficClass()); } if (option == ALLOW_HALF_CLOSURE) { return (T) Boolean.valueOf(isAllowHalfClosure()); } return super.getOption(option); } @Override public <T> boolean setOption(ChannelOption<T> option, T value) { validate(option, value); if (option == SO_RCVBUF) { setReceiveBufferSize((Integer) value); } else if (option == SO_SNDBUF) { setSendBufferSize((Integer) value); } else if (option == TCP_NODELAY) { setTcpNoDelay((Boolean) value); } else if (option == SO_KEEPALIVE) { setKeepAlive((Boolean) value); } else if (option == SO_REUSEADDR) { setReuseAddress((Boolean) value); } else if (option == SO_LINGER) { setSoLinger((Integer) value); } else if (option == IP_TOS) { setTrafficClass((Integer) value); } else if (option == ALLOW_HALF_CLOSURE) { setAllowHalfClosure((Boolean) value); } else { return super.setOption(option, value); } return true; } @Override public int getReceiveBufferSize() { try { return javaSocket.getReceiveBufferSize(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public int getSendBufferSize() { try { return javaSocket.getSendBufferSize(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public int getSoLinger() { try { return javaSocket.getSoLinger(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public int getTrafficClass() { try { return javaSocket.getTrafficClass(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public boolean isKeepAlive() { try { return javaSocket.getKeepAlive(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public boolean isReuseAddress() { try { return javaSocket.getReuseAddress(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public boolean isTcpNoDelay() { try { return javaSocket.getTcpNoDelay(); } catch (SocketException e) { throw new ChannelException(e); } } @Override public SocketChannelConfig setKeepAlive(boolean keepAlive) { try { javaSocket.setKeepAlive(keepAlive); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public SocketChannelConfig setPerformancePreferences( int connectionTime, int latency, int bandwidth) { javaSocket.setPerformancePreferences(connectionTime, latency, bandwidth); return this; } @Override public SocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { try { javaSocket.setReceiveBufferSize(receiveBufferSize); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public SocketChannelConfig setReuseAddress(boolean reuseAddress) { try { javaSocket.setReuseAddress(reuseAddress); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public SocketChannelConfig setSendBufferSize(int sendBufferSize) { try { javaSocket.setSendBufferSize(sendBufferSize); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public SocketChannelConfig setSoLinger(int soLinger) { try { if (soLinger < 0) { javaSocket.setSoLinger(false, 0); } else { javaSocket.setSoLinger(true, soLinger); } } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) { try { javaSocket.setTcpNoDelay(tcpNoDelay); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public SocketChannelConfig setTrafficClass(int trafficClass) { try { javaSocket.setTrafficClass(trafficClass); } catch (SocketException e) { throw new ChannelException(e); } return this; } @Override public boolean isAllowHalfClosure() { return allowHalfClosure; } @Override public SocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) { this.allowHalfClosure = allowHalfClosure; return this; } @Override public SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { super.setConnectTimeoutMillis(connectTimeoutMillis); return this; } @Override @Deprecated public SocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { super.setMaxMessagesPerRead(maxMessagesPerRead); return this; } @Override public SocketChannelConfig setWriteSpinCount(int writeSpinCount) { super.setWriteSpinCount(writeSpinCount); return this; } @Override public SocketChannelConfig setAllocator(ByteBufAllocator allocator) { super.setAllocator(allocator); return this; } @Override public SocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { super.setRecvByteBufAllocator(allocator); return this; } @Override public SocketChannelConfig setAutoRead(boolean autoRead) { super.setAutoRead(autoRead); return this; } @Override public SocketChannelConfig setAutoClose(boolean autoClose) { super.setAutoClose(autoClose); return this; } @Override public SocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); return this; } @Override public SocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); return this; } @Override public SocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { super.setWriteBufferWaterMark(writeBufferWaterMark); return this; } @Override public SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { super.setMessageSizeEstimator(estimator); return this; } }
从上面来看,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
总结:
默认通道配置内部主要是配置消息大小估算器,字节buf分配器,接收字节buf分配器等属性。默认ServerSocket通道配置,与ServerSocket相关的配置委托给ServerSocket的相关方法,其他委托给父类默认通道配置。默认Socket通道配置,与Socket相关的配置委托给Socket的相关方法,其他委托给父类默认通道配置。
发表评论
-
netty NioSocketChannel解析
2017-09-29 12:50 1320netty 抽象BootStrap定义:http://dona ... -
netty Pooled字节buf分配器
2017-09-28 13:00 2057netty 字节buf定义:http://donald-dra ... -
netty Unpooled字节buf分配器
2017-09-26 22:01 2443netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf分配器
2017-09-26 08:43 1316netty 字节buf定义:http:// ... -
netty 复合buf概念
2017-09-25 22:31 1310netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf引用计数器
2017-09-22 12:48 1593netty 字节buf定义:http://donald-dra ... -
netty 抽象字节buf解析
2017-09-22 09:00 1842netty 通道接口定义:http://donald-drap ... -
netty 资源泄漏探测器
2017-09-21 09:37 1397netty 通道接口定义:http://donald-drap ... -
netty 字节buf定义
2017-09-20 08:31 2833netty 通道接口定义:http://donald-drap ... -
netty 默认通道配置初始化
2017-09-17 22:51 2036netty 通道接口定义:http://donald-drap ... -
netty 通道配置接口定义
2017-09-17 14:51 1077netty 通道接口定义:http://donald-drap ... -
netty NioServerSocketChannel解析
2017-09-16 13:01 1876netty ServerBootStrap解析:http:// ... -
netty 抽象nio消息通道
2017-09-15 15:30 1217netty 通道接口定义:http:/ ... -
netty 抽象nio字节通道
2017-09-14 22:39 1201netty 通道接口定义:http:/ ... -
netty 抽象nio通道解析
2017-09-14 17:23 957netty 通道接口定义:http://donald-drap ... -
netty 抽象通道后续
2017-09-13 22:40 1309netty Inboudn/Outbound通道Inv ... -
netty 通道Outbound缓冲区
2017-09-13 14:31 2189netty 通道接口定义:http:/ ... -
netty 抽象Unsafe定义
2017-09-12 21:24 1075netty 通道接口定义:http:/ ... -
netty 抽象通道初始化
2017-09-11 12:56 1854netty 管道线定义-ChannelPipeline:htt ... -
netty 通道接口定义
2017-09-10 15:36 1876netty Inboudn/Outbound通道Invoker ...
相关推荐
这个“netty5完整配置实例”显然旨在帮助开发者理解和使用Netty 5版本,考虑到Netty的不同版本间的确存在显著差异,这个实例应该包含了Netty 5的关键配置和示例代码。 首先,Netty 5可能已经不被广泛使用,因为最新...
Netty 的核心组件包括:Bootstrap(启动器)、ServerBootstrap(服务器启动器)、Channel(通道)、Handler(处理器)和EventLoopGroup(事件循环组)。Bootstrap 和 ServerBootstrap 用于创建客户端和服务端连接,...
在本文中,我们将深入探讨 Netty 3 的配置实例,这对于理解 Netty 的基本工作原理以及如何在实际项目中使用它是至关重要的。 首先,Netty 3 版本相较于更现代的版本,例如 Netty 4,存在一些显著的差异。例如,...
1. 如何配置和启动一个基于Netty的HTTP服务器,处理HTTP请求和响应。 2. 实现WebSocket服务器和客户端,包括握手过程和数据帧的发送与接收。 3. 使用Netty的ChannelHandlerContext发送心跳包,并设置相应的超时检查...
5. **线程模型**: Netty 使用 NIO 或 EPOLL 等非阻塞 I/O 模型,而 Spring 默认使用线程池。集成时需注意线程模型的兼容性,确保 Netty 的工作线程和 Spring 的任务调度器协同工作,避免资源冲突。 6. **Spring MVC...
在标题"netty-all-4.1.5.Final完整pom.xml文件配置"中,我们关注的是Netty的特定版本——4.1.5.Final。这个版本包含了Netty的所有模块,提供了完整的功能集。`pom.xml`文件是Maven项目对象模型(Project Object ...
4. **配置WebSocketServerProtocolHandler**:Netty提供了`WebSocketServerProtocolHandler`,它是处理WebSocket升级请求的关键。你需要配置这个处理器,指定WebSocket的路径和其他参数。 5. **心跳机制**:...
创建一个Spring配置文件,例如`applicationContext.xml`,在这里定义Bean,包括Netty服务器、MyBatis的数据源、SqlSessionFactory等。使用Spring的注解或者XML配置来声明Bean,例如: ```xml <!-- 数据库连接配置...
Netty Server端通常会包含一个`ServerBootstrap`,用于设置服务器的各种属性,如事件循环组、通道处理器等。Client端则会使用`Bootstrap`来连接到服务器,同样也需要配置相应的处理器链。 在Server端,我们可以使用...
这个“netty-all-5.0.0.Alpha3 完整pom.xml配置”是针对Netty 5.0.0 Alpha3版本的集成配置,包含了该版本所需的全部依赖项,便于开发者在基于Maven的项目中直接引用,避免了手动管理多个jar文件的繁琐过程。...
Pipeline(管道)则是一系列处理通道事件的处理器链,每个处理器可以对数据进行解码、编码或者执行业务逻辑。 3. **ByteBuf**:Netty提供了自己的ByteBuf类,作为缓冲区,它比Java的ByteBuffer更易用且高效,支持...
《Netty实战》这本书是针对Java网络编程框架Netty的一本深入实践教程,旨在帮助读者掌握Netty的核心特性和实际应用。Netty是一款高性能、异步事件驱动的网络应用程序框架,广泛应用于各种分布式系统、微服务架构以及...
此外,书中还会探讨Netty的安全性,包括SSL/TLS加密通信的实现,以及如何配置和管理证书,保障网络通信的安全。 最后,实战部分会介绍如何将Netty应用于实际项目,如构建聊天服务器、游戏服务器,甚至是微服务间的...
- Netty提供了许多性能优化选项,如自定义ByteBuf分配器、心跳机制、通道空闲检测等。 10. **错误处理与异常安全**: - Netty提供了一套完善的异常处理机制,确保即使在出错时也能优雅地关闭资源。 综上所述,...
总的来说,Netty数据转发工具的实现涉及到对Netty框架的深入理解和灵活运用,包括服务器的启动、通道的创建、事件的处理以及异常的管理等多个方面。掌握这些技能,不仅有助于构建高效的数据转发服务,也能为更复杂的...
在这个入门案例中,你可能会看到这些步骤的实现,包括如何组织项目结构、编写 `.proto` 文件、配置 Netty 通道处理器,以及如何实际运行和测试这个简单的通信系统。这将帮助你理解如何在实际项目中利用 Netty 的异步...
- 使用 Netty 创建 WebSocket 服务器,首先需要定义一个 ChannelInitializer,配置处理 WebSocket 数据的 ChannelHandler。例如,WebSocketFrameDecoder 和 WebSocketFrameEncoder 用于解码和编码 WebSocket 帧。 ...
在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据传输的基础。ChannelHandler(通道处理器)是业务逻辑的载体,通过ChannelPipeline(通道管道)组织成处理链,...
在Netty中,通道(Channel)是连接的抽象,它代表了与远程实体的一个连接,可以进行读写操作。而通道处理器链(ChannelPipeline)则允许用户自定义数据在网络中传输的处理流程,这提供了高度的灵活性和可扩展性。 ...
在Netty中,可以通过ChannelFuture的sync()方法将异步操作转换为同步,确保操作完成后再继续执行后续代码。 在这个“netty同步传输demo”中,客户端可能使用了自定义的Handler或ChannelFutureListener来实现同步...