AMS服务启动
Amoro是一个后台服务,通过调度的方式生成和启动优化任务。
后台服务的启动入口在AmoroServiceContainer类中,启动过程如下:
org.apache.amoro.server.AmoroServiceContainer#main
// 初始化AmoroServiceContainer实例
service = new AmoroServiceContainer();
//添加shutdown hook
addShutdownHook(new Thread(() -> service.dispose())
// 阻塞等待进入Leader状态
service.waitLeaderShip();
// 启动AMS
service.startService();
tableService = new DefaultTableService
// 初始化DefaultOptimizingService实例
optimizingService = new DefaultOptimizingService
tableHandlerChain = new TableRuntimeHandlerImpl();
// optimizingService.getTableRuntimeHandler()返回optimizingService实例中的tableHandlerChain属性
tableService.addHandlerChain(optimizingService.getTableRuntimeHandler())
tableService.initialize()
// 从表table_runtime查询表优化列表
tableRuntimeMetaList = TableMetaMapper.selectTableRuntimeMetas
// 将tableRuntimeMetaList转化为List<TableRuntime> tableRuntimes
// headHandler为optimizingService实例中的tableHandlerChain属性
headHandler.initialize(tableRuntimes)
RuntimeHandlerChain#initialize(tableRuntimes)
TableRuntimeHandlerImpl#initHandler(tableRuntimes)
// 初始化定时器
optimizerMaintainTimer = new Timer("OptimizerMaintainer", true);
// 启动定时任务,延迟时间和实践间隔由optimizer.maintainer-check-interval配置决定,默认是30分钟
optimizerMaintainTimer.schedule(new ResourceMaintainer(), optimizerMaintainInterval, optimizerMaintainInterval);
// 阻塞等待进入Follower状态
service.waitFollowerShip();
这里启动了定时器,定时执行ResourceMaintainer中的任务逻辑
ResourceMaintainer定时任务
org.apache.amoro.server.DefaultOptimizingService.ResourceMaintainer定义定时任务的逻辑
private class ResourceMaintainer extends TimerTask {
@Override
public void run() {
try {
// 从resource_group表中查询资源组信息
List<ResourceGroup> optimizerGroups =
getAs(ResourceMapper.class, ResourceMapper::selectResourceGroups);
for (ResourceGroup group : optimizerGroups) {
// 将资源组转换为优化队列
if (!optimizingQueueByGroup.containsKey(group.getName())) {
contructResourceGroup(group);
}
}
// 针对每个优化队列,自动启动优化器
optimizingQueueByGroup.forEach(
(optimizerGroup, optimizingQueue) -> {
// 从表table_runtime中查询对应的优化器组是否在运行(optimizing_status_code == 600)
if (isGroupRunning(optimizerGroup)) {
// 优化器组在运行
int instances = groupConf.getIntValue("instances");
// 判断是否有这个资源组关联的optimizer
List<OptimizerInstance> optimizers = getOptimizersByGroupName(optimizerGroup);
if (optimizers.isEmpty() || optimizers.size() < instances) {
// 此资源组没有对应的优化器
int num = instances - optimizers.size();
// 启动对应数量的优化器
for (int i = 0; i < num; i++) {
scaleOutOptimizer(optimizerGroup, groupConf);
}
}
}
});
// audo close optimizer
checkAndCloseOrphanedOptimizers();
} catch (RuntimeException e) {
LOG.error("Maintainer optimizer instance abnormal failed. try next round.", e);
}
}
}
最后根据并发度设置需要启动优化器的数量,由scaleOutOptimizer方法启动对应的优化器
scaleOutOptimizer启动优化器
scaleOutOptimizer是org.apache.amoro.server.DefaultOptimizingService中的方法。
org.apache.amoro.server.DefaultOptimizingService#scaleOutOptimizer启动优化器
String optimizerGroup // 资源组名称
JSONObject optimizerConf // 优化器配置
// 根据资源组名称查询resource_group表,获取资源组记录
resourceGroup = ResourceMapper.selectResourceGroup(optimizerGroup)
resourceProfile = clusterService.getOptimizeResource(resourceGroup)
// 从optimizerConf获取并发度
parallelism = optimizerConf.getIntValue("parallelism");
// 生成资源抽象
resource = new Resource.Builder().build()
// 获取ResourceContainer,发送请求
ResourceContainers.get(resource.getContainerName()).requestResource(resource);
AbstractResourceContainer#requestResource(resource)
// 具体的优化器启动
FlinkOptimizerContainer#doScaleOut(resource)
ResourceContainers.get(resource.getContainerName())方法根据容器名称返回对应的容器实例,容器定义在config.yaml配置文件中。
容器对象初始化
ResourceContainers中保有一个globalContainers对象,存放配置的容器。
优化器对象初始化过程如下:
org.apache.amoro.server.AmoroServiceContainer#main
// 初始化AmoroServiceContainer实例
service = new AmoroServiceContainer();
// 初始化配置
AmoroServiceContainer#initConfig
AmoroServiceContainer.ConfigurationHelper#init
AmoroServiceContainer.ConfigurationHelper#initContainerConfig
// 从config.yaml中获取container配置
List<ContainerMetadata> containerList
// 初始化container
ResourceContainers#init(containerList)
// 将containerList每个元素封装为ContainerWrapper实例,并存入ResourceContainers.globalContainers静态变量中,key值为name
Map<String, ContainerWrapper> globalContainers
容器类型有以下四种