searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享

Flink的classLoader问题

2024-05-28 02:19:44
20
0
类的加载,就是指利用类加载器(ClassLoader)通过类的全限定名来获取定义此类的二进制字节码流,进而构造出类的定义。
Flink作为基于JVM的框架,在flink-conf.yaml中提供了控制类加载策略的参数classloader.resolve-order,可选项有child-first(默认)和parent-first
Parent-first:优先从Flink集群的lib中的jar包加载类如果没有该类就从用户的jar包中加载类
child-first:优先从jar包中包含的类加载类,如果没有从Flink集群的lib中的jar包加载类
其中 ParentFirstClassLoader直接调用了父加载器的loadClass()方法
  1. parent-first

 
JVM中类加载器的层次关系和默认loadClass()方法的逻辑由双亲委派模型(parents delegation model)来体现
如果一个类加载器要加载一个类,它首先不会自己尝试加载这个类,而是把加载的请求委托给父加载器完成,所有的类加载请求最终都应该传递给最顶层的启动类加载器。只有当父加载器无法加载到这个类时,子加载器才会尝试自己加载
用户代码的类加载器是Custom ClassLoader,Flink框架本身的类加载器是Application ClassLoader。用户代码中的类先由Flink框架的类加载器加载,再由用户代码的类加载器加载。但是,Flink默认并不采用parent-first策略
 

ParentFirstClassLoader是如何保证同一个Flink的类,优先加载集群的而非用户jar包中的?

用户提交的Flink任务运行于TaskManager进程内,TaskManager运行时的类加载器是Application ClassLoader(用户程序运行时的默认类加载器),当我们使用自定义类加载器加载需要的类时,会自底向上逐一查找,如果ParentFirstClassLoader中有这个类就再次从Application ClassLoader中查找这个类,如果Application ClassLoader中也有这个类,就会继续从Extension ClassLoader中查找,直到父加载器加载不到这个类时,才会使用当前类加载器加载这个类。这样就巧妙的利用双亲委派模型实现了parent-first的类加载机制。
  1. child-first

 
程序中引入的Flink Connector总是依赖于固定的Cassandra版本,用户代码中为了兼容实际使用的Cassandra版本,会引入一个更低或更高的依赖。而同一个组件不同版本的类定义有可能会不同(即使类的全限定名是相同的),如果仍然用双亲委派模型,就会因为Flink框架指定版本的类先加载,而出现莫名其妙的兼容性问题,如NoSuchMethodError、IllegalAccessError等。
Flink实现了ChildFirstClassLoader类加载器并作为默认策略。它打破了双亲委派模型,使得用户代码的类先加载
public final class ChildFirstClassLoader extends FlinkUserCodeClassLoader {
    private final String[] alwaysParentFirstPatterns;
 
    public ChildFirstClassLoader(
            URL[] urls,
            ClassLoader parent,
            String[] alwaysParentFirstPatterns,
            Consumer<Throwable> classLoadingExceptionHandler) {
        super(urls, parent, classLoadingExceptionHandler);
        this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
    }
 
    @Override
    protected synchronized Class<?> loadClassWithoutExceptionHandling(
            String name,
            boolean resolve) throws ClassNotFoundException {
        // First, check if the class has already been loaded
        Class<?> c = findLoadedClass(name);
 
        if (c == null) {
            // check whether the class should go parent-first
            for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
                if (name.startsWith(alwaysParentFirstPattern)) {
                    return super.loadClassWithoutExceptionHandling(name, resolve);
                }
            }
 
            try {
                // check the URLs
                c = findClass(name);
            } catch (ClassNotFoundException e) {
                // let URLClassLoader do it, which will eventually call the parent
                c = super.loadClassWithoutExceptionHandling(name, resolve);
            }
        }
 
