-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp1.py
More file actions
236 lines (190 loc) · 10.1 KB
/
app1.py
File metadata and controls
236 lines (190 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
import pandas as pd
import streamlit as st
import altair as alt
from io import StringIO
st.set_page_config(page_title="Branch Performance Dashboard", layout="wide")
st.title("📊 Company Overview")
# --- Blinking star function ---
def blinking_star():
blinking_css = """
<style>
@keyframes blink {
0% { opacity: 1; }
50% { opacity: 0; }
100% { opacity: 1; }
}
.blink {
animation: blink 1s infinite;
font-size: 24px;
color: gold;
font-weight: bold;
}
</style>
"""
st.markdown(blinking_css, unsafe_allow_html=True)
return '<span class="blink">⭐✨</span>'
# --- Performance Status Display (with Star) ---
def performance_status_display(ratio):
if ratio >= 3:
return blinking_star() # Blinking star for PW
elif ratio > 1:
return "⭐" # Single star for NPW with ratio > 1
else:
return "" # No star for NPW with ratio <= 1
# --- Sidebar: File Uploads ---
st.sidebar.header("📤 Upload CSV Files")
uploaded_employees = st.sidebar.file_uploader("Upload Employees CSV", type="csv")
uploaded_branches = st.sidebar.file_uploader("Upload Branches CSV", type="csv")
uploaded_transactions = st.sidebar.file_uploader("Upload Transactions CSV", type="csv")
# --- Function to Load and Process Data ---
def load_data(emp_file, branch_file, trans_file):
employees = pd.read_csv(emp_file)
branches = pd.read_csv(branch_file)
transactions = pd.read_csv(trans_file)
# Merge datasets
emp_branch = pd.merge(employees, branches, on='BranchID', how='left')
df = pd.merge(transactions, emp_branch, on='EmployeeID', how='left')
# Parse dates
df['Date'] = pd.to_datetime(df['Date'])
df['Year'] = df['Date'].dt.year
df['Month'] = df['Date'].dt.month
# Pivot table by transaction type
pivot_df = df.pivot_table(
index=['EmployeeID', 'EmployeeName', 'BranchName', 'Year', 'Month'],
columns='Type',
values='Amount',
aggfunc='sum',
fill_value=0
).reset_index()
# Flatten columns
pivot_df.columns = ['_'.join(col).strip() if isinstance(col, tuple) else col for col in pivot_df.columns.values]
# Clean and compute Net Income
pivot_df['BranchName'] = pivot_df['BranchName'].astype(str)
for col in ['Revenue', 'Expense', 'Salary']:
if col not in pivot_df.columns:
pivot_df[col] = 0
else:
pivot_df[col] = pd.to_numeric(pivot_df[col], errors='coerce').fillna(0)
pivot_df['Net Income'] = pivot_df['Revenue'] - pivot_df['Expense'] - pivot_df['Salary']
return pivot_df
# --- If all files uploaded ---
if uploaded_employees and uploaded_branches and uploaded_transactions:
df = load_data(uploaded_employees, uploaded_branches, uploaded_transactions)
# Prepare filter values
years = sorted(df['Year'].dropna().unique())
months = sorted(df['Month'].dropna().unique())
branches = sorted(df['BranchName'].dropna().unique())
employees = sorted(df['EmployeeName'].dropna().unique())
month_names = {1:"Jan", 2:"Feb", 3:"Mar", 4:"Apr", 5:"May", 6:"Jun",
7:"Jul", 8:"Aug", 9:"Sep", 10:"Oct", 11:"Nov", 12:"Dec"}
# --- Sidebar Filters ---
st.sidebar.header("📅 Filters")
selected_year = st.sidebar.selectbox("Select Year", ["All"] + [str(year) for year in years], index=0)
selected_month = st.sidebar.selectbox("Select Month", ["All"] + [month_names[m] for m in months], index=0)
selected_branches = st.sidebar.multiselect("Select Branch(es)", branches, default=branches)
selected_employees = st.sidebar.multiselect("Select Employee(s)", employees, default=employees)
# --- Fetch Data Button ---
fetch_data = st.sidebar.button("🔍 Fetch Data")
if fetch_data:
filtered_df = df.copy()
# Apply filters
if selected_year != "All":
filtered_df = filtered_df[filtered_df['Year'] == int(selected_year)]
if selected_month != "All":
month_num = [k for k, v in month_names.items() if v == selected_month][0]
filtered_df = filtered_df[filtered_df['Month'] == month_num]
if selected_branches:
filtered_df = filtered_df[filtered_df['BranchName'].isin(selected_branches)]
if selected_employees:
filtered_df = filtered_df[filtered_df['EmployeeName'].isin(selected_employees)]
# Net income recalculation
filtered_df['Net Income'] = filtered_df['Revenue'] - filtered_df['Expense'] - filtered_df['Salary']
# --- Metrics ---
total_sales = filtered_df['Revenue'].sum()
total_expenses = filtered_df['Expense'].sum() + filtered_df['Salary'].sum()
net_income = total_sales - total_expenses
avg_customer_rating = 4.69
total_branches = filtered_df['BranchName'].nunique()
total_employees = filtered_df['EmployeeID'].nunique()
if total_expenses > 0:
performance_ratio = total_sales / total_expenses
else:
performance_ratio = float('inf') # Prevent div by zero
performance_status = "PW" if performance_ratio >= 3 else "NPW"
# --- Show Company Overview Metrics ---
with st.container():
col1, col2, col3, col4 = st.columns(4)
col1.metric("Total Sales", f"${total_sales:,.0f}")
col2.metric("Total Expenses", f"${total_expenses:,.0f}")
col3.metric("Net Income", f"${net_income:,.0f}")
col4.metric("Avg. Customer Rating", f"{avg_customer_rating:.2f}")
col5, col6, col7, col8 = st.columns(4)
col5.metric("Total Branches", total_branches)
col6.metric("Performance Ratio", f"{performance_ratio:.2f}x")
if performance_status == "PW":
perf_status_display = blinking_star()
else:
perf_status_display = "⭐"
col7.markdown(f"**Performance Status:** {perf_status_display}", unsafe_allow_html=True)
col8.metric("Total Employees", total_employees)
# --- Branch Summary (Formatted as Company Overview) ---
st.subheader("📍 Summary by Branch")
branch_summary = filtered_df.groupby("BranchName")[['Expense', 'Revenue', 'Salary', 'Net Income']].sum().reset_index()
# Calculate total unique employees per branch
emp_count = filtered_df.groupby("BranchName")['EmployeeID'].nunique().reset_index().rename(columns={'EmployeeID': 'Total Employees'})
# Merge employee count into branch summary
branch_summary = branch_summary.merge(emp_count, on='BranchName', how='left')
# Calculate Performance Ratio and Status
branch_summary['Performance Ratio'] = branch_summary.apply(
lambda row: row['Revenue'] / (row['Expense'] + row['Salary']) if (row['Expense'] + row['Salary']) > 0 else float('inf'),
axis=1
)
# Apply performance status function for stars
branch_summary['Performance Status'] = branch_summary['Performance Ratio'].apply(
lambda ratio: performance_status_display(ratio)
)
# Show formatted metrics for each branch
for index, row in branch_summary.iterrows():
with st.container():
col1, col2, col3, col4 = st.columns(4)
col1.metric(f"Total Sales ({row['BranchName']})", f"${row['Revenue']:,.0f}")
col2.metric(f"Total Expenses ({row['BranchName']})", f"${row['Expense'] + row['Salary']:,.0f}")
col3.metric(f"Net Income ({row['BranchName']})", f"${row['Net Income']:,.0f}")
col4.metric(f"Total Employees ({row['BranchName']})", row['Total Employees'])
col5, col6, col7 = st.columns(3)
col5.metric(f"Performance Ratio ({row['BranchName']})", f"{row['Performance Ratio']:.2f}x")
col6.markdown(f"**Performance Status:** {row['Performance Status']}", unsafe_allow_html=True)
# --- Download Raw Data Button ---
csv = filtered_df.to_csv(index=False)
st.download_button("Download Raw Data (CSV)", csv, file_name="filtered_data.csv", mime="text/csv")
st.markdown("---")
# --- Employee Summary (Formatted as Company Overview) ---
st.subheader("🧑💼 Summary by Employee")
emp_branch_summary = filtered_df.groupby(["EmployeeName", "BranchName"])[['Expense', 'Revenue', 'Salary', 'Net Income']].sum().reset_index()
# Calculate Performance Ratio for each employee
emp_branch_summary['Performance Ratio'] = emp_branch_summary.apply(
lambda row: row['Revenue'] / (row['Expense'] + row['Salary']) if (row['Expense'] + row['Salary']) > 0 else float('inf'),
axis=1
)
# Apply performance status function for stars
emp_branch_summary['Performance Status'] = emp_branch_summary['Performance Ratio'].apply(
lambda ratio: performance_status_display(ratio)
)
# Show formatted metrics for each employee
for index, row in emp_branch_summary.iterrows():
with st.container():
col1, col2, col3, col4 = st.columns(4)
col1.metric(f"Total Sales ({row['EmployeeName']})", f"${row['Revenue']:,.0f}")
col2.metric(f"Total Expenses ({row['EmployeeName']})", f"${row['Expense'] + row['Salary']:,.0f}")
col3.metric(f"Net Income ({row['EmployeeName']})", f"${row['Net Income']:,.0f}")
col4.metric(f"Branch ({row['EmployeeName']})", row['BranchName'])
col5, col6, col7 = st.columns(3)
col5.metric(f"Performance Ratio ({row['EmployeeName']})", f"{row['Performance Ratio']:.2f}x")
col6.markdown(f"**Performance Status:** {row['Performance Status']}", unsafe_allow_html=True)
# --- Download Raw Data Button for Employee Summary ---
csv = emp_branch_summary.to_csv(index=False)
st.download_button("Download Raw Data (Employee Summary) CSV", csv, file_name="employee_summary_data.csv", mime="text/csv")
else:
st.info("👈 Use the filters and click **Fetch Data** to update the dashboard.")
else:
st.warning("🚨 Please upload all three CSV files in the sidebar to get started.")