Cobar啟動完成,監聽特定端口。整個認證的流程圖:

NIOAcceptor類繼承自Thread類,該類的對象會以線程的方式運行,進行連接的監聽。
NIOAcceptor啟動的初始化過程如下:
1 、打開一個selector,獲取一個ServerSocketChannel對象,對該對象的socket綁定特定的監聽端口,并設置該channel為非阻塞模式,然后想selector注冊該channel,綁定感興趣的事件位OP_ACCEPT。
| public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException { |
| this .selector = Selector.open(); |
| this .serverChannel = ServerSocketChannel.open(); |
| this .serverChannel.socket().bind( new InetSocketAddress(port)); |
| this .serverChannel.configureBlocking( false ); |
| this .serverChannel.register(selector, SelectionKey.OP_ACCEPT); |
2、 然后會啟動該線程,線程的run函數如下:
| final Selector selector = this .selector; |
| Set keys = selector.selectedKeys(); |
| for (SelectionKey key : keys) { |
| if (key.isValid() && key.isAcceptable()) { |
| LOGGER.warn(getName(), e); |
3 、 該線程會一直循環監聽想該selector注冊過的server channel所感興趣的事件(OP_ACCEPT),當有新的連接請求時,selector就會返回,keys就是請求連接的所有的包含channel的key集合。
SelectionKey有如下屬性:
interest集合(使用&操作SelectionKey.OP_ACCEPT和key.interestOps())ready集合(key.readyOps(),可以使用&操作檢測該集合,也可以使用is方法)Channel(key.channel())Selector(key.selector())附加對象(key.attach(obj) Object obj = key.attachment())4、 然后遍歷該集合,如果集合中的key沒有被cancel,并且這個key的channel已經做好接受一個新的socket連接的準備,則接受該連接。
accept()的具體代碼如下:
| SocketChannel channel = null ; |
| channel = serverChannel.accept(); |
| channel.configureBlocking( false ); |
| FrontendConnection c = factory.make(channel); |
| c.setId(ID_GENERATOR.getId()); |
| NIOProcessor processor = nextProcessor(); |
| c.setProcessor(processor); |
| processor.postRegister(c); |
| LOGGER.warn(getName(), e); |
首先從serverchannel中accept后會返回一個socketchannel對象,然后設置該socket channel屬性位非阻塞模式,然后將channel交給ServerConnectionFactory工廠,會產生一個ServerConnection對象。

FrontendConnectionFactory是一個抽象類,其中的getConnection方法是抽象方法,有具體子類連接工廠來實現。FrontendConnectionFactory的make方法對channel中的socket進行屬性設置(接收和發送的緩沖區大小、延時、KeepAlive等),然后調用具體調用具體子類(ServerConnectionFactory)的getConnection來返回一個ServerConnection,返回后會在進行設置一下該ServerConnection的包頭大小、最大包大小、設置連接的發送緩沖區隊列、超時時間、字符編碼,到此,工廠完成了新建連接的工作,返回一個連接的對象。返回后將該連接分配給一個processor,該processor會將該連接保存,processor也會對連接進行定期檢查。
5、 processor還會向自己的reactorR進行注冊該連接,加入reactorR的處理隊列,并喚醒阻塞的select()方法。
反應堆中Reactor的R線程運行代碼:
| final Selector selector = this .selector; |
| int res = selector.select(); |
| LOGGER.debug(reactCount + ">>NIOReactor接受連接數:" + res); |
| Set keys = selector.selectedKeys(); |
| for (SelectionKey key : keys) { |
| Object att = key.attachment(); |
| if (att != null && key.isValid()) { |
| int readyOps = key.readyOps(); |
| if ((readyOps & SelectionKey.OP_READ) != ) { |
| LOGGER.debug( "select讀事件" ); |
| read((NIOConnection) att); |
| } else if ((readyOps & SelectionKey.OP_WRITE) != ) { |
| LOGGER.debug( "select寫事件" ); |
| write((NIOConnection) att); |
該R線程也會一直循環運行,如果向該selector注冊過的channel沒有對應的感興趣的事件發生,就會阻塞,直到有感興趣的事件發生或被wakeup。返回后會運行register函數,將之前加入該reactor連接隊列中的所有連接向該selector注冊OP_READ事件。該注冊的動作會調用Connection對象中的register方法進行注冊
channel.register(selector, SelectionKey.OP_READ, this);
注意最后一個this指針參數,表示將該連接作為附件,注冊到selector,當有感興趣的時間發生時,函數selector.selectedKeys()返回的SelectionKey集合中的對象中使用key.attachment()即可獲取到上面注冊時綁定的connection對象指針附件。目的就是為了通過該附件對象調用該連接類中定義的read函數來完成功能。如下所示:
| private void read(NIOConnection c) { |
| c.error(ErrorCode.ERR_READ, e); |
6、 連接類中定義的read函數定義在AbstractConnection類中。在該read函數
(該read函數涉及到的邏輯比較復雜,先不深究)中,完成從channel中讀取數據到buffer,然后從buffer中提取byte數據交給具體子類(FrontendConnection)的handle()方法進行處理。
7、 該方法會從processor的線程池中獲取一個線程,來異步執行數據的處理。處理會調用成員handler的handle方法來對數據進行處理。這里,在FrontendConnection的構造函數中定handler設置為FrontendAuthenticator(進行前端認證)。
| public void handle( final byte [] data) { |
| processor.getHandler().execute( new Runnable() { |
| error(ErrorCode.ERR_HANDLE_DATA, t); |
8、 handler在構造函數中初始化成前端認證處理器,用于處理前端權限認證。
| public FrontendConnection(SocketChannel channel) { |
| this .handler = new FrontendAuthenticator( this ); |
9、 由于Cobar是基于MySQL協議的,所以需要分析一下MySQL協議的具體格式。下面就先分析一下MySQL認證數據包的格式:
每個報文都分為消息頭和消息體兩部分,其中消息頭是固定的四個字節,報文結構如下:

登錄認證報文的報文數據部分格式如下:

10、 FrontendAuthenticator類對上面的數據包的具體處理如下:
讀取信息到認證包對象核對用戶核對密碼檢查schema如果出現錯誤,會提示相應的錯誤信息,如果正確會向客戶端發送認證成功提示。
| public void handle( byte [] data) { |
| if (data.length == QuitPacket.QUIT.length && data[ ] == MySQLPacket.COM_QUIT) { |
| AuthPacket auth = new AuthPacket(); |
| if (!checkUser(auth.user, source.getHost())) { |
| failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "'" ); |
| if (!checkPassword(auth.password, auth.user)) { |
| failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "'" ); |
| switch (checkSchema(auth.database, auth.user)) { |
| case ErrorCode.ER_BAD_DB_ERROR: |
| failure(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'" ); |
| case ErrorCode.ER_DBACCESS_DENIED_ERROR: |
| String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'" ; |
| failure(ErrorCode.ER_DBACCESS_DENIED_ERROR, s); |
在上面的auth.read函數中會按9中的協議格式進行讀取數據到auth對象。認證成功后會執行:
| protected void success(AuthPacket auth) { |
| source.setAuthenticated( true ); |
| source.setUser(auth.user); |
| source.setSchema(auth.database); |
| source.setCharsetIndex(auth.charsetIndex); |
| source.setHandler( new FrontendCommandHandler(source)); |
| ByteBuffer buffer = source.allocate(); |
| source.write(source.writeToBuffer(AUTH_OK, buffer)); |
可以看到,在上面的函數中,設置連接對象source中的成員(是否認證、用戶、數據庫、編碼、處理該連接后續數據包的處理器【handle方法】)
然后回復認證成功的消息。后面客戶端再發送消息,會交給前端命令處理器進行處理。
客戶端進行鏈接的時候Cobar服務器的輸出:
| 16:59:19,388 INFO=============================================== |
| 16:59:19,389 INFOCobar is ready to startup ... |
| 16:59:19,389 INFOStartup processors ... |
| 16:59:19,455 INFOStartup connector ... |
| 16:59:19,460 INFOInitialize dataNodes ... |
| 16:59:19,506 INFOdnTest1:0 init success |
| 16:59:19,514 INFOdnTest3:0 init success |
| 16:59:19,517 INFOdnTest2:0 init success |
| 16:59:19,527 INFOCobarServer is started and listening on 8066 |
| 16:59:19,527 INFO=============================================== |
| 16:59:23,459 DEBUG 1>>NIOReactor接受連接數:0 |
| 16:59:23,464 DEBUG 2>>NIOReactor接受連接數:1 |
| 16:59:23,465 DEBUG select讀事件 |
| 16:59:23,465 INFOcom.alibaba.cobar.net.handler.FrontendAuthenticator接收的請求長度:62 |
| 16:59:23,468 INFO[thread=Processor1-H0,class=ServerConnection,host=192.168.137.8,port=46101,schema=null]'root' login success |
客戶端得到的回復:
| yan@yan-Z400:~$ mysql -uroot -p** -P8066 -h192.168.137.8 |
| Welcome to the MySQL monitor.Commands end with ; or /g. |
| Your MySQL connection id is 1 |
| Server version: 5.1.48-cobar-1.2.7 Cobar Server (ALIBABA) |
| Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved. |
| Oracle is a registered trademark of Oracle Corporation and/or its |
| affiliates. Other names may be trademarks of their respective |
| Type 'help;' or '/h' for help. Type '/c' to clear the current input statement. |
MySQL客戶端的命令處理,具體后續會分析。
作者:GeekCome出處:極客來
原文:分布式數據庫中間件–(2) Cobar與客戶端的握手認證
提示:本文版權歸作者,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接。
如果對文章有任何問題,都可以在評論中留言,我會盡可能的答復您,謝謝你的閱讀
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com