        if (resolve) {
            resolveClass(c);
        }
        return c;
    }
 
    @Override
    public URL getResource(String name) {
        // first, try and find it via the URLClassloader
        URL urlClassLoaderResource = findResource(name);
        if (urlClassLoaderResource != null) {
            return urlClassLoaderResource;
        }
        // delegate to super
        return super.getResource(name);
    }
 
    @Override
    public Enumeration<URL> getResources(String name) throws IOException {
        // first get resources from URLClassloader
        Enumeration<URL> urlClassLoaderResources = findResources(name);
        final List<URL> result = new ArrayList<>();
 
        while (urlClassLoaderResources.hasMoreElements()) {
            result.add(urlClassLoaderResources.nextElement());
        }
 
        // get parent urls
        Enumeration<URL> parentResources = getParent().getResources(name);
        while (parentResources.hasMoreElements()) {
            result.add(parentResources.nextElement());
        }
 
        return new Enumeration<URL>() {
            Iterator<URL> iter = result.iterator();
 
            public boolean hasMoreElements() {
                return iter.hasNext();
            }
 
            public URL nextElement() {
                return iter.next();
            }
        };
    }
 
核心逻辑位于loadClassWithoutExceptionHandling()方法中,简述如下:
  • 调用findLoadedClass()方法检查全限定名name对应的类是否已经加载过,若没有加载过,再继续往下执行。
  • 检查要加载的类是否以alwaysParentFirstPatterns集合中的前缀开头。如果是,则调用父类的对应方法,以parent-first的方式来加载它。
  • 如果类不符合alwaysParentFirstPatterns集合的条件,就调用findClass()方法在用户代码中查找并获取该类的定义(该方法在URLClassLoader中有默认实现)。如果找不到,再fallback到父加载器来加载。 最后,若resolve参数为true,就调用resolveClass()方法链接该类,最后返回对应的Class对象。
  • child-first策略避开了“先把加载的请求委托给父加载器完成”这一步骤,只有特定的某些类一定要“遵循旧制”。alwaysParentFirstPatterns集合中的这些类都是Java、Flink等组件的基础,不能被用户代码冲掉。它由以下两个参数来指定:classloader.parent-first-patterns.default,不建议修改,固定为以下这些值:
 

child-first是如何打破双亲委派机制的

同一个Flink集群也需要运行各种各样的Flink任务,而且如果用户使用的某个jar和Flink集群的jar版本不一致,需要优先加载用户的jar怎么办?Flink的ChildFirstClassLoader类加载提供了优先从用户jar包加载类的机制
这里alwaysParentFirstPatterns就是classloader.parent-first-patterns.default和classloader.parent-first-patterns.additional配置的类前缀集合,即这两个配置项中的类就算使用child-first类加载方式,也会从父加载器中加载,而非用户jar包中加载。这也是为什么Flink官网的例子中,flink相关的jar在pom引入的时候scope都采用了provided,既减少了用户jar包的大小,又能在集群上正常运行,最主要的是如果集群更新了版本,程序可以直接享受更新后的功能,而不用重新打包。

3、类冲突处理

因为类加载出现ClassNotFound或者NoSuchMethodError和NoSuchFieldError等异常。这种情况可能是因为你的jar包中的版本和集群的版本不一致,优先加载了集群的class
考虑对项目中引入的冲突类作shade。如com.typesafe的类发生冲突,maven项目加入如下配置:
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                    <goal>shade</goal>
            </goals>
            <configuration>
                    
                    <relocations>
                            <relocation>
                                    <pattern>com.typesafe</pattern>
                                    <shadedPattern>com.mycompany.com.typesafe</shadedPattern>
                            </relocation>
                    </relocations>
                    
            </configuration>
        </execution>
    </executions>
</plugin>
 
项目打包后,就会自动把引入和源码中相关的com.typesafe开头的类修改为com.mycompany.com.typesafe,由于类的路径不同,类加载器在加载用户jar中的类时就不会与集群中的其他版本冲突。

4、类加载出现ClassCastException问题

现象:当移除用法jar包中的类依赖,只在Flink的lib中添加jar包,作业可以正常运行
表明:Flink作业的classloader 先加载custom 的类,然后加载Flink 集群中的类,出现冲突
更改参数为:classloader.resolve-order: parent-first
 
 
 
0条评论
0 / 1000
张****领
4文章数
0粉丝数
张****领
4 文章 | 0 粉丝
张****领
4文章数
0粉丝数
张****领
4 文章 | 0 粉丝

