-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver_runtime.py
More file actions
120 lines (82 loc) · 3.83 KB
/
server_runtime.py
File metadata and controls
120 lines (82 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from __future__ import annotations
from typing import Any
from mesh import SovereignMesh
def _extract_path_id(path: str, prefix: str, suffix: str = "") -> str:
token = str(path or "")
if suffix:
token = token[: -len(suffix)]
return token[len(prefix) :].strip("/")
def get_manifest(mesh: SovereignMesh) -> dict[str, Any]:
return mesh.get_manifest()
def get_device_profile(mesh: SovereignMesh) -> dict[str, Any]:
return {"status": "ok", "device_profile": dict(mesh.device_profile)}
def update_device_profile(mesh: SovereignMesh, data: dict[str, Any]) -> dict[str, Any]:
return mesh.update_device_profile(dict(data.get("device_profile") or {}))
def list_peers(mesh: SovereignMesh, *, limit: int = 25) -> dict[str, Any]:
return mesh.list_peers(limit=limit)
def stream_snapshot(mesh: SovereignMesh, *, since_seq: int = 0, limit: int = 50) -> dict[str, Any]:
return mesh.stream_snapshot(since_seq=since_seq, limit=limit)
def accept_handshake(mesh: SovereignMesh, data: dict[str, Any]) -> dict[str, Any]:
return mesh.accept_handshake(data)
def acquire_lease(mesh: SovereignMesh, data: dict[str, Any]) -> dict[str, Any]:
return mesh.acquire_lease(**dict(data or {}))
def heartbeat_lease(mesh: SovereignMesh, data: dict[str, Any]) -> dict[str, Any]:
return mesh.heartbeat_lease(
(data.get("lease_id") or "").strip(),
ttl_seconds=int(data.get("ttl_seconds") or 300),
)
def release_lease(mesh: SovereignMesh, data: dict[str, Any]) -> dict[str, Any]:
return mesh.release_lease(
(data.get("lease_id") or "").strip(),
status=(data.get("status") or "released").strip(),
)
def heartbeat_attempt(mesh: SovereignMesh, attempt_id: str, data: dict[str, Any]) -> dict[str, Any]:
return {
"status": "ok",
"attempt": mesh.heartbeat_job_attempt(
str(attempt_id or "").strip(),
ttl_seconds=int(data.get("ttl_seconds") or 300),
metadata=dict(data.get("metadata") or {}),
),
}
def heartbeat_attempt_from_path(mesh: SovereignMesh, path: str, data: dict[str, Any]) -> dict[str, Any]:
return heartbeat_attempt(mesh, _extract_path_id(path, "/mesh/jobs/attempts/", "/heartbeat"), data)
def complete_attempt(mesh: SovereignMesh, attempt_id: str, data: dict[str, Any]) -> dict[str, Any]:
return mesh.complete_job_attempt(
str(attempt_id or "").strip(),
data.get("result"),
media_type=(data.get("media_type") or "application/json").strip(),
executor=(data.get("executor") or "").strip(),
metadata=dict(data.get("metadata") or {}),
)
def complete_attempt_from_path(mesh: SovereignMesh, path: str, data: dict[str, Any]) -> dict[str, Any]:
return complete_attempt(mesh, _extract_path_id(path, "/mesh/jobs/attempts/", "/complete"), data)
def fail_attempt(mesh: SovereignMesh, attempt_id: str, data: dict[str, Any]) -> dict[str, Any]:
return mesh.fail_job_attempt(
str(attempt_id or "").strip(),
error=(data.get("error") or "job attempt failed").strip(),
retryable=bool(data.get("retryable", True)),
metadata=dict(data.get("metadata") or {}),
)
def fail_attempt_from_path(mesh: SovereignMesh, path: str, data: dict[str, Any]) -> dict[str, Any]:
return fail_attempt(mesh, _extract_path_id(path, "/mesh/jobs/attempts/", "/fail"), data)
def accept_handoff(mesh: SovereignMesh, data: dict[str, Any]) -> dict[str, Any]:
return mesh.accept_handoff(data)
__all__ = [
"accept_handshake",
"accept_handoff",
"acquire_lease",
"complete_attempt",
"complete_attempt_from_path",
"fail_attempt",
"fail_attempt_from_path",
"get_device_profile",
"get_manifest",
"heartbeat_attempt",
"heartbeat_attempt_from_path",
"heartbeat_lease",
"list_peers",
"release_lease",
"stream_snapshot",
"update_device_profile",
]