`
Donald_Draper
  • 浏览: 980098 次
社区版块
存档分类
最新评论

ConnectionImp创建MysqlIO

    博客分类:
  • JDBC
阅读更多
JDBC驱动初始化-Mysql:http://donald-draper.iteye.com/blog/2342010
JDBC连接的获取:http://donald-draper.iteye.com/blog/2342011
Mysql负载均衡连接的获取:http://donald-draper.iteye.com/blog/2342089
Mysql主从复制读写分离连接的获取:http://donald-draper.iteye.com/blog/2342108
ConnectionImp创建MysqlIO :http://donald-draper.iteye.com/blog/2342959
Mysql预编译SQL:http://donald-draper.iteye.com/blog/2342960
MysqlSQL PreparedStatement的查询:http://donald-draper.iteye.com/blog/2343083
MySQL ServerPreparedStatement查询:http://donald-draper.iteye.com/blog/2343124
在前几篇文章中,我们mysqlSql connection的获取,在ConnectionImp构造中有这么一个函数为createNewI,下面我们来看看,这函数做了什么?
创建MysqlIO
protected void createNewIO(boolean isForReconnect)
        throws SQLException
    {
        Object obj = mutex;
        JVM INSTR monitorenter ;
        Properties mergedProps;
        long queriesIssuedFailedOverCopy;
        mergedProps = exposeAsProperties(props);
        queriesIssuedFailedOverCopy = queriesIssuedFailedOver;
        queriesIssuedFailedOver = 0L;
	//对于Standyalone Sever情况
        if(!getHighAvailability() && !failedOver)
        {
            boolean connectionGood = false;
            Exception connectionNotEstablishedBecause = null;
            int hostIndex = 0;
            if(getRoundRobinLoadBalance())
                hostIndex = getNextRoundRobinHostIndex(getURL(), hostList);
            while(hostIndex < hostListSize) 
            {
                if(hostIndex == 0)
                    hasTriedMasterFlag = true;
                try
                {
                    String newHostPortPair = (String)hostList.get(hostIndex);
                    int newPort = 3306;
                    String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(newHostPortPair);
                    String newHost = hostPortPair[0];
                    if(newHost == null || StringUtils.isEmptyOrWhitespaceOnly(newHost))
                        newHost = "localhost";
                    if(hostPortPair[1] != null)
                        try
                        {
                            newPort = Integer.parseInt(hostPortPair[1]);
                        }
                        catch(NumberFormatException nfe)
                        {
                            throw SQLError.createSQLException("Illegal connection port value '" + hostPortPair[1] + "'", "01S00", getExceptionInterceptor());
                        }
		     //创建MysqlIO
                    io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout(), largeRowSizeThreshold.getValueAsInt());
                    //握手
		    io.doHandshake(user, password, database);
		    //获取io的线程id
                    connectionId = io.getThreadId();
                    isClosed = false;
                    boolean oldAutoCommit = getAutoCommit();//AutoCommit
                    int oldIsolationLevel = isolationLevel;//事务隔离级别
                    boolean oldReadOnly = isReadOnly();//ReadOnly
                    String oldCatalog = getCatalog();
                    initializePropsFromServer();
                    if(isForReconnect)
                    {
                        setAutoCommit(oldAutoCommit);
                        if(hasIsolationLevels)
                            setTransactionIsolation(oldIsolationLevel);
                        setCatalog(oldCatalog);
                    }
                    if(hostIndex != 0)
                    {
                        setFailedOverState();
                        queriesIssuedFailedOverCopy = 0L;
                    } else
                    {
                        failedOver = false;
                        queriesIssuedFailedOverCopy = 0L;
                        if(hostListSize > 1)
                            setReadOnlyInternal(false);
                        else
                            setReadOnlyInternal(oldReadOnly);
                    }
                    connectionGood = true;
                    break;
                }
                catch(Exception EEE)
                {
                    if(io != null)
                        io.forceClose();
                    connectionNotEstablishedBecause = EEE;
                    connectionGood = false;
                    if(EEE instanceof SQLException)
                    {
                        SQLException sqlEx = (SQLException)EEE;
                        String sqlState = sqlEx.getSQLState();
                        if(sqlState == null || !sqlState.equals("08S01"))
                            throw sqlEx;
                    }
                    if(getRoundRobinLoadBalance())
                    {
                        hostIndex = getNextRoundRobinHostIndex(getURL(), hostList) - 1;
                        continue;
                    }
                    if(hostListSize - 1 == hostIndex)
                        throw SQLError.createCommunicationsException(this, io == null ? 0L : io.getLastPacketSentTimeMs(), io == null ? 0L : io.getLastPacketReceivedTimeMs(), EEE, getExceptionInterceptor());
                    hostIndex++;
                }
            }
            if(!connectionGood)
            {
                SQLException chainedEx = SQLError.createSQLException(Messages.getString("Connection.UnableToConnect"), "08001", getExceptionInterceptor());
                chainedEx.initCause(connectionNotEstablishedBecause);
                throw chainedEx;
            }
        } else
        {
	   //对于负载均衡集群
            double timeout = getInitialTimeout();
            boolean connectionGood = false;
            Exception connectionException = null;
            int hostIndex = 0;
            if(getRoundRobinLoadBalance())
                hostIndex = getNextRoundRobinHostIndex(getURL(), hostList);
            for(; hostIndex < hostListSize && !connectionGood; hostIndex++)
            {
                if(hostIndex == 0)
                    hasTriedMasterFlag = true;
                if(preferSlaveDuringFailover && hostIndex == 0)
                    hostIndex++;
                for(int attemptCount = 0; attemptCount < getMaxReconnects() && !connectionGood; attemptCount++)
                {
                    try
                    {
                        if(io != null)
                            io.forceClose();
                        String newHostPortPair = (String)hostList.get(hostIndex);
                        int newPort = 3306;
                        String hostPortPair[] = NonRegisteringDriver.parseHostPortPair(newHostPortPair);
                        String newHost = hostPortPair[0];
                        if(newHost == null || StringUtils.isEmptyOrWhitespaceOnly(newHost))
                            newHost = "localhost";
                        if(hostPortPair[1] != null)
                            try
                            {
                                newPort = Integer.parseInt(hostPortPair[1]);
                            }
                            catch(NumberFormatException nfe)
                            {
                                throw SQLError.createSQLException("Illegal connection port value '" + hostPortPair[1] + "'", "01S00", getExceptionInterceptor());
                            }
			//创建MysqlIO
                        io = new MysqlIO(newHost, newPort, mergedProps, getSocketFactoryClassName(), this, getSocketTimeout(), largeRowSizeThreshold.getValueAsInt());
                        //握手
			io.doHandshake(user, password, database);
                        pingInternal(false, 0);
                         //获取io的线程id
                        connectionId = io.getThreadId();
                        isClosed = false;
                        boolean oldAutoCommit = getAutoCommit();//AutoCommit
                        int oldIsolationLevel = isolationLevel;//事务级别
                        boolean oldReadOnly = isReadOnly();//ReadOnly
                        String oldCatalog = getCatalog();
                        initializePropsFromServer();
                        if(isForReconnect)
                        {
                            setAutoCommit(oldAutoCommit);
                            if(hasIsolationLevels)
                                setTransactionIsolation(oldIsolationLevel);
                            setCatalog(oldCatalog);
                        }
                        connectionGood = true;
                        if(hostIndex != 0)
                        {
                            setFailedOverState();
                            queriesIssuedFailedOverCopy = 0L;
                            break;
                        }
                        failedOver = false;
                        queriesIssuedFailedOverCopy = 0L;
                        if(hostListSize > 1)
                            setReadOnlyInternal(false);
                        else
                            setReadOnlyInternal(oldReadOnly);
                        break;
                    }
                    catch(Exception EEE)
                    {
                        connectionException = EEE;
                    }
                    connectionGood = false;
                    if(getRoundRobinLoadBalance())
                        hostIndex = getNextRoundRobinHostIndex(getURL(), hostList) - 1;
                    if(connectionGood)
                        break;
                    if(attemptCount <= 0)
                        continue;
                    try
                    {
                        Thread.sleep((long)timeout * 1000L);
                    }
                    catch(InterruptedException IE) { }
                }

            }

        }
 }

//MysqlIO
class MysqlIO
{

    private static final int UTF8_CHARSET_INDEX = 33;
    private static final String CODE_PAGE_1252 = "Cp1252";
    protected static final int NULL_LENGTH = -1;
    protected static final int COMP_HEADER_LENGTH = 3;
    protected static final int MIN_COMPRESS_LEN = 50;
    protected static final int HEADER_LENGTH = 4;
    protected static final int AUTH_411_OVERHEAD = 33;
    private static int maxBufferSize = 65535;
    private static final int CLIENT_COMPRESS = 32;
    protected static final int CLIENT_CONNECT_WITH_DB = 8;
    private static final int CLIENT_FOUND_ROWS = 2;
    private static final int CLIENT_LOCAL_FILES = 128;
    private static final int CLIENT_LONG_FLAG = 4;
    private static final int CLIENT_LONG_PASSWORD = 1;
    private static final int CLIENT_PROTOCOL_41 = 512;
    private static final int CLIENT_INTERACTIVE = 1024;
    protected static final int CLIENT_SSL = 2048;
    private static final int CLIENT_TRANSACTIONS = 8192;
    protected static final int CLIENT_RESERVED = 16384;
    protected static final int CLIENT_SECURE_CONNECTION = 32768;
    private static final int CLIENT_MULTI_QUERIES = 65536;
    private static final int CLIENT_MULTI_RESULTS = 131072;
    private static final int SERVER_STATUS_IN_TRANS = 1;
    private static final int SERVER_STATUS_AUTOCOMMIT = 2;
    static final int SERVER_MORE_RESULTS_EXISTS = 8;
    private static final int SERVER_QUERY_NO_GOOD_INDEX_USED = 16;
    private static final int SERVER_QUERY_NO_INDEX_USED = 32;
    private static final int SERVER_QUERY_WAS_SLOW = 2048;
    private static final int SERVER_STATUS_CURSOR_EXISTS = 64;
    private static final String FALSE_SCRAMBLE = "xxxxxxxx";
    protected static final int MAX_QUERY_SIZE_TO_LOG = 1024;
    protected static final int MAX_QUERY_SIZE_TO_EXPLAIN = 1048576;
    protected static final int INITIAL_PACKET_SIZE = 1024;
    private static String jvmPlatformCharset = null;
    protected static final String ZERO_DATE_VALUE_MARKER = "0000-00-00";
    protected static final String ZERO_DATETIME_VALUE_MARKER = "0000-00-00 00:00:00";
    private static final int MAX_PACKET_DUMP_LENGTH = 1024;
    private boolean packetSequenceReset;
    protected int serverCharsetIndex;
    private Buffer reusablePacket;
    private Buffer sendPacket;
    private Buffer sharedSendPacket;
    protected BufferedOutputStream mysqlOutput;
    protected ConnectionImpl connection;//Mysql connection
    private Deflater deflater;
    protected InputStream mysqlInput;//mysql输入流
    private LinkedList packetDebugRingBuffer;
    private RowData streamingData;
    protected Socket mysqlConnection;//mysql socket
    private SocketFactory socketFactory;// mysql socket的工场
    private SoftReference loadFileBufRef;
    private SoftReference splitBufRef;
    protected String host;//host
    protected String seed;
    private String serverVersion;
    private String socketFactoryClassName;
    private byte packetHeaderBuf[];
    private boolean colDecimalNeedsBump;
    private boolean hadWarnings;
    private boolean has41NewNewProt;
    private boolean hasLongColumnInfo;
    private boolean isInteractiveClient;
    private boolean logSlowQueries;
    private boolean platformDbCharsetMatches;
    private boolean profileSql;
    private boolean queryBadIndexUsed;
    private boolean queryNoIndexUsed;
    private boolean serverQueryWasSlow;
    private boolean use41Extensions;
    private boolean useCompression;
    private boolean useNewLargePackets;
    private boolean useNewUpdateCounts;
    private byte packetSequence;
    private byte readPacketSequence;
    private boolean checkPacketSequence;
    private byte protocolVersion;
    private int maxAllowedPacket;
    protected int maxThreeBytes;
    protected int port;
    protected int serverCapabilities;
    private int serverMajorVersion;
    private int serverMinorVersion;
    private int oldServerStatus;
    private int serverStatus;
    private int serverSubMinorVersion;
    private int warningCount;
    protected long clientParam;
    protected long lastPacketSentTimeMs;
    protected long lastPacketReceivedTimeMs;
    private boolean traceProtocol;
    private boolean enablePacketDebug;
    private Calendar sessionCalendar;
    private boolean useConnectWithDb;
    private boolean needToGrabQueryFromPacket;
    private boolean autoGenerateTestcaseScript;
    private long threadId;
    private boolean useNanosForElapsedTime;
    private long slowQueryThreshold;
    private String queryTimingUnits;
    private boolean useDirectRowUnpack;
    private int useBufferRowSizeThreshold;
    private int commandCount;
    private List statementInterceptors;
    private ExceptionInterceptor exceptionInterceptor;
    private int statementExecutionDepth;
    private boolean useAutoSlowLog;

    static 
    {
        OutputStreamWriter outWriter = null;
        try
        {
            outWriter = new OutputStreamWriter(new ByteArrayOutputStream());
            jvmPlatformCharset = outWriter.getEncoding();
        }
        finally
        {
            try
            {
                if(outWriter != null)
                    outWriter.close();
            }
            catch(IOException ioEx) { }
        }
    }
    //MysqlIO构造
     public MysqlIO(String host, int port, Properties props, String socketFactoryClassName, ConnectionImpl conn, int socketTimeout, int useBufferRowSizeThreshold)
        throws IOException, SQLException
    {
        packetSequenceReset = false;
        reusablePacket = null;
        sendPacket = null;
        sharedSendPacket = null;
        mysqlOutput = null;
        deflater = null;
        mysqlInput = null;
        packetDebugRingBuffer = null;
        streamingData = null;
        mysqlConnection = null;
        socketFactory = null;
        this.host = null;
        serverVersion = null;
        this.socketFactoryClassName = null;
        packetHeaderBuf = new byte[4];
        colDecimalNeedsBump = false;
        hadWarnings = false;
        has41NewNewProt = false;
        hasLongColumnInfo = false;
        isInteractiveClient = false;
        logSlowQueries = false;
        platformDbCharsetMatches = true;
        profileSql = false;
        queryBadIndexUsed = false;
        queryNoIndexUsed = false;
        serverQueryWasSlow = false;
        use41Extensions = false;
        useCompression = false;
        useNewLargePackets = false;
        useNewUpdateCounts = false;
        packetSequence = 0;
        readPacketSequence = -1;
        checkPacketSequence = false;
        protocolVersion = 0;
        maxAllowedPacket = 1048576;
        maxThreeBytes = 16581375;
        this.port = 3306;
        serverMajorVersion = 0;
        serverMinorVersion = 0;
        oldServerStatus = 0;
        serverStatus = 0;
        serverSubMinorVersion = 0;
        warningCount = 0;
        clientParam = 0L;
        lastPacketSentTimeMs = 0L;
        lastPacketReceivedTimeMs = 0L;
        traceProtocol = false;
        enablePacketDebug = false;
        useDirectRowUnpack = true;
        commandCount = 0;
        statementExecutionDepth = 0;
        connection = conn;
        if(connection.getEnablePacketDebug())
            packetDebugRingBuffer = new LinkedList();
        traceProtocol = connection.getTraceProtocol();
        useAutoSlowLog = connection.getAutoSlowLog();
        this.useBufferRowSizeThreshold = useBufferRowSizeThreshold;
        useDirectRowUnpack = connection.getUseDirectRowUnpack();
        logSlowQueries = connection.getLogSlowQueries();
        reusablePacket = new Buffer(1024);
        sendPacket = new Buffer(1024);
        this.port = port;
        this.host = host;
        this.socketFactoryClassName = socketFactoryClassName;
	//创建socket的工场
        socketFactory = createSocketFactory();
        exceptionInterceptor = connection.getExceptionInterceptor();
        try
        {
	    //从socket的工场获取连接
            mysqlConnection = socketFactory.connect(this.host, this.port, props);
            if(socketTimeout != 0)
                try
                {
                    mysqlConnection.setSoTimeout(socketTimeout);
                }
                catch(Exception ex) { }
	    //握手前
            mysqlConnection = socketFactory.beforeHandshake();
            if(connection.getUseReadAheadInput())
                mysqlInput = new ReadAheadInputStream(mysqlConnection.getInputStream(), 16384, connection.getTraceProtocol(), connection.getLog());
            else
            if(connection.useUnbufferedInput())
	        //从mysqlConnection获取输入流
                mysqlInput = mysqlConnection.getInputStream();
            else
                mysqlInput = new BufferedInputStream(mysqlConnection.getInputStream(), 16384);
	    //初始化mysqlOutput输出流
            mysqlOutput = new BufferedOutputStream(mysqlConnection.getOutputStream(), 16384);
            isInteractiveClient = connection.getInteractiveClient();
            profileSql = connection.getProfileSql();
            sessionCalendar = Calendar.getInstance();
            autoGenerateTestcaseScript = connection.getAutoGenerateTestcaseScript();
            needToGrabQueryFromPacket = profileSql || logSlowQueries || autoGenerateTestcaseScript;
            if(connection.getUseNanosForElapsedTime() && Util.nanoTimeAvailable())
            {
                useNanosForElapsedTime = true;
                queryTimingUnits = Messages.getString("Nanoseconds");
            } else
            {
                queryTimingUnits = Messages.getString("Milliseconds");
            }
            if(connection.getLogSlowQueries())
                calculateSlowQueryThreshold();
        }
        catch(IOException ioEx)
        {
            throw SQLError.createCommunicationsException(connection, 0L, 0L, ioEx, getExceptionInterceptor());
        }
    }
}

//创建socket的工场
socketFactory = createSocketFactory();
 private SocketFactory createSocketFactory()
        throws SQLException
    {
        if(socketFactoryClassName == null)
            throw SQLError.createSQLException(Messages.getString("MysqlIO.75"), "08001", getExceptionInterceptor());
        return (SocketFactory)Class.forName(socketFactoryClassName).newInstance();
        Exception ex;
        ex;
        SQLException sqlEx = SQLError.createSQLException(Messages.getString("MysqlIO.76") + socketFactoryClassName + Messages.getString("MysqlIO.77"), "08001", getExceptionInterceptor());
        sqlEx.initCause(ex);
        throw sqlEx;
    }

这个socketFactoryClassName是从何来呢?这个在ConnectionPropertiesImpl中
//ConnectionPropertiesImpl
public class ConnectionPropertiesImpl
    implements Serializable, ConnectionProperties
{
 public ConnectionPropertiesImpl()
{
  //socket的工场类
  socketFactoryClassName = new StringConnectionProperty("socketFactory", (com.mysql.jdbc.StandardSocketFactory.class).getName(), Messages.getString("ConnectionProperties.socketFactory"), "3.0.3", CONNECTION_AND_AUTH_CATEGORY, 4);
  }
}

从ConnectionPropertiesImpl可以看出,socket的工厂类其实为StandardSocketFactory
//从socket的工场获取连接
mysqlConnection = socketFactory.connect(this.host, this.port, props);

//StandardSocketFactory
public class StandardSocketFactory
    implements SocketFactory
{
    public static final String TCP_NO_DELAY_PROPERTY_NAME = "tcpNoDelay";
    public static final String TCP_KEEP_ALIVE_DEFAULT_VALUE = "true";
    public static final String TCP_KEEP_ALIVE_PROPERTY_NAME = "tcpKeepAlive";
    public static final String TCP_RCV_BUF_PROPERTY_NAME = "tcpRcvBuf";
    public static final String TCP_SND_BUF_PROPERTY_NAME = "tcpSndBuf";
    public static final String TCP_TRAFFIC_CLASS_PROPERTY_NAME = "tcpTrafficClass";
    public static final String TCP_RCV_BUF_DEFAULT_VALUE = "0";
    public static final String TCP_SND_BUF_DEFAULT_VALUE = "0";
    public static final String TCP_TRAFFIC_CLASS_DEFAULT_VALUE = "0";
    public static final String TCP_NO_DELAY_DEFAULT_VALUE = "true";
    private static Method setTraficClassMethod;
    protected String host;
    protected int port;
    protected Socket rawSocket;

    static 
    {
        try
        {
            setTraficClassMethod = (java.net.Socket.class).getMethod("setTrafficClass", new Class[] {
                Integer.TYPE
            });
        }
        catch(SecurityException e)
        {
            setTraficClassMethod = null;
        }
        catch(NoSuchMethodException e)
        {
            setTraficClassMethod = null;
        }
    }
    //socket的工场构造
       public StandardSocketFactory()
    {
        host = null;
        port = 3306;
        rawSocket = null;
    }
    //获取socket
     public Socket connect(String hostname, int portNumber, Properties props)
        throws SocketException, IOException
    {
        if(props != null)
        {
            host = hostname;
            port = portNumber;
            Method connectWithTimeoutMethod = null;
            Method socketBindMethod = null;
            Class socketAddressClass = null;
            String localSocketHostname = props.getProperty("localSocketAddress");
            String connectTimeoutStr = props.getProperty("connectTimeout");
            int connectTimeout = 0;
            boolean wantsTimeout = connectTimeoutStr != null && connectTimeoutStr.length() > 0 && !connectTimeoutStr.equals("0");
            boolean wantsLocalBind = localSocketHostname != null && localSocketHostname.length() > 0;
            boolean needsConfigurationBeforeConnect = socketNeedsConfigurationBeforeConnect(props);
            if(wantsTimeout || wantsLocalBind || needsConfigurationBeforeConnect)
            {
                if(connectTimeoutStr != null)
                    try
                    {
                        connectTimeout = Integer.parseInt(connectTimeoutStr);
                    }
                    catch(NumberFormatException nfe)
                    {
                        throw new SocketException("Illegal value '" + connectTimeoutStr + "' for connectTimeout");
                    }
                try
                {
                    socketAddressClass = Class.forName("java.net.SocketAddress");
		    //获取socket的connect方法
                    connectWithTimeoutMethod = (java.net.Socket.class).getMethod("connect", new Class[] {
                        socketAddressClass, Integer.TYPE
                    });
		    //获取socket的bing方法
                    socketBindMethod = (java.net.Socket.class).getMethod("bind", new Class[] {
                        socketAddressClass
                    });
                }
                catch(NoClassDefFoundError noClassDefFound) { }
                catch(NoSuchMethodException noSuchMethodEx) { }
                catch(Throwable catchAll) { }
                if(wantsLocalBind && socketBindMethod == null)
                    throw new SocketException("Can't specify \"localSocketAddress\" on JVMs older than 1.4");
                if(wantsTimeout && connectWithTimeoutMethod == null)
                    throw new SocketException("Can't specify \"connectTimeout\" on JVMs older than 1.4");
            }
            if(host != null)
            {
	        //当host地址存在,同时不需要本地bind
                if(!wantsLocalBind && !wantsTimeout && !needsConfigurationBeforeConnect)
                {
                    InetAddress possibleAddresses[] = InetAddress.getAllByName(host);
                    Throwable caughtWhileConnecting = null;
                    int i = 0;
                    do
                    {
                        if(i >= possibleAddresses.length)
                            break;
                        try
                        {
			    
                            rawSocket = new Socket(possibleAddresses[i], port);
			    //配置socket
                            configureSocket(rawSocket, props);
                            break;
                        }
                        catch(Exception ex)
                        {
                            caughtWhileConnecting = ex;
                            i++;
                        }
                    } while(true);
                    if(rawSocket == null)
                        unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting);
                } else
                {
		    //如果需要本地绑定端口
                    try
                    {
                        InetAddress possibleAddresses[] = InetAddress.getAllByName(host);
                        Throwable caughtWhileConnecting = null;
                        Object localSockAddr = null;
                        Class inetSocketAddressClass = null;
                        Constructor addrConstructor = null;
                        try
                        {
                            inetSocketAddressClass = Class.forName("java.net.InetSocketAddress");
			    //获取InetSocketAddress的构造函数
                            addrConstructor = inetSocketAddressClass.getConstructor(new Class[] {
                                java.net.InetAddress.class, Integer.TYPE
                            });
                            if(wantsLocalBind)
			        //如果需要绑定端口,构造socket地址
                                localSockAddr = addrConstructor.newInstance(new Object[] {
                                    InetAddress.getByName(localSocketHostname), new Integer(0)
                                });
                        }
                        catch(Throwable ex)
                        {
                            unwrapExceptionToProperClassAndThrowIt(ex);
                        }
                        int i = 0;
                        do
                        {
                            if(i >= possibleAddresses.length)
                                break;
                            try
                            {
                                rawSocket = new Socket();
				//配置socket
                                configureSocket(rawSocket, props);
				//创建InetSocketAddress
                                Object sockAddr = addrConstructor.newInstance(new Object[] {
                                    possibleAddresses[i], new Integer(port)
                                });
				//socket地址绑定
                                socketBindMethod.invoke(rawSocket, new Object[] {
                                    localSockAddr
                                });
				//连接server
                                connectWithTimeoutMethod.invoke(rawSocket, new Object[] {
                                    sockAddr, new Integer(connectTimeout)
                                });
                                break;
                            }
                            catch(Exception ex)
                            {
                                rawSocket = null;
                                caughtWhileConnecting = ex;
                                i++;
                            }
                        } while(true);
                        if(rawSocket == null)
                            unwrapExceptionToProperClassAndThrowIt(caughtWhileConnecting);
                    }
                    catch(Throwable t)
                    {
                        unwrapExceptionToProperClassAndThrowIt(t);
                    }
                }
                return rawSocket;
            }
        }
        throw new SocketException("Unable to create socket");
   }
   //配置socket
    private void configureSocket(Socket sock, Properties props)
        throws SocketException, IOException
    {
        try
        {
	    //设置Tcp无延时属性
            sock.setTcpNoDelay(Boolean.valueOf(props.getProperty("tcpNoDelay", "true")).booleanValue());
            String keepAlive = props.getProperty("tcpKeepAlive", "true");
            if(keepAlive != null && keepAlive.length() > 0)
	        //设置是否心跳检查
                sock.setKeepAlive(Boolean.valueOf(keepAlive).booleanValue());
            int receiveBufferSize = Integer.parseInt(props.getProperty("tcpRcvBuf", "0"));
            if(receiveBufferSize > 0)
	       //设置接收缓冲区大小
                sock.setReceiveBufferSize(receiveBufferSize);
            int sendBufferSize = Integer.parseInt(props.getProperty("tcpSndBuf", "0"));
            if(sendBufferSize > 0)
	         //设置发送缓冲区大小
                sock.setSendBufferSize(sendBufferSize);
            int trafficClass = Integer.parseInt(props.getProperty("tcpTrafficClass", "0"));
            if(trafficClass > 0 && setTraficClassMethod != null)
	        //冲突解决
                setTraficClassMethod.invoke(sock, new Object[] {
                    new Integer(trafficClass)
                });
        }
        catch(Throwable t)
        {
            unwrapExceptionToProperClassAndThrowIt(t);
        }
    }
}

//握手前
mysqlConnection = socketFactory.beforeHandshake();
//StandardSocketFactory
//返回初始化socket
public Socket beforeHandshake()
        throws SocketException, IOException
{
   return rawSocket;
}

//握手后
public Socket afterHandshake()
        throws SocketException, IOException
{
        return rawSocket;
}

从上面可以看出,createNewIO主要所做的工作就是初始化MysqlIO,在构造MysqlIO时,首先
创建socket的工场类,实际为StandardSocketFactory,并从StandardSocketFactory获取socket的连接,同时初始化MysqlIO的输入流与输出流,MysqlIO在Statement执行时,将sql命令发送到
Server。
//MysqlIO握手
void doHandshake(String user, String password, String database)
        throws SQLException
    {
        checkPacketSequence = false;
        readPacketSequence = 0;
        Buffer buf = readPacket();
        protocolVersion = buf.readByte();
        if(protocolVersion == -1)
        {
            try
            {
                mysqlConnection.close();
            }
            catch(Exception e) { }
            int errno = 2000;
            errno = buf.readInt();
            String serverErrorMessage = buf.readString("ASCII", getExceptionInterceptor());
            StringBuffer errorBuf = new StringBuffer(Messages.getString("MysqlIO.10"));
            errorBuf.append(serverErrorMessage);
            errorBuf.append("\"");
            String xOpen = SQLError.mysqlToSqlState(errno, connection.getUseSqlStateCodes());
            throw SQLError.createSQLException(SQLError.get(xOpen) + ", " + errorBuf.toString(), xOpen, errno, getExceptionInterceptor());
        }
	//版本信息
        serverVersion = buf.readString("ASCII", getExceptionInterceptor());
        int point = serverVersion.indexOf('.');
        if(point != -1)
        {
            try
            {
                int n = Integer.parseInt(serverVersion.substring(0, point));
                serverMajorVersion = n;
            }
            catch(NumberFormatException NFE1) { }
            String remaining = serverVersion.substring(point + 1, serverVersion.length());
            point = remaining.indexOf('.');
            if(point != -1)
            {
                try
                {
                    int n = Integer.parseInt(remaining.substring(0, point));
                    serverMinorVersion = n;
                }
                catch(NumberFormatException nfe) { }
                remaining = remaining.substring(point + 1, remaining.length());
                int pos;
                for(pos = 0; pos < remaining.length() && remaining.charAt(pos) >= '0' && remaining.charAt(pos) <= '9'; pos++);
                try
                {
                    int n = Integer.parseInt(remaining.substring(0, pos));
                    serverSubMinorVersion = n;
                }
                catch(NumberFormatException nfe) { }
            }
        }
        if(versionMeetsMinimum(4, 0, 8))
        {
            maxThreeBytes = 16777215;
            useNewLargePackets = true;
        } else
        {
            maxThreeBytes = 16581375;
            useNewLargePackets = false;
        }
        colDecimalNeedsBump = versionMeetsMinimum(3, 23, 0);
        colDecimalNeedsBump = !versionMeetsMinimum(3, 23, 15);
        useNewUpdateCounts = versionMeetsMinimum(3, 22, 5);
        threadId = buf.readLong();
        seed = buf.readString("ASCII", getExceptionInterceptor());
        serverCapabilities = 0;
        if(buf.getPosition() < buf.getBufLength())
            serverCapabilities = buf.readInt();
        if(versionMeetsMinimum(4, 1, 1))
        {
            int position = buf.getPosition();
            serverCharsetIndex = buf.readByte() & 255;
            serverStatus = buf.readInt();
            checkTransactionState(0);
            buf.setPosition(position + 16);
            String seedPart2 = buf.readString("ASCII", getExceptionInterceptor());
            StringBuffer newSeed = new StringBuffer(20);
            newSeed.append(seed);
            newSeed.append(seedPart2);
            seed = newSeed.toString();
        }
        if((serverCapabilities & 32) != 0 && connection.getUseCompression())
            clientParam |= 32L;
        //DB信息
        useConnectWithDb = database != null && database.length() > 0 && !connection.getCreateDatabaseIfNotExist();
        if(useConnectWithDb)
            clientParam |= 8L;
        if((serverCapabilities & 2048) == 0 && connection.getUseSSL())
        {
            if(connection.getRequireSSL())
            {
                connection.close();
                forceClose();
                throw SQLError.createSQLException(Messages.getString("MysqlIO.15"), "08001", getExceptionInterceptor());
            }
            connection.setUseSSL(false);
        }
        if((serverCapabilities & 4) != 0)
        {
            clientParam |= 4L;
            hasLongColumnInfo = true;
        }
        if(!connection.getUseAffectedRows())
            clientParam |= 2L;
        if(connection.getAllowLoadLocalInfile())
            clientParam |= 128L;
        if(isInteractiveClient)
            clientParam |= 1024L;
        if(protocolVersion > 9)
            clientParam |= 1L;
        else
            clientParam &= -2L;
        if(versionMeetsMinimum(4, 1, 0))
        {
            if(versionMeetsMinimum(4, 1, 1))
            {
                clientParam |= 512L;
                has41NewNewProt = true;
                clientParam |= 8192L;
                clientParam |= 131072L;
                if(connection.getAllowMultiQueries())
                    clientParam |= 65536L;
            } else
            {
                clientParam |= 16384L;
                has41NewNewProt = false;
            }
            use41Extensions = true;
        }
        int passwordLength = 16;
        int userLength = user == null ? 0 : user.length();
        int databaseLength = database == null ? 0 : database.length();
        int packLength = (userLength + passwordLength + databaseLength) * 2 + 7 + 4 + 33;
        Buffer packet = null;
	//SSL
        if(!connection.getUseSSL())
        {
            if((serverCapabilities & 32768) != 0)
            {
                clientParam |= 32768L;
		//安全验证
                if(versionMeetsMinimum(4, 1, 1))
                    secureAuth411(null, packLength, user, password, database, true);
                else
                    secureAuth(null, packLength, user, password, database, true);
            } else
            {
                packet = new Buffer(packLength);
                if((clientParam & 16384L) != 0L)
                {
                    if(versionMeetsMinimum(4, 1, 1))
                    {
                        packet.writeLong(clientParam);
                        packet.writeLong(maxThreeBytes);
                        packet.writeByte((byte)8);
                        packet.writeBytesNoNull(new byte[23]);
                    } else
                    {
                        packet.writeLong(clientParam);
                        packet.writeLong(maxThreeBytes);
                    }
                } else
                {
                    packet.writeInt((int)clientParam);
                    packet.writeLongInt(maxThreeBytes);
                }
                packet.writeString(user, "Cp1252", connection);
                if(protocolVersion > 9)
                    packet.writeString(Util.newCrypt(password, seed), "Cp1252", connection);
                else
                    packet.writeString(Util.oldCrypt(password, seed), "Cp1252", connection);
                if(useConnectWithDb)
                    packet.writeString(database, "Cp1252", connection);
                send(packet, packet.getPosition());
            }
        } else
        {
	    //无SSL连接
            negotiateSSLConnection(user, password, database, packLength);
        }
        if(!versionMeetsMinimum(4, 1, 1))
            checkErrorPacket();
        if((serverCapabilities & 32) != 0 && connection.getUseCompression())
        {
            deflater = new Deflater();
            useCompression = true;
            mysqlInput = new CompressedInputStream(connection, mysqlInput);
        }
        if(!useConnectWithDb)
            changeDatabaseTo(database);
        try
        {
	   //握手完毕
            mysqlConnection = socketFactory.afterHandshake();
        }
        catch(IOException ioEx)
        {
            throw SQLError.createCommunicationsException(connection, lastPacketSentTimeMs, lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor());
        }
}
//无SSL连接
private void negotiateSSLConnection(String user, String password, String database, int packLength)
        throws SQLException
    {
        if(!ExportControlled.enabled())
            throw new ConnectionFeatureNotAvailableException(connection, lastPacketSentTimeMs, null);
        boolean doSecureAuth = false;
        if((serverCapabilities & 32768) != 0)
        {
            clientParam |= 32768L;
            doSecureAuth = true;
        }
        clientParam |= 2048L;
        Buffer packet = new Buffer(packLength);
        if(use41Extensions)
            packet.writeLong(clientParam);
        else
            packet.writeInt((int)clientParam);
        send(packet, packet.getPosition());
        ExportControlled.transformSocketToSSLSocket(this);
        packet.clear();
        if(doSecureAuth)
        {
            if(versionMeetsMinimum(4, 1, 1))
                secureAuth411(null, packLength, user, password, database, true);
            else
                secureAuth411(null, packLength, user, password, database, true);
        } else
        {
            if(use41Extensions)
            {
                packet.writeLong(clientParam);
                packet.writeLong(maxThreeBytes);
            } else
            {
                packet.writeInt((int)clientParam);
                packet.writeLongInt(maxThreeBytes);
            }
            packet.writeString(user);
            if(protocolVersion > 9)
                packet.writeString(Util.newCrypt(password, seed));
            else
                packet.writeString(Util.oldCrypt(password, seed));
            if((serverCapabilities & 8) != 0 && database != null && database.length() > 0)
                packet.writeString(database);
            //发送数据包
            send(packet, packet.getPosition());
        }
    }
//发送sql Packet
 private final void sendSplitPackets(Buffer packet)
        throws SQLException
    {
        try
        {
            Buffer headerPacket = splitBufRef != null ? (Buffer)splitBufRef.get() : null;
            if(headerPacket == null)
            {
                headerPacket = new Buffer(maxThreeBytes + 4);
                splitBufRef = new SoftReference(headerPacket);
            }
            int len = packet.getPosition();
            int splitSize = maxThreeBytes;
            int originalPacketPos = 4;
            byte origPacketBytes[] = packet.getByteBuffer();
            byte headerPacketBytes[] = headerPacket.getByteBuffer();
            int packetLen;
            for(; len >= maxThreeBytes; len -= splitSize)
            {
                packetSequence++;
                headerPacket.setPosition(0);
                headerPacket.writeLongInt(splitSize);
                headerPacket.writeByte(packetSequence);
                System.arraycopy(origPacketBytes, originalPacketPos, headerPacketBytes, 4, splitSize);
                packetLen = splitSize + 4;
                if(!useCompression)
                {
                    mysqlOutput.write(headerPacketBytes, 0, splitSize + 4);
                    mysqlOutput.flush();
                } else
                {
                    headerPacket.setPosition(0);
                    Buffer packetToSend = compressPacket(headerPacket, 4, splitSize, 4);
                    packetLen = packetToSend.getPosition();
                    mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen);
                    mysqlOutput.flush();
                }
                originalPacketPos += splitSize;
            }

            headerPacket.clear();
            headerPacket.setPosition(0);
            headerPacket.writeLongInt(len - 4);
            packetSequence++;
            headerPacket.writeByte(packetSequence);
            if(len != 0)
                System.arraycopy(origPacketBytes, originalPacketPos, headerPacketBytes, 4, len - 4);
            packetLen = len - 4;
            if(!useCompression)
            {
	        //将数据包写入输出流
		 protected BufferedOutputStream mysqlOutput;
                mysqlOutput.write(headerPacket.getByteBuffer(), 0, len);
                mysqlOutput.flush();
            } else
            {
                headerPacket.setPosition(0);
                Buffer packetToSend = compressPacket(headerPacket, 4, packetLen, 4);
                packetLen = packetToSend.getPosition();
                mysqlOutput.write(packetToSend.getByteBuffer(), 0, packetLen);
                mysqlOutput.flush();
            }
        }
        catch(IOException ioEx)
        {
            throw SQLError.createCommunicationsException(connection, lastPacketSentTimeMs, lastPacketReceivedTimeMs, ioEx, getExceptionInterceptor());
        }
    }

MysqlIO握手,主要做的工作是,发送用户,密码,数据库等信息。

总结:
createNewIO主要所做的工作就是初始化MysqlIO,在构造MysqlIO时,首先创建socket的工场类,实际为StandardSocketFactory,并从StandardSocketFactory获取socket的连接,同时初始化MysqlIO的输入流与输出流,MysqlIO在Statement执行时,将sql命令发送到
Server。MysqlIO握手,主要做的工作是,发送用户,密码,数据库等信息。
0
0
分享到:
评论

相关推荐

    在Qt中创建MySQL数据库.txt

    ### 在Qt中创建MySQL数据库 #### 一、简介与背景 在现代软件开发中,图形用户界面(GUI)的应用程序非常普遍,而Qt作为一个跨平台的C++图形用户界面应用程序开发框架,在桌面应用开发领域拥有广泛的应用。对于需要...

    命令行创建mysql服务

    在dos下创建mysql服务,mysql不同版本都可以实现,按照命令执行即可

    shell创建mysql数据库

    通过shell脚本来创建MySQL数据库,不仅可以提高工作效率,还能确保操作的一致性和可重复性。下面将详细介绍如何利用shell脚本来创建MySQL数据库,并结合提供的文件进行解析。 首先,我们需要理解shell脚本的基本...

    Java根据实体创建Mysql数据库表

    本资源分享的是如何根据Java实体类自动创建MySQL数据库表,下面将详细介绍这一过程。 首先,我们需要一个ORM框架,例如Hibernate或MyBatis。这里以广泛使用的Hibernate为例。Hibernate是一个强大的Java持久化框架,...

    python脚本,根据excel数据自动创建mysql库、表-并更新数据

    py文件中需要手动设置excel字段在mysql中的类型、index索引及写入时校验的字段。(搜索*查找对应的位置) 执行py文件,若不存在数据库及表会自动创建,并写入数据(对于指定字段重复的不会写入)

    Mysql IO 内存方面的优化

    这里使用的是mysql Ver 14.14 Distrib 5.6.19, for Linux (i686) using EditLine wrapper 一、mysql目录文件 ibdata1:系统表空间 包含数据字典、回滚日志/undolog等 (insert buffer segment/double write ...

    MySQL 数据库创建(Python 代码)

    用 Python 和 MySQLdb 创建 MySQL 数据库,可以根据自己的需要再添加字段和按需赋值。

    MSSQL 数据库中创建MySql 跨数据库数据同步

    本教程将详细介绍如何在MSSQL数据库中创建到MySQL的跨数据库数据同步。 首先,我们需要了解`dblink`的概念。在MSSQL中,`dblink`(数据库链接)是一种机制,允许用户在不同的数据库服务器之间建立连接,从而实现跨...

    创建mysql数据库视图.pdf

    MySQL 数据库视图创建与管理 在 MySQL 中,视图(View)是一种虚拟表,它是基于一个或多个表的数据计算生成的结果集。视图可以简化复杂的查询、隐藏复杂的查询逻辑、提高数据安全性和简化数据管理。下面是创建 ...

    C#操作Mysql创建数据库,数据表,增、删、改数据

    本教程将详细介绍如何使用C#与Mysql进行交互,包括创建数据库、数据表以及执行基本的CRUD(Create、Read、Update、Delete)操作。 首先,我们需要在C#项目中引入Mysql的数据提供者——`MySql.Data.dll`。这个库允许...

    MySql数据库连接+IO

    ### MySQL 数据库连接与 IO 流操作 #### 一、概述 本文将详细介绍如何通过 Java 程序实现与 MySQL 数据库的连接,并利用输入输出流(IO 流)进行图片数据的读写操作。本示例适用于需要在数据库中存储和检索二进制...

    MySQL创建数据源.docx

    MySQL 创建数据源 MySQL 是一个流行的开源关系数据库管理系统,它提供了强大的数据存储和管理功能。创建 MySQL 数据源是连接 MySQL 数据库的重要步骤,本文将详细介绍如何创建 MySQL 数据源。 安装 MySQL 在创建...

    MySQL服务器 IO 100%的分析与优化方案

    MySQL服务器的IO 100%问题通常是一个严重的情况,因为它直接影响到数据库的性能和响应时间,可能导致系统整体效率下降,甚至服务中断。本篇文章主要探讨了如何分析和优化MySQL服务器遇到的IO瓶颈,特别是针对高写入...

    怎样通过PHP连接MYSQL数据库、创建数据库、创建表-PHP编程教程.docx

    PHP 连接 MYSQL 数据库、创建数据库、创建表教程 PHP 是一种广泛使用的服务器端脚本语言,经常与 MYSQL 数据库结合使用。本文档将详细介绍如何通过 PHP 连接 MYSQL 数据库、创建数据库和创建表。 一、PHP 连接 ...

    VTK 创建mysql数据库 完整可编译运行的实例

    VTK 创建mysql数据库 完整可编译运行的实例

    mysql触发器之创建多个触发器操作实例分析

    本文实例讲述了mysql触发器之创建多个触发器操作。分享给大家供大家参考,具体如下: 这次记录的内容mysql 版本必须得是5.7.2+的哈,之前的会不好使的。废话不多说,咱们开始正文哈。 在mysql 5.7.2+版本之前,我们...

    国家开放大学 MySQL数据库应用 实验训练1 在MySQL中创建数据库和表

    "国家开放大学 MySQL数据库应用实验训练1 在MySQL中创建数据库和表" 本实验训练旨在让学生掌握 MySQL 中的数据库和表的创建过程。 MySQL 是一种关系型数据库管理系统,广泛应用于各种领域。为提高学生的实践能力,...

    MySql\Mysql创建用户和表.docx

    Mysql 创建用户和表 Mysql 创建用户和表是数据库管理系统中的重要组成部分。创建用户和表是数据管理的基础,理解创建用户和表的方法对数据库管理至关重要。本文将详细介绍 Mysql 创建用户和表的方法。 一、创建...

    mysql创建数据表 家庭作业

    MySQL 创建数据表 家庭作业 MySQL 是一个流行的关系型数据库管理系统,创建数据表是 MySQL 中最基本也是最重要的操作之一。本文将介绍如何使用 MySQL 创建数据表,并对创建的表进行基本操作。 创建数据库 在创建...

    Linux平台的MySQL数据库操作与创建.pdf

    Linux平台的MySQL数据库操作与创建 Linux操作系统和MySQL数据库是当前最流行的系统平台和应用软件。Linux是功能强大、高效、开放式的操作系统,而MySQL是快速、功能强大的数据库。以下是Linux平台上的MySQL数据库...

Global site tag (gtag.js) - Google Analytics