Flink的classLoader问题

2024-05-28 02:19:44
20
0
类的加载,就是指利用类加载器(ClassLoader)通过类的全限定名来获取定义此类的二进制字节码流,进而构造出类的定义。
Flink作为基于JVM的框架,在flink-conf.yaml中提供了控制类加载策略的参数classloader.resolve-order,可选项有child-first(默认)和parent-first
Parent-first:优先从Flink集群的lib中的jar包加载类如果没有该类就从用户的jar包中加载类
child-first:优先从jar包中包含的类加载类,如果没有从Flink集群的lib中的jar包加载类
其中 ParentFirstClassLoader直接调用了父加载器的loadClass()方法
  1. parent-first

 
JVM中类加载器的层次关系和默认loadClass()方法的逻辑由双亲委派模型(parents delegation model)来体现
如果一个类加载器要加载一个类,它首先不会自己尝试加载这个类,而是把加载的请求委托给父加载器完成,所有的类加载请求最终都应该传递给最顶层的启动类加载器。只有当父加载器无法加载到这个类时,子加载器才会尝试自己加载
用户代码的类加载器是Custom ClassLoader,Flink框架本身的类加载器是Application ClassLoader。用户代码中的类先由Flink框架的类加载器加载,再由用户代码的类加载器加载。但是,Flink默认并不采用parent-first策略
 

ParentFirstClassLoader是如何保证同一个Flink的类,优先加载集群的而非用户jar包中的?

用户提交的Flink任务运行于TaskManager进程内,TaskManager运行时的类加载器是Application ClassLoader(用户程序运行时的默认类加载器),当我们使用自定义类加载器加载需要的类时,会自底向上逐一查找,如果ParentFirstClassLoader中有这个类就再次从Application ClassLoader中查找这个类,如果Application ClassLoader中也有这个类,就会继续从Extension ClassLoader中查找,直到父加载器加载不到这个类时,才会使用当前类加载器加载这个类。这样就巧妙的利用双亲委派模型实现了parent-first的类加载机制。
  1. child-first

 
程序中引入的Flink Connector总是依赖于固定的Cassandra版本,用户代码中为了兼容实际使用的Cassandra版本,会引入一个更低或更高的依赖。而同一个组件不同版本的类定义有可能会不同(即使类的全限定名是相同的),如果仍然用双亲委派模型,就会因为Flink框架指定版本的类先加载,而出现莫名其妙的兼容性问题,如NoSuchMethodError、IllegalAccessError等。
Flink实现了ChildFirstClassLoader类加载器并作为默认策略。它打破了双亲委派模型,使得用户代码的类先加载
public final class ChildFirstClassLoader extends FlinkUserCodeClassLoader {
    private final String[] alwaysParentFirstPatterns;
 
    public ChildFirstClassLoader(
            URL[] urls,
            ClassLoader parent,
            String[] alwaysParentFirstPatterns,
            Consumer<Throwable> classLoadingExceptionHandler) {
        super(urls, parent, classLoadingExceptionHandler);
        this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
    }
 
    @Override
    protected synchronized Class<?> loadClassWithoutExceptionHandling(
            String name,
            boolean resolve) throws ClassNotFoundException {
        // First, check if the class has already been loaded
        Class<?> c = findLoadedClass(name);
 
        if (c == null) {
            // check whether the class should go parent-first
            for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {
                if (name.startsWith(alwaysParentFirstPattern)) {
                    return super.loadClassWithoutExceptionHandling(name, resolve);
                }
            }
 
            try {
                // check the URLs
                c = findClass(name);
            } catch (ClassNotFoundException e) {
                // let URLClassLoader do it, which will eventually call the parent
                c = super.loadClassWithoutExceptionHandling(name, resolve);
            }
        }
 
        if (resolve) {
            resolveClass(c);
        }
        return c;
    }
 
    @Override
    public URL getResource(String name) {
        // first, try and find it via the URLClassloader
        URL urlClassLoaderResource = findResource(name);
        if (urlClassLoaderResource != null) {
            return urlClassLoaderResource;
        }
        // delegate to super
        return super.getResource(name);
    }
 
