- 浏览: 980010 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
Mysql负载均衡连接的获取:http://donald-draper.iteye.com/blog/2342089
Mysql主从复制读写分离连接的获取:http://donald-draper.iteye.com/blog/2342108
ConnectionImp创建MysqlIO :http://donald-draper.iteye.com/blog/2342959
Mysql预编译SQL:http://donald-draper.iteye.com/blog/2342960
MysqlSQL PreparedStatement的查询:http://donald-draper.iteye.com/blog/2343083
MySQL ServerPreparedStatement查询:http://donald-draper.iteye.com/blog/2343124
在前几篇文章中,我们mysqlSql connection的获取,在ConnectionImp构造中有这么一个函数为createNewI,下面我们来看看,这函数做了什么?
创建MysqlIO
//MysqlIO
//创建socket的工场
socketFactory = createSocketFactory();
这个socketFactoryClassName是从何来呢?这个在ConnectionPropertiesImpl中
//ConnectionPropertiesImpl
从ConnectionPropertiesImpl可以看出,socket的工厂类其实为StandardSocketFactory
//从socket的工场获取连接
//StandardSocketFactory
//握手前
mysqlConnection = socketFactory.beforeHandshake();
//StandardSocketFactory
//返回初始化socket
//握手后
从上面可以看出,createNewIO主要所做的工作就是初始化MysqlIO,在构造MysqlIO时,首先
创建socket的工场类,实际为StandardSocketFactory,并从StandardSocketFactory获取socket的连接,同时初始化MysqlIO的输入流与输出流,MysqlIO在Statement执行时,将sql命令发送到
Server。
//MysqlIO握手
MysqlIO握手,主要做的工作是,发送用户,密码,数据库等信息。
总结:
createNewIO主要所做的工作就是初始化MysqlIO,在构造MysqlIO时,首先创建socket的工场类,实际为StandardSocketFactory,并从StandardSocketFactory获取socket的连接,同时初始化MysqlIO的输入流与输出流,MysqlIO在Statement执行时,将sql命令发送到
Server。MysqlIO握手,主要做的工作是,发送用户,密码,数据库等信息。
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
Mysql负载均衡连接的获取:http://donald-draper.iteye.com/blog/2342089
Mysql主从复制读写分离连接的获取:http://donald-draper.iteye.com/blog/2342108
ConnectionImp创建MysqlIO :http://donald-draper.iteye.com/blog/2342959
Mysql预编译SQL:http://donald-draper.iteye.com/blog/2342960
MysqlSQL PreparedStatement的查询:http://donald-draper.iteye.com/blog/2343083
MySQL ServerPreparedStatement查询:http://donald-draper.iteye.com/blog/2343124
在前几篇文章中,我们mysqlSql connection的获取,在ConnectionImp构造中有这么一个函数为createNewI,下面我们来看看,这函数做了什么?
创建MysqlIO
protected void createNewIO(boolean isForReconnect) throws SQLException { Object obj = mutex; JVM INSTR monitorenter ; Properties mergedProps; long queriesIssuedFailedOverCopy; mergedProps = exposeAsProperties(props); queriesIssuedFailedOverCopy = queriesIssuedFailedOver; queriesIssuedFailedOver = 0L; //对于Standyalone Sever情况 if(!getHighAvailability() && !failedOver) { boolean connectionGood = false; Exception connectionNotEstablishedBecause = null; int hostIndex = 0; if(getRoundRobinLoadBalance()) hostIndex = getNextRoundRobinHostIndex(getURL(), hostList); while(hostIndex < hostListSize) { if(hostIndex == 0) hasTriedMasterFlag = true; try { String newHostPortPair = (String)hostList.get(hostIndex); int newPort = 3306; String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(newHostPortPair); String newHost = hostPortPair[0]; if(newHost == null || StringUtils.isEmptyOrWhitespaceOnly(newHost)) newHost = "localhost"; if(hostPortPair[1] != null) try { newPort = Integer.parseInt(hostPortPair[1]); } catch(NumberFormatException nfe) { throw SQLError.createSQLException("Illegal connection port value '" + hostPortPair[1] + "'", "01S00", getExceptionInterceptor()); } //创建MysqlIO io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout(), largeRowSizeThreshold.getValueAsInt()); //握手 io.doHandshake(user, password, database); //获取io的线程id connectionId = io.getThreadId(); isClosed = false; boolean oldAutoCommit = getAutoCommit();//AutoCommit int oldIsolationLevel = isolationLevel;//事务隔离级别 boolean oldReadOnly = isReadOnly();//ReadOnly String oldCatalog = getCatalog(); initializePropsFromServer(); if(isForReconnect) { setAutoCommit(oldAutoCommit); if(hasIsolationLevels) setTransactionIsolation(oldIsolationLevel); setCatalog(oldCatalog); } if(hostIndex != 0) { setFailedOverState(); queriesIssuedFailedOverCopy = 0L; } else { failedOver = false; queriesIssuedFailedOverCopy = 0L; if(hostListSize > 1) setReadOnlyInternal(false); else setReadOnlyInternal(oldReadOnly); } connectionGood = true; break; } catch(Exception EEE) { if(io != null) io.forceClose(); connectionNotEstablishedBecause = EEE; connectionGood = false; if(EEE instanceof SQLException) { SQLException sqlEx = (SQLException)EEE; String sqlState = sqlEx.getSQLState(); if(sqlState == null || !sqlState.equals("08S01")) throw sqlEx; } if(getRoundRobinLoadBalance()) { hostIndex = getNextRoundRobinHostIndex(getURL(), hostList) - 1; continue; } if(hostListSize - 1 == hostIndex) throw SQLError.createCommunicationsException(this, io == null ? 0L : io.getLastPacketSentTimeMs(), io == null ? 0L : io.getLastPacketReceivedTimeMs(), EEE, getExceptionInterceptor()); hostIndex++; } } if(!connectionGood) { SQLException chainedEx = SQLError.createSQLException(Messages.getString("Connection.UnableToConnect"), "08001", getExceptionInterceptor()); chainedEx.initCause(connectionNotEstablishedBecause); throw chainedEx; } } else { //对于负载均衡集群 double timeout = getInitialTimeout(); boolean connectionGood = false; Exception connectionException = null; int hostIndex = 0; if(getRoundRobinLoadBalance()) hostIndex = getNextRoundRobinHostIndex(getURL(), hostList); for(; hostIndex < hostListSize && !connectionGood; hostIndex++) { if(hostIndex == 0) hasTriedMasterFlag = true; if(preferSlaveDuringFailover && hostIndex == 0) hostIndex++; for(int attemptCount = 0; attemptCount < getMaxReconnects() && !connectionGood; attemptCount++) { try { if(io != null) io.forceClose(); String newHostPortPair = (String)hostList.get(hostIndex); int newPort = 3306; String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(newHostPortPair); String newHost = hostPortPair[0]; if(newHost == null || StringUtils.isEmptyOrWhitespaceOnly(newHost)) newHost = "localhost"; if(hostPortPair[1] != null) try { newPort = Integer.parseInt(hostPortPair[1]); } catch(NumberFormatException nfe) { throw SQLError.createSQLException("Illegal connection port value '" + hostPortPair[1] + "'", "01S00", getExceptionInterceptor()); } //创建MysqlIO io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout(), largeRowSizeThreshold.getValueAsInt()); //握手 io.doHandshake(user, password, database); pingInternal(false, 0); //获取io的线程id connectionId = io.getThreadId(); isClosed = false; boolean oldAutoCommit = getAutoCommit();//AutoCommit int oldIsolationLevel = isolationLevel;//事务级别 boolean oldReadOnly = isReadOnly();//ReadOnly String oldCatalog = getCatalog(); initializePropsFromServer(); if(isForReconnect) { setAutoCommit(oldAutoCommit); if(hasIsolationLevels) setTransactionIsolation(oldIsolationLevel); setCatalog(oldCatalog); } connectionGood = true; if(hostIndex != 0) { setFailedOverState(); queriesIssuedFailedOverCopy = 0L; break; } failedOver = false; queriesIssuedFailedOverCopy = 0L; if(hostListSize > 1) setReadOnlyInternal(false); else setReadOnlyInternal(oldReadOnly); break; } catch(Exception EEE) { connectionException = EEE; } connectionGood = false; if(getRoundRobinLoadBalance()) hostIndex = getNextRoundRobinHostIndex(getURL(), hostList) - 1; if(connectionGood) break; if(attemptCount <= 0) continue; try { Thread.sleep((long)timeout * 1000L); } catch(InterruptedException IE) { } } } } }
//MysqlIO
class MysqlIO { private static final int UTF8_CHARSET_INDEX = 33; private static final String CODE_PAGE_1252 = "Cp1252"; protected static final int NULL_LENGTH = -1; protected static final int COMP_HEADER_LENGTH = 3; protected static final int MIN_COMPRESS_LEN = 50; protected static final int HEADER_LENGTH = 4; protected static final int AUTH_411_OVERHEAD = 33; private static int maxBufferSize = 65535; private static final int CLIENT_COMPRESS = 32; protected static final int CLIENT_CONNECT_WITH_DB = 8; private static final int CLIENT_FOUND_ROWS = 2; private static final int CLIENT_LOCAL_FILES = 128; private static final int CLIENT_LONG_FLAG = 4; private static final int CLIENT_LONG_PASSWORD = 1; private static final int CLIENT_PROTOCOL_41 = 512; private static final int CLIENT_INTERACTIVE = 1024; protected static final int CLIENT_SSL = 2048; private static final int CLIENT_TRANSACTIONS = 8192; protected static final int CLIENT_RESERVED = 16384; protected static final int CLIENT_SECURE_CONNECTION = 32768; private static final int CLIENT_MULTI_QUERIES = 65536; private static final int CLIENT_MULTI_RESULTS = 131072; private static final int SERVER_STATUS_IN_TRANS = 1; private static final int SERVER_STATUS_AUTOCOMMIT = 2; static final int SERVER_MORE_RESULTS_EXISTS = 8; private static final int SERVER_QUERY_NO_GOOD_INDEX_USED = 16; private static final int SERVER_QUERY_NO_INDEX_USED = 32; private static final int SERVER_QUERY_WAS_SLOW = 2048; private static final int SERVER_STATUS_CURSOR_EXISTS = 64; private static final String FALSE_SCRAMBLE = "xxxxxxxx"; protected static final int MAX_QUERY_SIZE_TO_LOG = 1024; protected static final int MAX_QUERY_SIZE_TO_EXPLAIN = 1048576; protected static final int INITIAL_PACKET_SIZE = 1024; private static String jvmPlatformCharset = null; protected static final String ZERO_DATE_VALUE_MARKER = "0000-00-00"; protected static final String ZERO_DATETIME_VALUE_MARKER = "0000-00-00 00:00:00"; private static final int MAX_PACKET_DUMP_LENGTH = 1024; private boolean packetSequenceReset; protected int serverCharsetIndex; private Buffer reusablePacket; private Buffer sendPacket; private Buffer sharedSendPacket; protected BufferedOutputStream mysqlOutput; protected ConnectionImpl connection;//Mysql connection private Deflater deflater; protected InputStream mysqlInput;//mysql输入流 private LinkedList packetDebugRingBuffer; private RowData streamingData; protected Socket mysqlConnection;//mysql socket private SocketFactory socketFactory;// mysql socket的工场 private SoftReference loadFileBufRef; private SoftReference splitBufRef; protected String host;//host protected String seed; private String serverVersion; private String socketFactoryClassName; private byte packetHeaderBuf[]; private boolean colDecimalNeedsBump; private boolean hadWarnings; private boolean has41NewNewProt; private boolean hasLongColumnInfo; private boolean isInteractiveClient; private boolean logSlowQueries; private boolean platformDbCharsetMatches; private boolean profileSql; private boolean queryBadIndexUsed; private boolean queryNoIndexUsed; private boolean serverQueryWasSlow; private boolean use41Extensions; private boolean useCompression; private boolean useNewLargePackets; private boolean useNewUpdateCounts; private byte packetSequence; private byte readPacketSequence; private boolean checkPacketSequence; private byte protocolVersion; private int maxAllowedPacket; protected int maxThreeBytes; protected int port; protected int serverCapabilities; private int serverMajorVersion; private int serverMinorVersion; private int oldServerStatus; private int serverStatus; private int serverSubMinorVersion; private int warningCount; protected long clientParam; protected long lastPacketSentTimeMs; protected long lastPacketReceivedTimeMs; private boolean traceProtocol; private boolean enablePacketDebug; private Calendar sessionCalendar; private boolean useConnectWithDb; private boolean needToGrabQueryFromPacket; private boolean autoGenerateTestcaseScript; private long threadId; private boolean useNanosForElapsedTime; private long slowQueryThreshold; private String queryTimingUnits; private boolean useDirectRowUnpack; private int useBufferRowSizeThreshold; private int commandCount; private List statementInterceptors; private ExceptionInterceptor exceptionInterceptor; private int statementExecutionDepth; private boolean useAutoSlowLog; static { OutputStreamWriter outWriter = null; try { outWriter = new OutputStreamWriter(new ByteArrayOutputStream()); jvmPlatformCharset = outWriter.getEncoding(); } finally { try { if(outWriter != null) outWriter.close(); } catch(IOException ioEx) { } } } //MysqlIO构造 public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, ConnectionImpl conn, int socketTimeout, int useBufferRowSizeThreshold) throws IOException, SQLException { packetSequenceReset = false; reusablePacket = null; sendPacket = null; sharedSendPacket = null; mysqlOutput = null; deflater = null; mysqlInput = null; packetDebugRingBuffer = null; streamingData = null; mysqlConnection = null; socketFactory = null; this.host = null; serverVersion = null; this.socketFactoryClassName = null; packetHeaderBuf = new byte[4]; colDecimalNeedsBump = false; hadWarnings = false; has41NewNewProt = false; hasLongColumnInfo = false; isInteractiveClient = false; logSlowQueries = false; platformDbCharsetMatches = true; profileSql = false; queryBadIndexUsed = false; queryNoIndexUsed = false; serverQueryWasSlow = false; use41Extensions = false; useCompression = false; useNewLargePackets = false; useNewUpdateCounts = false; packetSequence = 0; readPacketSequence = -1; checkPacketSequence = false; protocolVersion = 0; maxAllowedPacket = 1048576; maxThreeBytes = 16581375; this.port = 3306; serverMajorVersion = 0; serverMinorVersion = 0; oldServerStatus = 0; serverStatus = 0; serverSubMinorVersion = 0; warningCount = 0; clientParam = 0L; lastPacketSentTimeMs = 0L; lastPacketReceivedTimeMs = 0L; traceProtocol = false; enablePacketDebug = false; useDirectRowUnpack = true; commandCount = 0; statementExecutionDepth = 0; connection = conn; if(connection.getEnablePacketDebug()) packetDebugRingBuffer = new LinkedList(); traceProtocol = connection.getTraceProtocol(); useAutoSlowLog = connection.getAutoSlowLog(); this.useBufferRowSizeThreshold = useBufferRowSizeThreshold; useDirectRowUnpack = connection.getUseDirectRowUnpack(); logSlowQueries = connection.getLogSlowQueries(); reusablePacket = new Buffer(1024); sendPacket = new Buffer(1024); this.port = port; this.host = host; this.socketFactoryClassName = socketFactoryClassName; //创建socket的工场 socketFactory = createSocketFactory(); exceptionInterceptor = connection.getExceptionInterceptor(); try { //从socket的工场获取连接 mysqlConnection = socketFactory.connect(this.host, this.port, props); if(socketTimeout != 0) try { mysqlConnection.setSoTimeout(socketTimeout); } catch(Exception ex) { } //握手前 mysqlConnection = socketFactory.beforeHandshake(); if(connection.getUseReadAheadInput()) mysqlInput = new ReadAheadInputStream(mysqlConnection.getInputStream(), 16384, connection.getTraceProtocol(), connection.getLog()); else if(connection.useUnbufferedInput()) //从mysqlConnection获取输入流 mysqlInput = mysqlConnection.getInputStream(); else mysqlInput = new BufferedInputStream(mysqlConnection.getInputStream(), 16384); //初始化mysqlOutput输出流 mysqlOutput = new BufferedOutputStream(mysqlConnection.getOutputStream(), 16384); isInteractiveClient = connection.getInteractiveClient(); profileSql = connection.getProfileSql(); sessionCalendar = Calendar.getInstance(); autoGenerateTestcaseScript = connection.getAutoGenerateTestcaseScript(); needToGrabQueryFromPacket = profileSql || logSlowQueries || autoGenerateTestcaseScript; if(connection.getUseNanosForElapsedTime() && Util.nanoTimeAvailable()) { useNanosForElapsedTime = true; queryTimingUnits = Messages.getString("Nanoseconds"); } else { queryTimingUnits = Messages.getString("Milliseconds"); } if(connection.getLogSlowQueries()) calculateSlowQueryThreshold(); } catch(IOException ioEx) { throw SQLError.createCommunicationsException(connection, 0L, 0L, ioEx, getExceptionInterceptor()); } } }
//创建socket的工场
socketFactory = createSocketFactory();
private SocketFactory createSocketFactory() throws SQLException { if(socketFactoryClassName == null) throw SQLError.createSQLException(Messages.getString("MysqlIO.75"), "08001", getExceptionInterceptor()); return (SocketFactory)Class.forName(socketFactoryClassName).newInstance(); Exception ex; ex; SQLException sqlEx = SQLError.createSQLException(Messages.getString("MysqlIO.76") + socketFactoryClassName + Messages.getString("MysqlIO.77"), "08001", getExceptionInterceptor()); sqlEx.initCause(ex); throw sqlEx; }
这个socketFactoryClassName是从何来呢?这个在ConnectionPropertiesImpl中
//ConnectionPropertiesImpl
public class ConnectionPropertiesImpl implements Serializable, ConnectionProperties { public ConnectionPropertiesImpl() { //socket的工场类 socketFactoryClassName = new StringConnectionProperty("socketFactory", (com.mysql.jdbc.StandardSocketFactory.class).getName(), Messages.getString("ConnectionProperties.socketFactory"), "3.0.3", CONNECTION_AND_AUTH_CATEGORY, 4); } }
从ConnectionPropertiesImpl可以看出,socket的工厂类其实为StandardSocketFactory
//从socket的工场获取连接
mysqlConnection = socketFactory.connect(this.host, this.port, props);
//StandardSocketFactory
public class StandardSocketFactory implements SocketFactory { public static final String TCP_NO_DELAY_PROPERTY_NAME = "tcpNoDelay"; public static final String TCP_KEEP_ALIVE_DEFAULT_VALUE = "true"; public static final String TCP_KEEP_ALIVE_PROPERTY_NAME = "tcpKeepAlive"; public static final String TCP_RCV_BUF_PROPERTY_NAME = "tcpRcvBuf"; public static final String TCP_SND_BUF_PROPERTY_NAME = "tcpSndBuf"; public static final String TCP_TRAFFIC_CLASS_PROPERTY_NAME = "tcpTrafficClass"; public static final String TCP_RCV_BUF_DEFAULT_VALUE = "0"; public static final String TCP_SND_BUF_DEFAULT_VALUE = "0"; public static final String TCP_TRAFFIC_CLASS_DEFAULT_VALUE = "0"; public static final String TCP_NO_DELAY_DEFAULT_VALUE = "true"; private static Method setTraficClassMethod; protected String host; protected int port; protected Socket rawSocket; static { try { setTraficClassMethod = (java.net.Socket.class).getMethod("setTrafficClass", new Class[] { Integer.TYPE }); } catch(SecurityException e) { setTraficClassMethod = null; } catch(NoSuchMethodException e) { setTraficClassMethod = null; } } //socket的工场构造 public StandardSocketFactory() { host = null; port = 3306; rawSocket = null; } //获取socket public Socket connect(String hostname, int portNumber, Properties props) throws SocketException, IOException { if(props != null) { host = hostname; port = portNumber; Method connectWithTimeoutMethod = null; Method socketBindMethod = null; Class socketAddressClass = null; String localSocketHostname = props.getProperty("localSocketAddress"); String connectTimeoutStr = props.getProperty("connectTimeout"); int connectTimeout = 0; boolean wantsTimeout = connectTimeoutStr != null && connectTimeoutStr.length() > 0 && !connectTimeoutStr.equals("0"); boolean wantsLocalBind = localSocketHostname != null && localSocketHostname.length() > 0; boolean needsConfigurationBeforeConnect = socketNeedsConfigurationBeforeConnect(props); if(wantsTimeout || wantsLocalBind || needsConfigurationBeforeConnect) { if(connectTimeoutStr != null) try { connectTimeout = Integer.parseInt(connectTimeoutStr); } catch(NumberFormatException nfe) { throw new SocketException("Illegal value '" + connectTimeoutStr + "' for connectTimeout"); } try { socketAddressClass = Class.forName("java.net.SocketAddress"); //获取socket的connect方法 connectWithTimeoutMethod = (java.net.Socket.class).getMethod("connect", new Class[] { socketAddressClass, Integer.TYPE }); //获取socket的bing方法 socketBindMethod = (java.net.Socket.class).getMethod("bind", new Class[] { socketAddressClass }); } catch(NoClassDefFoundError noClassDefFound) { } catch(NoSuchMethodException noSuchMethodEx) { } catch(Throwable catchAll) { } if(wantsLocalBind && socketBindMethod == null) throw new SocketException("Can't specify \"localSocketAddress\" on JVMs older than 1.4"); if(wantsTimeout && connectWithTimeoutMethod == null) throw new SocketException("Can't specify \"connectTimeout\" on JVMs older than 1.4"); } if(host != null) { //当host地址存在,同时不需要本地bind if(!wantsLocalBind && !wantsTimeout && !needsConfigurationBeforeConnect) { InetAddress possibleAddresses[] = InetAddress.getAllByName(host); Throwable caughtWhileConnecting = null; int i = 0; do { if(i >= possibleAddresses.length) break; try { rawSocket = new Socket(possibleAddresses[i], port); //配置socket configureSocket(rawSocket, props); break; } catch(Exception ex) { caughtWhileConnecting = ex; i++; } } while(true); if(rawSocket == null) unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } else { //如果需要本地绑定端口 try { InetAddress possibleAddresses[] = InetAddress.getAllByName(host); Throwable caughtWhileConnecting = null; Object localSockAddr = null; Class inetSocketAddressClass = null; Constructor addrConstructor = null; try { inetSocketAddressClass = Class.forName("java.net.InetSocketAddress"); //获取InetSocketAddress的构造函数 addrConstructor = inetSocketAddressClass.getConstructor(new Class[] { java.net.InetAddress.class, Integer.TYPE }); if(wantsLocalBind) //如果需要绑定端口,构造socket地址 localSockAddr = addrConstructor.newInstance(new Object[] { InetAddress.getByName(localSocketHostname), new Integer(0) }); } catch(Throwable ex) { unwrapExceptionToProperClassAndThrowIt(ex); } int i = 0; do { if(i >= possibleAddresses.length) break; try { rawSocket = new Socket(); //配置socket configureSocket(rawSocket, props); //创建InetSocketAddress Object sockAddr = addrConstructor.newInstance(new Object[] { possibleAddresses[i], new Integer(port) }); //socket地址绑定 socketBindMethod.invoke(rawSocket, new Object[] { localSockAddr }); //连接server connectWithTimeoutMethod.invoke(rawSocket, new Object[] { sockAddr, new Integer(connectTimeout) }); break; } catch(Exception ex) { rawSocket = null; caughtWhileConnecting = ex; i++; } } while(true); if(rawSocket == null) unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting); } catch(Throwable t) { unwrapExceptionToProperClassAndThrowIt(t); } } return rawSocket; } } throw new SocketException("Unable to create socket"); } //配置socket private void configureSocket(Socket sock, Properties props) throws SocketException, IOException { try { //设置Tcp无延时属性 sock.setTcpNoDelay(Boolean.valueOf(props.getProperty("tcpNoDelay", "true")).booleanValue()); String keepAlive = props.getProperty("tcpKeepAlive", "true"); if(keepAlive != null && keepAlive.length() > 0) //设置是否心跳检查 sock.setKeepAlive(Boolean.valueOf(keepAlive).booleanValue()); int receiveBufferSize = Integer.parseInt(props.getProperty("tcpRcvBuf", "0")); if(receiveBufferSize > 0) //设置接收缓冲区大小 sock.setReceiveBufferSize(receiveBufferSize); int sendBufferSize = Integer.parseInt(props.getProperty("tcpSndBuf", "0")); if(sendBufferSize > 0) //设置发送缓冲区大小 sock.setSendBufferSize(sendBufferSize); int trafficClass = Integer.parseInt(props.getProperty("tcpTrafficClass", "0")); if(trafficClass > 0 && setTraficClassMethod != null) //冲突解决 setTraficClassMethod.invoke(sock, new Object[] { new Integer(trafficClass) }); } catch(Throwable t) { unwrapExceptionToProperClassAndThrowIt(t); } } }
//握手前
mysqlConnection = socketFactory.beforeHandshake();
//StandardSocketFactory
//返回初始化socket
public Socket beforeHandshake() throws SocketException, IOException { return rawSocket; }
//握手后
public Socket afterHandshake() throws SocketException, IOException { return rawSocket; }
从上面可以看出,createNewIO主要所做的工作就是初始化MysqlIO,在构造MysqlIO时,首先
创建socket的工场类,实际为StandardSocketFactory,并从StandardSocketFactory获取socket的连接,同时初始化MysqlIO的输入流与输出流,MysqlIO在Statement执行时,将sql命令发送到
Server。
//MysqlIO握手
void doHandshake(String user, String password, String database) throws SQLException { checkPacketSequence = false; readPacketSequence = 0; Buffer buf = readPacket(); protocolVersion = buf.readByte(); if(protocolVersion == -1) { try { mysqlConnection.close(); } catch(Exception e) { } int errno = 2000; errno = buf.readInt(); String serverErrorMessage = buf.readString("ASCII", getExceptionInterceptor()); StringBuffer errorBuf = new StringBuffer(Messages.getString("MysqlIO.10")); errorBuf.append(serverErrorMessage); errorBuf.append("\""); String xOpen = SQLError.mysqlToSqlState(errno, connection.getUseSqlStateCodes()); throw SQLError.createSQLException(SQLError.get(xOpen) + ", " + errorBuf.toString(), xOpen, errno, getExceptionInterceptor()); } //版本信息 serverVersion = buf.readString("ASCII", getExceptionInterceptor()); int point = serverVersion.indexOf('.'); if(point != -1) { try { int n = Integer.parseInt(serverVersion.substring(0, point)); serverMajorVersion = n; } catch(NumberFormatException NFE1) { } String remaining = serverVersion.substring(point + 1, serverVersion.length()); point = remaining.indexOf('.'); if(point != -1) { try { int n = Integer.parseInt(remaining.substring(0, point)); serverMinorVersion = n; } catch(NumberFormatException nfe) { } remaining = remaining.substring(point + 1, remaining.length()); int pos; for(pos = 0; pos < remaining.length() && remaining.charAt(pos) >= '0' && remaining.charAt(pos) <= '9'; pos++); try { int n = Integer.parseInt(remaining.substring(0, pos)); serverSubMinorVersion = n; } catch(NumberFormatException nfe) { } } } if(versionMeetsMinimum(4, 0, 8)) { maxThreeBytes = 16777215; useNewLargePackets = true; } else { maxThreeBytes = 16581375; useNewLargePackets = false; } colDecimalNeedsBump = versionMeetsMinimum(3, 23, 0); colDecimalNeedsBump = !versionMeetsMinimum(3, 23, 15); useNewUpdateCounts = versionMeetsMinimum(3, 22, 5); threadId = buf.readLong(); seed = buf.readString("ASCII", getExceptionInterceptor()); serverCapabilities = 0; if(buf.getPosition() < buf.getBufLength()) serverCapabilities = buf.readInt(); if(versionMeetsMinimum(4, 1, 1)) { int position = buf.getPosition(); serverCharsetIndex = buf.readByte() & 255; serverStatus = buf.readInt(); checkTransactionState(0); buf.setPosition(position + 16); String seedPart2 = buf.readString("ASCII", getExceptionInterceptor()); StringBuffer newSeed = new StringBuffer(20); newSeed.append(seed); newSeed.append(seedPart2); seed = newSeed.toString(); } if((serverCapabilities & 32) != 0 && connection.getUseCompression()) clientParam |= 32L; //DB信息 useConnectWithDb = database != null && database.length() > 0 && !connection.getCreateDatabaseIfNotExist(); if(useConnectWithDb) clientParam |= 8L; if((serverCapabilities & 2048) == 0 && connection.getUseSSL()) { if(connection.getRequireSSL()) { connection.close(); forceClose(); throw SQLError.createSQLException(Messages.getString("MysqlIO.15"), "08001", getExceptionInterceptor()); } connection.setUseSSL(false); } if((serverCapabilities & 4) != 0) { clientParam |= 4L; hasLongColumnInfo = true; } if(!connection.getUseAffectedRows()) clientParam |= 2L; if(connection.getAllowLoadLocalInfile()) clientParam |= 128L; if(isInteractiveClient) clientParam |= 1024L; if(protocolVersion > 9) clientParam |= 1L; else clientParam &= -2L; if(versionMeetsMinimum(4, 1, 0)) { if(versionMeetsMinimum(4, 1, 1)) { clientParam |= 512L; has41NewNewProt = true; clientParam |= 8192L; clientParam |= 131072L; if(connection.getAllowMultiQueries()) clientParam |= 65536L; } else { clientParam |= 16384L; has41NewNewProt = false; } use41Extensions = true; } int passwordLength = 16; int userLength = user == null ? 0 : user.length(); int databaseLength = database == null ? 0 : database.length(); int packLength = (userLength + passwordLength + databaseLength) * 2 + 7 + 4 + 33; Buffer packet = null; //SSL if(!connection.getUseSSL()) { if((serverCapabilities & 32768) != 0) { clientParam |= 32768L; //安全验证 if(versionMeetsMinimum(4, 1, 1)) secureAuth411(null, packLength, user, password, database, true); else secureAuth(null, packLength, user, password, database, true); } else { packet = new Buffer(packLength); if((clientParam & 16384L) != 0L) { if(versionMeetsMinimum(4, 1, 1)) { packet.writeLong(clientParam); packet.writeLong(maxThreeBytes); packet.writeByte((byte)8); packet.writeBytesNoNull(new byte[23]); } else { packet.writeLong(clientParam); packet.writeLong(maxThreeBytes); } } else { packet.writeInt((int)clientParam); packet.writeLongInt(maxThreeBytes); } packet.writeString(user, "Cp1252", connection); if(protocolVersion > 9) packet.writeString(Util.newCrypt(password, seed), "Cp1252", connection); else packet.writeString(Util.oldCrypt(password, seed), "Cp1252", connection); if(useConnectWithDb) packet.writeString(database, "Cp1252", connection); send(packet, packet.getPosition()); } } else { //无SSL连接 negotiateSSLConnection(user, password, database, packLength); } if(!versionMeetsMinimum(4, 1, 1)) checkErrorPacket(); if((serverCapabilities & 32) != 0 && connection.getUseCompression()) { deflater = new Deflater(); useCompression = true; mysqlInput = new CompressedInputStream(connection, mysqlInput); } if(!useConnectWithDb) changeDatabaseTo(database); try { //握手完毕 mysqlConnection = socketFactory.afterHandshake(); } catch(IOException ioEx) { throw SQLError.createCommunicationsException(connection, lastPacketSentTimeMs, lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor()); } } //无SSL连接 private void negotiateSSLConnection(String user, String password, String database, int packLength) throws SQLException { if(!ExportControlled.enabled()) throw new ConnectionFeatureNotAvailableException(connection, lastPacketSentTimeMs, null); boolean doSecureAuth = false; if((serverCapabilities & 32768) != 0) { clientParam |= 32768L; doSecureAuth = true; } clientParam |= 2048L; Buffer packet = new Buffer(packLength); if(use41Extensions) packet.writeLong(clientParam); else packet.writeInt((int)clientParam); send(packet, packet.getPosition()); ExportControlled.transformSocketToSSLSocket(this); packet.clear(); if(doSecureAuth) { if(versionMeetsMinimum(4, 1, 1)) secureAuth411(null, packLength, user, password, database, true); else secureAuth411(null, packLength, user, password, database, true); } else { if(use41Extensions) { packet.writeLong(clientParam); packet.writeLong(maxThreeBytes); } else { packet.writeInt((int)clientParam); packet.writeLongInt(maxThreeBytes); } packet.writeString(user); if(protocolVersion > 9) packet.writeString(Util.newCrypt(password, seed)); else packet.writeString(Util.oldCrypt(password, seed)); if((serverCapabilities & 8) != 0 && database != null && database.length() > 0) packet.writeString(database); //发送数据包 send(packet, packet.getPosition()); } } //发送sql Packet private final void sendSplitPackets(Buffer packet) throws SQLException { try { Buffer headerPacket = splitBufRef != null ? (Buffer)splitBufRef.get() : null; if(headerPacket == null) { headerPacket = new Buffer(maxThreeBytes + 4); splitBufRef = new SoftReference(headerPacket); } int len = packet.getPosition(); int splitSize = maxThreeBytes; int originalPacketPos = 4; byte origPacketBytes[] = packet.getByteBuffer(); byte headerPacketBytes[] = headerPacket.getByteBuffer(); int packetLen; for(; len >= maxThreeBytes; len -= splitSize) { packetSequence++; headerPacket.setPosition(0); headerPacket.writeLongInt(splitSize); headerPacket.writeByte(packetSequence); System.arraycopy(origPacketBytes, originalPacketPos, headerPacketBytes, 4, splitSize); packetLen = splitSize + 4; if(!useCompression) { mysqlOutput.write(headerPacketBytes, 0, splitSize + 4); mysqlOutput.flush(); } else { headerPacket.setPosition(0); Buffer packetToSend = compressPacket(headerPacket, 4, splitSize, 4); packetLen = packetToSend.getPosition(); mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen); mysqlOutput.flush(); } originalPacketPos += splitSize; } headerPacket.clear(); headerPacket.setPosition(0); headerPacket.writeLongInt(len - 4); packetSequence++; headerPacket.writeByte(packetSequence); if(len != 0) System.arraycopy(origPacketBytes, originalPacketPos, headerPacketBytes, 4, len - 4); packetLen = len - 4; if(!useCompression) { //将数据包写入输出流 protected BufferedOutputStream mysqlOutput; mysqlOutput.write(headerPacket.getByteBuffer(), 0, len); mysqlOutput.flush(); } else { headerPacket.setPosition(0); Buffer packetToSend = compressPacket(headerPacket, 4, packetLen, 4); packetLen = packetToSend.getPosition(); mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen); mysqlOutput.flush(); } } catch(IOException ioEx) { throw SQLError.createCommunicationsException(connection, lastPacketSentTimeMs, lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor()); } }
MysqlIO握手,主要做的工作是,发送用户,密码,数据库等信息。
总结:
createNewIO主要所做的工作就是初始化MysqlIO,在构造MysqlIO时,首先创建socket的工场类,实际为StandardSocketFactory,并从StandardSocketFactory获取socket的连接,同时初始化MysqlIO的输入流与输出流,MysqlIO在Statement执行时,将sql命令发送到
Server。MysqlIO握手,主要做的工作是,发送用户,密码,数据库等信息。
发表评论
-
Mysql PreparedStatement 批处理
2016-12-06 18:09 1384JDBC驱动初始化-Mysql:http://donald-d ... -
MySQL ServerPreparedStatement查询
2016-12-06 14:42 1291JDBC驱动初始化-Mysql:http://donald-d ... -
MysqlSQL PreparedStatement的查询
2016-12-06 11:40 2028JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql预编译SQL
2016-12-05 19:06 1159JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql主从复制读写分离连接的获取
2016-12-01 08:43 1161JDBC驱动初始化-Mysql:http://donald-d ... -
Mysql负载均衡连接的获取
2016-12-01 08:38 1323Java动态代理:http://www.cnblogs.com ... -
JDBC连接的获取
2016-11-30 12:44 2932JDBC驱动初始化-Mysql:http://donald-d ... -
JDBC驱动初始化-Mysql
2016-11-30 12:41 3049JDBC驱动初始化-Mysql:http://donald-d ... -
mysql 存储过程
2016-07-17 14:49 898存储过程基础:http://sishuok.com/forum ... -
java.sql.Date,java.util.Date,java.sql.Timestamp的区别
2016-07-17 11:50 878java.sql.Date,jdbc映射数据库中的date类型 ... -
JDBC PreparedStatement 的用法
2016-07-15 17:27 892import java.sql.Connection; im ...
相关推荐
### 在Qt中创建MySQL数据库 #### 一、简介与背景 在现代软件开发中,图形用户界面(GUI)的应用程序非常普遍,而Qt作为一个跨平台的C++图形用户界面应用程序开发框架,在桌面应用开发领域拥有广泛的应用。对于需要...
在dos下创建mysql服务,mysql不同版本都可以实现,按照命令执行即可
通过shell脚本来创建MySQL数据库,不仅可以提高工作效率,还能确保操作的一致性和可重复性。下面将详细介绍如何利用shell脚本来创建MySQL数据库,并结合提供的文件进行解析。 首先,我们需要理解shell脚本的基本...
本资源分享的是如何根据Java实体类自动创建MySQL数据库表,下面将详细介绍这一过程。 首先,我们需要一个ORM框架,例如Hibernate或MyBatis。这里以广泛使用的Hibernate为例。Hibernate是一个强大的Java持久化框架,...
py文件中需要手动设置excel字段在mysql中的类型、index索引及写入时校验的字段。(搜索*查找对应的位置) 执行py文件,若不存在数据库及表会自动创建,并写入数据(对于指定字段重复的不会写入)
这里使用的是mysql Ver 14.14 Distrib 5.6.19, for Linux (i686) using EditLine wrapper 一、mysql目录文件 ibdata1:系统表空间 包含数据字典、回滚日志/undolog等 (insert buffer segment/double write ...
用 Python 和 MySQLdb 创建 MySQL 数据库,可以根据自己的需要再添加字段和按需赋值。
本教程将详细介绍如何在MSSQL数据库中创建到MySQL的跨数据库数据同步。 首先,我们需要了解`dblink`的概念。在MSSQL中,`dblink`(数据库链接)是一种机制,允许用户在不同的数据库服务器之间建立连接,从而实现跨...
MySQL 数据库视图创建与管理 在 MySQL 中,视图(View)是一种虚拟表,它是基于一个或多个表的数据计算生成的结果集。视图可以简化复杂的查询、隐藏复杂的查询逻辑、提高数据安全性和简化数据管理。下面是创建 ...
本教程将详细介绍如何使用C#与Mysql进行交互,包括创建数据库、数据表以及执行基本的CRUD(Create、Read、Update、Delete)操作。 首先,我们需要在C#项目中引入Mysql的数据提供者——`MySql.Data.dll`。这个库允许...
### MySQL 数据库连接与 IO 流操作 #### 一、概述 本文将详细介绍如何通过 Java 程序实现与 MySQL 数据库的连接,并利用输入输出流(IO 流)进行图片数据的读写操作。本示例适用于需要在数据库中存储和检索二进制...
MySQL 创建数据源 MySQL 是一个流行的开源关系数据库管理系统,它提供了强大的数据存储和管理功能。创建 MySQL 数据源是连接 MySQL 数据库的重要步骤,本文将详细介绍如何创建 MySQL 数据源。 安装 MySQL 在创建...
MySQL服务器的IO 100%问题通常是一个严重的情况,因为它直接影响到数据库的性能和响应时间,可能导致系统整体效率下降,甚至服务中断。本篇文章主要探讨了如何分析和优化MySQL服务器遇到的IO瓶颈,特别是针对高写入...
PHP 连接 MYSQL 数据库、创建数据库、创建表教程 PHP 是一种广泛使用的服务器端脚本语言,经常与 MYSQL 数据库结合使用。本文档将详细介绍如何通过 PHP 连接 MYSQL 数据库、创建数据库和创建表。 一、PHP 连接 ...
VTK 创建mysql数据库 完整可编译运行的实例
本文实例讲述了mysql触发器之创建多个触发器操作。分享给大家供大家参考,具体如下: 这次记录的内容mysql 版本必须得是5.7.2+的哈,之前的会不好使的。废话不多说,咱们开始正文哈。 在mysql 5.7.2+版本之前,我们...
"国家开放大学 MySQL数据库应用实验训练1 在MySQL中创建数据库和表" 本实验训练旨在让学生掌握 MySQL 中的数据库和表的创建过程。 MySQL 是一种关系型数据库管理系统,广泛应用于各种领域。为提高学生的实践能力,...
Mysql 创建用户和表 Mysql 创建用户和表是数据库管理系统中的重要组成部分。创建用户和表是数据管理的基础,理解创建用户和表的方法对数据库管理至关重要。本文将详细介绍 Mysql 创建用户和表的方法。 一、创建...
MySQL 创建数据表 家庭作业 MySQL 是一个流行的关系型数据库管理系统,创建数据表是 MySQL 中最基本也是最重要的操作之一。本文将介绍如何使用 MySQL 创建数据表,并对创建的表进行基本操作。 创建数据库 在创建...
Linux平台的MySQL数据库操作与创建 Linux操作系统和MySQL数据库是当前最流行的系统平台和应用软件。Linux是功能强大、高效、开放式的操作系统,而MySQL是快速、功能强大的数据库。以下是Linux平台上的MySQL数据库...