在线播放的A站本免费少妇_亚洲成在人线在线播放无码_伊人大香人妻在线播放_亚洲精华国产精华液_亚洲AV综合色区无码三区诱

每日速讀!大數(shù)據(jù)Flink進階(十四):Flink On Standalone任務(wù)提交

2023-04-09 23:03:04 來源:騰訊云

?Flink On Standalone任務(wù)提交

Flink On Standalone 即Flink任務(wù)運行在Standalone集群中,Standlone集群部署時采用Session模式來構(gòu)建集群,即:首先構(gòu)建一個Flink集群,F(xiàn)link集群資源就固定了,所有提交到該集群的Flink作業(yè)都運行在這一個集群中,如果集群中提交的任務(wù)多資源不夠時,需要手動增加節(jié)點,所以Flink 基于Standalone運行任務(wù)一般用在開發(fā)測試或者企業(yè)實時業(yè)務(wù)較少的場景下。


(資料圖片僅供參考)

Flink On Standalone 任務(wù)提交支持Session會話模式和Application應(yīng)用模式,不支持Per-Job單作業(yè)模式。下面介紹基于Standalone 的Session會話模式和Application應(yīng)用模式任務(wù)提交命令和原理,演示兩類任務(wù)提交模式的代碼還是以上一章節(jié)中讀取Socket 數(shù)據(jù)進行實時WordCount統(tǒng)計代碼為例,代碼如下:

