-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathblob.py
More file actions
77 lines (61 loc) · 1.83 KB
/
blob.py
File metadata and controls
77 lines (61 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from dataclasses import dataclass
from functools import cached_property
from os.path import join, exists
import yaml
from utz import run, err, Log, solo
from utz.s3 import parse_bkt_key, get_etag
from ctbk.paths import S3
class DvcBlob:
path: str
@property
def dvc_path(self) -> str:
return f"{self.path}.dvc"
@cached_property
def dvc_spec(self):
with open(self.dvc_path, 'r') as f:
return yaml.safe_load(f)
@property
def out(self):
return solo(self.dvc_spec['outs'])
@property
def dep(self):
return solo(self.dvc_spec['deps'])
@property
def etag(self) -> str:
return self.dep['etag']
@dataclass(init=False)
class Blob(DvcBlob):
bkt: str
key: str
def __init__(self, *args: str):
self.bkt, self.key = parse_bkt_key(args)
@property
def url(self) -> str:
return f"s3://{self.bkt}/{self.key}"
@property
def path(self) -> str:
return join(S3, self.bkt, self.key)
@property
def s3_etag(self) -> str:
return get_etag(self.bkt, self.key)
def update(
self,
dry_run: bool = False,
log: Log = err,
verbose: bool = False,
) -> bool:
dvc_path = self.dvc_path
if exists(dvc_path):
etag0 = self.etag
etag1 = self.s3_etag
if etag0 != etag1:
log(f"{dvc_path} etag changed ({etag0} → {etag1}); re-importing")
run('dvc', 'import-url', '-f', self.url, self.path, dry_run=dry_run)
return True
elif verbose:
log(f"{dvc_path} (ETag {etag0}) is up to date")
return False
else:
log(f"{dvc_path} not found; importing")
run('dvc', 'import-url', self.url, self.path, dry_run=dry_run)
return True