Skip to content

Commit 9720c31

Browse files
committed
Merge branch 'bugfixes/358-unhandled-worker-thread-exc'
A DoS would happen in many situations, including TLS errors and attempts to close the underlying sockets erroring out. This patch aims to prevent a situation when the worker threads are killed by arbitrary exceptions that bubble up to their entry point layers that aren't handled properly or at all. PR #649 Fixes #358 Fixes #354 Ref #310 Ref #346 Ref #375 Ref #599 Ref #641 Resolves #365
2 parents 029600e + a7665f3 commit 9720c31

File tree

3 files changed

+334
-3
lines changed

3 files changed

+334
-3
lines changed

.flake8

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ per-file-ignores =
110110
cheroot/test/test_dispatch.py: DAR101, DAR201, S101, WPS111, WPS121, WPS302, WPS422, WPS430
111111
cheroot/test/test_ssl.py: C818, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, S101, S309, S404, S603, WPS100, WPS110, WPS111, WPS114, WPS121, WPS130, WPS201, WPS202, WPS204, WPS210, WPS211, WPS218, WPS219, WPS222, WPS226, WPS231, WPS300, WPS301, WPS317, WPS318, WPS324, WPS326, WPS335, WPS336, WPS337, WPS352, WPS408, WPS420, WPS421, WPS422, WPS432, WPS436, WPS440, WPS441, WPS442, WPS450, WPS509, WPS510, WPS608
112112
cheroot/test/test_server.py: DAR101, DAR201, DAR301, I001, I003, I004, I005, S101, WPS110, WPS111, WPS118, WPS121, WPS122, WPS130, WPS201, WPS202, WPS210, WPS218, WPS226, WPS229, WPS300, WPS317, WPS318, WPS324, WPS326, WPS421, WPS422, WPS430, WPS432, WPS433, WPS436, WPS437, WPS442, WPS507, WPS509, WPS608
113-
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
113+
cheroot/test/test_conn.py: B007, DAR101, DAR201, DAR301, DAR401, E800, I001, I003, I004, I005, N802, N805, RST304, S101, S310, WPS100, WPS110, WPS111, WPS114, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS218, WPS219, WPS226, WPS231, WPS301, WPS306, WPS317, WPS318, WPS323, WPS326, WPS361, WPS402, WPS420, WPS421, WPS422, WPS425, WPS429, WPS430, WPS432, WPS435, WPS436, WPS437, WPS440, WPS442, WPS447, WPS462, WPS508, WPS509, WPS510, WPS526
114114
cheroot/test/webtest.py: B007, DAR101, DAR201, DAR401, I001, I003, I004, N802, RST303, RST304, S101, S104, WPS100, WPS110, WPS111, WPS115, WPS120, WPS121, WPS122, WPS201, WPS202, WPS204, WPS210, WPS211, WPS213, WPS214, WPS220, WPS221, WPS223, WPS229, WPS230, WPS231, WPS236, WPS301, WPS306, WPS317, WPS323, WPS326, WPS338, WPS361, WPS414, WPS420, WPS421, WPS422, WPS430, WPS432, WPS433, WPS437, WPS440, WPS501, WPS503, WPS505, WPS601
115115
cheroot/testing.py: B014, C815, DAR101, DAR201, DAR301, I001, I003, S104, WPS100, WPS202, WPS211, WPS229, WPS301, WPS306, WPS317, WPS414, WPS420, WPS422, WPS430, WPS503, WPS526
116116
cheroot/workers/threadpool.py: B007, DAR101, DAR201, E800, I001, I003, I004, RST201, RST203, RST301, WPS100, WPS110, WPS111, WPS121, WPS125, WPS211, WPS214, WPS220, WPS229, WPS230, WPS231, WPS304, WPS306, WPS317, WPS318, WPS322, WPS326, WPS335, WPS338, WPS362, WPS410, WPS414, WPS420, WPS422, WPS428, WPS432, WPS440, WPS462, WPS501, WPS505, WPS601, WPS602, WPS609

