案例:將mysql中新增的數據實時同步到Hive中。
以上案例需要用到的處理器有:“CaptureChangeMySQL”、“RouteOnAttribute”、“EvaluateJsonPath”、“ReplaceText”、“PutHiveQL”。
首先通過“CaptureChangeMySQL”讀取MySQL中數據的變化(需要開啟MySQL binlog日志),將Binlog中變化的數據同步到“RouteOnAttribute”處理器,通過此處理器獲取上游數據屬性,獲取對應binlog操作類型,再將想要處理的數據路由到“EvaluateJsonPath”處理器,該處理器可以將json格式的binlog數據解析,通過自定義json 表達式獲取json數據中的屬性放入FlowFile屬性,將FlowFile通過“ReplaceText”處理器獲取上游FowFile屬性,動態拼接sql替換所有的FlowFile內容,將拼接好的sql組成FlowFile路由到“PutHiveQL”將數據寫入到Hive表。
(資料圖片)
mysql-binlog是MySQL數據庫的二進制日志,記錄了所有的DDL和DML(除了數據查詢語句)語句信息。一般來說開啟二進制日志大概會有1%的性能損耗。這里需要開啟MySQL的binlog日志方便后期使用“CaptureChangeMySQL”處理器來獲取MySQL中的CDC事件。MySQL的版本最好是5.7版本之上。
[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";
在/etc/my.cnf文件中[mysqld]下寫入以下內容:
[mysqld]#隨機指定一個不能和其他集群中機器重名的字符串server-id=123#配置binlog日志目錄,配置后會自動開啟binlog日志,并寫入該目錄log-bin=/var/lib/mysql/mysql-bin
[root@node2 ~]# service mysqld restart[root@node2 ~]# mysql -u root -p123456mysql> show variables like "log_%";
“CaptureChangeMySQL”主要是從MySQL數據庫捕獲CDC(Change Data Capture)事件。CDC事件包括INSERT,UPDATE,DELETE操作,事件按操作發生時的順序輸出為單獨的FlowFile文件。
關于“CaptureChangeMySQL”處理器的“Properties”主要配置的說明如下:
配置項 | 默認值 | 允許值 | 描述 |
---|---|---|---|
MySQL Hosts(MySQL 節點) | MySQL集群節點相對應的主機名/端口項的列表。多個節點使用逗號分隔,格式為:host1:port、host2:port…,處理器將嘗試按順序連接到列表中的主機。如果一個節點關閉,并且群集啟用了故障轉移,那么處理器將連接到活動節點。 | ||
MySQL Driver Class Name(MySQL驅動名稱) | com.mysql.jdbc.Driver | MySQL數據庫驅動程序類的類名。 | |
MySQL Driver Location(s)(MySQL驅動的位置) | 包含MySQL驅動程序包及其依賴項的文件/文件夾和/或url的逗號分隔列表(如果有),例如"/var/tmp/mysql-connector-java-5.1.38-bin.jar文件"。 | ||
Username(用戶名) | 訪問MySQL集群的用戶名。 | ||
Password(密碼) | 訪問MySQL集群的密碼。 | ||
Database/Schema Name Pattern(匹配數據庫/Schema) | 用于根據CDC事件列表匹配數據庫(或模式,具體取決于RDBMS類型)的正則表達式。正則表達式必須與存儲在RDBMS中的數據庫名稱匹配。如果未設置屬性,則數據庫名稱將不會用于篩選CDC事件。 | ||
Table Name Pattern(匹配表) | 用于匹配影響匹配表的CDC事件的正則表達式(regex)。regex必須與存儲在數據庫中的表名匹配。如果未設置屬性,則不會根據表名篩選任何事件。 | ||
Max Wait Time(最大連接等待時長) | 30 seconds | 允許建立連接的最長時間,零表示實際上沒有限制。 | |
Distributed Map Cache Client(分布式緩存客戶端) | 指定用于保存處理器所需的各種表、列等信息的分布式映射緩存客戶端控制器服務。如果未指定,則生成的事件將不包括列類型或名稱等信息。 | ||
Retrieve All Records(檢索所有記錄) | true | ?true?false | 指定是否獲取所有可用的CDC事件,而不考慮當前的binlog文件名或位置。如果處理器狀態中存在binlog文件名和位置值,則忽略此屬性的值。這允許4種不同的配置:1).如果處理器State中存在binlog數據,則State用來確定開始位置,并忽略Retrieve All Records的值。(目前NiFi版本測試有問題)2).如果處理器State中不存在binlog數據,此值設置為true意味著從頭開始讀取Binlog 數據。3).如果處理器State中不存在binlog數據,并且沒有指定binlog文件名和位置,此值設置為false意味著從binlog尾部開始讀取數據。4).如果處理器State中不存在binlog數據,并指定binlog文件名和位置,此值設置為false意味著從指定binlog尾部開始讀取數據。 |
Include Begin/Commit Events(包含開始/提交事件) | false | ?true?false | 指定是否發出與二進制日志中的開始或提交事件相對應的事件。如果下游流中需要開始/提交事件,則設置為true,否則設置為false,這將抑制這些事件的生成并可以提高流性能。 |
Include DDL Events(標準表/列名) | false | ?true?false | 指定是否發出與數據定義語言(DDL)事件對應的事件,如ALTER TABLE、TRUNCATE TABLE。如果下游流中需要DDL事件,則設置為true,否則設置為false。為false時這將抑制這些事件的生成,并可以提高流性能。 |
配置步驟如下:
監控mysql變化需要設置“DistributedMapCacheClient”控制服務,其對應的Server中存儲處理器所需的各種表、列等信息,所以這里需要首先配置“DistributeMapCacheServer”控制服務。
?
?
由于這里使用“CaptureChangeMySQL”處理器監控“MySQL”中的數據,所以設置調度訪問周期為“10s”,防止一直監聽MySQL binlog數據,帶來性能消耗。
?
在“CaptureChangeMySQL”處理器中配置“PROPERTIES”,配置如下:
MySQL Host : 192.168.179.5:3306MySQL Driver Class Name:com.mysql.jdbc.DriverMySQL Driver Location(s):/root/test/mysql-connector-java-5.1.47.jar注意:這里需要在每臺NiFi節點上創建對應目錄,上傳mysql驅動包。
“PROPERTIES”配置如下:
此外,在“PROPERTIES”中還需要配置“Distributed Map Cache Client”控制服務,來讀取“DistributeMapCacheServer”控制服務中的緩存數據:
?
另外,這里我們只是監控表“test2”對應的CDC事件,這里設置匹配表名為“test2”,最終“PROPERTIES”的配置如下:
注意:以上“Table Name Pattern”這里配置對應的Value值為:test2,也可以不配置,不配置會監控所有MySQL表的變化對應的binlog事件。當后面向Hive表中插入新增和更新數據時,對應MySQL中的元數據表也會變化,也會監控到對應的binlog事件。為了避免后期出現監控到其他表的binlog日志,這里建議配置上“test2”。
登錄mysql ,使用“mynifi”庫,創建表“test2”。暫時設置“CaptureChangeMySQL”處理器“success”事件自動終止并啟動,向表中插入對應的數據查看“CaptureChangeMySQL”處理器能否正常監控事件。
在mysql中創建對應的表:
use mynifi;create table test2 (id int,name varchar(255),age int);
啟動“CaptureChangeMySQL”處理器:
向表“test2”中插入以下數據:
insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;
可以在“CaptureChangeMySQL”處理器中右鍵“View data provenance”查看捕獲到的“insert”、“update”、“delete”事件:
注意問題:在配置好“CaptureChangeMySQL”處理器啟動后,當MySQL中有數據插入、修改、刪除時當前處理器會讀取MySql binlog日志,并在當前處理器中記錄讀取binlog的位置狀態。正常來說這里關閉“CaptureChangeMySQL”處理器后再次啟動,會接著保存的binlog位置繼續讀取(可以參照“PROPERTIES”屬性中“Retrieve All Records”配置說明),但是經過測試,此NiFi版本出現以下錯誤(無效的binlog位置,目測是一個版本bug錯誤):
所以在之后的測試中,我們可以將“CaptureChangeMysql”處理器讀取binlog的狀態清空,然后再次啟動即可,這里會重復讀取MySQL之前已經檢測到的新增、修改、刪除數據。
清空“CaptureChangeMysql”讀取binlog狀態:
“RouteOnAttribute”是根據FlowFile的屬性使用屬性表達式進行數據路由。
關于“RouteOnAttribute”處理器的“Properties”主要配置的說明如下:
配置項 | 默認值 | 描述 |
---|---|---|
Routing Strategy(路由策略) | Route to Property name | 指定在計算表達式語言時如何使用哪個關系。有如下幾個關系可選擇:?Route to Property nameFlowFile的副本將被路由到對應的表達式計算結果為"true"的每個關系。?Route to "matched" if all match要求所有用戶定義的表達式求值都為"true",才認為FlowFile是匹配的。?Route to "matched" if any matches至少有一個用戶定義的表達式求值為"true",才能認為FlowFile是匹配的。 |
注意:該處理器允許用戶自定義屬性并指定該屬性的匹配表達式。屬性與動態屬性指定的屬性表達式相匹配的FileFlow,映射到動態屬性上。
配置如下:
注意:以上自定義的屬性中update、insert、delete對應的json 表達式寫法為:${cdc.event.type:equals("delete")},代表匹配對應類型的FlowFile,“cdc.event.type”是上游FlowFile中的屬性,“equales”是對應的方法,“delete”使用單引號引起,表示匹配的CDC事件。
“EvaluatejsonPath”處理器將根據上游“RouteOnAttribute”匹配的事件將內容映射成FlowFile屬性,方便后期拼接SQL獲取數據,上游匹配到的FlowFile中的數據格式為:
EvaluatejsonPath”處理器配置如下:
連接關系中,我們這里只關注“insert”和“update”的數據,后期獲取對應的屬性將插入和更新的數據插入到Hive表中,對于“delete”的數據可以路由到其他關系中,例如需要將刪除數據插入到另外的Hive表中,可以再設置個分支處理。這里我們將“delete”和“failure”的數據設置自動終止關系。
設置“RouteOnAttribute”處理器其他匹配路由關系為自動終止:
“ReplaceText”處理器可以獲取“EvaluatejsonPath”轉換后FlowFile中的屬性來替換原有數據組成一個“insert into ... values (... ...)”語句,方便后續將數據插入到Hive中。“ReplaceText”處理器的配置如下:
在“Replacement Value”中配置“insert into ${tablename} values (${id},"${name}",${age})”
注意:
以上獲取的tablename名稱為“test2”,后面這個sql是要將數據插入到Hive中的,所以這里在Hive中也應該創建“test2”的表名稱,或者將表名稱寫成固定表,后期在Hive中創建對應的表即可。
另外,需要注意${name}在插入Hive中時對應的列為字符串,這里需要加上單引號。
配置“EvaluatjsonPath”處理器“failure”和“unmatch”路由關系為自動終止。
訪問Hive有兩種方式:HiveServer2和Hive Client,Hive Client需要Hive和Hadoop的jar包,配置環境。HiveServer2使得連接Hive的Client從Yarn和HDFS集群中獨立出來,不需要每個幾點都配置Hive和Hadoop的jar包和一系列環境。
NiFi連接Hive就是使用了HiveServer2方式連接,所以這里需要配置HiveServer2。
配置HiveServer2步驟如下:
#在Hive 服務端 $HIVE_HOME/etc/hive-site.xml中配置: hive.server2.thrift.port 10000 hive.server2.thrift.bind.host 192.168.179.4
hadoop.proxyuser.root.hosts * hadoop.proxyuser.root.groups *
nohup hive --service metastore >> ./nohup.out 2>&1 &nohup hive --service hiveserver2 >> ./nohup.out 2>&1 &
[root@node3 test]# beelinebeeline> !connect jdbc:hive2://node1:10000 rootEnter password for jdbc:hive2://node1:10000: 沒有密碼直接跳過即可0: jdbc:hive2://node1:10000> show tables;+------------------------------------+| tab_name |+------------------------------------+| personinfo || test2 |+------------------------------------+
以上配置完成后,還需要將配置好的core-site.xml文件發送到各個NiFi節點對應的路徑/root/test下替換原有的core-site.xml文件。之后重啟NiFi集群,各個NiFi節點上執行命令:
service nifi restart
“PutHiveQL”主要執行HiveQL的DDL/DML命令,傳入給該處理器的FlowFile內容是要執行的HiveQL命令。HiveQL命令可以使用“?”來指定參數,這種情況下,參數必須存在于FlowFile的屬性中,命名約定為hiveql.args.N.type和hiveql.args.N.value,其中N為正整數。
關于“PutHiveQL”處理器的“Properties”主要配置的說明如下:
配置項 | 默認值 | 允許值 | 描述 |
---|---|---|---|
Hive Database Connection Pooling Servic(Hive數據庫連接池服務) | Hive Controller服務,用于獲取與Hive數據庫的連接。 | ||
Batch Size(批次大小) | 100 | 一批次讀取FlowFile的個數。 | |
Character Set(編碼) | UTF-8 | 指定數據的編碼格式。 | |
Statement Delimiter(語句分隔符) | ; | 語句分隔符,用于分隔多個語句腳本中的SQL語句。 | |
Rollback On Failure(失敗時回滾) | false | ?true?false | 指定如何處理錯誤。默認false指的是如果在處理FlowFile時發生錯誤,則FlowFile將根據錯誤類型路由到“failure”或“retry”關系,處理器繼續處理下一個FlowFile。相反,可以設置為true回滾當前已處理的FlowFile,并立即停止進一步的處理。如果設置為true啟用,失敗的FlowFiles將停留在輸入關系中并會反復處理,直到成功處理或通過其他方式將其刪除為止。可以設置足夠大的“Yield Duration”避免重試次數過多。 |
“PutHiveQL”處理器的配置如下:
?
?
點擊之后,配置“HiveConnectionPool”控制服務:
注意以上需要配置:
“Database Connection URL” :這里是Hive的HiveServer2啟動的節點,也就是服務端節點。“jdbc:hive2://192.168.179.4:10000”“Hive Configuration Resources”:“/root/test/hive-site.xml,/root/test/core-site.xml,/root/test/hdfs-site.xml”,這里需要將以上各個文件在NiFi集群各個節點對應位置準備好。“Database User”:root,這里防止操作Hive對應的HDFS時權限問題。配置完成后,需要啟用對應的“HiveConnectionPool”控制服務:
最終配置“PROPERTIES”為:
?
設置“ReplaceText”處理器“failure”路由關系為自動終止:
設置“PutHiveQL”處理器路由關系為自動終止:
?
動HDFS,啟動Hive服務端和客戶端,創建表“test2”
create table test2 (id int,name string,age int )row format delimited fields terminated by "\t";
首先清空“CaptureChangeMySQL”處理器的狀態,單獨啟動“CaptureChangeMySQL”處理器,清空重新消費的數據(以上主要就是避免此版本NiFi bug問題),啟動當前案例中其他NiFi處理器。
然后向MySQL中插入以下數據:
insert into test2 values (1,"zs",18);update test2 set name = "ls" where id = 1;delete from test2 where id = 1;
NiFi頁面:
Hive表test2中的結果:
在實際法律問題情景中,個案情況都有所差異,為了高效解決您的問題,保障合法權益,建議您直接向專業律師說明情況,解決您的實際問題。 立即在線咨詢 >
法律保,中國知名的 法律咨詢網站,能夠為廣大用戶提供在線 免費法律咨詢服務。
CopyRight@2003-2023 falvbao.net.cn ALL Rights Reservrd 版權所有
皖ICP備2022009963號-45
違法和不良信息聯系郵箱:39 60 29 14 2 @qq.com