• 鄭州
        您的位置: 法律包 > 綜合 > 詳情

        環球消息!大數據NiFi(二十):實時同步MySQL數據到Hive

        來源: 騰訊云 2023-02-27 15:20:21

        ?實時同步MySQL數據到Hive

        案例:將mysql中新增的數據實時同步到Hive中。

        以上案例需要用到的處理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。

        首先通過“CaptureChangeMySQL”讀取MySQL中數據的變化(需要開啟MySQL binlog日志),將Binlog中變化的數據同步到“RouteOnAttribute”處理器,通過此處理器獲取上游數據屬性,獲取對應binlog操作類型,再將想要處理的數據路由到“EvaluateJsonPath”處理器,該處理器可以將json格式的binlog數據解析,通過自定義json 表達式獲取json數據中的屬性放入FlowFile屬性,將FlowFile通過“ReplaceText”處理器獲取上游FowFile屬性,動態拼接sql替換所有的FlowFile內容,將拼接好的sql組成FlowFile路由到“PutHiveQL”將數據寫入到Hive表。


        (資料圖片)

        一、開啟MySQL的binlog日志

        mysql-binlog是MySQL數據庫的二進制日志,記錄了所有的DDL和DML(除了數據查詢語句)語句信息。一般來說開啟二進制日志大概會有1%的性能損耗。這里需要開啟MySQL的binlog日志方便后期使用“CaptureChangeMySQL”處理器來獲取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。

        1、登錄mysql查看MySQL是否開啟binlog日志

        [root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";

        2 、開啟mysql binlog日志

        在/etc/my.cnf文件中[mysqld]下寫入以下內容:

        [mysqld]#隨機指定一個不能和其他集群中機器重名的字符串server-id=123#配置binlog日志目錄,配置后會自動開啟binlog日志,并寫入該目錄log-bin=/var/lib/mysql/mysql-bin

        3、重啟mysql 服務,重新查看binlog日志情況

        [root@node2 ~]# service mysqld restart[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";

        二、???????配置“CaptureChangeMySQL”處理器

        “CaptureChangeMySQL”主要是從MySQL數據庫捕獲CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作發生時的順序輸出為單獨的FlowFile文件。

        關于“CaptureChangeMySQL”處理器的“Properties”主要配置的說明如下:

        配置項

        默認值

        允許值

        描述

        MySQL Hosts(MySQL 節點)

        MySQL集群節點相對應的主機名/端口項的列表。多個節點使用逗號分隔,格式為:host1:port、host2:port…,處理器將嘗試按順序連接到列表中的主機。如果一個節點關閉,并且群集啟用了故障轉移,那么處理器將連接到活動節點。

        MySQL Driver Class Name(MySQL驅動名稱)

        com.mysql.jdbc.Driver

        MySQL數據庫驅動程序類的類名。

        MySQL Driver Location(s)(MySQL驅動的位置)

        包含MySQL驅動程序包及其依賴項的文件/文件夾和/或url的逗號分隔列表(如果有),例如"/var/tmp/mysql-connector-java-5.1.38-bin.jar文件"。

        Username(用戶名)

        訪問MySQL集群的用戶名。

        Password(密碼)

        訪問MySQL集群的密碼。

        Database/Schema Name Pattern(匹配數據庫/Schema)

        用于根據CDC事件列表匹配數據庫(或模式,具體取決于RDBMS類型)的正則表達式。正則表達式必須與存儲在RDBMS中的數據庫名稱匹配。如果未設置屬性,則數據庫名稱將不會用于篩選CDC事件。

        Table Name Pattern(匹配表)

        用于匹配影響匹配表的CDC事件的正則表達式(regex)。regex必須與存儲在數據庫中的表名匹配。如果未設置屬性,則不會根據表名篩選任何事件。

        Max Wait Time(最大連接等待時長)

        30 seconds

        允許建立連接的最長時間,零表示實際上沒有限制。

        Distributed Map Cache Client(分布式緩存客戶端)

        指定用于保存處理器所需的各種表、列等信息的分布式映射緩存客戶端控制器服務。如果未指定,則生成的事件將不包括列類型或名稱等信息。

        Retrieve All Records(檢索所有記錄)

        true

        ?true?false

        指定是否獲取所有可用的CDC事件,而不考慮當前的binlog文件名或位置。如果處理器狀態中存在binlog文件名和位置值,則忽略此屬性的值。這允許4種不同的配置:1).如果處理器State中存在binlog數據,則State用來確定開始位置,并忽略Retrieve All Records的值。(目前NiFi版本測試有問題)2).如果處理器State中不存在binlog數據,此值設置為true意味著從頭開始讀取Binlog 數據。3).如果處理器State中不存在binlog數據,并且沒有指定binlog文件名和位置,此值設置為false意味著從binlog尾部開始讀取數據。4).如果處理器State中不存在binlog數據,并指定binlog文件名和位置,此值設置為false意味著從指定binlog尾部開始讀取數據。

        Include Begin/Commit Events(包含開始/提交事件)

        false

        ?true?false

        指定是否發出與二進制日志中的開始或提交事件相對應的事件。如果下游流中需要開始/提交事件,則設置為true,否則設置為false,這將抑制這些事件的生成并可以提高流性能。

        Include DDL Events(標準表/列名)

        false

        ?true?false

        指定是否發出與數據定義語言(DDL)事件對應的事件,如ALTER TABLE、TRUNCATE TABLE。如果下游流中需要DDL事件,則設置為true,否則設置為false。為false時這將抑制這些事件的生成,并可以提高流性能。

        配置步驟如下:

        1、創建“CaptureChangeMySQL”處理器

        2、配置“DistributeMapCacheServer”控制服務

        監控mysql變化需要設置“DistributedMapCacheClient”控制服務,其對應的Server中存儲處理器所需的各種表、列等信息,所以這里需要首先配置“DistributeMapCacheServer”控制服務。

        ?

        ?

        3、配置“SCHEDULING”

        由于這里使用“CaptureChangeMySQL”處理器監控“MySQL”中的數據,所以設置調度訪問周期為“10s”,防止一直監聽MySQL binlog數據,帶來性能消耗。

        ?

        4、配置“PROPERTIES”

        在“CaptureChangeMySQL”處理器中配置“PROPERTIES”,配置如下:

        MySQL Host : 192.168.179.5:3306MySQL Driver Class Name:com.mysql.jdbc.DriverMySQL Driver Location(s):/root/test/mysql-connector-java-5.1.47.jar

        注意:這里需要在每臺NiFi節點上創建對應目錄,上傳mysql驅動包。

        “PROPERTIES”配置如下:

        此外,在“PROPERTIES”中還需要配置“Distributed Map Cache Client”控制服務,來讀取“DistributeMapCacheServer”控制服務中的緩存數據:

        ?

        另外,這里我們只是監控表“test2”對應的CDC事件,這里設置匹配表名為“test2”,最終“PROPERTIES”的配置如下:

        注意:以上“Table Name Pattern”這里配置對應的Value值為:test2,也可以不配置,不配置會監控所有MySQL表的變化對應的binlog事件。當后面向Hive表中插入新增和更新數據時,對應MySQL中的元數據表也會變化,也會監控到對應的binlog事件。為了避免后期出現監控到其他表的binlog日志,這里建議配置上“test2”。

        5、啟動MySQL,創建表“test2”測試“CaptureChangeMySQL”處理器

        登錄mysql ,使用“mynifi”庫,創建表“test2”。暫時設置“CaptureChangeMySQL”處理器“success”事件自動終止并啟動,向表中插入對應的數據查看“CaptureChangeMySQL”處理器能否正常監控事件。

        在mysql中創建對應的表:

        use mynifi;create table test2 (id int,name varchar(255),age int);

        啟動“CaptureChangeMySQL”處理器:

        向表“test2”中插入以下數據:

        insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;

        可以在“CaptureChangeMySQL”處理器中右鍵“View data provenance”查看捕獲到的“insert”、“update”、“delete”事件:

        注意問題:在配置好“CaptureChangeMySQL”處理器啟動后,當MySQL中有數據插入、修改、刪除時當前處理器會讀取MySql binlog日志,并在當前處理器中記錄讀取binlog的位置狀態。正常來說這里關閉“CaptureChangeMySQL”處理器后再次啟動,會接著保存的binlog位置繼續讀取(可以參照“PROPERTIES”屬性中“Retrieve All Records”配置說明),但是經過測試,此NiFi版本出現以下錯誤(無效的binlog位置,目測是一個版本bug錯誤):

        所以在之后的測試中,我們可以將“CaptureChangeMysql”處理器讀取binlog的狀態清空,然后再次啟動即可,這里會重復讀取MySQL之前已經檢測到的新增、修改、刪除數據。

        清空“CaptureChangeMysql”讀取binlog狀態:

        三、??????????????配置“RouteOnAttribute”處理器

        “RouteOnAttribute”是根據FlowFile的屬性使用屬性表達式進行數據路由。

        關于“RouteOnAttribute”處理器的“Properties”主要配置的說明如下:

        配置項

        默認值

        描述

        Routing Strategy(路由策略)

        Route to Property name

        指定在計算表達式語言時如何使用哪個關系。有如下幾個關系可選擇:?Route to Property nameFlowFile的副本將被路由到對應的表達式計算結果為"true"的每個關系。?Route to "matched" if all match要求所有用戶定義的表達式求值都為"true",才認為FlowFile是匹配的。?Route to "matched" if any matches至少有一個用戶定義的表達式求值為"true",才能認為FlowFile是匹配的。

        注意:該處理器允許用戶自定義屬性并指定該屬性的匹配表達式。屬性與動態屬性指定的屬性表達式相匹配的FileFlow,映射到動態屬性上。

        配置如下:

        1、創建“RouteOnAttribute”處理器

        2、配置“PROPERTIES”自定義屬性

        注意:以上自定義的屬性中update、insert、delete對應的json 表達式寫法為:${cdc.event.type:equals("delete")},代表匹配對應類型的FlowFile,“cdc.event.type”是上游FlowFile中的屬性,“equales”是對應的方法,“delete”使用單引號引起,表示匹配的CDC事件。

        3、連接“CaptureChangeMySQL”處理器與“RouteOnAttribute”處理器

        四、配置“EvaluatejsonPath”處理器

        “EvaluatejsonPath”處理器將根據上游“RouteOnAttribute”匹配的事件將內容映射成FlowFile屬性,方便后期拼接SQL獲取數據,上游匹配到的FlowFile中的數據格式為:

        EvaluatejsonPath”處理器配置如下:

        1、配置“EvaluatejsonPath”的“PROPERTIES”屬性

        2、連接“RouteOnAttribute”處理器和“EvaluatejsonPath”處理器

        連接關系中,我們這里只關注“insert”和“update”的數據,后期獲取對應的屬性將插入和更新的數據插入到Hive表中,對于“delete”的數據可以路由到其他關系中,例如需要將刪除數據插入到另外的Hive表中,可以再設置個分支處理。這里我們將“delete”和“failure”的數據設置自動終止關系。

        設置“RouteOnAttribute”處理器其他匹配路由關系為自動終止:

        五、??????????????配置“ReplaceText”處理器

        “ReplaceText”處理器可以獲取“EvaluatejsonPath”轉換后FlowFile中的屬性來替換原有數據組成一個“insert into ... values (... ...)”語句,方便后續將數據插入到Hive中。“ReplaceText”處理器的配置如下:

        1、配置“RelaceText”處理器“PROPERTIES”屬性

        在“Replacement Value”中配置“insert into ${tablename} values (${id},"${name}",${age})”

        注意:

        以上獲取的tablename名稱為“test2”,后面這個sql是要將數據插入到Hive中的,所以這里在Hive中也應該創建“test2”的表名稱,或者將表名稱寫成固定表,后期在Hive中創建對應的表即可。

        另外,需要注意${name}在插入Hive中時對應的列為字符串,這里需要加上單引號。

        2、連接“EvaluatejsonPath”處理器與“ReplaceText”處理器

        配置“EvaluatjsonPath”處理器“failure”和“unmatch”路由關系為自動終止。

        六、??????????????配置Hive 支持HiveServer2

        訪問Hive有兩種方式:HiveServer2和Hive Client,Hive Client需要Hive和Hadoop的jar包,配置環境。HiveServer2使得連接Hive的Client從Yarn和HDFS集群中獨立出來,不需要每個幾點都配置Hive和Hadoop的jar包和一系列環境。

        NiFi連接Hive就是使用了HiveServer2方式連接,所以這里需要配置HiveServer2。

        配置HiveServer2步驟如下:

        1、在Hive服務端配置hive-site.xml

        #在Hive 服務端 $HIVE_HOME/etc/hive-site.xml中配置: hive.server2.thrift.port 10000hive.server2.thrift.bind.host192.168.179.4

        2、在每臺Hadoop 節點配置core-site.xml

             hadoop.proxyuser.root.hosts     *       hadoop.proxyuser.root.groups        * 

        3、重啟HDFS ,Hive ,在Hive服務端啟動Metastore和HiveServer2服務

        nohup hive --service metastore >> ./nohup.out 2>&1 &nohup hive --service hiveserver2 >> ./nohup.out 2>&1 &

        4、在客戶端通過beeline連接Hive

        [root@node3 test]# beelinebeeline> !connect jdbc:hive2://node1:10000 rootEnter password for jdbc:hive2://node1:10000: 沒有密碼直接跳過即可0: jdbc:hive2://node1:10000> show tables;+------------------------------------+|              tab_name              |+------------------------------------+| personinfo                         || test2                              |+------------------------------------+

        以上配置完成后,還需要將配置好的core-site.xml文件發送到各個NiFi節點對應的路徑/root/test下替換原有的core-site.xml文件。之后重啟NiFi集群,各個NiFi節點上執行命令:

        service nifi restart

        七、配置“PutHiveQL”處理器

        “PutHiveQL”主要執行HiveQL的DDL/DML命令,傳入給該處理器的FlowFile內容是要執行的HiveQL命令。HiveQL命令可以使用“?”來指定參數,這種情況下,參數必須存在于FlowFile的屬性中,命名約定為hiveql.args.N.type和hiveql.args.N.value,其中N為正整數。

        關于“PutHiveQL”處理器的“Properties”主要配置的說明如下:

        配置項

        默認值

        允許值

        描述

        Hive Database Connection Pooling Servic(Hive數據庫連接池服務)

        Hive Controller服務,用于獲取與Hive數據庫的連接。

        Batch Size(批次大小)

        100

        一批次讀取FlowFile的個數。

        Character Set(編碼)

        UTF-8

        指定數據的編碼格式。

        Statement Delimiter(語句分隔符)

        ;

        語句分隔符,用于分隔多個語句腳本中的SQL語句。

        Rollback On Failure(失敗時回滾)

        false

        ?true?false

        指定如何處理錯誤。默認false指的是如果在處理FlowFile時發生錯誤,則FlowFile將根據錯誤類型路由到“failure”或“retry”關系,處理器繼續處理下一個FlowFile。相反,可以設置為true回滾當前已處理的FlowFile,并立即停止進一步的處理。如果設置為true啟用,失敗的FlowFiles將停留在輸入關系中并會反復處理,直到成功處理或通過其他方式將其刪除為止。可以設置足夠大的“Yield Duration”避免重試次數過多。

        “PutHiveQL”處理器的配置如下:

        1、創建“PutHiveQL”處理器

        ?

        2、 配置“PROPERTIES”

        ?

        點擊之后,配置“HiveConnectionPool”控制服務:

        注意以上需要配置:

        “Database Connection URL” :這里是Hive的HiveServer2啟動的節點,也就是服務端節點。“jdbc:hive2://192.168.179.4:10000”“Hive Configuration Resources”:“/root/test/hive-site.xml,/root/test/core-site.xml,/root/test/hdfs-site.xml”,這里需要將以上各個文件在NiFi集群各個節點對應位置準備好。“Database User”:root,這里防止操作Hive對應的HDFS時權限問題。

        配置完成后,需要啟用對應的“HiveConnectionPool”控制服務:

        最終配置“PROPERTIES”為:

        3、連接“ReplaceText”處理器與“PutHiveQL”處理器并設置關系

        ?

        設置“ReplaceText”處理器“failure”路由關系為自動終止:

        設置“PutHiveQL”處理器路由關系為自動終止:

        ?

        八、??????????????運行測試

        1、在Hive中創建表“test2”

        動HDFS,啟動Hive服務端和客戶端,創建表“test2”

        create table test2 (id int,name string,age int )row format delimited fields terminated by "\t";

        2、啟動NiFi處理數據流程,向MySQL中寫入數據,查看Hive中表數據

        首先清空“CaptureChangeMySQL”處理器的狀態,單獨啟動“CaptureChangeMySQL”處理器,清空重新消費的數據(以上主要就是避免此版本NiFi bug問題),啟動當前案例中其他NiFi處理器。

        然后向MySQL中插入以下數據:

        insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;

        NiFi頁面:

        Hive表test2中的結果:

        標簽: Hive 云數據庫 Server Java
        溫馨提示:

        在實際法律問題情景中,個案情況都有所差異,為了高效解決您的問題,保障合法權益,建議您直接向專業律師說明情況,解決您的實際問題。 立即在線咨詢 >

        相關知識推薦
        操作
        分享
        15037178970

        公眾服務

        法制網公眾號

        快速找律師 / 免費咨詢

        查法律知識 / 查看解答 / 隨時追問

        律師服務(工作日8:30-18:00 ,非工作日請QQ留言)

        律師加盟

        律師營銷服務

        在線客服:

        加盟熱線:

        律師營銷診斷

        營銷分析 / 回復咨詢

        案件接洽 / 合作加盟

        法律包,中國知名的 法律咨詢網站,能夠為廣大用戶提供在線 免費法律咨詢服務。
        CopyRight@2003-2022 fazhi.net ALL Rights Reservrd 版權所有
        皖ICP備2022009963號-41
        違法和不良信息聯系郵箱:39 60 29 14 2 @qq.com

        亚洲一区二区三区高清不卡| 亚洲欧洲日韩不卡| 国产亚洲精品免费| 日韩亚洲国产综合高清| 亚洲最新中文字幕| 亚洲美女视频网站| 亚洲人成网站在线观看播放| 亚洲午夜福利717| 国产成A人亚洲精V品无码| 亚洲精品无码不卡在线播放HE | 亚洲国产美女精品久久| 亚洲字幕在线观看| 亚洲a级在线观看| 四虎必出精品亚洲高清| 亚洲熟妇少妇任你躁在线观看| 亚洲人成人77777网站不卡| 亚洲一线产区二线产区精华| 亚洲精品一二三区| 亚洲人成色99999在线观看| 亚洲国产精品无码久久98| 日韩精品亚洲专区在线影视| 亚洲成aⅴ人片久青草影院| 亚洲国产成人精品女人久久久| 亚洲人成电影在线播放| 亚洲乱码国产一区三区| 亚洲AV天天做在线观看| 亚洲美女视频一区二区三区| 亚洲免费视频播放| 在线综合亚洲中文精品| 亚洲国产av玩弄放荡人妇| 国产亚洲视频在线观看| 伊人亚洲综合青草青草久热| 国产亚洲av片在线观看16女人 | 亚洲av中文无码字幕色不卡| 亚洲 自拍 另类小说综合图区| 久久久久亚洲AV成人网人人软件 | 亚洲中文字幕人成乱码| 亚洲国产精品成人综合色在线| 国产亚洲漂亮白嫩美女在线| 91麻豆精品国产自产在线观看亚洲 | 亚洲国产欧美日韩精品一区二区三区|