    @Override
    public Enumeration<URL> getResources(String name) throws IOException {
        // first get resources from URLClassloader
        Enumeration<URL> urlClassLoaderResources = findResources(name);
        final List<URL> result = new ArrayList<>();
 
        while (urlClassLoaderResources.hasMoreElements()) {
            result.add(urlClassLoaderResources.nextElement());
        }
 
        // get parent urls
        Enumeration<URL> parentResources = getParent().getResources(name);
        while (parentResources.hasMoreElements()) {
            result.add(parentResources.nextElement());
        }
 
        return new Enumeration<URL>() {
            Iterator<URL> iter = result.iterator();
 
            public boolean hasMoreElements() {
                return iter.hasNext();
            }
 
            public URL nextElement() {
                return iter.next();
            }
        };
    }
 
核心逻辑位于loadClassWithoutExceptionHandling()方法中,简述如下:
  • 调用findLoadedClass()方法检查全限定名name对应的类是否已经加载过,若没有加载过,再继续往下执行。
  • 检查要加载的类是否以alwaysParentFirstPatterns集合中的前缀开头。如果是,则调用父类的对应方法,以parent-first的方式来加载它。
  • 如果类不符合alwaysParentFirstPatterns集合的条件,就调用findClass()方法在用户代码中查找并获取该类的定义(该方法在URLClassLoader中有默认实现)。如果找不到,再fallback到父加载器来加载。 最后,若resolve参数为true,就调用resolveClass()方法链接该类,最后返回对应的Class对象。
  • child-first策略避开了“先把加载的请求委托给父加载器完成”这一步骤,只有特定的某些类一定要“遵循旧制”。alwaysParentFirstPatterns集合中的这些类都是Java、Flink等组件的基础,不能被用户代码冲掉。它由以下两个参数来指定:classloader.parent-first-patterns.default,不建议修改,固定为以下这些值:
 

child-first是如何打破双亲委派机制的

同一个Flink集群也需要运行各种各样的Flink任务,而且如果用户使用的某个jar和Flink集群的jar版本不一致,需要优先加载用户的jar怎么办?Flink的ChildFirstClassLoader类加载提供了优先从用户jar包加载类的机制
这里alwaysParentFirstPatterns就是classloader.parent-first-patterns.default和classloader.parent-first-patterns.additional配置的类前缀集合,即这两个配置项中的类就算使用child-first类加载方式,也会从父加载器中加载,而非用户jar包中加载。这也是为什么Flink官网的例子中,flink相关的jar在pom引入的时候scope都采用了provided,既减少了用户jar包的大小,又能在集群上正常运行,最主要的是如果集群更新了版本,程序可以直接享受更新后的功能,而不用重新打包。

3、类冲突处理

因为类加载出现ClassNotFound或者NoSuchMethodError和NoSuchFieldError等异常。这种情况可能是因为你的jar包中的版本和集群的版本不一致,优先加载了集群的class
考虑对项目中引入的冲突类作shade。如com.typesafe的类发生冲突,maven项目加入如下配置:
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                    <goal>shade</goal>
            </goals>
            <configuration>
                    
                    <relocations>
                            <relocation>
                                    <pattern>com.typesafe</pattern>
                                    <shadedPattern>com.mycompany.com.typesafe</shadedPattern>
                            </relocation>
                    </relocations>
                    
            </configuration>
        </execution>
    </executions>
</plugin>
 
项目打包后,就会自动把引入和源码中相关的com.typesafe开头的类修改为com.mycompany.com.typesafe,由于类的路径不同,类加载器在加载用户jar中的类时就不会与集群中的其他版本冲突。

4、类加载出现ClassCastException问题

现象:当移除用法jar包中的类依赖,只在Flink的lib中添加jar包,作业可以正常运行
表明:Flink作业的classloader 先加载custom 的类,然后加载Flink 集群中的类,出现冲突
更改参数为:classloader.resolve-order: parent-first
 
 
 
文章来自个人专栏
Flink的学习
4 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0