-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtables_dir.py
More file actions
79 lines (67 loc) · 2.21 KB
/
tables_dir.py
File metadata and controls
79 lines (67 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from abc import ABC
from functools import cache
from os.path import exists, splitext, basename
from tempfile import mkdtemp
from typing import Type
import pandas as pd
from pandas import DataFrame
from utz import Unset, err
from utz.ym import Monthy
from ctbk.task import Task
from ctbk.util.df import save
from ctbk.util.read import Read
Tables = dict[str, DataFrame]
class TablesDir(Task[Tables], ABC):
def __init__(self, ym: Monthy, **kwargs):
self.ym = ym
super().__init__(**kwargs)
def _dfs(self) -> Tables:
raise NotImplementedError
def _create(self, read: Read | None | Type[Unset] = Unset) -> Tables:
return self.checkpoint()
def paths(self) -> dict[str, str]:
fs = self.fs
paths = fs.glob(f'{self.url}/*_*.parquet')
return { splitext(basename(path))[0]: path for path in paths }
def read(self) -> Tables:
paths = self.paths()
return {
name: pd.read_parquet(path)
for name, path in paths.items()
}
@cache
def dfs(self) -> Tables:
return self.create()
@property
def checkpoint_kwargs(self):
return dict()
def checkpoint(self) -> Tables:
url = self.url
rmdir = False
if not exists(url):
self.fs.mkdirs(url)
rmdir = True
rm_paths = []
try:
_dfs = self._dfs()
dfs = {}
for name, df in _dfs.items():
path = f'{url}/{name}.parquet'
save(df, url=path, **self.checkpoint_kwargs)
dfs[name] = df
rmdir = False
rm_paths = {
name: path
for name, path in self.paths().items()
if name not in dfs
}
return dfs
finally:
if rmdir:
err(f"Removing directory {url} after failed write")
self.fs.delete(url) # TODO: remove all directory levels that were created
if rm_paths:
tmpdir = mkdtemp()
err(f"Moving untracked parquets to {tmpdir}: {rm_paths}")
for name, path in rm_paths.items():
self.fs.mv(path, f'{tmpdir}/{name}.parquet')