searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

SpringBoot整合ElasticJob

2023-05-25 01:08:01
16
0
一、简介

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job整合了Quartz和Zookeeper,封装而成的一个分布式调度框架
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务调度
ElasticJob Cloud 采用自研 Mesos Framework 的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。

二、功能列表
1、任务分片
  • 将整体任务拆解为多个子任务
  • 可通过服务器的增减弹性伸缩任务处理能力
  • 分布式协调,任务服务器上下线的全自动发现与处理

Elastic Job中任务分片项的概念,使得任务可以在分布式的环境下运行,每条任务服务器只运行分配该服务器的分片。随着服务器增加或宕机,Elastic Job近乎实时的感知服务器数量的变更,从而重新分配分片。
例如、新增服务器时,通过注册中心临时节点的变化感知新服务器,并在下次任务调度的时候重新分片,新服务将承载一部分作业。服务器宕机时,注册中心感知,并将在下次运行将分片转移到存活的服务器,达到高可用。本次宕机未执行的作业,可以通过失效转移的方式继续执行。

2、多任务类型
  • 基于时间驱动的任务
  • 基于数据驱动的任务
  • 同事支持常驻任务和瞬时任务
  • 多语言任务支持
3、云原生
  • 完美结合Mesos或Kubernetes等调度平台
  • 任务不依赖IP、磁盘、数据等有状态组件
  • 合理的资源调度、基于Netflix的Fenzo进行资源的分配
4、容错性
  • 支持定时自我故障检测与自动修复
  • 分布式任务分片唯一保证
  • 支持失效转移和错过任务重触发

弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤立作业分片执行。同样失效转移功能也会牺牲部分性能。

5、任务聚合
  • 相同任务聚合至相同的执行器统一处理
  • 节省系统资源和初始化开销
  • 动态调配追加资源至新分配的任务
易用性
  • 完善的运维平台
  • 提供任务执行历史数据追踪能力
  • 注册中心数据一键dump用于备份与调试问题
三、快速集成
1、引入maven依赖

版本使用2.1.5,官方有一段时间没更新elasticJob了,近期重新启动,即将发布3.0.0版本

 <properties>
            <lasted.release.version>2.1.5</lasted.release.version>
        </properties>
        <!-- import elastic-job lite core -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${lasted.release.version}</version>
        </dependency>

        <!-- import other module if need -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${lasted.release.version}</version>
        </dependency>

2、配置ZooKeeper

@Configuration
public class JobRegistryCenterConfig {
    
    @Bean
    public ZookeeperRegistryCenter registryCenter(){
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("x.x.x.x:2181","elastic-job-ns");
        zookeeperConfiguration.setMaxRetries(5);
        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        zookeeperRegistryCenter.init();
        return zookeeperRegistryCenter;
    }

}

3、配置Job调度器

