Amoro优化任务启动过程


发布于 2025-06-10 / 19 阅读 / 0 评论 /
amoro优化任务的提交和执行过程

AMS服务启动

Amoro是一个后台服务,通过调度的方式生成和启动优化任务。

后台服务的启动入口在AmoroServiceContainer类中,启动过程如下:

org.apache.amoro.server.AmoroServiceContainer#main
    // 初始化AmoroServiceContainer实例
    service = new AmoroServiceContainer();
    //添加shutdown hook
    addShutdownHook(new Thread(() -> service.dispose())
    // 阻塞等待进入Leader状态
    service.waitLeaderShip();
    // 启动AMS 
    service.startService();
        tableService = new DefaultTableService
        // 初始化DefaultOptimizingService实例
        optimizingService = new DefaultOptimizingService
            tableHandlerChain = new TableRuntimeHandlerImpl();
        // optimizingService.getTableRuntimeHandler()返回optimizingService实例中的tableHandlerChain属性
        tableService.addHandlerChain(optimizingService.getTableRuntimeHandler())
        tableService.initialize()
            // 从表table_runtime查询表优化列表
            tableRuntimeMetaList = TableMetaMapper.selectTableRuntimeMetas
            // 将tableRuntimeMetaList转化为List<TableRuntime> tableRuntimes
            // headHandler为optimizingService实例中的tableHandlerChain属性
            headHandler.initialize(tableRuntimes)
                RuntimeHandlerChain#initialize(tableRuntimes)
                    TableRuntimeHandlerImpl#initHandler(tableRuntimes)
                        // 初始化定时器
                        optimizerMaintainTimer = new Timer("OptimizerMaintainer", true);
                        // 启动定时任务,延迟时间和实践间隔由optimizer.maintainer-check-interval配置决定,默认是30分钟
                        optimizerMaintainTimer.schedule(new ResourceMaintainer(), optimizerMaintainInterval, optimizerMaintainInterval);
    // 阻塞等待进入Follower状态
    service.waitFollowerShip();

这里启动了定时器,定时执行ResourceMaintainer中的任务逻辑

ResourceMaintainer定时任务

org.apache.amoro.server.DefaultOptimizingService.ResourceMaintainer定义定时任务的逻辑

private class ResourceMaintainer extends TimerTask {

  @Override
  public void run() {
    try {
      // 从resource_group表中查询资源组信息
      List<ResourceGroup> optimizerGroups =
          getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
      for (ResourceGroup group : optimizerGroups) {
        // 将资源组转换为优化队列
        if (!optimizingQueueByGroup.containsKey(group.getName())) {
          contructResourceGroup(group);
        }
      }
      
      // 针对每个优化队列,自动启动优化器
      optimizingQueueByGroup.forEach(
          (optimizerGroup, optimizingQueue) -> {
            // 从表table_runtime中查询对应的优化器组是否在运行(optimizing_status_code == 600)
            if (isGroupRunning(optimizerGroup)) {
              // 优化器组在运行
              int instances = groupConf.getIntValue("instances");
              // 判断是否有这个资源组关联的optimizer
              List<OptimizerInstance> optimizers = getOptimizersByGroupName(optimizerGroup);
              if (optimizers.isEmpty() || optimizers.size() < instances) {
                // 此资源组没有对应的优化器
                int num = instances - optimizers.size();
                // 启动对应数量的优化器
                for (int i = 0; i < num; i++) {
                  scaleOutOptimizer(optimizerGroup, groupConf);
                }
              }
            }
          });
      // audo close optimizer
      checkAndCloseOrphanedOptimizers();
    } catch (RuntimeException e) {
      LOG.error("Maintainer optimizer instance abnormal failed. try next round.", e);
    }
  }
}  

最后根据并发度设置需要启动优化器的数量,由scaleOutOptimizer方法启动对应的优化器

scaleOutOptimizer启动优化器

scaleOutOptimizer是org.apache.amoro.server.DefaultOptimizingService中的方法。

org.apache.amoro.server.DefaultOptimizingService#scaleOutOptimizer启动优化器
    String optimizerGroup // 资源组名称
    JSONObject optimizerConf // 优化器配置
    // 根据资源组名称查询resource_group表,获取资源组记录
    resourceGroup = ResourceMapper.selectResourceGroup(optimizerGroup)
    resourceProfile = clusterService.getOptimizeResource(resourceGroup)
    // 从optimizerConf获取并发度
    parallelism = optimizerConf.getIntValue("parallelism");
    // 生成资源抽象
    resource = new Resource.Builder().build()
    // 获取ResourceContainer,发送请求
    ResourceContainers.get(resource.getContainerName()).requestResource(resource);
        AbstractResourceContainer#requestResource(resource)
            // 具体的优化器启动
            FlinkOptimizerContainer#doScaleOut(resource)

ResourceContainers.get(resource.getContainerName())方法根据容器名称返回对应的容器实例,容器定义在config.yaml配置文件中。

容器对象初始化

ResourceContainers中保有一个globalContainers对象,存放配置的容器。

优化器对象初始化过程如下:

org.apache.amoro.server.AmoroServiceContainer#main
    // 初始化AmoroServiceContainer实例
    service = new AmoroServiceContainer();
        // 初始化配置
        AmoroServiceContainer#initConfig
            AmoroServiceContainer.ConfigurationHelper#init
                AmoroServiceContainer.ConfigurationHelper#initContainerConfig
                    // 从config.yaml中获取container配置
                    List<ContainerMetadata> containerList
                    // 初始化container
                    ResourceContainers#init(containerList)
                        // 将containerList每个元素封装为ContainerWrapper实例,并存入ResourceContainers.globalContainers静态变量中,key值为name
                        Map<String, ContainerWrapper> globalContainers

容器类型有以下四种