cheroot/test/test_conn.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Tests for TCP connection handling, including proper and timely close."""
22

33
import errno
4+
from re import match as _matches_pattern
45
import socket
56
import time
67
import logging
@@ -700,6 +701,263 @@ def _close_kernel_socket(self):
700701
assert _close_kernel_socket.exception_leaked is exception_leaks
701702

702703

704+
def test_broken_connection_during_http_communication_fallback( # noqa: WPS118
705+
monkeypatch,
706+
test_client,
707+
testing_server,
708+
wsgi_server_thread,
709+
):
710+
"""Test that unhandled internal error cascades into shutdown."""
711+
def _raise_connection_reset(*_args, **_kwargs):
712+
raise ConnectionResetError(666)
713+
714+
def _read_request_line(self):
715+
monkeypatch.setattr(self.conn.rfile, 'close', _raise_connection_reset)
716+
monkeypatch.setattr(self.conn.wfile, 'write', _raise_connection_reset)
717+
_raise_connection_reset()
718+
719+
monkeypatch.setattr(
720+
test_client.server_instance.ConnectionClass.RequestHandlerClass,
721+
'read_request_line',
722+
_read_request_line,
723+
)
724+
725+
test_client.get_connection().send(b'GET / HTTP/1.1')
726+
wsgi_server_thread.join() # no extra logs upon server termination
727+
728+
actual_log_entries = testing_server.error_log.calls[:]
729+
testing_server.error_log.calls.clear() # prevent post-test assertions
730+
731+
expected_log_entries = (
732+
(logging.WARNING, r'^socket\.error 666$'),
733+
(
734+
logging.INFO,
735+
'^Got a connection error while handling a connection '
736+
r'from .*:\d{1,5} \(666\)',
737+
),
738+
(
739+
logging.CRITICAL,
740+
r'A fatal exception happened\. Setting the server interrupt flag '
741+
r'to ConnectionResetError\(666\) and giving up\.\n\nPlease, '
742+
'report this on the Cheroot tracker at '
743+
r'<https://github\.com/cherrypy/cheroot/issues/new/choose>, '
744+
'providing a full reproducer with as much context and details '
745+
r'as possible\.$',
746+
),
747+
)
748+
749+
assert len(actual_log_entries) == len(expected_log_entries)
750+
751+
for ( # noqa: WPS352
752+
(expected_log_level, expected_msg_regex),
753+
(actual_msg, actual_log_level, _tb),
754+
) in zip(expected_log_entries, actual_log_entries):
755+
assert expected_log_level == actual_log_level
756+
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
757+
f'{actual_msg !r} does not match {expected_msg_regex !r}'
758+
)
759+
760+
761+
def test_kb_int_from_http_handler(
762+
test_client,
763+
testing_server,
764+
wsgi_server_thread,
765+
):
766+
"""Test that a keyboard interrupt from HTTP handler causes shutdown."""
767+
def _trigger_kb_intr(_req, _resp):
768+
raise KeyboardInterrupt('simulated test handler keyboard interrupt')
769+
testing_server.wsgi_app.handlers['/kb_intr'] = _trigger_kb_intr
770+
771+
http_conn = test_client.get_connection()
772+
http_conn.putrequest('GET', '/kb_intr', skip_host=True)
773+
http_conn.putheader('Host', http_conn.host)
774+
http_conn.endheaders()
775+
wsgi_server_thread.join() # no extra logs upon server termination
776+
777+
actual_log_entries = testing_server.error_log.calls[:]
778+
testing_server.error_log.calls.clear() # prevent post-test assertions
779+
780+
expected_log_entries = (
781+
(
782+
logging.DEBUG,
783+
'^Got a server shutdown request while handling a connection '
784+
r'from .*:\d{1,5} \(simulated test handler keyboard interrupt\)$',
785+
),
786+
(
787+
logging.DEBUG,
788+
'^Setting the server interrupt flag to KeyboardInterrupt'
789+
r"\('simulated test handler keyboard interrupt'\)$",
790+
),
791+
(
792+
logging.INFO,
793+
'^Keyboard Interrupt: shutting down$',
794+
),
795+
)
796+
797+
assert len(actual_log_entries) == len(expected_log_entries)
798+
799+
for ( # noqa: WPS352
800+
(expected_log_level, expected_msg_regex),
801+
(actual_msg, actual_log_level, _tb),
802+
) in zip(expected_log_entries, actual_log_entries):
803+
assert expected_log_level == actual_log_level
804+
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
805+
f'{actual_msg !r} does not match {expected_msg_regex !r}'
806+
)
807+
808+
809+
def test_unhandled_exception_in_request_handler(
810+
mocker,
811+
monkeypatch,
812+
test_client,
813+
testing_server,
814+
wsgi_server_thread,
815+
):
816+
"""Ensure worker threads are resilient to in-handler exceptions."""
817+
818+
class SillyMistake(BaseException): # noqa: WPS418, WPS431
819+
"""A simulated crash within an HTTP handler."""
820+
821+
def _trigger_scary_exc(_req, _resp):
822+
raise SillyMistake('simulated unhandled exception 💣 in test handler')
823+
824+
testing_server.wsgi_app.handlers['/scary_exc'] = _trigger_scary_exc
825+
826+
server_connection_close_spy = mocker.spy(
827+
test_client.server_instance.ConnectionClass,
828+
'close',
829+
)
830+
831+
http_conn = test_client.get_connection()
832+
http_conn.putrequest('GET', '/scary_exc', skip_host=True)
833+
http_conn.putheader('Host', http_conn.host)
834+
http_conn.endheaders()
835+
836+
# NOTE: This spy ensure the log entry gets recorded before we're testing
837+
# NOTE: them and before server shutdown, preserving their order and making
838+
# NOTE: the log entry presence non-flaky.
839+
while not server_connection_close_spy.called: # noqa: WPS328
840+
pass
841+
842+
assert len(testing_server.requests._threads) == 10
843+
while testing_server.requests.idle < 10: # noqa: WPS328
844+
pass
845+
assert len(testing_server.requests._threads) == 10
846+
testing_server.interrupt = SystemExit('test requesting shutdown')
847+
assert not testing_server.requests._threads
848+
wsgi_server_thread.join() # no extra logs upon server termination
849+
850+
actual_log_entries = testing_server.error_log.calls[:]
851+
testing_server.error_log.calls.clear() # prevent post-test assertions
852+
853+
expected_log_entries = (
854+
(
855+
logging.ERROR,
856+
'^Unhandled error while processing an incoming connection '
857+
'SillyMistake'
858+
r"\('simulated unhandled exception 💣 in test handler'\)$",
859+
),
860+
(
861+
logging.INFO,
862+
'^SystemExit raised: shutting down$',
863+
),
864+
)
865+
866+
assert len(actual_log_entries) == len(expected_log_entries)
867+
868+
for ( # noqa: WPS352
869+
(expected_log_level, expected_msg_regex),
870+
(actual_msg, actual_log_level, _tb),
871+
) in zip(expected_log_entries, actual_log_entries):
872+
assert expected_log_level == actual_log_level
873+
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
874+
f'{actual_msg !r} does not match {expected_msg_regex !r}'
875+
)
876+
877+
878+
def test_remains_alive_post_unhandled_exception(
879+
mocker,
880+
monkeypatch,
881+
test_client,
882+
testing_server,
883+
wsgi_server_thread,
884+
):
885+
"""Ensure worker threads are resilient to unhandled exceptions."""
886+
887+
class ScaryCrash(BaseException): # noqa: WPS418, WPS431
888+
"""A simulated crash during HTTP parsing."""
889+
890+
_orig_read_request_line = (
891+
test_client.server_instance.
892+
ConnectionClass.RequestHandlerClass.
893+
read_request_line
894+
)
895+
896+
def _read_request_line(self):
897+
_orig_read_request_line(self)
898+
raise ScaryCrash(666)
899+
900+
monkeypatch.setattr(
901+
test_client.server_instance.ConnectionClass.RequestHandlerClass,
902+
'read_request_line',
903+
_read_request_line,
904+
)
905+
906+
server_connection_close_spy = mocker.spy(
907+
test_client.server_instance.ConnectionClass,
908+
'close',
909+
)
910+
911+
# NOTE: The initial worker thread count is 10.
912+
assert len(testing_server.requests._threads) == 10
913+
914+
test_client.get_connection().send(b'GET / HTTP/1.1')
915+
916+
# NOTE: This spy ensure the log entry gets recorded before we're testing
917+
# NOTE: them and before server shutdown, preserving their order and making
918+
# NOTE: the log entry presence non-flaky.
919+
while not server_connection_close_spy.called: # noqa: WPS328
920+
pass
921+
922+
# NOTE: This checks for whether there's any crashed threads
923+
while testing_server.requests.idle < 10: # noqa: WPS328
924+
pass
925+
assert len(testing_server.requests._threads) == 10
926+
assert all(
927+
worker_thread.is_alive()
928+
for worker_thread in testing_server.requests._threads
929+
)
930+
testing_server.interrupt = SystemExit('test requesting shutdown')
931+
assert not testing_server.requests._threads
932+
wsgi_server_thread.join() # no extra logs upon server termination
933+
934+
actual_log_entries = testing_server.error_log.calls[:]
935+
testing_server.error_log.calls.clear() # prevent post-test assertions
936+
937+
expected_log_entries = (
938+
(
939+
logging.ERROR,
940+
'^Unhandled error while processing an incoming connection '
941+
r'ScaryCrash\(666\)$',
942+
),
943+
(
944+
logging.INFO,
945+
'^SystemExit raised: shutting down$',
946+
),
947+
)
948+
949+
assert len(actual_log_entries) == len(expected_log_entries)
950+
951+
for ( # noqa: WPS352
952+
(expected_log_level, expected_msg_regex),
953+
(actual_msg, actual_log_level, _tb),
954+
) in zip(expected_log_entries, actual_log_entries):
955+
assert expected_log_level == actual_log_level
956+
assert _matches_pattern(expected_msg_regex, actual_msg) is not None, (
957+
f'{actual_msg !r} does not match {expected_msg_regex !r}'
958+
)
959+
960+
703961
@pytest.mark.parametrize(
704962
'timeout_before_headers',
705963
(

cheroot/workers/threadpool.py

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77

88
import collections
9+
import logging
910
import threading
1011
import time
1112
import socket
@@ -107,14 +108,38 @@ def run(self):
107108
from the inner-layer code constitute a global server interrupt
108109
request. When they happen, the worker thread exits.
109110
111+
:raises BaseException: when an unexpected non-interrupt
112+
exception leaks from the inner layers
113+
110114
# noqa: DAR401 KeyboardInterrupt SystemExit
111115
"""
112116
self.server.stats['Worker Threads'][self.name] = self.stats
113117
self.ready = True
114118
try:
115119
self._process_connections_until_interrupted()
116-
except (KeyboardInterrupt, SystemExit) as ex:
117-
self.server.interrupt = ex
120+
except (KeyboardInterrupt, SystemExit) as interrupt_exc:
121+
interrupt_cause = interrupt_exc.__cause__ or interrupt_exc
122+
self.server.error_log(
123+
f'Setting the server interrupt flag to {interrupt_cause !r}',
124+
level=logging.DEBUG,
125+
)
126+
self.server.interrupt = interrupt_cause
127+
except BaseException as underlying_exc: # noqa: WPS424
128+
# NOTE: This is the last resort logging with the last dying breath
129+
# NOTE: of the worker. It is only reachable when exceptions happen
130+
# NOTE: in the `finally` branch of the internal try/except block.
131+
self.server.error_log(
132+
'A fatal exception happened. Setting the server interrupt flag'
133+
f' to {underlying_exc !r} and giving up.'
134+
'\N{NEW LINE}\N{NEW LINE}'
135+
'Please, report this on the Cheroot tracker at '
136+
'<https://github.com/cherrypy/cheroot/issues/new/choose>, '
137+
'providing a full reproducer with as much context and details as possible.',
138+
level=logging.CRITICAL,
139+
traceback=True,
140+
)
141+
self.server.interrupt = underlying_exc
142+
raise
118143
finally:
119144
self.ready = False
120145

