spark和flink是当今最主流的两款计算引擎,探究在数据集、算子、应用程序开发流程、并行运行参数等使用层面的一些共性,能引发我们对计算引擎一些通用机制的思考,从而加深对计算引擎一些设计共性的抽象理解。
数据集的共性与类比
类比维度 |
spark |
flink |
实现类 |
Rdd、Dataset |
DataStream、DataSet |
是否支持分区分片 |
支持 |
支持 |
是否支持副本复制 |
支持 |
支持 |
数据集方面,spark有数据集Rdd,flink有数据集DataStream,他们的共性都为分布式数据集,都有分片和副本的概念,分片是为了能并行处理,副本是为了高可用。
算子的共性与类比
算子分类 |
spark算子 |
flink算子 |
一对一算子 |
Rdd.map(Function)、Rdd.mapToPair(PairFunction)、Rdd.filter(Function)、 |
DataStream.map(RichMapFunction)、DataStream.filter(RichFilterFunction)、DataStream.process(ProcessFunction)、KeyedStream.process(KeyedProcessFunction) |
一对多算子 |
Rdd.flatMap(FlatMapFunction) |
DataStream.flatMap(RichFlatMapFunction) |
多对一算子 |
Rdd.reduce(Function) |
DataStream.reduce(RichReduceFunction) |
多对多算子 |
Rdd.mapPartitions(FlatMapFunction) |
WindowedStream.apply(WindowFunction)、WindowedStream.process(ProcessWindowFunction)、AllWindowedStream.apply(AllWindowFunction)、AllWindowedStream.process(ProcessAllWindowFunction |
洗牌算子 |
Rdd.groupBy(Function) |
DataStream.KeyBy(KeySelector)、DataStream.partitionCustom(Partitioner) |
集合类算子 |
PairRdd.join(PairRdd)、PairRdd.leftOuterJoin(PairRdd)、PairRdd.rightOuterJoin(PairRdd)、PairRdd.intersection(PairRdd) 、PairRdd.union(PairRdd)、PairRdd.subtract(PairRdd) |
DataStream.union(DataStream)、DataStream.join(DataStream)、DataStream.coGroup(DataStream) |
如上表总结所示,spark和flink都存在以上类别的算子:
- 一对一算子:算子把输入的一个元素,经过计算,输出一个元素。
- 一对多算子:算子把输入的一个元素,经过计算,输出多个元素。
- 多对一算子:算子把输入的多个元素,经过计算,输出一个元素。
- 多对多算子:算子把输入的多个元素,经过计算,也输出多个元素。
- 洗牌算子:把一个有分区属性的数据集,经过洗牌运算,依据设定的规则,把数据集进行重新分区。
- 集合类算子:并集运算、交集运算、关联运算、笛卡尔积运算等集合相关的运算。
应用程序开发流程的共性与类比
spark和flink应用程序开发,都存在以下基本步骤:
(1)创建执行环境。
(2)把上下文参数传入执行环境。
(3)为执行环境设置其它参数。
(4)设置输入数据。
(5)执行算子操作。
spark应用程序开发流程样例
@Test public static JavaRDD<String> testMapRdd(JavaRDD<String> sourceRdd) throws Exception { // 1、创建执行环境 SparkSession spark = SparkSession.builder().appName("SparkApiTest") .config("spark.sql.warehouse.dir", "spark-warehouse") .config("spark.master", "local[2]") .config("spark.driver.host", "localhost").getOrCreate(); // 2、把上下文参数传入执行环境 spark.conf().set("0", "v0"); // 3、为执行环境设置其它参数 spark.sparkContext().conf().set("spark.defalut.parallelism", 4); // 4、设置输入数据 JavaRDD<String> srcRdd = spark.read().textFile("data/rddtest/source_data.txt").javaRDD(); // 5、执行算子操作 JavaRDD<String> mapRdd = sourceRdd.map(new Function<String, String>() { private static final long serialVersionUID = 1L; @Override public String call(String s) throws Exception { spark.conf().get("0"); return s; } }); printRdd(mapRdd); return mapRdd; } |
flink应用程序开发流程样例
@Test public void windowFunctionTest2() throws Exception { // 1、创建执行环境 env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2、把上下文参数传入执行环境 ParameterTool params = ParameterTool.fromPropertiesFile(FlinkStreamingTest.class.getResourceAsStream("/application.properties")); env.getConfig().setGlobalJobParameters(params); // 3、为执行环境设置其它参数 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setAutoWatermarkInterval(10000); env.setParallelism(2); // 4、设置输入数据 text = env.addSource(new RichSourceFunctionTest()) .setParallelism(1) .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTest()); // 5、执行算子操作 text.map(new RichMapFunction<String, String> { private static long serialVersionUID = 1L; @Override public String map(String value) throws Exception { ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); parameterTool.get("hostName"); return value; } } ).print(); env.execute("flink streaming"); } |
并行运行参数的共性与类比
并行运行参数
参数定义 |
spark参数设置样例 |
flink参数设置样例 |
运行模式 |
--master yarn --deploy-mode client |
-m yarn-cluster |
任务管理容器的核数 |
--driver-cores 3 |
|
任务管理容器的内存 |
--driver-memory 1g |
-yjm 1024 |
任务执行容器的数量 |
--num-executors 2 |
-yn 2 |
任务执行容器的core数量 |
--executor-cores 2 |
-ys 2 |
任务执行容器的内存大小 |
--executor-memory 1g |
-ytm 1024 |
任务并发数 |
--conf spark.default.parallelism=100 |
-p 100 |
spark yarn模式运行命令样例
bin/spark-submit \ --master yarn \ --deploy-mode client \ --num-executors 50 \ --executor-cores 1 \ --executor-memory 2g \ --driver-cores 1 \ --driver-memory 4g \ xxx ./data/batch/WordCount.jar |
flink yarn模式运行命令样例
bin/flink run \ -m yarn-cluster \ -yjm 1024 \ -yn 2 \ -ys 2 \ -ytm 1024 \ -p 10 \ -c xxx ./data/batch/WordCount.jar |