深度复盘 Orion-Server 开发的全流程,梳理如何搭建一个 Rust 分布式系统

Table of contents
Open Table of contents
1. 任务概述 Github issue
在 UI 上显示注册到 Orion 的客户端信息,并显示每个客户端状态。
- Orion 需要提供客户端注册信息,每个客户端至少包括:
Client ID(唯一标识),Hostname / IP,Instance ID(如 Pod 名),启动时间, Orion 版本,支持的能力(可选),最近心跳时间 - 显示客户端状态
- 核心状态:
Idle(空闲),Busy(正在构建),Error(构建失败),Lost / Offline(最近心跳超时) - 可扩展状态:
downloading source,preparing environment,running build,uploading artifacts
- 核心状态:
- Orion 需要记录客户端状态变化
- 在任务开始时标记客户端为 Busy
- 在任务结束成功后标记为 Idle
- 在任务失败后标记为 Error
- 定期心跳(例如每 5 秒)更新 Online 状态
- 10~30 秒无心跳 → Offline
整体任务并不困困难,我们可以在 Orion-Server 声明两个接口,/info 与 /status/{id} ,一个用于请求所有 Orion 客户端的信息;一个通过获取到的客户端 ID 来获取 Orion 客户端 status。唯一需要注意的时 前端在请求 /status/{id} 时,需要采取轮询的方式,以便实时获取 客户端 状态的变化。
那现在唯一需要我们花心思处理就是如何在 Orion-Server 中提供注册信息以及实时更新 客户端 状态,这需要我们对整个 Orion 分布式系统进行分析。
2 Orion 分布式系统职责
Mega 是 Google Piper 的一个非官方开源实现,本质上是一个代码托管平台(类似 Github)。而 Orion-Server 正是作为构建调度与执行系统,通常会有多个客户端(agent/worker)注册到 Orion-Server,用于执行:代码构建(Buck2),分析任务, 测试任务,编译 / 预处理 / 输出产物 等等,有点类似于 Github Actions
3. Orion 分布式系统的全梳理
在这部分我们来介绍,Orion 作为一个分布式系统,他的注册与构建流程。
题外话:一个典型的分布式系统的分布式工作流程
- 多个构建客户端连接到服务器
- 用户提交构建任务
- 服务器分配任务给空闲工作者
- 工作者执行构建并实时报告状态
- 服务器收集日志并持久化结果 Orion 构建系统这种分布式系统的典型实现
3. 1 Orion-Server(服务器) 的启动
/// Starts the Orion server with the specified port
/// Initializes database connection, sets up routes, and starts health check tasks
pub async fn start_server(port: u16) {
let db_url = std::env::var("DATABASE_URL").expect("DATABASE_URL is not set in .env file");
let conn = Database::connect(db_url)
.await
.expect("Database connection failed");
let state = AppState::new(conn, None);
/// .......
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.merge(api::routers())
.merge(SwaggerUi::new("/swagger-ui").url("/api-doc/openapi.json", ApiDoc::openapi()))
.with_state(state)
.layer(TraceLayer::new_for_http())
.layer(
ServiceBuilder::new().layer(
CorsLayer::new()
.allow_origin(origins)
.allow_headers(vec![
http::header::AUTHORIZATION,
http::header::CONTENT_TYPE,
])
.allow_methods([
Method::GET,
Method::POST,
Method::OPTIONS,
Method::DELETE,
Method::PUT,
])
.allow_credentials(true),
),
);
/// .......
}orion-server/src/server.rs
和绝大多数服务器代码一样,orion-server 实现了数据库的连接;路由的初始化;连接健康检查,当然最后还有服务器的启动。
让我们把目光放到 api::routers()上,这是我们利用 websocket 与客户端进行长连接的核心,包括处理客户端信息的接收与发送
/// Creates and configures all API routes
pub fn routers() -> Router<AppState> {
Router::new()
.route("/ws", any(ws_handler)) // ws_handler 为 WebSocket 处理的核心函数
/// ......
}
当我们的客户端 Orion 连接到 服务器时,会触发 ws_handler
async fn ws_handler(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<AppState>,
) -> impl IntoResponse {
tracing::info!("{addr} connected. Waiting for registration...");
ws.on_upgrade(move |socket| handle_socket(socket, addr, state))
}