Skip to content

Commit b226973

Browse files
authored
Merge pull request #9 from YanivZalach/feat/reactor_backend
Feat/reactor backend
2 parents 50a3721 + 33ca330 commit b226973

File tree

8 files changed

+575
-469
lines changed

8 files changed

+575
-469
lines changed

backend/iceberg_inventory_builder.py

Lines changed: 463 additions & 443 deletions
Large diffs are not rendered by default.

backend/main.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1+
import threading
2+
import traceback
13
from dotenv import load_dotenv
24
from pathlib import Path
35
from flask import (
46
Flask,
57
jsonify,
68
request,
7-
Response,
8-
redirect,
99
send_from_directory,
1010
)
1111
from pyspark.errors import AnalysisException
@@ -19,6 +19,9 @@
1919
load_dotenv()
2020
app = Flask(__name__, static_url_path="/static")
2121

22+
_in_flight_lock = threading.Lock()
23+
_in_flight: dict = {}
24+
2225

2326
@app.route("/", defaults={"path": ""})
2427
@app.route("/<path:path>")
@@ -36,18 +39,45 @@ def serve(path):
3639
def graph_data():
3740
table_name = request.form.get("table_name")
3841
date_value = request.form.get("date")
42+
key = (table_name, date_value)
43+
44+
with _in_flight_lock:
45+
if key in _in_flight:
46+
state = _in_flight[key]
47+
is_leader = False
48+
else:
49+
state = {"event": threading.Event(), "result": None, "error": None}
50+
_in_flight[key] = state
51+
is_leader = True
52+
53+
if not is_leader:
54+
logger.info(f"Duplicate request for {key}, waiting for in-flight result")
55+
state["event"].wait()
56+
if state["error"]:
57+
return jsonify({"error": state["error"]}), 400
58+
return jsonify(state["result"])
3959

4060
try:
4161
verify_iceberg_table(table_name)
4262
table_data = IcebergInventoryBuilder(table_name, date_value).collect()
43-
data = normalize_graph_data(table_data)
44-
45-
return jsonify(data)
63+
state["result"] = normalize_graph_data(table_data)
64+
return jsonify(state["result"])
4665

4766
except AnalysisException as e:
48-
logger.error(f"Spark Error: {e}")
67+
logger.error(f"Spark Error: {e}\n{traceback.format_exc()}")
68+
state["error"] = str(e)
4969
return jsonify({"error": str(e)}), 400
5070

71+
except Exception as e:
72+
logger.error(f"Unexpected error: {e}\n{traceback.format_exc()}")
73+
state["error"] = str(e)
74+
return jsonify({"error": str(e)}), 500
75+
76+
finally:
77+
state["event"].set()
78+
with _in_flight_lock:
79+
_in_flight.pop(key, None)
80+
5181

5282
if __name__ == "__main__":
5383
app.run(host="0.0.0.0", port=APPLICATION_PORT)

backend/utils.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing import List
22
import json
3+
from pyspark.sql import functions as F
34
import os
45
from contextlib import suppress
56
from datetime import datetime
@@ -126,6 +127,35 @@ def format_node_info(file_info: Dict[str, Any]) -> str:
126127
return formatted_info
127128

128129

130+
def get_metadata_row_slim_df_from_path(metadata_path: str):
131+
spark = SparkSession.builder.getOrCreate()
132+
df = spark.read.option("multiLine", True).json(metadata_path)
133+
existing = set(df.columns)
134+
135+
scalar_cols = [
136+
"current-schema-id",
137+
"current-snapshot-id",
138+
"default-sort-order-id",
139+
"default-spec-id",
140+
"last-column-id",
141+
"last-partition-id",
142+
"last-sequence-number",
143+
"last-updated-ms",
144+
"location",
145+
"table-uuid",
146+
]
147+
json_cols = ["properties", "refs"]
148+
149+
return df.select(
150+
*[F.col(column) for column in scalar_cols if column in existing],
151+
*[
152+
F.to_json(F.col(column)).alias(column)
153+
for column in json_cols
154+
if column in existing
155+
],
156+
)
157+
158+
129159
def get_json_metadata_from_path(metadata_path: str) -> Dict[str, Any]:
130160
spark = SparkSession.builder.getOrCreate()
131161

@@ -142,7 +172,7 @@ def get_json_metadata_from_path(metadata_path: str) -> Dict[str, Any]:
142172
return row.asDict(recursive=True)
143173

144174

145-
def _update_col_metric(source_list, metric_name, column_metrics):
175+
def update_col_metric(source_list, metric_name, column_metrics):
146176
for row in source_list:
147177
col_id = row.key
148178
if col_id not in column_metrics:

docker_demo/create_mock_tables.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
# ─────────────────────────────────────────────
1010

