- 浏览: 1000041 次
-
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
ThreadLocal解析 :http://donald-draper.iteye.com/blog/2368159
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
DirectByteBuffer简介:http://donald-draper.iteye.com/blog/2372351
SelectorProvider定义:http://donald-draper.iteye.com/blog/2369615
ServerSocketChannelImpl解析:http://donald-draper.iteye.com/blog/2370912
SocketChannel接口定义:http://donald-draper.iteye.com/blog/2371218
引言:
在SocketChannel接口定义这篇文章中,我们看了socket的连接,完成连接,是否正在建立连接,读缓冲区写到通道,聚集写,读通道写缓冲区,分散读等方法。在NIO包中TCP发送接受字节序列通过SocketChannel。今天我们来看一下SocketChannel的具体
实现。我们从SocketChannel的open方法开始。
//SocketChannel
SelectorProvider.provider()这个过程我们就不详说了实际是加载系统默认的SelectorProvider
实例,则个我们在SelectorProvider定义有提过,简单看一下:
//SelectorProviderImpl
从上面可以看出,SocketChannel的默认实现为SocketChannelImpl。再来看SocketChannelImpl的变量
声明和相关方法的实现。
SocketChannelImpl的构造方法有三种分别如下
1.
2.
3.
我们需要关注的是这两点,
//Net
//IOUtil
这个我们在ServerSocketChannelImpl解析这篇文章接触过Net和IOUtil,这里不具体的解释了
,看一下即可,很容易理解。
//Net
从上面可以看出SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
来看SocketChannelImpl的其他方法
//连接socket地址
connect连接方法有几点要看:
1.
2.
3.尝试连接socket地址
4.检查连接结果
下面分别来看这四点:
1.
2.
3.尝试连接socket地址
//Net
4.检查连接结果
这一点我们需要关注的是IOStatus.check(i)这句:
//IOStatus
从上面可以看出,connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;
然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,
最后尝试连接socket地址。
再来看地址绑定方法bind
下面来看SocketChannelImpl的几个读写方法
先来看从缓冲区读取数据,写到通道
写操作需要关注一下几点,
1.
2.
3.
下面分别来看这三点
1.
2.
//IOUtil
这一步我们有几点要关注:
a.
想要理解这点,先看一下Util的定义
//Util
//IOUtil,变量IOV_MAX
再来看Util的缓冲区的定义BufferCache
//Util
从上面可以看出BufferCache用一个ByteBuffer数组buffers存放写到缓冲区的字节流序列,每次写字节流对应一个ByteBuffer,用count记录当前缓冲区中的有数据或可用的ByteBuffer数量,start记录当前缓冲区buffers的头部;offerFirst方法向缓冲区的头部添加一个字节序列bytebuffer,即写字节序列到缓存区;offerLast与offerFirst恰好相反,写字节序列到缓冲区的尾部(索引start + count);next方法为向后移动缓冲区buffers索引;get(int i)方法为从缓冲区获取可以存放i个字节序列的ByteBuffer,并rewind字节缓冲区ByteBuffer,
限制孔勇空间为ByteBuffer。removeFirst为移除缓冲区头部的bytebuffer,并返回。
看过Util的BufferCache的定义,我们再回到
//Util
获取临时DirectByteBuffer有两点要看
a.1
//Util
//DirectBuffer
a.2
b.
//nativedispatcher参数实际为SocketDispatcher
来看两种方式的写
b.1
//Nativedispatcher
b.2
//SocketDispatcher
从缓冲读取字节序列,写到通道中,实际是通过SocketDispatcher完成实际的写工作,当前默认的写方法为write(FileDescriptor filedescriptor, long l, int i)。
c.
3.
//清除写线程
从以上分析可以看出,从缓冲区读取字节序列写到通道,首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区,
以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。
总结:
SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,最后尝试连接socket地址。从缓冲区读取字节序列写到通道write(ByteBuffer),首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区,
以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。
SocketChannelImpl 解析二(发送数据后续):http://donald-draper.iteye.com/blog/2372548
附:
权限检查:SecurityManager为系统的默认安全检查管理器,主要用于检查当前线程是否拥有
某个权限的访问控制权限,比如socket连接,监听,获取类加载等。
//SecurityManager
//SecurityConstants,安全权限常量
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
DirectByteBuffer简介:http://donald-draper.iteye.com/blog/2372351
SelectorProvider定义:http://donald-draper.iteye.com/blog/2369615
ServerSocketChannelImpl解析:http://donald-draper.iteye.com/blog/2370912
SocketChannel接口定义:http://donald-draper.iteye.com/blog/2371218
引言:
在SocketChannel接口定义这篇文章中,我们看了socket的连接,完成连接,是否正在建立连接,读缓冲区写到通道,聚集写,读通道写缓冲区,分散读等方法。在NIO包中TCP发送接受字节序列通过SocketChannel。今天我们来看一下SocketChannel的具体
实现。我们从SocketChannel的open方法开始。
//SocketChannel
public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }
SelectorProvider.provider()这个过程我们就不详说了实际是加载系统默认的SelectorProvider
实例,则个我们在SelectorProvider定义有提过,简单看一下:
//SelectorProviderImpl
public abstract class SelectorProviderImpl extends SelectorProvider { public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); } }
从上面可以看出,SocketChannel的默认实现为SocketChannelImpl。再来看SocketChannelImpl的变量
声明和相关方法的实现。
class SocketChannelImpl extends SocketChannel implements SelChImpl { private static NativeDispatcher nd = new SocketDispatcher();//socket的分发器 private final FileDescriptor fd;//文件描述 private final int fdVal;//文件描述id private volatile long readerThread;//读线程 private volatile long writerThread;//写线程 private final Object readLock;//读锁 private final Object writeLock;//写锁 private final Object stateLock;//状态锁 private static final int ST_UNINITIALIZED = -1;//未初始化 private static final int ST_UNCONNECTED = 0;//未连接 private static final int ST_PENDING = 1;//正在连接 private static final int ST_CONNECTED = 2;//已连接 private static final int ST_KILLPENDING = 3;//正在关闭 private static final int ST_KILLED = 4;//关闭 private int state;//通道状态 private SocketAddress localAddress;//socket本地地址 private SocketAddress remoteAddress;//socket远端地址 private boolean isInputOpen;//输入流是否打开 private boolean isOutputOpen;//输出流是否打开 private boolean readyToConnect;//是否正在准备连接 private Socket socket;//通道套接字 static final boolean $assertionsDisabled = !sun/nio/ch/SocketChannelImpl.desiredAssertionStatus(); static { //加载nio,net资源库 Util.load(); } }
SocketChannelImpl的构造方法有三种分别如下
1.
SocketChannelImpl(SelectorProvider selectorprovider) throws IOException { super(selectorprovider); readerThread = 0L; writerThread = 0L; //初始化读写及状态锁 readLock = new Object(); writeLock = new Object(); stateLock = new Object(); state = -1;//状态默认为未初始化 isInputOpen = true; isOutputOpen = true; readyToConnect = false; fd = Net.socket(true);//初始化文件描述符 fdVal = IOUtil.fdVal(fd);//获取文件描述的值 state = 0;//已初始化,未连接 }
2.
SocketChannelImpl(SelectorProvider selectorprovider, FileDescriptor filedescriptor, boolean flag) throws IOException { super(selectorprovider); readerThread = 0L; writerThread = 0L; readLock = new Object(); writeLock = new Object(); stateLock = new Object(); state = -1; isInputOpen = true; isOutputOpen = true; readyToConnect = false; fd = filedescriptor; fdVal = IOUtil.fdVal(filedescriptor); state = 0;//已初始化,未连接 if(flag) //初始化本地地址 localAddress = Net.localAddress(filedescriptor); }
3.
SocketChannelImpl(SelectorProvider selectorprovider, FileDescriptor filedescriptor, InetSocketAddress inetsocketaddress) throws IOException { super(selectorprovider); readerThread = 0L; writerThread = 0L; readLock = new Object(); writeLock = new Object(); stateLock = new Object(); state = -1; isInputOpen = true; isOutputOpen = true; readyToConnect = false; fd = filedescriptor; fdVal = IOUtil.fdVal(filedescriptor); state = 2;//已连接 localAddress = Net.localAddress(filedescriptor); remoteAddress = inetsocketaddress; }
我们需要关注的是这两点,
a.fd = Net.socket(true);//初始化文件描述符
//Net
static FileDescriptor socket(boolean flag) throws IOException { return socket(UNSPEC, flag); } static FileDescriptor socket(ProtocolFamily protocolfamily, boolean flag) throws IOException { boolean flag1 = isIPv6Available() && protocolfamily != StandardProtocolFamily.INET; return IOUtil.newFD(socket0(flag1, flag, false)); } private static native int socket0(boolean flag, boolean flag1, boolean flag2);
//IOUtil
static FileDescriptor newFD(int i) { FileDescriptor filedescriptor = new FileDescriptor(); setfdVal(filedescriptor, i); return filedescriptor; }
这个我们在ServerSocketChannelImpl解析这篇文章接触过Net和IOUtil,这里不具体的解释了
,看一下即可,很容易理解。
b.localAddress = Net.localAddress(filedescriptor);
//Net
static InetSocketAddress localAddress(FileDescriptor filedescriptor) throws IOException { return new InetSocketAddress(localInetAddress(filedescriptor), localPort(filedescriptor)); } private static native int localPort(FileDescriptor filedescriptor) throws IOException; private static native InetAddress localInetAddress(FileDescriptor filedescriptor) throws IOException;
从上面可以看出SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
来看SocketChannelImpl的其他方法
//连接socket地址
public boolean connect(SocketAddress socketaddress) throws IOException { boolean flag = false; Object obj = readLock;//同步读锁 JVM INSTR monitorenter ;//try Object obj1 = writeLock;//同步写锁 JVM INSTR monitorenter ; InetSocketAddress inetsocketaddress; //确保socket通道处于打开状态,没有连接 ensureOpenAndUnconnected(); //检查socketAddress正确与合法性 inetsocketaddress = Net.checkAddress(socketaddress); SecurityManager securitymanager = System.getSecurityManager(); if(securitymanager != null) //检查当前线程是否有Connect方法的访问控制权限 securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort()); //同步regLock锁,Lock for registration and configureBlocking operations //这个在AbstractSelectableChannel中定义 Object obj2 = blockingLock(); JVM INSTR monitorenter ; int i = 0; //Marks the begin/end of an I/O operation that might block indefinitely. begin();//与end协调使用,用于可能阻塞IO操作 boolean flag1; //同步状态锁 synchronized(stateLock) { if(isOpen()) break MISSING_BLOCK_LABEL_149; flag1 = false; } //清除Reader线程 readerCleanup(); end(i > 0 || i == -2); //断言连接结果大于-2,则连接失败,抛出断言异常 if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); return flag1; if(localAddress == null) //beforeTcpConnect为静态空方法体,这个我们在ServerSocketChannelImpl中有说 NetHooks.beforeTcpConnect(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); //初始化读线程 readerThread = NativeThread.current(); obj3; JVM INSTR monitorexit ; do { InetAddress inetaddress = inetsocketaddress.getAddress(); if(inetaddress.isAnyLocalAddress()) inetaddress = InetAddress.getLocalHost(); //尝试连接socket地址 i = Net.connect(fd, inetaddress, inetsocketaddress.getPort()); } while(i == -3 && isOpen()); readerCleanup(); end(i > 0 || i == -2); if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); break MISSING_BLOCK_LABEL_358; Exception exception1; exception1; readerCleanup(); end(i > 0 || i == -2); if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); else throw exception1; IOException ioexception; ioexception; //出现IO异常,则关闭通道 close(); throw ioexception; Object obj4 = stateLock; JVM INSTR monitorenter ; remoteAddress = inetsocketaddress; if(i <= 0) goto _L2; else goto _L1 _L1: state = 2; if(isOpen()) localAddress = Net.localAddress(fd); true; obj2; JVM INSTR monitorexit ;//退出同步 obj1; JVM INSTR monitorexit ; obj; JVM INSTR monitorexit ; return; _L2: if(!isBlocking()) state = 1; else if(!$assertionsDisabled) throw new AssertionError(); obj4; JVM INSTR monitorexit ; goto _L3 Exception exception2; exception2; obj4; JVM INSTR monitorexit ; throw exception2; _L3: obj2; JVM INSTR monitorexit ; goto _L4 Exception exception3; exception3; obj2; JVM INSTR monitorexit ; throw exception3; _L4: false; obj1; JVM INSTR monitorexit ; obj; JVM INSTR monitorexit ; return; Exception exception4; exception4; throw exception4; Exception exception5; exception5; throw exception5; }
connect连接方法有几点要看:
1.
//确保socket通道处于打开状态,没有连接 ensureOpenAndUnconnected();
2.
//清除Reader线程 readerCleanup();
3.尝试连接socket地址
do { InetAddress inetaddress = inetsocketaddress.getAddress(); if(inetaddress.isAnyLocalAddress()) inetaddress = InetAddress.getLocalHost(); //尝试连接socket地址 i = Net.connect(fd, inetaddress, inetsocketaddress.getPort()); } while(i == -3 && isOpen());
4.检查连接结果
if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); else throw exception1; IOException ioexception; ioexception; //出现IO异常,则关闭通道 close();
下面分别来看这四点:
1.
//确保socket通道处于打开状态,没有连接 ensureOpenAndUnconnected();
void ensureOpenAndUnconnected() throws IOException { synchronized(stateLock) { if(!isOpen())//通道关闭 throw new ClosedChannelException(); if(state == 2)//已经连接 throw new AlreadyConnectedException(); if(state == 1)//正在来接 throw new ConnectionPendingException(); } }
2.
//清除Reader线程 readerCleanup();
private void readerCleanup() throws IOException { synchronized(stateLock) { readerThread = 0L; //连接正在关闭,则调用kill完成实际关闭工作 if(state == 3) kill(); } }
3.尝试连接socket地址
do { InetAddress inetaddress = inetsocketaddress.getAddress(); if(inetaddress.isAnyLocalAddress()) inetaddress = InetAddress.getLocalHost(); //尝试连接socket地址,这里为什么是循序,因为连接操作有可能被中断,及i为-3, //当中断位消除时,继续尝试连接 i = Net.connect(fd, inetaddress, inetsocketaddress.getPort()); } while(i == -3 && isOpen());
//Net
static int connect(FileDescriptor filedescriptor, InetAddress inetaddress, int i) throws IOException { return connect(UNSPEC, filedescriptor, inetaddress, i); } static int connect(ProtocolFamily protocolfamily, FileDescriptor filedescriptor, InetAddress inetaddress, int i) throws IOException { boolean flag = isIPv6Available() && protocolfamily != StandardProtocolFamily.INET; return connect0(flag, filedescriptor, inetaddress, i); } private static native int connect0(boolean flag, FileDescriptor filedescriptor, InetAddress inetaddress, int i) throws IOException;
4.检查连接结果
if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); else throw exception1; IOException ioexception; ioexception; //出现IO异常,则关闭通道 close();
这一点我们需要关注的是IOStatus.check(i)这句:
//IOStatus
package sun.nio.ch; final class IOStatus { static final int EOF = -1;//结束 static final int UNAVAILABLE = -2;//不可用 static final int INTERRUPTED = -3;//操作中断 static final int UNSUPPORTED = -4;//不支持 static final int THROWN = -5;//异常 static final int UNSUPPORTED_CASE = -6; private IOStatus() { } static int normalize(int i) { if(i == -2) return 0; else return i; } //连接结果i大于等于-2,即连接失败 static boolean check(int i) { return i >= -2; } static long normalize(long l) { if(l == -2L) return 0L; else return l; } static boolean check(long l) { return l >= -2L; } static boolean checkAll(long l) { return l > -1L || l < -6L; } }
从上面可以看出,connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;
然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,
最后尝试连接socket地址。
再来看地址绑定方法bind
public SocketChannel bind(SocketAddress socketaddress) throws IOException { //同步读锁,写锁,状态锁 synchronized(readLock) { synchronized(writeLock) { synchronized(stateLock) { if(!isOpen())//通道关闭 throw new ClosedChannelException(); if(state == 1)//正在连接 throw new ConnectionPendingException(); if(localAddress != null) throw new AlreadyBoundException(); //检查地址 InetSocketAddress inetsocketaddress = socketaddress != null ? Net.checkAddress(socketaddress) : new InetSocketAddress(0); NetHooks.beforeTcpBind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); //绑定地址,这个在ServerSocketChannelImpl篇,一看过不在重复。 Net.bind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); //初始化localAddress localAddress = Net.localAddress(fd); } } } return this; }
下面来看SocketChannelImpl的几个读写方法
先来看从缓冲区读取数据,写到通道
public int write(ByteBuffer bytebuffer) throws IOException { if(bytebuffer == null) throw new NullPointerException(); Object obj = writeLock;//同步写锁 JVM INSTR monitorenter ;//进入同步 int i; //确保没有关闭输出流 ensureWriteOpen(); i = 0; begin();//end, int k; synchronized(stateLock) { if(isOpen()) break MISSING_BLOCK_LABEL_140; k = 0; } //清除写线程 writerCleanup(); end(i > 0 || i == -2); //同步状态锁,如果通道输出流关闭或写异常,则抛出AsynchronousCloseException synchronized(stateLock) { if(i <= 0 && !isOutputOpen) throw new AsynchronousCloseException(); } //断言,检查写结果 if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); return k; //初始化线程 writerThread = NativeThread.current(); obj1; JVM INSTR monitorexit ; int j; do //写字节流,为什么是循环写,如果字节序列太多,发送缓冲区一次写不完,需要分多次写 i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock); while(i == -3 && isOpen()); j = IOStatus.normalize(i); writerCleanup(); end(i > 0 || i == -2); synchronized(stateLock) { if(i <= 0 && !isOutputOpen) throw new AsynchronousCloseException(); } if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); obj; JVM INSTR monitorexit ; return j; Exception exception3; exception3; writerCleanup(); end(i > 0 || i == -2); synchronized(stateLock) { if(i <= 0 && !isOutputOpen) throw new AsynchronousCloseException(); } if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); else throw exception3; Exception exception5; exception5; throw exception5; }
写操作需要关注一下几点,
1.
//确保没有关闭输出流 ensureWriteOpen();
2.
//写字节流 do //写字节流 i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock); while(i == -3 && isOpen())
3.
//清除写线程 writerCleanup();
下面分别来看这三点
1.
//确保没有关闭输出流 ensureWriteOpen(); private void ensureWriteOpen() throws ClosedChannelException { synchronized(stateLock) { if(!isOpen())//通道关闭 throw new ClosedChannelException(); if(!isOutputOpen)//输出流关闭 throw new ClosedChannelException(); if(!isConnected())//还没连接 throw new NotYetConnectedException(); } }
2.
//写字节流 do //写字节流,为什么是循环写,如果字节序列太多,发送缓冲区一次写不完,需要分多次写 i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock); while(i == -3 && isOpen())
//IOUtil
static int write(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj) throws IOException { int i; ByteBuffer bytebuffer1; //如果ByteBffer为DirectBuffer,则调用writeFromNativeBuffer if(bytebuffer instanceof DirectBuffer) return writeFromNativeBuffer(filedescriptor, bytebuffer, l, nativedispatcher, obj); //获取缓冲区的当前位置 i = bytebuffer.position(); //获取缓冲区limit位置 int j = bytebuffer.limit(); //断言position是否大于limit,是抛出AssertionError if(!$assertionsDisabled && i > j) throw new AssertionError(); int k = i > j ? 0 : j - i;//需要些的字节数 //获取k个字节的临时DirectBuffer bytebuffer1 = Util.getTemporaryDirectBuffer(k); int j1; 写缓冲区到临时内存缓冲区DirectBuffer-bytebuffer1 bytebuffer1.put(bytebuffer); //转换bytebuffer1写模式,为读模式 bytebuffer1.flip(); bytebuffer.position(i);//重新定位bytebuffer的position位置 //从本地缓冲空间写字节流,i1为已写的字节数 int i1 = writeFromNativeBuffer(filedescriptor, bytebuffer1, l, nativedispatcher, obj); if(i1 > 0) //重新定位bytebuffer的position位置 //为什么重新定位bytebuffer的position位, //如果字节序列太多,发送缓冲区一次写不完,需要分多次写 //将position向前移动i1位置,避免重复写即已写过的字节序列。 bytebuffer.position(i + i1); j1 = i1; //将byteBuffer内存写到当前线程的缓存区 Util.offerFirstTemporaryDirectBuffer(bytebuffer1); return j1; Exception exception; exception; Util.offerFirstTemporaryDirectBuffer(bytebuffer1); throw exception; }
这一步我们有几点要关注:
a.
//获取k个字节的临时DirectBuffer bytebuffer1 = Util.getTemporaryDirectBuffer(k);
想要理解这点,先看一下Util的定义
//Util
class Util { private static final int TEMP_BUF_POOL_SIZE;//临时缓冲区大小 private static ThreadLocal localSelector = new ThreadLocal(); private static ThreadLocal localSelectorWrapper = new ThreadLocal(); private static Unsafe unsafe = Unsafe.getUnsafe(); private static int pageSize = -1; private static volatile Constructor directByteBufferConstructor = null; private static volatile Constructor directByteBufferRConstructor = null; private static volatile String bugLevel = null; private static boolean loaded = false; static final boolean $assertionsDisabled = !sun/nio/ch/Util.desiredAssertionStatus(); static { //初始化临时缓冲区大小,为IOUtil的IOV_MAX,及系统默认最大IO缓冲区大小 //static final int IOV_MAX = iovMax(); //static native int iovMax(); TEMP_BUF_POOL_SIZE = IOUtil.IOV_MAX; } //线程本地缓存区 private static ThreadLocal bufferCache = new ThreadLocal() { protected BufferCache initialValue() { return new BufferCache(); } protected volatile Object initialValue() { return initialValue(); } }; }
//IOUtil,变量IOV_MAX
static native int iovMax(); static final int IOV_MAX = iovMax();
再来看Util的缓冲区的定义BufferCache
//Util
private static class BufferCache { //存放字节序列的缓存数组,可以这么理解buffers为 //当前缓冲区存放的字节序列ByteBuffer //buffers的size,即为当前缓冲区可以接受写多少个字节序列ByteBuffer private ByteBuffer buffers[]; private int count;//当前缓冲区中,有数据的字节序列ByteBuffer的个数,即buffers计数器 private int start;//缓冲区buffers的开始索引,即头部 static final boolean $assertionsDisabled = !sun/nio/ch/Util.desiredAssertionStatus(); BufferCache() { //初始化缓冲区 buffers = new ByteBuffer[Util.TEMP_BUF_POOL_SIZE]; } //向缓冲区的头部添加一个字节序列bytebuffer,即写字节序列到缓存区 boolean offerFirst(ByteBuffer bytebuffer) { if(count >= Util.TEMP_BUF_POOL_SIZE) { //如果当前缓冲区已满,则返回false,即当前不能写字节序列到缓存区 return false; } else { //获取缓冲区byteBuffers的当前头部索引start的前一个索引 start = ((start + Util.TEMP_BUF_POOL_SIZE) - 1) % Util.TEMP_BUF_POOL_SIZE; //写字节序列到缓存区的索引start对应的ByteBuffer buffers[start] = bytebuffer; count++;//缓冲区bytebuffer计数器+1 return true;//写字节序列到缓存区成功 } } //这个与offerFirst恰好相反,写字节序列到缓冲区的尾部(索引start + count) boolean offerLast(ByteBuffer bytebuffer) { if(count >= Util.TEMP_BUF_POOL_SIZE) { return false; } else { int i = (start + count) % Util.TEMP_BUF_POOL_SIZE; buffers[i] = bytebuffer; count++; return true; } } //缓冲区buffers,索引向后移动 private int next(int i) { return (i + 1) % Util.TEMP_BUF_POOL_SIZE; } //注意这个i不是索引的意思,是需要写的字节序列的字节个数, //这个在IOUtil的write方法中调用,如下面两行代码 //获取k个字节的临时DirectBuffer //bytebuffer1 = Util.getTemporaryDirectBuffer(k); ByteBuffer get(int i) { //如果缓存区当前可用的可用的ByteBuffer,返回null if(count == 0) return null; ByteBuffer abytebuffer[] = buffers; ByteBuffer bytebuffer = abytebuffer[start]; //如果当前缓冲区start索引对应的bytebuffer,不够用,即容量不够存放要写的字节序列 //则遍历当前buffers,找到可以存放的bytebuffer if(bytebuffer.capacity() < i) { bytebuffer = null; int j = start; do { if((j = next(j)) == start) //只有一个bytebuffer,break break; ByteBuffer bytebuffer1 = abytebuffer[j]; if(bytebuffer1 == null) //下一个bytebuffer为null,break break; if(bytebuffer1.capacity() < i) //容量不够用,continue continue; //找到可以存放i个字节序列的bytebuffer bytebuffer = bytebuffer1; break; } while(true); if(bytebuffer == null) return null; abytebuffer[j] = abytebuffer[start]; } //清空 abytebuffer[start] = null; start = next(start); count--;//缓冲区bytebuffer计数器-1 //调用rewind,为了从开始位置写字节流 bytebuffer.rewind(); bytebuffer.limit(i);//限制bytebuffer的可用空间limit return bytebuffer; } //缓冲区是否为空 boolean isEmpty() { return count == 0; } //移除缓冲区头部的bytebuffer ByteBuffer removeFirst() { //如果断言开启, 缓冲区为空,抛出断言异常 if(!$assertionsDisabled && count <= 0) { throw new AssertionError(); } else { //有了上面几个方法,下面应该很好理解,就不说了 ByteBuffer bytebuffer = buffers[start]; buffers[start] = null; start = next(start); count--; return bytebuffer; } } }
从上面可以看出BufferCache用一个ByteBuffer数组buffers存放写到缓冲区的字节流序列,每次写字节流对应一个ByteBuffer,用count记录当前缓冲区中的有数据或可用的ByteBuffer数量,start记录当前缓冲区buffers的头部;offerFirst方法向缓冲区的头部添加一个字节序列bytebuffer,即写字节序列到缓存区;offerLast与offerFirst恰好相反,写字节序列到缓冲区的尾部(索引start + count);next方法为向后移动缓冲区buffers索引;get(int i)方法为从缓冲区获取可以存放i个字节序列的ByteBuffer,并rewind字节缓冲区ByteBuffer,
限制孔勇空间为ByteBuffer。removeFirst为移除缓冲区头部的bytebuffer,并返回。
看过Util的BufferCache的定义,我们再回到
//获取k个字节的临时DirectBuffer bytebuffer1 = Util.getTemporaryDirectBuffer(k);
//Util
static ByteBuffer getTemporaryDirectBuffer(int i) { //获取当前线程的缓冲区(ThreadLocal-bufferCache) BufferCache buffercache = (BufferCache)bufferCache.get(); //从缓冲区获取容量第一个大于i的ByteBuffer ByteBuffer bytebuffer = buffercache.get(i); //如果缓冲区存在容量大于i个字节的bytebuffer,直接返回 if(bytebuffer != null) return bytebuffer; //如果缓冲区中不存在容量大于i的bytebuffer,且不为空; //则移除缓冲区头部的bytebuffer if(!buffercache.isEmpty()) { ByteBuffer bytebuffer1 = buffercache.removeFirst(); //释放bytebuffer1 free(bytebuffer1); } //ByteBuffer直接分配一个DirectByteBuffer,存放字节序列 return ByteBuffer.allocateDirect(i); }
获取临时DirectByteBuffer有两点要看
a.1
//释放bytebuffer1 free(bytebuffer1);
//Util
private static void free(ByteBuffer bytebuffer) { //实际委托给DirectBuffer的clean,这个我们在DirectByteBuffer有说, //即释放分配的实际物理内存 ((DirectBuffer)bytebuffer).cleaner().clean(); }
//DirectBuffer
package sun.nio.ch; import sun.misc.Cleaner; public interface DirectBuffer { public abstract long address(); public abstract Object attachment(); public abstract Cleaner cleaner(); }
a.2
public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); }
b.
//从本地缓冲空间写字节流,i1为已写的字节数 int i1 = writeFromNativeBuffer(filedescriptor, bytebuffer1, l, nativedispatcher, obj);
//nativedispatcher参数实际为SocketDispatcher
private static int writeFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj) throws IOException { int i = bytebuffer.position(); int j = bytebuffer.limit(); if(!$assertionsDisabled && i > j) throw new AssertionError(); int k = i > j ? 0 : j - i; int i1 = 0; if(k == 0) return 0; if(l != -1L) //这个方法在Nativedispatcher定义,在SocketDispatcher并没有实现,obj为writeLock i1 = nativedispatcher.pwrite(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj); else //默认的写操作 i1 = nativedispatcher.write(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k); if(i1 > 0) //将position向前移动i1位置,避免重复写即已写过的字节序列 bytebuffer.position(i + i1); return i1; }
来看两种方式的写
b.1
if(l != -1L) //这个在Nativedispatcher,在SocketDispatcher并没有实现 i1 = nativedispatcher.pwrite(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);
//Nativedispatcher
int pwrite(FileDescriptor filedescriptor, long l, int i, long l1, Object obj) throws IOException { //操作当前JDK,不支持,留待以后扩展用吧,我的JDK为1.7.0.17 throw new IOException("Operation Unsupported"); }
b.2
else //默认的写操作 i1 = nativedispatcher.write(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);
//SocketDispatcher
int write(FileDescriptor filedescriptor, long l, int i) throws IOException { return write0(filedescriptor, l, i); } static native int write0(FileDescriptor filedescriptor, long l, int i) throws IOException;
从缓冲读取字节序列,写到通道中,实际是通过SocketDispatcher完成实际的写工作,当前默认的写方法为write(FileDescriptor filedescriptor, long l, int i)。
c.
//添加bytebuffer到线程当前缓冲区 Util.offerFirstTemporaryDirectBuffer(bytebuffer1);
static void offerFirstTemporaryDirectBuffer(ByteBuffer bytebuffer) { if(!$assertionsDisabled && bytebuffer == null) throw new AssertionError(); //获取当前线程缓冲区 BufferCache buffercache = (BufferCache)bufferCache.get(); //将bytebuffer添加到缓冲区 if(!buffercache.offerFirst(bytebuffer)) free(bytebuffer); }
3.
//清除写线程
writerCleanup(); private void writerCleanup() throws IOException { synchronized(stateLock) { writerThread = 0L; if(state == 3) //这个kill操作,我们会在后面再讲 kill(); } }
从以上分析可以看出,从缓冲区读取字节序列写到通道,首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区,
以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。
总结:
SocketChannelImpl构造主要是初始化读写及状态锁和通道socket文件描述。
connect连接方法首先同步读锁和写锁,确保socket通道打开,并没有连接;然后检查socket地址的正确性与合法性,然后检查当前线程是否有Connect方法的访问控制权限,最后尝试连接socket地址。从缓冲区读取字节序列写到通道write(ByteBuffer),首先确保通道打开,且输出流没有关闭,然后委托给IOUtil写字节序列;IOUtil写字节流过程为首先通过Util从当前线程的缓冲区获取可以容下字节序列的临时缓冲区(DirectByteBuffer),如果没有则创建一个DirectByteBuffer,将字节序列写到临时的DirectByteBuffer中,然后将写操作委托给nativedispatcher(SocketDispatcher),将DirectByteBuffer添加到当前线程的缓冲区,
以便重用,因为DirectByteBuffer实际上是存在物理内存中,频繁的分配将会消耗更多的资源。
SocketChannelImpl 解析二(发送数据后续):http://donald-draper.iteye.com/blog/2372548
附:
权限检查:SecurityManager为系统的默认安全检查管理器,主要用于检查当前线程是否拥有
某个权限的访问控制权限,比如socket连接,监听,获取类加载等。
//SecurityManager
//检查socket连接权限 public void checkConnect(String host, int port) { if (host == null) { throw new NullPointerException("host can't be null"); } if (!host.startsWith("[") && host.indexOf(':') != -1) { host = "[" + host + "]"; } if (port == -1) { checkPermission(new SocketPermission(host, SecurityConstants.SOCKET_RESOLVE_ACTION)); } else { //检查是否socket连接访问控制权限 checkPermission(new SocketPermission(host+":"+port, SecurityConstants.SOCKET_CONNECT_ACTION)); } } public void checkPermission(Permission perm) { //检查是否perm的访问控制权限 java.security.AccessController.checkPermission(perm); }
//SecurityConstants,安全权限常量
public final class SecurityConstants { //AWT为创建图形界面相关权限 public static class AWT { private static PermissionFactory permissionFactory() { Class class1; class1 = (Class)AccessController.doPrivileged(new PrivilegedAction() { public Class run() { return Class.forName("sun.awt.AWTPermissionFactory", true, null); ClassNotFoundException classnotfoundexception; classnotfoundexception; return null; } public volatile Object run() { return run(); } }); if(class1 == null) break MISSING_BLOCK_LABEL_52; return (PermissionFactory)class1.newInstance(); Object obj; obj; throw new InternalError(((InstantiationException) (obj)).getMessage()); obj; throw new InternalError(((IllegalAccessException) (obj)).getMessage()); return new FakeAWTPermissionFactory(); } private static Permission newAWTPermission(String s) { return factory.newPermission(s); } private static final String AWTFactory = "sun.awt.AWTPermissionFactory"; private static final PermissionFactory factory = permissionFactory(); public static final Permission TOPLEVEL_WINDOW_PERMISSION = newAWTPermission("showWindowWithoutWarningBanner"); public static final Permission ACCESS_CLIPBOARD_PERMISSION = newAWTPermission("accessClipboard");//访问粘贴板 public static final Permission CHECK_AWT_EVENTQUEUE_PERMISSION = newAWTPermission("accessEventQueue"); public static final Permission TOOLKIT_MODALITY_PERMISSION = newAWTPermission("toolkitModality"); public static final Permission READ_DISPLAY_PIXELS_PERMISSION = newAWTPermission("readDisplayPixels"); public static final Permission CREATE_ROBOT_PERMISSION = newAWTPermission("createRobot"); public static final Permission WATCH_MOUSE_PERMISSION = newAWTPermission("watchMousePointer"); public static final Permission SET_WINDOW_ALWAYS_ON_TOP_PERMISSION = newAWTPermission("setWindowAlwaysOnTop"); public static final Permission ALL_AWT_EVENTS_PERMISSION = newAWTPermission("listenToAllAWTEvents"); public static final Permission ACCESS_SYSTEM_TRAY_PERMISSION = newAWTPermission("accessSystemTray"); private AWT() { } } private static class FakeAWTPermission extends BasicPermission { public String toString() { return (new StringBuilder()).append("(\"java.awt.AWTPermission\" \"").append(getName()).append("\")").toString(); } private static final long serialVersionUID = -1L; public FakeAWTPermission(String s) { super(s); } } private static class FakeAWTPermissionFactory implements PermissionFactory { public FakeAWTPermission newPermission(String s) { return new FakeAWTPermission(s); } public volatile Permission newPermission(String s) { return newPermission(s); } private FakeAWTPermissionFactory() { } } private SecurityConstants() { } public static final String FILE_DELETE_ACTION = "delete";//文件删除 public static final String FILE_EXECUTE_ACTION = "execute";//文件执行 public static final String FILE_READ_ACTION = "read";//文件读 public static final String FILE_WRITE_ACTION = "write";//写文件 public static final String FILE_READLINK_ACTION = "readlink"; public static final String SOCKET_RESOLVE_ACTION = "resolve"; public static final String SOCKET_CONNECT_ACTION = "connect";//socket连接 public static final String SOCKET_LISTEN_ACTION = "listen";//socket监听 public static final String SOCKET_ACCEPT_ACTION = "accept";//socket接受连接 public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";//socket连接,接受连接 public static final String PROPERTY_RW_ACTION = "read,write";//读写属性 public static final String PROPERTY_READ_ACTION = "read";//读属性 public static final String PROPERTY_WRITE_ACTION = "write";//写属性 public static final AllPermission ALL_PERMISSION = new AllPermission(); public static final NetPermission SPECIFY_HANDLER_PERMISSION = new NetPermission("specifyStreamHandler"); public static final NetPermission SET_PROXYSELECTOR_PERMISSION = new NetPermission("setProxySelector"); public static final NetPermission GET_PROXYSELECTOR_PERMISSION = new NetPermission("getProxySelector"); public static final NetPermission SET_COOKIEHANDLER_PERMISSION = new NetPermission("setCookieHandler"); public static final NetPermission GET_COOKIEHANDLER_PERMISSION = new NetPermission("getCookieHandler"); public static final NetPermission SET_RESPONSECACHE_PERMISSION = new NetPermission("setResponseCache"); public static final NetPermission GET_RESPONSECACHE_PERMISSION = new NetPermission("getResponseCache"); //创建类加载器 public static final RuntimePermission CREATE_CLASSLOADER_PERMISSION = new RuntimePermission("createClassLoader"); public static final RuntimePermission CHECK_MEMBER_ACCESS_PERMISSION = new RuntimePermission("accessDeclaredMembers"); //修改线程 public static final RuntimePermission MODIFY_THREAD_PERMISSION = new RuntimePermission("modifyThread"); //修改线程分组信息 public static final RuntimePermission MODIFY_THREADGROUP_PERMISSION = new RuntimePermission("modifyThreadGroup"); public static final RuntimePermission GET_PD_PERMISSION = new RuntimePermission("getProtectionDomain"); //获取类加载器 public static final RuntimePermission GET_CLASSLOADER_PERMISSION = new RuntimePermission("getClassLoader"); public static final RuntimePermission STOP_THREAD_PERMISSION = new RuntimePermission("stopThread"); public static final RuntimePermission GET_STACK_TRACE_PERMISSION = new RuntimePermission("getStackTrace"); public static final SecurityPermission CREATE_ACC_PERMISSION = new SecurityPermission("createAccessControlContext"); public static final SecurityPermission GET_COMBINER_PERMISSION = new SecurityPermission("getDomainCombiner"); public static final SecurityPermission GET_POLICY_PERMISSION = new SecurityPermission("getPolicy"); public static final SocketPermission LOCAL_LISTEN_PERMISSION = new SocketPermission("localhost:1024-", "listen"); }
发表评论
-
文件通道解析二(文件锁,关闭通道)
2017-05-16 23:17 1108文件通道解析一(读写操作,通道数据传输等):http://do ... -
文件通道解析一(读写操作,通道数据传输等)
2017-05-16 10:04 1206Reference定义(PhantomRefere ... -
文件通道创建方式综述
2017-05-15 17:39 1109Reference定义(PhantomReference,Cl ... -
文件读写方式简单综述后续(文件,流构造)
2017-05-14 23:04 1525Java Socket通信实例:http://donald-d ... -
文件读写方式简单综述
2017-05-14 11:13 1169Java Socket通信实例:http://donald-d ... -
FileChanne定义
2017-05-12 23:28 991文件读写方式简单综述:http://donald-draper ... -
SeekableByteChannel接口定义
2017-05-11 08:43 1282ByteChannel,分散聚集通道接口的定义(SocketC ... -
FileChannel示例
2017-05-11 08:37 1027前面我们看过socket通道,datagram通道,以管道Pi ... -
PipeImpl解析
2017-05-11 08:41 974ServerSocketChannel定义:http://do ... -
Pipe定义
2017-05-10 09:07 958Channel接口定义:http://donald-drape ... -
NIO-Pipe示例
2017-05-10 08:47 951PipeImpl解析:http://donald-draper ... -
DatagramChannelImpl 解析四(地址绑定,关闭通道等)
2017-05-10 08:27 832DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析三(多播)
2017-05-10 08:20 2002DatagramChannelImpl 解析一(初始化):ht ... -
NIO-UDP实例
2017-05-09 12:32 1628DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析二(报文发送与接收)
2017-05-09 09:03 1453DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析一(初始化)
2017-05-08 21:52 1471Channel接口定义:http://donald-drape ... -
MembershipKeyImpl 简介
2017-05-08 09:11 965MembershipKey定义:http://donald-d ... -
DatagramChannel定义
2017-05-07 23:13 1266Channel接口定义:http://donald-drape ... -
MulticastChanne接口定义
2017-05-07 13:45 1195NetworkChannel接口定义:ht ... -
MembershipKey定义
2017-05-06 16:20 966package java.nio.channels; i ...
相关推荐
此外,Stream API也是1.8的一大亮点,它提供了处理集合的新方式,可以进行数据流的过滤、映射、归约等操作,大大简化了数据处理的代码。 在"jdk-5b86f66575b7"这个压缩包中,很可能包含了JDK 1.8的完整源码,包括...
7 服务器端没有指定adapter的端口和ip,仅仅随便给了一个名字,并使用该名字从配置文件中读取信息: 启动服务器时没有问题正常,但是客户端无法连接 原因: 对象适配器无效 错误信息: 抛出异常: Ice::...
嵌入式八股文面试题库资料知识宝典-华为的面试试题.zip
训练导控系统设计.pdf
嵌入式八股文面试题库资料知识宝典-网络编程.zip
人脸转正GAN模型的高效压缩.pdf
少儿编程scratch项目源代码文件案例素材-几何冲刺 转瞬即逝.zip
少儿编程scratch项目源代码文件案例素材-鸡蛋.zip
嵌入式系统_USB设备枚举与HID通信_CH559单片机USB主机键盘鼠标复合设备控制_基于CH559单片机的USB主机模式设备枚举与键盘鼠标数据收发系统支持复合设备识别与HID
嵌入式八股文面试题库资料知识宝典-linux常见面试题.zip
面向智慧工地的压力机在线数据的预警应用开发.pdf
基于Unity3D的鱼类运动行为可视化研究.pdf
少儿编程scratch项目源代码文件案例素材-霍格沃茨魔法学校.zip
少儿编程scratch项目源代码文件案例素材-金币冲刺.zip
内容概要:本文深入探讨了HarmonyOS编译构建子系统的作用及其技术细节。作为鸿蒙操作系统背后的关键技术之一,编译构建子系统通过GN和Ninja工具实现了高效的源代码到机器代码的转换,确保了系统的稳定性和性能优化。该系统不仅支持多系统版本构建、芯片厂商定制,还具备强大的调试与维护能力。其高效编译速度、灵活性和可扩展性使其在华为设备和其他智能终端中发挥了重要作用。文章还比较了HarmonyOS编译构建子系统与安卓和iOS编译系统的异同,并展望了其未来的发展趋势和技术演进方向。; 适合人群:对操作系统底层技术感兴趣的开发者、工程师和技术爱好者。; 使用场景及目标:①了解HarmonyOS编译构建子系统的基本概念和工作原理;②掌握其在不同设备上的应用和优化策略;③对比HarmonyOS与安卓、iOS编译系统的差异;④探索其未来发展方向和技术演进路径。; 其他说明:本文详细介绍了HarmonyOS编译构建子系统的架构设计、核心功能和实际应用案例,强调了其在万物互联时代的重要性和潜力。阅读时建议重点关注编译构建子系统的独特优势及其对鸿蒙生态系统的深远影响。
嵌入式八股文面试题库资料知识宝典-奇虎360 2015校园招聘C++研发工程师笔试题.zip
嵌入式八股文面试题库资料知识宝典-腾讯2014校园招聘C语言笔试题(附答案).zip
双种群变异策略改进RWCE算法优化换热网络.pdf
内容概要:本文详细介绍了基于瞬时无功功率理论的三电平有源电力滤波器(APF)仿真研究。主要内容涵盖并联型APF的工作原理、三相三电平NPC结构、谐波检测方法(ipiq)、双闭环控制策略(电压外环+电流内环PI控制)以及SVPWM矢量调制技术。仿真结果显示,在APF投入前后,电网电流THD从21.9%降至3.77%,显著提高了电能质量。 适用人群:从事电力系统研究、电力电子技术开发的专业人士,尤其是对有源电力滤波器及其仿真感兴趣的工程师和技术人员。 使用场景及目标:适用于需要解决电力系统中谐波污染和无功补偿问题的研究项目。目标是通过仿真验证APF的有效性和可行性,优化电力系统的电能质量。 其他说明:文中提到的仿真模型涉及多个关键模块,如三相交流电压模块、非线性负载、信号采集模块、LC滤波器模块等,这些模块的设计和协同工作对于实现良好的谐波抑制和无功补偿至关重要。
内容概要:本文探讨了在工业自动化和物联网交汇背景下,构建OPC DA转MQTT网关软件的需求及其具体实现方法。文中详细介绍了如何利用Python编程语言及相关库(如OpenOPC用于读取OPC DA数据,paho-mqtt用于MQTT消息传递),完成从OPC DA数据解析、格式转换到最终通过MQTT协议发布数据的关键步骤。此外,还讨论了针对不良网络环境下数据传输优化措施以及后续测试验证过程。 适合人群:从事工业自动化系统集成、物联网项目开发的技术人员,特别是那些希望提升跨协议数据交换能力的专业人士。 使用场景及目标:适用于需要在不同通信协议间建立高效稳定的数据通道的应用场合,比如制造业生产线监控、远程设备管理等。主要目的是克服传统有线网络限制,实现在不稳定无线网络条件下仍能保持良好性能的数据传输。 其他说明:文中提供了具体的代码片段帮助理解整个流程,并强调了实际部署过程中可能遇到的问题及解决方案。