searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

spark与flink应用程序开发的一些共性与类比

2023-08-17 04:22:46
15
0

spark和flink是当今最主流的两款计算引擎,探究在数据集、算子、应用程序开发流程、并行运行参数等使用层面的一些共性,能引发我们对计算引擎一些通用机制的思考,从而加深对计算引擎一些设计共性的抽象理解。

数据集的共性与类比

类比维度

spark

flink

实现类

Rdd、Dataset

DataStream、DataSet

是否支持分区分片

支持

支持

是否支持副本复制

支持

支持

   数据集方面,spark有数据集Rdd,flink有数据集DataStream,他们的共性都为分布式数据集,都有分片和副本的概念,分片是为了能并行处理,副本是为了高可用。

算子的共性与类比

算子分类

spark算子

flink算子

一对一算子

Rdd.map(Function)、Rdd.mapToPair(PairFunction)、Rdd.filter(Function)、

DataStream.map(RichMapFunction)、DataStream.filter(RichFilterFunction)、DataStream.process(ProcessFunction)、KeyedStream.process(KeyedProcessFunction)

一对多算子

Rdd.flatMap(FlatMapFunction)

DataStream.flatMap(RichFlatMapFunction)

多对一算子

Rdd.reduce(Function)

DataStream.reduce(RichReduceFunction)

多对多算子

Rdd.mapPartitions(FlatMapFunction)

WindowedStream.apply(WindowFunction)、WindowedStream.process(ProcessWindowFunction)、AllWindowedStream.apply(AllWindowFunction)、AllWindowedStream.process(ProcessAllWindowFunction

洗牌算子

Rdd.groupBy(Function)

DataStream.KeyBy(KeySelector)、DataStream.partitionCustom(Partitioner)

集合类算子

PairRdd.join(PairRdd)、PairRdd.leftOuterJoin(PairRdd)、PairRdd.rightOuterJoin(PairRdd)、PairRdd.intersection(PairRdd)

、PairRdd.union(PairRdd)、PairRdd.subtract(PairRdd)

DataStream.union(DataStream)、DataStream.join(DataStream)、DataStream.coGroup(DataStream)

如上表总结所示,spark和flink都存在以上类别的算子:

  • 一对一算子:算子把输入的一个元素,经过计算,输出一个元素。
  • 一对多算子:算子把输入的一个元素,经过计算,输出多个元素。
  • 多对一算子:算子把输入的多个元素,经过计算,输出一个元素。
  • 多对多算子:算子把输入的多个元素,经过计算,也输出多个元素。
  • 洗牌算子:把一个有分区属性的数据集,经过洗牌运算,依据设定的规则,把数据集进行重新分区。
  • 集合类算子:并集运算、交集运算、关联运算、笛卡尔积运算等集合相关的运算。

应用程序开发流程的共性与类比

spark和flink应用程序开发,都存在以下基本步骤:

(1)创建执行环境。

(2)把上下文参数传入执行环境。

(3)为执行环境设置其它参数。

(4)设置输入数据。

(5)执行算子操作。

spark应用程序开发流程样例

    @Test

    public static JavaRDD<String> testMapRdd(JavaRDD<String> sourceRdd) throws Exception {

        // 1、创建执行环境

        SparkSession spark = SparkSession.builder().appName("SparkApiTest")

                .config("spark.sql.warehouse.dir", "spark-warehouse")

                .config("spark.master", "local[2]")

                .config("spark.driver.host", "localhost").getOrCreate();

        // 2、把上下文参数传入执行环境

        spark.conf().set("0", "v0");

        // 3、为执行环境设置其它参数

        spark.sparkContext().conf().set("spark.defalut.parallelism", 4);

        // 4、设置输入数据

        JavaRDD<String> srcRdd = spark.read().textFile("data/rddtest/source_data.txt").javaRDD();

        // 5、执行算子操作

        JavaRDD<String> mapRdd = sourceRdd.map(new Function<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override

            public String call(String s) throws Exception {

                spark.conf().get("0");

                return s;

            }

        });

        printRdd(mapRdd);

        return mapRdd;

    }