@Configuration
public class MyJobConfig {
    private final String cron = "1/5 * * * * ?";
    private final int shardingTotalCount = 5;
    private final String shardingItemParameters = "0=A,1=B,2=C";
    private final String jobParameters = "parameter";
    @Resource
    private ZookeeperRegistryCenter regCenter;
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final MySimpleJob simpleJob) {
        return new SpringJobScheduler(simpleJob, regCenter,
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
    }
    /**
     * 配置任务详细信息
     * @param jobClass
     * @param cron
     * @param shardingTotalCount
     * @param shardingItemParameters
     * @return
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }
}

4、编写任务逻辑

@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        String a = String.format("Thread ID: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        );
        System.out.println(a);
    }
}

5、运行效果

Thread ID: 75, 作业分片总数: 5, 当前分片项: 0.当前参数: A,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 76, 作业分片总数: 5, 当前分片项: 1.当前参数: B,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 2.当前参数: C,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 3.当前参数: D,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 4.当前参数: E,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 48, 作业分片总数: 5, 当前分片项: 0.当前参数: A,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 49, 作业分片总数: 5, 当前分片项: 1.当前参数: B,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 48, 作业分片总数: 5, 当前分片项: 2.当前参数: C,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 49, 作业分片总数: 5, 当前分片项: 3.当前参数: D,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 48, 作业分片总数: 5, 当前分片项: 4.当前参数: E,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 0.当前参数: A,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 1.当前参数: B,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
参考文档

Elastic-王者归来

分布式定时任务调度神器

SpringBoot整合Elastic-job

0条评论
0 / 1000
x****n
7文章数
0粉丝数
x****n
7 文章 | 0 粉丝
x****n
7文章数
0粉丝数
x****n
7 文章 | 0 粉丝
原创

SpringBoot整合ElasticJob

2023-05-25 01:08:01
16
0
一、简介

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job整合了Quartz和Zookeeper,封装而成的一个分布式调度框架
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务调度
ElasticJob Cloud 采用自研 Mesos Framework 的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。它通过弹性调度、资源管控、以及作业治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的作业生态。

二、功能列表
1、任务分片
  • 将整体任务拆解为多个子任务
  • 可通过服务器的增减弹性伸缩任务处理能力
  • 分布式协调,任务服务器上下线的全自动发现与处理

Elastic Job中任务分片项的概念,使得任务可以在分布式的环境下运行,每条任务服务器只运行分配该服务器的分片。随着服务器增加或宕机,Elastic Job近乎实时的感知服务器数量的变更,从而重新分配分片。
例如、新增服务器时,通过注册中心临时节点的变化感知新服务器,并在下次任务调度的时候重新分片,新服务将承载一部分作业。服务器宕机时,注册中心感知,并将在下次运行将分片转移到存活的服务器,达到高可用。本次宕机未执行的作业,可以通过失效转移的方式继续执行。

2、多任务类型
  • 基于时间驱动的任务
  • 基于数据驱动的任务
  • 同事支持常驻任务和瞬时任务
  • 多语言任务支持
3、云原生
  • 完美结合Mesos或Kubernetes等调度平台
  • 任务不依赖IP、磁盘、数据等有状态组件
  • 合理的资源调度、基于Netflix的Fenzo进行资源的分配
4、容错性
  • 支持定时自我故障检测与自动修复
  • 分布式任务分片唯一保证
  • 支持失效转移和错过任务重触发

弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤立作业分片执行。同样失效转移功能也会牺牲部分性能。

5、任务聚合
  • 相同任务聚合至相同的执行器统一处理
  • 节省系统资源和初始化开销
  • 动态调配追加资源至新分配的任务
易用性
  • 完善的运维平台
  • 提供任务执行历史数据追踪能力
  • 注册中心数据一键dump用于备份与调试问题
三、快速集成
1、引入maven依赖

版本使用2.1.5,官方有一段时间没更新elasticJob了,近期重新启动,即将发布3.0.0版本

 <properties>
            <lasted.release.version>2.1.5</lasted.release.version>
        </properties>
        <!-- import elastic-job lite core -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>${lasted.release.version}</version>
        </dependency>

        <!-- import other module if need -->
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>${lasted.release.version}</version>
        </dependency>

2、配置ZooKeeper

@Configuration
public class JobRegistryCenterConfig {
    
    @Bean
    public ZookeeperRegistryCenter registryCenter(){
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("x.x.x.x:2181","elastic-job-ns");
        zookeeperConfiguration.setMaxRetries(5);
        ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        zookeeperRegistryCenter.init();
        return zookeeperRegistryCenter;
    }

}

3、配置Job调度器

@Configuration
public class MyJobConfig {
    private final String cron = "1/5 * * * * ?";
    private final int shardingTotalCount = 5;
    private final String shardingItemParameters = "0=A,1=B,2=C";
    private final String jobParameters = "parameter";
    @Resource
    private ZookeeperRegistryCenter regCenter;
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(final MySimpleJob simpleJob) {
        return new SpringJobScheduler(simpleJob, regCenter,
                getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
    }
    /**
     * 配置任务详细信息
     * @param jobClass
     * @param cron
     * @param shardingTotalCount
     * @param shardingItemParameters
     * @return
     */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters) {
        return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
                JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
                        .shardingItemParameters(shardingItemParameters).build()
                , jobClass.getCanonicalName())
        ).overwrite(true).build();
    }
}

4、编写任务逻辑

@Component
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        String a = String.format("Thread ID: %s, 作业分片总数: %s, " +
                        "当前分片项: %s.当前参数: %s," +
                        "作业名称: %s.作业自定义参数: %s"
                ,
                Thread.currentThread().getId(),
                shardingContext.getShardingTotalCount(),
                shardingContext.getShardingItem(),
                shardingContext.getShardingParameter(),
                shardingContext.getJobName(),
                shardingContext.getJobParameter()
        );
        System.out.println(a);
    }
}

5、运行效果

Thread ID: 75, 作业分片总数: 5, 当前分片项: 0.当前参数: A,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 76, 作业分片总数: 5, 当前分片项: 1.当前参数: B,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 2.当前参数: C,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 3.当前参数: D,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 4.当前参数: E,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 48, 作业分片总数: 5, 当前分片项: 0.当前参数: A,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 49, 作业分片总数: 5, 当前分片项: 1.当前参数: B,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 48, 作业分片总数: 5, 当前分片项: 2.当前参数: C,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 49, 作业分片总数: 5, 当前分片项: 3.当前参数: D,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 48, 作业分片总数: 5, 当前分片项: 4.当前参数: E,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 0.当前参数: A,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
Thread ID: 77, 作业分片总数: 5, 当前分片项: 1.当前参数: B,作业名称: cn.draven.elastic.job.job.MySimpleJob.作业自定义参数: 
参考文档

Elastic-王者归来

分布式定时任务调度神器

SpringBoot整合Elastic-job

文章来自个人专栏
Spring
5 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0