-
Notifications
You must be signed in to change notification settings - Fork 34
Description
Today, users are expected to implement their own safeguard for workers with different incompatible versions to interact with each other. This can very easily happen during rollout: old workers get decommissioned while new workers get added to the cluster. If users do nothing, old workers will account the new ones as part of the cluster, and new ones will see old workers as part of the cluster.
This is problematic, because the interaction of several workers with different versions can result either in errors or wrong results.
The scope of this problem goes even beyond this project, there are several reasons why users might want to control the worker versioning themselves:
- Workers running the same version of Distributed DataFusion but a different versions of DataFusion itself
- Workers where new versions have custom UDFs registered but old ones still don't
- Any other reason this project can't control.
Because of this, users should have control of worker versioning, but the actual protocol for advertising worker versions can be perfectly served from this project.
Based on the protocol proposed in #375.
We could have something like this in the proto specification:
service WorkerService {
+ rpc WorkerInfo(WorkerInfoRequest) returns (WorkerInfoResponse);
rpc SetPlan(SetPlanRequest) returns (SetPlanResponse);
rpc ExecuteTask(ExecuteTaskRequest) returns (stream FlightData);
}
message WorkerInfoRequest {
// empty message, but declared anyways for retrocompatibility purposes, in case we want to add something in the future
}
message WorkerInfoResponse {
// user-defined version of the worker
string version = 1;
}And the actual worker structure could be something like:
#[derive(Clone)]
pub struct Worker {
pub(crate) version: String // defaults to an empty string
...
}
impl Worker {
// Sets an arbitrary user-defined version for the worker
pub fn with_version(mut self, version: String) -> Self;
}As users are expected to implement their own WorkerResovler::get_urls() method, they probably would like to perform the version filtering there, in their own code. If there's any helper tool we can provide for that, it would be really nice, it can be as simple as a function that says something like:
async fn worker_has_version(url: Url, expected_version: &str) -> bool;