@@ -123,6 +148,9 @@ def _process_connections_until_interrupted(self):
123148
124149
Retrieves incoming connections from thread pool, processing
125150
them one by one.
151+
152+
:raises SystemExit: on the internal requests to stop the
153+
server instance
126154
"""
127155
while True:
128156
conn = self.server.requests.get()
@@ -136,7 +164,52 @@ def _process_connections_until_interrupted(self):
136164
keep_conn_open = False
137165
try:
138166
keep_conn_open = conn.communicate()
167+
except ConnectionError as connection_error:
168+
keep_conn_open = False # Drop the connection cleanly
169+
self.server.error_log(
170+
'Got a connection error while handling a '
171+
f'connection from {conn.remote_addr !s}:'
172+
f'{conn.remote_port !s} ({connection_error !s})',
173+
level=logging.INFO,
174+
)
175+
continue
176+
except (KeyboardInterrupt, SystemExit) as shutdown_request:
177+
# Shutdown request
178+
keep_conn_open = False # Drop the connection cleanly
179+
self.server.error_log(
180+
'Got a server shutdown request while handling a '
181+
f'connection from {conn.remote_addr !s}:'
182+
f'{conn.remote_port !s} ({shutdown_request !s})',
183+
level=logging.DEBUG,
184+
)
185+
raise SystemExit(
186+
str(shutdown_request),
187+
) from shutdown_request
188+
except BaseException as unhandled_error: # noqa: WPS424
189+
# NOTE: Only a shutdown request should bubble up to the
190+
# NOTE: external cleanup code. Otherwise, this thread dies.
191+
# NOTE: If this were to happen, the threadpool would still
192+
# NOTE: list a dead thread without knowing its state. And
193+
# NOTE: the calling code would fail to schedule processing
194+
# NOTE: of new requests.
195+
self.server.error_log(
196+
'Unhandled error while processing an incoming '
197+
f'connection {unhandled_error !r}',
198+
level=logging.ERROR,
199+
traceback=True,
200+
)
201+
continue # Prevent the thread from dying
139202
finally:
203+
# NOTE: Any exceptions coming from within `finally` may
204+
# NOTE: kill the thread, causing the threadpool to only
205+
# NOTE: contain references to dead threads rendering the
206+
# NOTE: server defunct, effectively meaning a DoS.
207+
# NOTE: Ideally, things called here should process
208+
# NOTE: everything recoverable internally. Any unhandled
209+
# NOTE: errors will bubble up into the outer try/except
210+
# NOTE: block. They will be treated as fatal and turned
211+
# NOTE: into server shutdown requests and then reraised
212+
# NOTE: unconditionally.
140213
if keep_conn_open:
141214
self.server.put_conn(conn)
142215
else:

0 commit comments

Comments
 (0)