Ballista 分布式查询引擎 - 集群共享状态
2024-04-24
集群共享状态会被存储在共享 kv 中,被所有 Scheduler 节点读写。
/// 集群节点状态
#[tonic::async_trait]
pub trait ClusterState: Send + Sync + 'static {
/// 启动时初始化
async fn init(&self) -> Result<()> {
Ok(())
}
/// 将可执行的 tasks 分配到对应的 executor 计算资源槽上
async fn bind_schedulable_tasks(
&self,
distribution: TaskDistributionPolicy,
active_jobs: Arc<HashMap<String, JobInfoCache>>,
executors: Option<HashSet<String>>,
) -> Result<Vec<BoundTask>>;
/// 解绑 tasks,释放 executor 计算资源
async fn unbind_tasks(&self, executor_slots: Vec<ExecutorSlot>) -> Result<()>;
/// 注册 executor
async fn register_executor(&self, metadata: ExecutorMetadata, spec: ExecutorData)
-> Result<()>;
/// 保存 executor 元信息
async fn save_executor_metadata(&self, metadata: ExecutorMetadata) -> Result<()>;
/// 读取 executor 元信息
async fn get_executor_metadata(&self, executor_id: &str) -> Result<ExecutorMetadata>;
/// 保存 executor 最新心跳信息
async fn save_executor_heartbeat(&self, heartbeat: ExecutorHeartbeat) -> Result<()>;
/// 从集群中移除 executor
async fn remove_executor(&self, executor_id: &str) -> Result<()>;
/// 返回所有活跃 executor 的最新心跳信息
fn executor_heartbeats(&self) -> HashMap<String, ExecutorHeartbeat>;
/// 读取 executor 最新心跳信息
fn get_executor_heartbeat(&self, executor_id: &str) -> Option<ExecutorHeartbeat>;
}
/// 集群作业状态
#[tonic::async_trait]
pub trait JobState: Send + Sync {
/// 接收作业进本地队列(并没有存储在共享 kv 中)
fn accept_job(&self, job_id: &str, queued_at: u64) -> Result<()>;
/// 提交作业
async fn submit_job(&self, job_id: String, graph: &ExecutionGraph) -> Result<()>;
/// 获取所有活跃 job id
async fn get_jobs(&self) -> Result<HashSet<String>>;
/// 获取作业状态
async fn get_job_status(&self, job_id: &str) -> Result<Option<JobStatus>>;
/// 获取作业的执行图(分布式执行计划)
async fn get_execution_graph(&self, job_id: &str) -> Result<Option<ExecutionGraph>>;
/// 保存作业执行图状态
async fn save_job(&self, job_id: &str, graph: &ExecutionGraph) -> Result<()>;
/// 标记作业失败(planning阶段失败)
async fn fail_unscheduled_job(&self, job_id: &str, reason: String) -> Result<()>;
/// 移除作业
async fn remove_job(&self, job_id: &str) -> Result<()>;
/// 读取会话
async fn get_session(&self, session_id: &str) -> Result<Arc<SessionContext>>;
/// 创建一个会话
async fn create_session(&self, config: &BallistaConfig) -> Result<Arc<SessionContext>>;
}
Executor 元信息
主要包括 Executor 的 id、ip 和 端口等信息。在 Executor 节点启动时,会往 Scheduler 注册,Scheduler 会将 Executor 元信息存储到共享 kv 中。
Executor 心跳信息
主要包括 Executor 的 id、此次心跳时间戳 和 executor 状态。
- 在 Executor 启动注册时,会往共享 kv 中存入心跳信息
- 在 Executor 定时发送心跳时,会往共享 kv 中覆盖心跳信息
- 在检查 Executor 心跳是否过期的定时任务中,会移除共享 kv 中过期的 Executor 的心跳信息
Executor 资源信息
主要包括 Executor 还有多少计算资源(一个任务执行占据一个资源槽位)。
- 在注册 executor 时,会增加资源槽位总量
- 在分配可执行 tasks 到 executor 时,会消耗资源槽位
- 在 tasks 执行完毕时,释放资源槽位
会话信息
会话信息主要包括作业配置(如配置多少并行度),Client 连接 Scheduler 时,会创建一个会话并存储在共享 kv 中。
作业执行图(分布式执行计划)
主要包括整个分布式执行计划,以及每个 stage 的执行状态。
- 在提交作业时,会将整个执行图保存到共享 kv 中
- 在作业执行成功/终止作业/作业执行有更新时,会更新整个执行图到共享 kv 中
- 在作业执行完毕后,会从共享 kv 中移除
作业状态
作业状态因为读取频繁,被额外单独存储(作业状态可以从执行图中获取)。
- 在提交作业时,会将作业状态保存到共享 kv 中
- 在作业执行成功/终止作业/作业执行有更新时,会更新作业状态到共享 kv 中
- 在作业执行完毕后,会从共享 kv 中移除