Skip to content

Commit 235a1d2

Browse files
committed
Create UFW services(on normal method, wo import fabric)
1 parent 2d16ed6 commit 235a1d2

File tree

10 files changed

+318
-8
lines changed

10 files changed

+318
-8
lines changed

app/interfaces/cli/run.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,3 @@ def docker_interactive_run(self):
8282
print("Неверный ввод")
8383
# overload
8484
self.docker_interactive_run()
85-

app/services/distro/arch/docker.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
from app.utils.subprocess_utils import run_commands
44

5+
56
class DockerService:
67
def __init__(self) -> None:
78
self.status: bool = True if which("docker") else False
8-
9+
910
def install(self):
1011
run_commands(
1112
[
@@ -14,7 +15,7 @@ def install(self):
1415
["systemctl", "start", "docker"],
1516
]
1617
)
17-
18+
1819
def uninstall(self):
1920
run_commands(
2021
[
@@ -24,4 +25,4 @@ def uninstall(self):
2425
["rm", "-rf", "/var/lib/docker"],
2526
["rm", "-rf", "/var/lib/containerd"],
2627
]
27-
)
28+
)

app/services/distro/arch/ufw.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import time
2+
from subprocess import run
3+
4+
from app.services.ufw import UFWService
5+
from app.utils.subprocess_utils import run_commands
6+
7+
8+
class ArchUFWService(UFWService):
9+
def __init__(self):
10+
super().__init__()
11+
self.package_manager = "pacman"
12+
13+
def install(self):
14+
print("Installing UFW for Arch Linux...")
15+
16+
run_commands([[self.package_manager, "-Sy", "--noconfirm", "ufw"]])
17+
18+
run_commands(
19+
[
20+
["ufw", "default", "deny", "incoming"],
21+
["ufw", "default", "allow", "outgoing"],
22+
]
23+
)
24+
25+
for action, port, description in self.default_rules:
26+
print(f"Adding rule: {action} {port} ({description})")
27+
run_commands([["ufw", action, port, "comment", description]])
28+
29+
run_commands(
30+
[
31+
["systemctl", "enable", "ufw"],
32+
["systemctl", "start", "ufw"],
33+
]
34+
)
35+
36+
run_commands([["ufw", "--force", "enable"]])
37+
38+
self._wait_for_status("active")
39+
40+
run_commands(
41+
[
42+
["ufw", "status", "verbose"],
43+
["systemctl", "status", "ufw"],
44+
]
45+
)
46+
47+
print("UFW successfully installed and configured")
48+
49+
def uninstall(self):
50+
print("Uninstalling UFW for Arch Linux...")
51+
52+
run_commands([["ufw", "--force", "disable"]])
53+
54+
run_commands(
55+
[
56+
["systemctl", "stop", "ufw"],
57+
["systemctl", "disable", "ufw"],
58+
]
59+
)
60+
61+
self._wait_for_status("inactive")
62+
63+
run_commands([[self.package_manager, "-Rns", "--noconfirm", "ufw"]])
64+
65+
print("UFW successfully uninstalled")
66+
67+
def _check_service_status(self, expected_status: str) -> bool:
68+
result = run(["systemctl", "is-active", "ufw"], text=True, capture_output=True)
69+
return result.stdout.strip() == expected_status
70+
71+
def _wait_for_status(self, expected_status: str, timeout: int = 10):
72+
start_time = time.time()
73+
while time.time() - start_time < timeout:
74+
if self._check_service_status(expected_status):
75+
return
76+
time.sleep(0.5)
77+
print(f"Warning: failed to reach status '{expected_status}'")

app/services/distro/debian/fail2ban.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import time
22
from subprocess import run
3+
34
from app.utils.subprocess_utils import run_commands
45

56