1111
spark.sql("DROP TABLE IF EXISTS default.events")
12-
spark.sql(
13-
"""
12+
spark.sql("""
1413
CREATE TABLE default.events (
1514
event_id INT,
1615
event_name STRING,
@@ -24,8 +23,7 @@
2423
'write.update.mode' = 'merge-on-read',
2524
'write.merge.mode' = 'merge-on-read'
2625
)
27-
"""
28-
)
26+
""")
2927
print("✅ Created table `default.events`, partitioned by hour(event_ts)\n")
3028

3129
# ─────────────────────────────────────────────
@@ -175,8 +173,7 @@
175173
# ─────────────────────────────────────────────
176174

177175
spark.sql("DROP TABLE IF EXISTS default.logging")
178-
spark.sql(
179-
"""
176+
spark.sql("""
180177
CREATE TABLE default.logging (
181178
event_id INT,
182179
event_name STRING,
@@ -190,8 +187,7 @@
190187
'write.update.mode' = 'merge-on-read',
191188
'write.merge.mode' = 'merge-on-read'
192189
)
193-
"""
194-
)
190+
""")
195191
print("✅ Created table `default.logging`, partitioned by hour(event_ts)\n")
196192

197193
# ─────────────────────────────────────────────

frontend/src/mocks/handlers.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

frontend/src/pages/FileTreePage.jsx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,12 @@ export default function FileTreePage() {
323323
.sort((a, b) => a.name.localeCompare(b.name))
324324
}, [metadata])
325325

326+
useEffect(() => {
327+
if (selectedBranch !== null) return
328+
const mainBranch = branches.find(b => b.name === 'main')
329+
if (mainBranch) setSelectedBranch('main')
330+
}, [branches])
331+
326332
const displayedSnapshots = useMemo(() => {
327333
if (!selectedBranch) return snapshots
328334
const branch = branches.find(b => b.name === selectedBranch)

frontend/src/pages/GraphPage.jsx

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ export default function GraphPage() {
6868
liveEdges.update(liveEdges.get().map(e => ({ ...e, hidden: false })))
6969
setStickyNode(null)
7070
setIsFullView(true)
71+
history.replaceState({ graphSelection: null }, '')
7172
requestAnimationFrame(() => { network.redraw(); network.fit() })
7273
}, [])
7374

@@ -232,16 +233,31 @@ export default function GraphPage() {
232233
🔒 Locked View
233234
</span>
234235
)}
235-
{sticky.rows.map((r, i) => (
236-
<div key={i}>
237-
<span className="block font-bold text-slate-500 text-[0.65rem] uppercase tracking-wider mb-1">
238-
{r.label}
239-
</span>
240-
<span className="block font-mono bg-[#0d1117] text-slate-200 px-3 py-2 rounded-lg text-xs whitespace-pre overflow-x-auto break-normal">
241-
{r.value}
242-
</span>
243-
</div>
244-
))}
236+
{sticky.rows.map((r, i) => {
237+
let displayValue = r.value
238+
const tryParseJson = (str) => {
239+
try { return JSON.parse(str) } catch { return undefined }
240+
}
241+
const asPythonToJson = (str) => str
242+
.replace(/'/g, '"')
243+
.replace(/\bTrue\b/g, 'true')
244+
.replace(/\bFalse\b/g, 'false')
245+
.replace(/\bNone\b/g, 'null')
246+
const parsed = tryParseJson(r.value) ?? tryParseJson(asPythonToJson(r.value))
247+
if (parsed !== undefined && typeof parsed === 'object' && parsed !== null) {
248+
displayValue = JSON.stringify(parsed, null, 2)
249+
}
250+
return (
251+
<div key={i}>
252+
<span className="block font-bold text-slate-500 text-[0.65rem] uppercase tracking-wider mb-1">
253+
{r.label}
254+
</span>
255+
<span className="block font-mono bg-[#0d1117] text-slate-200 px-3 py-2 rounded-lg text-xs whitespace-pre overflow-x-auto break-normal">
256+
{displayValue}
257+
</span>
258+
</div>
259+
)
260+
})}
245261
</div>
246262
</div>
247263
)}

frontend/src/pages/TableLayout.jsx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,20 @@ export default function TableLayout() {
7878
}
7979

8080
const body = new URLSearchParams({ table_name: tableName, date })
81+
const controller = new AbortController()
8182

8283
fetch('/api/v1/graph-data', {
8384
method: 'POST',
8485
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
8586
body: body.toString(),
87+
signal: controller.signal,
8688
})
8789
.then(async (res) => {
8890
const text = await res.text()
8991
const data = JSONbig({ storeAsString: true }).parse(text)
92+
93+
console.log(data)
94+
9095
if (!res.ok) throw new Error(data.error || 'Request failed')
9196
sessionStorage.setItem(cacheKey, text)
9297
return data
@@ -96,9 +101,12 @@ export default function TableLayout() {
96101
setLoading(false)
97102
})
98103
.catch((err) => {
104+
if (err.name === 'AbortError') return
99105
setError(err.message || 'An unexpected error occurred.')
100106
setLoading(false)
101107
})
108+
109+
return () => controller.abort()
102110
}, [tableName, date])
103111

104112
if (loading) {

0 commit comments

Comments
 (0)