Skip to content
Go back

深度剖析 Mega 系统中的 Orion-Server 与 Orion客户端

Edit page

深度复盘 Orion-Server 开发的全流程,梳理如何搭建一个 Rust 分布式系统

title-picture

Table of contents

Open Table of contents

1. 任务概述 Github issue

在 UI 上显示注册到 Orion 的客户端信息,并显示每个客户端状态。

  1. Orion 需要提供客户端注册信息,每个客户端至少包括Client ID(唯一标识), Hostname / IP, Instance ID(如 Pod 名), 启动时间, Orion 版本, 支持的能力(可选), 最近心跳时间
  2. 显示客户端状态
    1. 核心状态: Idle(空闲), Busy(正在构建), Error(构建失败), Lost / Offline(最近心跳超时)
    2. 可扩展状态: downloading source, preparing environment, running build, uploading artifacts
  3. Orion 需要记录客户端状态变化
    1. 在任务开始时标记客户端为 Busy
    2. 在任务结束成功后标记为 Idle
    3. 在任务失败后标记为 Error
    4. 定期心跳(例如每 5 秒)更新 Online 状态
    5. 10~30 秒无心跳 → Offline

整体任务并不困困难,我们可以在 Orion-Server 声明两个接口,/info 与 /status/{id} ,一个用于请求所有 Orion 客户端的信息;一个通过获取到的客户端 ID 来获取 Orion 客户端 status。唯一需要注意的时 前端在请求 /status/{id} 时,需要采取轮询的方式,以便实时获取 客户端 状态的变化。

那现在唯一需要我们花心思处理就是如何在 Orion-Server 中提供注册信息以及实时更新 客户端 状态,这需要我们对整个 Orion 分布式系统进行分析。

2 Orion 分布式系统职责

Mega 是 Google Piper 的一个非官方开源实现,本质上是一个代码托管平台(类似 Github)。而 Orion-Server 正是作为构建调度与执行系统,通常会有多个客户端(agent/worker)注册到 Orion-Server,用于执行:代码构建(Buck2)分析任务, 测试任务编译 / 预处理 / 输出产物 等等,有点类似于 Github Actions

3. Orion 分布式系统的全梳理

在这部分我们来介绍,Orion 作为一个分布式系统,他的注册与构建流程。

题外话:一个典型的分布式系统的分布式工作流程

  1. 多个构建客户端连接到服务器
  2. 用户提交构建任务
  3. 服务器分配任务给空闲工作者
  4. 工作者执行构建并实时报告状态
  5. 服务器收集日志并持久化结果 Orion 构建系统这种分布式系统的典型实现

3. 1 Orion-Server(服务器) 的启动

/// Starts the Orion server with the specified port
/// Initializes database connection, sets up routes, and starts health check tasks
pub async fn start_server(port: u16) {
    let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL is not set in .env file");
    let conn = Database::connect(db_url)
        .await
        .expect("Database connection failed");

    let state = AppState::new(conn, None);

    /// .......

    let app = Router::new()
        .route("/", get(|| async { "Hello, World!" }))
        .merge(api::routers())
        .merge(SwaggerUi::new("/swagger-ui").url("/api-doc/openapi.json", ApiDoc::openapi()))
        .with_state(state)
        .layer(TraceLayer::new_for_http())
        .layer(
            ServiceBuilder::new().layer(
                CorsLayer::new()
                    .allow_origin(origins)
                    .allow_headers(vec![
                        http::header::AUTHORIZATION,
                        http::header::CONTENT_TYPE,
                    ])
                    .allow_methods([
                        Method::GET,
                        Method::POST,
                        Method::OPTIONS,
                        Method::DELETE,
                        Method::PUT,
                    ])
                    .allow_credentials(true),
            ),
        );

    /// .......
}orion-server/src/server.rs

和绝大多数服务器代码一样,orion-server 实现了数据库的连接;路由的初始化;连接健康检查,当然最后还有服务器的启动。

让我们把目光放到 api::routers()上,这是我们利用 websocket 与客户端进行长连接的核心,包括处理客户端信息的接收与发送

/// Creates and configures all API routes
pub fn routers() -> Router<AppState> {
    Router::new()
        .route("/ws", any(ws_handler)) // ws_handler 为 WebSocket 处理的核心函数
        /// ......
}

当我们的客户端 Orion 连接到 服务器时,会触发 ws_handler

async fn ws_handler(
    ws: WebSocketUpgrade,
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
    State(state): State<AppState>,
) -> impl IntoResponse {
    tracing::info!("{addr} connected. Waiting for registration...");
    ws.on_upgrade(move |socket| handle_socket(socket, addr, state))
}

Edit page
Share this post on: