• <fieldset id="8imwq"><menu id="8imwq"></menu></fieldset>
  • <bdo id="8imwq"><input id="8imwq"></input></bdo>
    最新文章專題視頻專題問答1問答10問答100問答1000問答2000關鍵字專題1關鍵字專題50關鍵字專題500關鍵字專題1500TAG最新視頻文章推薦1 推薦3 推薦5 推薦7 推薦9 推薦11 推薦13 推薦15 推薦17 推薦19 推薦21 推薦23 推薦25 推薦27 推薦29 推薦31 推薦33 推薦35 推薦37視頻文章20視頻文章30視頻文章40視頻文章50視頻文章60 視頻文章70視頻文章80視頻文章90視頻文章100視頻文章120視頻文章140 視頻2關鍵字專題關鍵字專題tag2tag3文章專題文章專題2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章專題3
    問答文章1 問答文章501 問答文章1001 問答文章1501 問答文章2001 問答文章2501 問答文章3001 問答文章3501 問答文章4001 問答文章4501 問答文章5001 問答文章5501 問答文章6001 問答文章6501 問答文章7001 問答文章7501 問答文章8001 問答文章8501 問答文章9001 問答文章9501
    當前位置: 首頁 - 科技 - 知識百科 - 正文

    Impala源代碼分析(1)-Impala架構和RPC

    來源:懂視網 責編:小采 時間:2020-11-09 13:24:14
    文檔

    Impala源代碼分析(1)-Impala架構和RPC

    Impala源代碼分析(1)-Impala架構和RPC:Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrif
    推薦度:
    導讀Impala源代碼分析(1)-Impala架構和RPC:Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrif

    Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrift的client,連接到impala

    Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。

    Client?: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrift的client,連接到impalad的21000端口。

    Impalad: 分為frontend和backend兩部分,這個進程有三個ThriftServer(beeswax_server, hs2_server, be_server)對系統外和系統內提供服務。

    Statestored: 集群內各個backend service的數據交換中心,每個backend會在statestored注冊,以后statestored會與所有注冊過的backend交換update消息。

    RPC

    Component Service Port Access Requirement Comment
    ImpalaDaemon Impala Daemon Backend Port 22000 Internal ImpalaBackendService export
    Impala Daemon Frontend Port 21000 External ImpalaService export
    Impala Daemon HTTP Server Port 25000 External Impala debug web server
    StateStoreSubscriber Service Port 23000 Internal StateStoreSubscriberService
    ?ImpalaStateStore Daemon StateStore HTTP Server Port 25010 External StateStore debug web server
    StateStore Service Port 24000 Internal StateStoreService export

    下面介紹三個組件之間的Thrift RPC(“<->”前面的表示RPC client,“<->”后面的表示RPC server)

    (1)Client <-> impalad(frontend)

    BeeswaxService(beeswax.thrift): client通過query()提交SQL請求,然后異步調用get_state()監聽該SQL的查詢進度,一旦完成,調用fetch()取回結果。

    TCLIService(cli_service.thrift): client提交SQL請求,功能和上面類似,更豐富的就是對DDL操作的支持,例如GetTables()返回指定table的元數據。

    ImpalaService和ImpalaHiveServer2Service(ImpalaService.thrift)分別是上面兩個類的子類,各自豐富了點功能而已,核心功能沒啥大變化。

    (2)Impalad(backend) <-> statestored

    StateStoreService(StateStoreService.thrift): statestored保存整個系統所有backend service狀態的全局數據庫,這里是個單節點中央數據交換中心(該節點保存的狀態是soft state,一旦宕機,保存的狀態信息就沒了)。例如每個impala backend啟動的時候會調用StateStoreService.RegisterService()向statestored注冊自己(其實是通過跟這個backend service捆綁在一起的StateStoreSubscriber標識的),然后再調用StateStoreService.RegisterSubscription()表明這個StateStoreSubscriber接收來自statestored的update。

    (3)Statestord <-> impalad(backend)

    StateStoreSubscriberService(StateStoreSubscriberService.thrift): backend向statestored調用RegisterSubscription之后,statestored就會定期向backend這邊捆綁的StateStoreSubscriber發送該backend的狀態更新信息。然后backend這邊調用StateStoreSubscriberService.UpdateState()更新相關狀態。同時這個UpdateState()調用在impalad backend/StateStoreSubscriber這端還會返回該backend的一些update信息給statestored。

    (4)Impalad(backend) <-> other impalad(backend) (這兩個是互為client/server的)

    ImpalaInternalService(ImpalaInternalService.thrift):某個backend的coordinator要向其他backend的execute engine發送執行某個plan fragment的請求(提交ExecPlanFragment并要求返回ReportExecStatus)。這部分功能會在backend分析中詳細討論。

    (5)Impalad backend <-> other frontend

    ImpalaPlanService(ImpalaPlanService.thrift):可以由其他形式的frontend生成TExecRequest然后交給backend執行。

    另外,Impala frontend是用Java寫的,而backend使用C++寫的。Frontend負責把輸入的SQL解析,然后生成執行計劃,之后通過Thrift的序列化/反序列化的方式傳給backend。TExecRequest(frontend.thrift)是中間傳輸的數據結構,表示了一個Query/DML/DDL的查詢請求,也是SQL執行過程中在frontend和backend之間的數據接口。所以我們可以把impala-frontend換掉,用其他的形式拼湊出這個TExecRequest就可以傳給backend執行,這也就是前面說的ImpalaPlanService干的事。

    impala組件執行流程

    1, impala-shell

    client就可以通過Beeswax和HiveServer2的Thrift API向Impala提交query。這兩種訪問接口的作用是一樣的(都是用于client提交query,返回query result)。

    Impala_shell.py是通過Beeswax方式訪問impala的,下面我們看看impala_shell.py是怎么向impalad提交query的。

    (1)通過OptionParser()解析命令行參數。如果參數中有—query或者—query_file,則執行execute_queries_non_interactive_mode(options),這是非交互查詢(也就是就查詢一個SQL或者一個寫滿SQL的文件);否則進入ImpalaShell.cmdloop (intro)循環。

    (2)進入命令行循環后,一般是先connect某一個impalad,輸入”connect localhost:21000”,進入do_connect(self, args)函數。這個函數根據用戶指定的host和port,生成與相應的impalad的socket連接。最重要的就是這行代碼:

    self.imp_service = ImpalaService.Client(protocol)

    至此imp_service就是client端的代理了,所有請求都通過它提交。

    (3)下面以select命令為例說明,如果client輸入這樣的命令”select col1, col2 from tbl”,則進入do_select(self, args)函數。在這個函數里首先生成BeeswaxService.Query對象,向這個對象填充query statement和configuration。然后進入__query_with_result()函數通過imp_service.query(query)提交query。注意ImpalaService都是異步的,提交之后返回一個QueryHandle,然后就是在一個while循環里不斷__get_query_state()查詢狀態。如果發現這個SQL的狀態是FINISHED,那么就通過fetch() RPC獲取結果。

    2, statestored

    Statestored進程對外提供StateStoreService RPC服務,而StateStoreSubscriberService RPC服務是在impalad進程中提供的。StateStoreService這個RPC的邏輯實現是在StateStore這個類里面實現的。

    Statestored收到backend發送的RegisterService RPC請求時,調用StateStore::RegisterService()處理,主要做兩件事:

    (1)根據TRegisterServiceRequest提供的service_id把該service加入StateStore.service_instances_。

    通常在整個impala集群只存在名為“impala_backend_service”這一個服務,所以service_id=”impala_backend_service”。而每個backend捆綁的是不一樣的,所以就形成了service和backend一對多的關系,這個關系存儲在StateStore.service_instances_組。

    (2)Impalad backend在向statestored RegisterService的時候,會把subscriber_address發送過去。在statestored端,會根據這個subscriber_address生成對應的Subscriber對象(表示與該Subscriber捆綁的backend)。把與該backend綁定的Subscriber加入StateStore.subscribers_這個map里。每個Subscriber有個唯一的id,這樣分布在集群內的impala backend就有了全局唯一id了。

    這樣如果以后某個backend/StateStoreSubscriber fail或者其中運行的SQL任務出了問題,在statestored這里就會有體現了,那么就會通知給其他相關的backend。

    那么每個backend是怎么update的呢?StateStore::UpdateLoop()負責定期向各個backend推送其所訂閱的service的所有成員的更新,目前的更新策略是全量更新,未來會考慮增量更新。

    3, impalad

    Impalad進程的服務被wrapper在ImpalaServer這個類中。ImpalaServer包括fe和be的功能,實現了ImpalaService(Beeswax), ImpalaHiveServer2Service(HiveServer2)和ImpalaInternelService API。

    全局函數CreateImpalaServer()創建了一個ImpalaServer其中包含了多個ThriftServer:

    (1)創建一個名為beeswax_server的ThriftServer對系統外提供ImpalaService(Beeswax)服務,主要服務于Query查詢,是fe/frontend的核心服務,端口21000

    (2)創建一個名為hs2_server的ThriftServer對系統外提供ImpalaHiveServer2Service服務,提供Query, DML, DDL相關操作,端口21050

    (3)創建一個名為be_server的ThriftServer對系統內其他impalad提供ImpalaInternalService,端口22000

    (4)創建ImpalaServer對象,前面三個ThriftServer的TProcessor被賦值這個ImpalaServer對象,所以對前面三個Thrift服務的RPC請求都交由這個ImpalaServer對象處理。最典型的例子就是我們通過Beeswax接口提交了一個BeeswaxService.query()請求,在impalad端的處理邏輯是由void ImpalaServer::query(QueryHandle& query_handle, const Query& query)這個函數(在impala-beeswax-server.cc中實現)完成的。

    下面是impalad-main.cc的主函數:

    int main(int argc, char** argv) {
     //參數解析,開啟日志(基于Google gflags和glog)
     InitDaemon(argc, argv);
     LlvmCodeGen::InitializeLlvm();
     // Enable Kerberos security if requested.
     if (!FLAGS_principal.empty()) {
     EXIT_IF_ERROR(InitKerberos("Impalad"));
     }
     //因為frontend, HBase等相關組件是由Java開發的,所以下面這幾行都是初始化JNI相關的reference和method id
     JniUtil::InitLibhdfs();
     EXIT_IF_ERROR(JniUtil::Init());
     EXIT_IF_ERROR(HBaseTableScanner::Init());
     EXIT_IF_ERROR(HBaseTableCache::Init());
     InitFeSupport();
     //ExecEnv類是impalad backend上Query/PlanFragment的執行環境。
     //生成SubscriptionManager, SimpleScheduler和各種Cache
     ExecEnv exec_env;
     //生成Beeswax, hive-server2和backend三種ThriftServer用于接收client請求,不過這三種服務的后端真正的處理邏輯都是ImpalaServer* server這個對象。
     ThriftServer* beeswax_server = NULL;
     ThriftServer* hs2_server = NULL;
     ThriftServer* be_server = NULL;
     ImpalaServer* server =
     CreateImpalaServer(&exec_env, FLAGS_fe_port, FLAGS_hs2_port, FLAGS_be_port,
     &beeswax_server, &hs2_server, &be_server);
     //因為be_server是對系統內提供服務的,先啟動它。
     be_server->Start();
     //這里面關鍵是啟動了SubscriptionManager和Scheduler
     Status status = exec_env.StartServices();
     if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting";
     ShutdownLogging();
     exit(1);
     }
     // register be service *after* starting the be server thread and after starting
     // the subscription mgr handler thread
     scoped_ptr cb;
     if (FLAGS_use_statestore) {
     THostPort host_port;
     host_port.port = FLAGS_be_port;
     host_port.ipaddress = FLAGS_ipaddress;
     host_port.hostname = FLAGS_hostname;
     //注冊這個be服務到statestored,整個集群里所有的be服務組成一個group,這樣以后來了Query請求就可以在各個backend之間dispatch了。
     Status status =
     exec_env.subscription_mgr()->RegisterService(IMPALA_SERVICE_ID, host_port);
     unordered_set services;
     services.insert(IMPALA_SERVICE_ID);
     //注冊callback函數,每當StateStoreSubscriber接收到來自statestored的update之后調用該函數。
     cb.reset(new SubscriptionManager::UpdateCallback(
     bind(mem_fn(&ImpalaServer::MembershipCallback), server, _1)));
     exec_env.subscription_mgr()->RegisterSubscription(services, "impala.server",
     cb.get());
     if (!status.ok()) {
     LOG(ERROR) << "Could not register with state store service: "
     << status.GetErrorMsg(); ShutdownLogging(); exit(1); } } // this blocks until the beeswax and hs2 servers terminate //前面對內服務的be_server已經成功啟動,下面啟動對外服務的beeswax_server和hs2_server beeswax_server->Start();
     hs2_server->Start();
     beeswax_server->Join();
     hs2_server->Join();
     delete be_server;
     delete beeswax_server;
     delete hs2_server;
    }
    

    exec_env.StartServices()調用SubscriptionManager.Start(),進一步調用StateStoreSubscriber.Start()啟動一個ThriftServer。

    StateStoreSubscriber實現了StateStoreSubscriberService(StateStoreSubscriberService.thrift中定義),用于接收來自statestored的update,并把與這個StateStoreSubscriber捆綁的backend的update反饋給statestored。這樣這個backend就可以對其他backend可見,這樣就可以接受其他impala backend發來的任務更新了(當然,接收backend更新是通過statestored中轉的)。

    參考文獻:

    http://www.sizeofvoid.net/wp-content/uploads/ImpalaIntroduction2.pdf

    聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com

    文檔

    Impala源代碼分析(1)-Impala架構和RPC

    Impala源代碼分析(1)-Impala架構和RPC:Impala總共分為3個組件:impalad, statestored, client/impala-shell。關于這三個組件的基本功能在這篇文章中已經介紹過了。 Client: 可以是Python CLI(官方提供的impala_shell.py),JDBC/ODBC或者Hue。無論哪個其實就是一個Thrif
    推薦度:
    標簽: 源代碼 分析 rpc
    • 熱門焦點

    最新推薦

    猜你喜歡

    熱門推薦

    專題
    Top
    主站蜘蛛池模板: 国产成人精品免费午夜app| 欧美日激情日韩精品| 国产精品一区二区不卡| 亚洲成人国产精品| 国产精品V亚洲精品V日韩精品| 精品亚洲aⅴ在线观看| 欧美精品一区二区在线精品| 秋霞午夜鲁丝片午夜精品久| 久久精品国产亚洲av麻豆小说 | 在线观看国产精品日韩av| 国产成人精品久久亚洲高清不卡 | 国产精品无码a∨精品| 午夜DY888国产精品影院| 欧美日韩精品系列一区二区三区国产一区二区精品 | 国内精品久久久久久久亚洲| 精品久久久久久国产| 经典国产乱子伦精品视频| 亚洲国产精品18久久久久久| 日本精品久久久久久久久免费| 国产精品夜色视频一级区| 一区二区三区精品| 华人在线精品免费观看| 99久久99久久久精品齐齐| 久久99精品国产自在现线小黄鸭 | 国产精品小视频免费无限app| 久久精品国产亚洲综合色| 999成人精品视频在线| 成人区人妻精品一区二区不卡视频 | 99国产精品久久久久久久成人热| 无码国产精品一区二区免费模式| 亚洲精品国产日韩无码AV永久免费网| 久久久久九国产精品| 国产综合成人色产三级高清在线精品发布 | 国产99视频精品免费专区| 国产精品亚洲一区二区三区在线| 人人妻人人澡人人爽欧美精品| 香蕉久久夜色精品升级完成| 亚洲国产精品VA在线观看麻豆| 无码人妻精品一区二区三区久久久 | 奇米精品视频一区二区三区| 久久精品人成免费|