flink应用程序开发流程样例

    @Test

    public void windowFunctionTest2() throws Exception {

        // 1、创建执行环境

        env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、把上下文参数传入执行环境

        ParameterTool params = ParameterTool.fromPropertiesFile(FlinkStreamingTest.class.getResourceAsStream("/application.properties"));

        env.getConfig().setGlobalJobParameters(params);

        // 3、为执行环境设置其它参数

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.getConfig().setAutoWatermarkInterval(10000);

        env.setParallelism(2);

        // 4、设置输入数据

        text = env.addSource(new RichSourceFunctionTest())

                .setParallelism(1)

                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTest());

        // 5、执行算子操作

        text.map(new RichMapFunction<String, String> {

        private static long serialVersionUID = 1L;

        @Override

        public String map(String value) throws Exception {

            ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

            parameterTool.get("hostName");

            return value;

        }

    }

).print();

        env.execute("flink streaming");

    }

并行运行参数的共性与类比

并行运行参数

参数定义

spark参数设置样例

flink参数设置样例

运行模式

--master yarn

--deploy-mode client

-m yarn-cluster

任务管理容器的核数

--driver-cores 3

 

任务管理容器的内存

--driver-memory 1g

-yjm 1024

任务执行容器的数量

--num-executors 2

-yn 2

任务执行容器的core数量

--executor-cores 2

-ys 2

任务执行容器的内存大小

--executor-memory 1g

-ytm 1024

任务并发数

--conf spark.default.parallelism=100

-p 100

spark yarn模式运行命令样例

bin/spark-submit \

  --master yarn \

  --deploy-mode client \

  --num-executors 50 \

  --executor-cores 1 \

  --executor-memory 2g \

  --driver-cores 1 \

  --driver-memory 4g \

  xxx ./data/batch/WordCount.jar

flink yarn模式运行命令样例

bin/flink run \

  -m yarn-cluster \

  -yjm 1024 \

  -yn 2 \

  -ys 2 \

  -ytm 1024 \

  -p 10 \

  -c xxx ./data/batch/WordCount.jar

0条评论
0 / 1000
谢****瑜
2文章数
0粉丝数
谢****瑜
2 文章 | 0 粉丝
谢****瑜
2文章数
0粉丝数
谢****瑜
2 文章 | 0 粉丝
原创

spark与flink应用程序开发的一些共性与类比

2023-08-17 04:22:46
15
0

spark和flink是当今最主流的两款计算引擎,探究在数据集、算子、应用程序开发流程、并行运行参数等使用层面的一些共性,能引发我们对计算引擎一些通用机制的思考,从而加深对计算引擎一些设计共性的抽象理解。

数据集的共性与类比

类比维度

spark

flink

实现类

Rdd、Dataset

DataStream、DataSet

是否支持分区分片

支持

支持

是否支持副本复制

支持

支持

   数据集方面,spark有数据集Rdd,flink有数据集DataStream,他们的共性都为分布式数据集,都有分片和副本的概念,分片是为了能并行处理,副本是为了高可用。

算子的共性与类比

算子分类

spark算子

flink算子

一对一算子

Rdd.map(Function)、Rdd.mapToPair(PairFunction)、Rdd.filter(Function)、

DataStream.map(RichMapFunction)、DataStream.filter(RichFilterFunction)、DataStream.process(ProcessFunction)、KeyedStream.process(KeyedProcessFunction)

一对多算子

Rdd.flatMap(FlatMapFunction)

DataStream.flatMap(RichFlatMapFunction)

多对一算子

Rdd.reduce(Function)

DataStream.reduce(RichReduceFunction)

多对多算子

Rdd.mapPartitions(FlatMapFunction)