package com.lanson.flinkjava.code.chapter4;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * 讀取Socket數(shù)據(jù)進行實時WordCount統(tǒng)計 */public class SocketWordCount {    public static void main(String[] args) throws Exception {        //1.準備環(huán)境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2.讀取Socket數(shù)據(jù)        DataStreamSource ds = env.socketTextStream("node3", 9999);        //3.準備K,V格式數(shù)據(jù)        SingleOutputStreamOperator> tupleDS = ds.flatMap((String line, Collector> out) -> {            String[] words = line.split(",");            for (String word : words) {                out.collect(Tuple2.of(word, 1));            }        }).returns(Types.TUPLE(Types.STRING, Types.INT));        //4.聚合打印結(jié)果        tupleDS.keyBy(tp -> tp.f0).sum(1).print();        //5.execute觸發(fā)執(zhí)行        env.execute();    }}

將以上代碼進行打包,名稱為"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3節(jié)點上啟動socket服務(wù)(nc -lk 9999)。

一、Standalone Session模式

1、任務(wù)提交命令

在Standalone集群搭建完成后,基于Standalone集群提交Flink任務(wù)方式就是使用的Session模式,提交任務(wù)之前首先啟動Standalone集群($FLINK_HOME/bin/start-cluster.sh),然后再提交任務(wù),Standalone Session模式提交任務(wù)命令如下:

[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./flink run -m node1:8081 -d -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上提交任務(wù)的參數(shù)解釋如下:

參數(shù)

解釋

-m

--jobmanager,指定提交任務(wù)連接的JobManager地址。

-c

--class,指定運行的class主類。

-d

--detached,任務(wù)提交后在后臺獨立運行,退出客戶端,也可不指定。

-p

--parallelism,執(zhí)行程序的并行度。

以上任務(wù)提交完成后,我們可以登錄Flink WebUI(https://node1:8081)查看啟動一個任務(wù):

再次按照以上命令提交Flink任務(wù)可以看到集群中會有2個任務(wù),說明Standalone Session模式下提交的所有Flink任務(wù)共享集群資源,如下:

以上提交Flink流任務(wù)的名稱默認為"Flink Streaming Job",也可以通過參數(shù)"pipeline.name"來自定義指定Job 名稱,提交命令如下:

./flink run -m node1:8081 -d -Dpipeline.name=socket-wc1 -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

提交之后,可以看到頁面中有三個任務(wù),最后一個任務(wù)提交的名稱改成了自定義任務(wù)名稱。

2、任務(wù)提交流程

Standalone Session模式提交任務(wù)中首先需要創(chuàng)建Flink集群,集群創(chuàng)建啟動的同時Dispatcher、JobMaster、ResourceManager對象一并創(chuàng)建、TaskManager也一并啟動,TaskManager會向集群ResourceManager匯報Slot信息,F(xiàn)link集群資源也就確定了。Standalone Session模式提交任務(wù)流程如下:

在客戶端提交Flink任務(wù),客戶端會將任務(wù)轉(zhuǎn)換成JobGraph提交給JobManager。Dispatcher將提交任務(wù)提交給JobMaster。JobMaster向ResourceManager申請Slot資源。ResourceManager會在對應(yīng)的TaskManager上劃分Slot資源。TaskManager向JobMaster offer Slot資源。JobMaster將任務(wù)對應(yīng)的task發(fā)送到TaskManager上執(zhí)行。

二、Standalone Application模式

1、任務(wù)提交命令

Standalone Application模式中不會預(yù)先創(chuàng)建Flink集群,在提交Flink 任務(wù)的同時會創(chuàng)建JobManager,啟動Flink集群,然后需要手動啟動TaskManager連接該Flink集群,啟動的TaskManager會根據(jù)$FLINK_HOME/conf/flink-conf.yaml配置文件中的"jobmanager.rpc.address"配置找JobManager,所以這里選擇在node1節(jié)點上提交任務(wù)并啟動JobManager,方便后續(xù)其他節(jié)點啟動TaskManager后連接該節(jié)點。Standalone Appliction模式提交任務(wù)步驟和命令如下:

1.1、準備Flink jar包

在node1節(jié)點上將Flink 打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包放在 $FLINK_HOME/lib目錄下。

1.2、提交任務(wù),在node1 節(jié)點上啟動 JobManager

cd /software/flink-1.16.0/bin/
#執(zhí)行如下命令,啟動JobManager ./standalone-job.sh start --job-classname com.lanson.flinkjava.code.chapter4.SocketWordCount

執(zhí)行以上命令后會自動從$FLINK_HOME/lib中掃描所有jar包,執(zhí)行指定的入口類。命令執(zhí)行后可以訪問對應(yīng)的Flink WebUI:https://node1:8081,可以看到提交的任務(wù),但是由于還沒有執(zhí)行TaskManager任務(wù)無法執(zhí)行。

1.3、啟動TaskManager

在node1、node2、node3任意一臺節(jié)點上啟動taskManager,根據(jù)$FLINK_HOME/conf/flink-conf.yaml配置文件中"jobmanager.rpc.address"配置項會找到對應(yīng)node1 JobManager。

#在node1節(jié)點上啟動TaskManager[root@node1 ~]# cd /software/flink-1.16.0/bin/[root@node1 bin]# ./taskmanager.sh start#在node2節(jié)點上啟動TaskManager[root@node2 ~]# cd /software/flink-1.16.0/bin/[root@node2 bin]# ./taskmanager.sh start

啟動兩個TaskManager后可以看到Flink WebUI中對應(yīng)的有2個TaskManager,可以根據(jù)自己任務(wù)使用資源的情況,手動啟動多個TaskManager。

1.4、停止集群

#停止啟動的JobManager[root@node1 bin]# ./standalone-job.sh stop#停止啟動的TaskManager[root@node1 bin]# ./taskmanager.sh stop[root@node2 bin]# ./taskmanager.sh stop

我們可以以同樣的方式在其他節(jié)點上以Standalone Application模式提交先的Flink任務(wù),但是每次提交都是當前提交任務(wù)獨享集群資源。

2、任務(wù)提交流程

Standalone Application模式提交任務(wù)中提交任務(wù)的同時會啟動JobManager創(chuàng)建Flink集群,但是需要手動啟動TaskManager,這樣提交的任務(wù)才能正常運行,如果提交的任務(wù)使用資源多,還可以啟動多個TaskManager。Standalone Application模式提交任務(wù)流程如下:

在客戶端提交Flink任務(wù)的同時啟動JobManager,客戶端會將任務(wù)轉(zhuǎn)換成JobGraph提交給JobManager。Dispatcher會啟動JobMaster,Dispatcher將提交任務(wù)提交給JobMaster。JobMaster向ResourceManager申請Slot資源。手動啟動TaskManager,TaskManager會向ResourceManager注冊Slot資源ResourceManager會在對應(yīng)的TaskManager上劃分Slot資源。TaskManager向JobMaster offer Slot資源。JobMaster將任務(wù)對應(yīng)的task發(fā)送到TaskManager上執(zhí)行。

Standalone Application模式任務(wù)提交流程和Standalone Session模式類似,兩者區(qū)別主要是Standalone Session模式中啟動Flink集群時JobManager、TaskManager、JobMaster會預(yù)先啟動;Standalone Application模式中提交任務(wù)時同時啟動集群JobManager、JobMaster,需要手動啟動TaskManager。

?

標簽:

天天要聞:押注黃金!橋水中國去年末8.1億元布局A股三大黃金ETF

2023-04-06 10:25:02

每日關(guān)注!3月份全球制造業(yè)PMI環(huán)比下降 全球經(jīng)濟復蘇動能依然不足

2023-04-06 09:34:11

天天觀熱點:終于等到你!第九屆浙江遂昌湯顯祖文化節(jié)形象宣傳片正式發(fā)布

2023-04-06 08:41:11

環(huán)球消息!加快打造具有杭州特色的招標造價管理體系

2023-04-06 07:31:25

每日簡訊:“老弗士”普及好習慣 醫(yī)生義診送健康

2023-04-06 05:51:35

消息!李小萌談張繼科事件:面對惡意,勇敢說“不”

2023-04-06 00:51:28

黑龍江森林消防總隊開展清明節(jié)防火專項行動,7省多地域同步聯(lián)動|環(huán)球訊息

2023-04-05 21:42:01

全球熱點評!黃鶴樓與什么樓什么樓并稱江南三大名樓_黃鶴樓與什么樓并稱為江南三大名樓簡介介紹

2023-04-05 19:54:44

【時快訊】“安全”的360與“不安”的周鴻祎

2023-04-05 17:53:17

世界焦點!周生生鉑金多少錢一克(2023年04月05日)參考價格

2023-04-05 16:42:37

恩比德:我們似乎找到了所有輸球的辦法,但贏了就是贏了 熱點在線

2023-04-05 14:57:40

日系車銷量集體暴跌!日本經(jīng)濟最后的防線失守!

2023-04-05 13:20:08

重慶到長沙機票查詢_重慶到長沙機票|天天要聞

2023-04-05 11:58:50

媒體人:前外援久爾基奇已收到賠償,成都蓉城可進行新援注冊報名

2023-04-05 10:25:25

世界消息!重奪美國會眾議院?民主黨欲“翻盤”31個共和黨議席

2023-04-05 08:56:30
x 廣告
x 廣告

Copyright @  2015-2022 華中晚報網(wǎng)版權(quán)所有  備案號: 京ICP備12018864號-26   聯(lián)系郵箱:2 913 236 @qq.com