Skip to content

Commit 5f25659

Browse files
committed
Fancy progress bar!
1 parent 7d305d3 commit 5f25659

3 files changed

Lines changed: 57 additions & 46 deletions

File tree

src/sync/collect.rs

Lines changed: 45 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
collections::{HashMap, HashSet},
44
path::PathBuf,
55
};
6-
use tokio::sync::mpsc::Receiver;
6+
use tokio::sync::mpsc::UnboundedReceiver;
77

88
use crate::{
99
asset::AssetRef,
@@ -16,11 +16,11 @@ use crate::{
1616
pub struct CollectResults {
1717
pub new_lockfile: Lockfile,
1818
pub input_sources: HashMap<String, NodeSource>,
19-
pub new_count: u32,
19+
pub new_count: u64,
2020
}
2121

2222
pub async fn collect_events(
23-
mut rx: Receiver<super::Event>,
23+
mut rx: UnboundedReceiver<super::Event>,
2424
target: SyncTarget,
2525
inputs: InputMap,
2626
mp: MultiProgress,
@@ -43,6 +43,16 @@ pub async fn collect_events(
4343

4444
while let Some(event) = rx.recv().await {
4545
match event {
46+
super::Event::Discovered(path) => {
47+
if !seen_paths.contains(&path) {
48+
progress.discovered += 1;
49+
}
50+
}
51+
super::Event::InFlight(path) => {
52+
if !seen_paths.contains(&path) {
53+
progress.in_flight.insert(path.clone());
54+
}
55+
}
4656
super::Event::Finished {
4757
state,
4858
input_name,
@@ -81,18 +91,13 @@ pub async fn collect_events(
8191

8292
progress.in_flight.remove(&path);
8393
}
84-
super::Event::InFlight(path) => {
85-
if !seen_paths.contains(&path) {
86-
progress.in_flight.insert(path.clone());
87-
}
88-
}
8994
super::Event::Failed(path) => {
9095
progress.failed += 1;
9196
progress.in_flight.remove(&path);
9297
}
9398
}
9499

95-
progress.update_msg();
100+
progress.update();
96101
}
97102

98103
progress.finish();
@@ -105,30 +110,39 @@ pub async fn collect_events(
105110
}
106111

107112
struct Progress {
108-
spinner: ProgressBar,
113+
inner: ProgressBar,
109114
target: SyncTarget,
110115
in_flight: HashSet<PathBuf>,
111-
synced: u32,
112-
new: u32,
113-
dupes: u32,
114-
failed: u32,
116+
discovered: u64,
117+
synced: u64,
118+
new: u64,
119+
dupes: u64,
120+
failed: u64,
115121
}
116122

117123
impl Progress {
124+
fn get_style(finished: bool) -> ProgressStyle {
125+
ProgressStyle::default_bar()
126+
.template(&format!(
127+
"{{prefix:.{prefix_color}.bold}}{bar} {{pos}}/{{len}} assets: ({{msg}})",
128+
prefix_color = if finished { "green" } else { "cyan" },
129+
bar = if finished { "" } else { " [{bar:40}]" },
130+
))
131+
.unwrap()
132+
.progress_chars("=> ")
133+
}
134+
118135
fn new(mp: MultiProgress, target: SyncTarget) -> Self {
119136
let spinner = mp.add(ProgressBar::new_spinner());
120-
spinner.set_style(
121-
ProgressStyle::default_spinner()
122-
.template("{spinner:.cyan} {msg}")
123-
.unwrap(),
124-
);
137+
spinner.set_style(Progress::get_style(false));
138+
spinner.set_prefix("Syncing");
125139
spinner.enable_steady_tick(std::time::Duration::from_millis(100));
126-
spinner.set_message("Starting sync...");
127140

128141
Self {
129-
spinner,
142+
inner: spinner,
130143
target,
131144
in_flight: HashSet::new(),
145+
discovered: 0,
132146
synced: 0,
133147
new: 0,
134148
dupes: 0,
@@ -137,16 +151,13 @@ impl Progress {
137151
}
138152

139153
fn get_msg(&self) -> String {
140-
let mut str = format!("Synced {} files", self.synced + self.dupes);
141-
142154
let mut parts = Vec::new();
143155

144156
if self.new > 0 {
145157
let target_msg = match self.target {
146158
SyncTarget::Cloud { dry_run: true } => "checked",
147159
SyncTarget::Cloud { dry_run: false } => "uploaded",
148-
SyncTarget::Studio => "written to content folder",
149-
SyncTarget::Debug => "written to debug folder",
160+
SyncTarget::Studio | SyncTarget::Debug => "written",
150161
};
151162
parts.push(format!("{} {}", self.new, target_msg));
152163
}
@@ -160,29 +171,26 @@ impl Progress {
160171

161172
let in_flight = self.in_flight.len();
162173
if in_flight > 0 {
163-
parts.push(format!("{} in-flight", in_flight));
174+
parts.push(format!("{} processing", in_flight));
164175
}
165176

166177
let failed = self.failed;
167178
if failed > 0 {
168179
parts.push(format!("{} failed", failed));
169180
}
170181

171-
if parts.is_empty() {
172-
return str;
173-
}
174-
175-
str.push_str(" (");
176-
str.push_str(&parts.join(", "));
177-
str.push(')');
178-
str
182+
parts.join(", ")
179183
}
180184

181-
fn update_msg(&self) {
182-
self.spinner.set_message(self.get_msg());
185+
fn update(&self) {
186+
self.inner.set_position(self.synced + self.dupes);
187+
self.inner.set_length(self.discovered);
188+
self.inner.set_message(self.get_msg());
183189
}
184190

185191
fn finish(&self) {
186-
self.spinner.finish_with_message(self.get_msg());
192+
self.inner.set_prefix("Synced");
193+
self.inner.set_style(Progress::get_style(true));
194+
self.inner.finish();
187195
}
188196
}

src/sync/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ impl TargetBackend {
4141

4242
#[derive(Debug)]
4343
enum Event {
44+
Discovered(PathBuf),
45+
InFlight(PathBuf),
4446
Finished {
4547
state: EventState,
4648
input_name: String,
@@ -49,7 +51,6 @@ enum Event {
4951
hash: String,
5052
asset_ref: Option<AssetRef>,
5153
},
52-
InFlight(PathBuf),
5354
Failed(PathBuf),
5455
}
5556

@@ -71,7 +72,7 @@ pub async fn sync(args: SyncArgs, mp: MultiProgress) -> anyhow::Result<()> {
7172
db
7273
});
7374

74-
let (event_tx, event_rx) = mpsc::channel::<Event>(100);
75+
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
7576

7677
let collector_handle = tokio::spawn({
7778
let inputs = config.inputs.clone();

src/sync/walk.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::{
1919
sync::Arc,
2020
};
2121
use tokio::{
22-
sync::{Mutex, Semaphore, mpsc::Sender},
22+
sync::{Mutex, Semaphore, mpsc::UnboundedSender},
2323
task::JoinSet,
2424
};
2525
use walkdir::WalkDir;
@@ -39,7 +39,7 @@ struct InputState {
3939
bleed: bool,
4040
}
4141

42-
pub async fn walk(params: Params, config: &Config, tx: &Sender<super::Event>) {
42+
pub async fn walk(params: Params, config: &Config, tx: &UnboundedSender<super::Event>) {
4343
let params = Arc::new(params);
4444

4545
for (input_name, input) in &config.inputs {
@@ -78,14 +78,16 @@ pub async fn walk(params: Params, config: &Config, tx: &Sender<super::Event>) {
7878
let semaphore = semaphore.clone();
7979
let tx = tx.clone();
8080

81+
tx.send(super::Event::Discovered(path.clone())).unwrap();
82+
8183
join_set.spawn(async move {
8284
let _permit = semaphore.acquire_owned().await.unwrap();
8385

84-
tx.send(super::Event::InFlight(path.clone())).await.unwrap();
86+
tx.send(super::Event::InFlight(path.clone())).unwrap();
8587

8688
if let Err(e) = process_entry(state.clone(), &path, &tx).await {
8789
warn!("Failed to process file {}: {e:?}", path.display());
88-
tx.send(super::Event::Failed(path.clone())).await.unwrap();
90+
tx.send(super::Event::Failed(path.clone())).unwrap();
8991
}
9092
});
9193
}
@@ -97,7 +99,7 @@ pub async fn walk(params: Params, config: &Config, tx: &Sender<super::Event>) {
9799
async fn process_entry(
98100
state: Arc<InputState>,
99101
path: &Path,
100-
tx: &Sender<super::Event>,
102+
tx: &UnboundedSender<super::Event>,
101103
) -> anyhow::Result<()> {
102104
debug!("Handling entry: {}", path.display());
103105

@@ -137,7 +139,7 @@ async fn process_entry(
137139
asset_ref: lockfile_entry.map(Into::into),
138140
hash: asset.hash.clone(),
139141
};
140-
tx.send(event).await.unwrap();
142+
tx.send(event).unwrap();
141143

142144
return Ok(());
143145
}
@@ -163,7 +165,7 @@ async fn process_entry(
163165
hash: asset.hash.clone(),
164166
asset_ref,
165167
};
166-
tx.send(event).await.unwrap();
168+
tx.send(event).unwrap();
167169

168170
Ok(())
169171
}

0 commit comments

Comments
 (0)