WindowedStream.apply(WindowFunction)、WindowedStream.process(ProcessWindowFunction)、AllWindowedStream.apply(AllWindowFunction)、AllWindowedStream.process(ProcessAllWindowFunction

洗牌算子

Rdd.groupBy(Function)

DataStream.KeyBy(KeySelector)、DataStream.partitionCustom(Partitioner)

集合类算子

PairRdd.join(PairRdd)、PairRdd.leftOuterJoin(PairRdd)、PairRdd.rightOuterJoin(PairRdd)、PairRdd.intersection(PairRdd)

、PairRdd.union(PairRdd)、PairRdd.subtract(PairRdd)

DataStream.union(DataStream)、DataStream.join(DataStream)、DataStream.coGroup(DataStream)

如上表总结所示,spark和flink都存在以上类别的算子:

  • 一对一算子:算子把输入的一个元素,经过计算,输出一个元素。
  • 一对多算子:算子把输入的一个元素,经过计算,输出多个元素。
  • 多对一算子:算子把输入的多个元素,经过计算,输出一个元素。
  • 多对多算子:算子把输入的多个元素,经过计算,也输出多个元素。
  • 洗牌算子:把一个有分区属性的数据集,经过洗牌运算,依据设定的规则,把数据集进行重新分区。
  • 集合类算子:并集运算、交集运算、关联运算、笛卡尔积运算等集合相关的运算。

应用程序开发流程的共性与类比

spark和flink应用程序开发,都存在以下基本步骤:

(1)创建执行环境。

(2)把上下文参数传入执行环境。

(3)为执行环境设置其它参数。

(4)设置输入数据。

(5)执行算子操作。

spark应用程序开发流程样例

    @Test

    public static JavaRDD<String> testMapRdd(JavaRDD<String> sourceRdd) throws Exception {

        // 1、创建执行环境

        SparkSession spark = SparkSession.builder().appName("SparkApiTest")

                .config("spark.sql.warehouse.dir", "spark-warehouse")

                .config("spark.master", "local[2]")

                .config("spark.driver.host", "localhost").getOrCreate();

        // 2、把上下文参数传入执行环境

        spark.conf().set("0", "v0");

        // 3、为执行环境设置其它参数

        spark.sparkContext().conf().set("spark.defalut.parallelism", 4);

        // 4、设置输入数据

        JavaRDD<String> srcRdd = spark.read().textFile("data/rddtest/source_data.txt").javaRDD();

        // 5、执行算子操作

        JavaRDD<String> mapRdd = sourceRdd.map(new Function<String, String>() {

            private static final long serialVersionUID = 1L;

            @Override

            public String call(String s) throws Exception {

                spark.conf().get("0");

                return s;

            }

        });

        printRdd(mapRdd);

        return mapRdd;

    }

flink应用程序开发流程样例

    @Test

    public void windowFunctionTest2() throws Exception {

        // 1、创建执行环境

        env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、把上下文参数传入执行环境

        ParameterTool params = ParameterTool.fromPropertiesFile(FlinkStreamingTest.class.getResourceAsStream("/application.properties"));

        env.getConfig().setGlobalJobParameters(params);

        // 3、为执行环境设置其它参数

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        env.getConfig().setAutoWatermarkInterval(10000);

        env.setParallelism(2);

        // 4、设置输入数据

        text = env.addSource(new RichSourceFunctionTest())

                .setParallelism(1)

                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTest());

        // 5、执行算子操作

        text.map(new RichMapFunction<String, String> {

        private static long serialVersionUID = 1L;

        @Override

        public String map(String value) throws Exception {

            ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

            parameterTool.get("hostName");

            return value;

        }

    }

).print();

        env.execute("flink streaming");

    }

并行运行参数的共性与类比

并行运行参数

参数定义

spark参数设置样例

flink参数设置样例

运行模式

--master yarn

--deploy-mode client

-m yarn-cluster

任务管理容器的核数

--driver-cores 3

 

任务管理容器的内存

--driver-memory 1g

-yjm 1024

任务执行容器的数量

--num-executors 2

-yn 2

任务执行容器的core数量

--executor-cores 2

-ys 2

任务执行容器的内存大小

--executor-memory 1g

-ytm 1024

任务并发数

--conf spark.default.parallelism=100

-p 100

spark yarn模式运行命令样例

bin/spark-submit \

  --master yarn \

  --deploy-mode client \

  --num-executors 50 \

  --executor-cores 1 \

  --executor-memory 2g \

  --driver-cores 1 \

  --driver-memory 4g \

  xxx ./data/batch/WordCount.jar

flink yarn模式运行命令样例

bin/flink run \

  -m yarn-cluster \

  -yjm 1024 \

  -yn 2 \

  -ys 2 \

  -ytm 1024 \

  -p 10 \

  -c xxx ./data/batch/WordCount.jar

文章来自个人专栏
Happy to share
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0