67
class Fail2BanService:
78
def __init__(self):
8-
99
self.path_to_jail_config = "/etc/fail2ban/jail.d/sshd.local"
1010
self.jail_str_config = """
1111
[sshd]

app/services/distro/debian/ufw.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import time
2+
from subprocess import run
3+
4+
from app.services.ufw import UFWService
5+
from app.utils.subprocess_utils import run_commands
6+
7+
8+
class DebianUFWService(UFWService):
9+
def __init__(self):
10+
super().__init__()
11+
self.package_manager = "apt"
12+
13+
def install(self):
14+
print("Installing UFW for Debian/Ubuntu...")
15+
16+
run_commands([[self.package_manager, "update"]])
17+
run_commands([[self.package_manager, "install", "ufw", "-y"]])
18+
19+
run_commands(
20+
[
21+
["ufw", "default", "deny", "incoming"],
22+
["ufw", "default", "allow", "outgoing"],
23+
]
24+
)
25+
26+
for action, port, description in self.default_rules:
27+
print(f"Adding rule: {action} {port} ({description})")
28+
run_commands([["ufw", action, port, "comment", description]])
29+
30+
run_commands([["ufw", "--force", "enable"]])
31+
32+
self._wait_for_status("active")
33+
34+
run_commands(
35+
[
36+
["ufw", "status", "verbose"],
37+
["systemctl", "status", "ufw"],
38+
]
39+
)
40+
41+
print("UFW successfully installed and configured")
42+
43+
def uninstall(self):
44+
print("Uninstalling UFW for Debian/Ubuntu...")
45+
46+
run_commands([["ufw", "--force", "disable"]])
47+
48+
self._wait_for_status("inactive")
49+
50+
run_commands(
51+
[
52+
["systemctl", "stop", "ufw"],
53+
["systemctl", "disable", "ufw"],
54+
]
55+
)
56+
57+
run_commands([[self.package_manager, "remove", "ufw", "-y"]])
58+
run_commands([[self.package_manager, "autoremove", "-y"]])
59+
60+
print("UFW successfully uninstalled")
61+
62+
def _check_service_status(self, expected_status: str) -> bool:
63+
result = run(["systemctl", "is-active", "ufw"], text=True, capture_output=True)
64+
return result.stdout.strip() == expected_status
65+
66+
def _wait_for_status(self, expected_status: str, timeout: int = 10):
67+
start_time = time.time()
68+
while time.time() - start_time < timeout:
69+
if self._check_service_status(expected_status):
70+
return
71+
time.sleep(0.5)
72+
print(f"Warning: failed to reach status '{expected_status}'")

app/services/distro/wrt/fail2ban.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import time
22
from subprocess import run
3+
34
from app.utils.subprocess_utils import run_commands
45

56

67
class Fail2BanService:
78
def __init__(self):
8-
99
self.path_to_jail_config = "/etc/fail2ban/jail.d/sshd.local"
1010
self.jail_str_config = """
1111
[sshd]

