-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathutils.py
More file actions
155 lines (124 loc) · 5.29 KB
/
utils.py
File metadata and controls
155 lines (124 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import os
import tempfile
import time
import subprocess
import threading
import logging
from contextlib import contextmanager
from functools import lru_cache
import librosa
from config import Config
logger = logging.getLogger(__name__)
# Custom exceptions
class AudioProcessingError(Exception):
pass
class DownloadError(Exception):
pass
class ValidationError(Exception):
pass
def get_user_directories(session_id):
base_dir = os.path.join(Config.BASE_DIR, session_id)
return {
'download': os.path.join(base_dir, 'downloads'),
'separated': os.path.join(base_dir, 'separated'),
'final': os.path.join(base_dir, 'final_output')
}
@contextmanager
def temp_audio_file(suffix='.wav'):
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
try:
yield temp_file.name
finally:
try:
os.remove(temp_file.name)
except Exception as e:
logger.warning(f"Failed to cleanup temp file {temp_file.name}: {str(e)}")
def sanitize_filename(filename):
filename = os.path.basename(filename)
filename = ''.join(c for c in filename if c.isalnum() or c in '._-')
if len(filename) > 255:
filename = filename[:255]
if not any(filename.lower().endswith(ext) for ext in Config.ALLOWED_EXTENSIONS):
raise ValidationError(f"Invalid file extension for {filename}")
return filename
def validate_audio_file_fast(file_path):
try:
with open(file_path, 'rb') as f:
header = f.read(4)
if not header.startswith((b'RIFF', b'ID3', b'\xff\xfb')):
raise ValidationError("Invalid audio file format")
return True
except Exception as e:
raise ValidationError(f"File validation failed: {str(e)}")
@lru_cache(maxsize=32)
def get_audio_metadata(file_path):
try:
duration = librosa.get_duration(path=file_path)
return {'duration': duration}
except Exception as e:
logger.warning(f"Failed to get metadata for {file_path}: {str(e)}")
return {'duration': 0}
def emit_log(socketio, message, type="info", error_context=None, sid=None):
socketio.emit('log_message', {'message': message, 'type': type}, room=sid)
log_level = 'info' if type == 'success' else type
if type == "error" and error_context:
logger.error(f"{message} | Context: {error_context}", exc_info=True)
else:
getattr(logger, log_level)(message)
def emit_progress(socketio, file_index, total_files, step, total_steps, stage_name="Processing", sid=None):
progress_per_file = 100 / total_files
progress_per_step_in_file = progress_per_file / total_steps
current_file_base_progress = (file_index - 1) * progress_per_file
current_step_progress = step * progress_per_step_in_file
total_progress = current_file_base_progress + current_step_progress
socketio.emit('progress_update', {'progress': round(total_progress, 1), 'stage': stage_name}, room=sid)
def progress_heartbeat(socketio, process_name, stop_event, sid=None):
start_time = time.time()
while not stop_event.is_set():
elapsed = time.time() - start_time
emit_log(socketio, f"⏳ {process_name} in progress ({elapsed:.1f}s elapsed)...", "info", sid=sid)
time.sleep(Config.LOG_UPDATE_INTERVAL)
def run_subprocess_with_timeout(command, timeout=Config.MAX_PROCESSING_TIME, progress_callback=None, sid=None):
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
start_time = time.time()
stderr_output = []
def monitor_progress():
while process.poll() is None:
line = process.stderr.readline()
if line:
stderr_output.append(line)
if progress_callback and '|' in line:
try:
progress = line.split('|')[1].split('/')[0].strip()
progress = float(progress) / 35.1 * 100
progress_callback(round(progress, 1))
except (IndexError, ValueError):
pass
if time.time() - start_time > timeout:
process.terminate()
raise AudioProcessingError(f"Subprocess timed out after {timeout} seconds")
time.sleep(0.1)
if progress_callback:
monitor_thread = threading.Thread(target=monitor_progress)
monitor_thread.daemon = True
monitor_thread.start()
stdout, stderr = process.communicate(timeout=timeout)
stderr_output.append(stderr)
if process.returncode != 0:
raise AudioProcessingError(f"Subprocess failed: {''.join(stderr_output)}")
return subprocess.CompletedProcess(command, process.returncode, stdout, ''.join(stderr_output))
except subprocess.TimeoutExpired:
process.terminate()
raise AudioProcessingError(f"Subprocess timed out after {timeout} seconds")
except subprocess.CalledProcessError as e:
raise AudioProcessingError(f"Subprocess failed: {e.stderr}")
except Exception as e:
raise AudioProcessingError(f"Subprocess error: {str(e)}")