app/services/distro/wrt/ufw.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import time
2+
from subprocess import run
3+
4+
from app.services.ufw import UFWService
5+
from app.utils.subprocess_utils import run_commands
6+
7+
8+
class WrtUFWService(UFWService):
9+
def __init__(self):
10+
super().__init__()
11+
12+
def install(self):
13+
print("OpenWrt uses built-in firewall")
14+
print("Configuring firewall for OpenWrt...")
15+
16+
fw_version = self._detect_firewall_version()
17+
18+
if fw_version == "fw4":
19+
self._configure_fw4()
20+
else:
21+
self._configure_fw3()
22+
23+
run_commands([["/etc/init.d/firewall", "restart"]])
24+
25+
self._wait_for_status("running")
26+
27+
run_commands(
28+
[
29+
["/etc/init.d/firewall", "status"],
30+
["iptables", "-L", "-n", "-v"],
31+
]
32+
)
33+
34+
print("Firewall successfully configured")
35+
36+
def uninstall(self):
37+
print("Cannot remove built-in OpenWrt firewall")
38+
print("Resetting firewall to default settings...")
39+
40+
run_commands(
41+
[
42+
["uci", "revert", "firewall"],
43+
["uci", "commit", "firewall"],
44+
["/etc/init.d/firewall", "restart"],
45+
]
46+
)
47+
48+
print("Firewall reset to default settings")
49+
50+
def _detect_firewall_version(self) -> str:
51+
result = run(["which", "fw4"], capture_output=True)
52+
return "fw4" if result.returncode == 0 else "fw3"
53+
54+
def _configure_fw3(self):
55+
print("Configuring fw3...")
56+
57+
commands = [
58+
["uci", "set", "firewall.ssh=rule"],
59+
["uci", "set", "firewall.ssh.name=Allow-SSH"],
60+
["uci", "set", "firewall.ssh.src=wan"],
61+
["uci", "set", "firewall.ssh.proto=tcp"],
62+
["uci", "set", "firewall.ssh.dest_port=22"],
63+
["uci", "set", "firewall.ssh.target=ACCEPT"],
64+
["uci", "set", "firewall.http=rule"],
65+
["uci", "set", "firewall.http.name=Allow-HTTP"],
66+
["uci", "set", "firewall.http.src=wan"],
67+
["uci", "set", "firewall.http.proto=tcp"],
68+
["uci", "set", "firewall.http.dest_port=80"],
69+
["uci", "set", "firewall.http.target=ACCEPT"],
70+
["uci", "set", "firewall.https=rule"],
71+
["uci", "set", "firewall.https.name=Allow-HTTPS"],
72+
["uci", "set", "firewall.https.src=wan"],
73+
["uci", "set", "firewall.https.proto=tcp"],
74+
["uci", "set", "firewall.https.dest_port=443"],
75+
["uci", "set", "firewall.https.target=ACCEPT"],
76+
["uci", "commit", "firewall"],
77+
]
78+
79+
run_commands(commands)
80+
81+
def _configure_fw4(self):
82+
print("Configuring fw4...")
83+
self._configure_fw3()
84+
85+
def _check_service_status(self, expected_status: str) -> bool:
86+
result = run(["/etc/init.d/firewall", "status"], capture_output=True, text=True)
87+
if expected_status == "running":
88+
return result.returncode == 0
89+
else:
90+
return result.returncode != 0
91+
92+
def _wait_for_status(self, expected_status: str, timeout: int = 10):
93+
start_time = time.time()
94+
while time.time() - start_time < timeout:
95+
if self._check_service_status(expected_status):
96+
return
97+
time.sleep(0.5)
98+
print(f"Warning: failed to reach status '{expected_status}'")

app/services/docker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
else:
1212
raise OSError("Unsuported distro!")
1313

14-
dockerservice = DockerService()
14+
dockerservice = DockerService()

app/services/fail2ban.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
else:
1212
raise OSError("Unsuported distro!")
1313

14-
fail2banservice = Fail2BanService()
14+
fail2banservice = Fail2BanService()

app/services/ufw.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from abc import ABC, abstractmethod
2+
3+
from app.utils import sysinfo_utils
4+
5+
6+
class UFWService(ABC):
7+
"""Base class for UFW management"""
8+
9+
def __init__(self):
10+
self.default_rules = [
11+
("allow", "22/tcp", "SSH"),
12+
("allow", "80/tcp", "HTTP"),
13+
("allow", "443/tcp", "HTTPS"),
14+
]
15+
16+
@abstractmethod
17+
def install(self):
18+
"""Install UFW"""
19+
pass
20+
21+
@abstractmethod
22+
def uninstall(self):
23+
"""Uninstall UFW"""
24+
pass
25+
26+
@abstractmethod
27+
def _check_service_status(self, expected_status: str) -> bool:
28+
"""Check service status"""
29+
pass
30+
31+
@staticmethod
32+
def create():
33+
"""Factory method to create instance based on OS"""
34+
__os: str | None = sysinfo_utils.detect_distro().lower()
35+
36+
if __os == "debian":
37+
from app.services.distro.debian.ufw import DebianUFWService
38+
39+
return DebianUFWService()
40+
elif __os == "arch":
41+
from app.services.distro.arch.ufw import ArchUFWService
42+
43+
return ArchUFWService()
44+
elif __os == "openwrt":
45+
from app.services.distro.wrt.ufw import WrtUFWService
46+
47+
return WrtUFWService()
48+
else:
49+
raise OSError(f"Unsupported distro: {__os}")
50+
51+
@staticmethod
52+
def is_supported() -> bool:
53+
"""Check if current OS is supported"""
54+
try:
55+
__os = sysinfo_utils.detect_distro().lower()
56+
return __os in ["debian", "arch", "openwrt"]
57+
except Exception:
58+
return False
59+
60+
@staticmethod
61+
def get_distro() -> str:
62+
"""Get current distribution"""
63+
return sysinfo_utils.detect_distro().lower()

0 commit comments

Comments
 (0)