Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions mitogen/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def set_blocking(fd, blocking):

PY24 = sys.version_info < (2, 5)
PY3 = sys.version_info > (3,)
if PY3:
if sys.version_info >= (3, 0):
import pickle
import _thread as thread
from io import BytesIO
Expand Down Expand Up @@ -237,6 +237,13 @@ def set_blocking(fd, blocking):
#: writing small trailer chunks.
CHUNK_SIZE = 131072

#: Markers sent by bootstrap stages when it's ready to receive the next stage,
#: e.g. compressed :class:`mitogen.core`.
EC0 = b('MITO000')
EC1 = b('MITO001')
EC2 = b('MITO002 %d %d' % sys.version_info[0:2])
EC2_PATTERN = b(r'MITO002 (\d) (\d\d?)')

_tls = threading.local()


Expand Down Expand Up @@ -2357,10 +2364,13 @@ class MitogenProtocol(Protocol):
#: peer.
on_message = None

def __init__(self, router, remote_id, auth_id=None,
local_id=None, parent_ids=None):
def __init__(
self, router, remote_id, auth_id=None, local_id=None, parent_ids=None,
remote_python_version=None,
):
self._router = router
self.remote_id = remote_id
self.remote_python_version = remote_python_version
#: If not :data:`None`, :class:`Router` stamps this into
#: :attr:`Message.auth_id` of every message received on this stream.
self.auth_id = auth_id
Expand Down Expand Up @@ -4130,7 +4140,8 @@ def _setup_master(self):
self.router,
parent_id,
local_id=self.config['context_id'],
parent_ids=self.config['parent_ids']
parent_ids=self.config['parent_ids'],
remote_python_version=self.config['parent_python_version'],
)
for f in in_fp, out_fp:
fd = f.fileno()
Expand Down Expand Up @@ -4304,7 +4315,7 @@ def main(self):
_v and LOG.debug('Recovered sys.executable: %r', sys.executable)

if self.config.get('send_ec2', True):
self.stream.transmit_side.write(b('MITO002\n'))
self.stream.transmit_side.write(EC2 + b('\n'))
self.broker._py24_25_compat()
self.log_handler.uncork()
self.dispatcher.run()
Expand Down
2 changes: 1 addition & 1 deletion mitogen/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def _child_main(self, childfp):
self.options.on_fork()
mitogen.core.set_blocking(childfp.fileno(), True)

childfp.send(b('MITO002\n'))
childfp.send(mitogen.core.EC2 + b('\n'))

# Expected by the ExternalContext.main().
os.dup2(childfp.fileno(), 1)
Expand Down
32 changes: 14 additions & 18 deletions mitogen/parent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1186,18 +1186,8 @@ def on_unrecognized_partial_line_received(self, line):

class BootstrapProtocol(RegexProtocol):
"""
Respond to stdout of a child during bootstrap. Wait for :attr:`EC0_MARKER`
to be written by the first stage to indicate it can receive the bootstrap,
then await :attr:`EC1_MARKER` to indicate success, and
:class:`MitogenProtocol` can be enabled.
"""
#: Sentinel value emitted by the first stage to indicate it is ready to
#: receive the compressed bootstrap. For :mod:`mitogen.ssh` this must have
#: length of at least `max(len('password'), len('debug1:'))`
EC0_MARKER = b('MITO000')
EC1_MARKER = b('MITO001')
EC2_MARKER = b('MITO002')

Respond to readiness markers sent to parent by child doing bootstrap.
"""
def __init__(self, broker):
super(BootstrapProtocol, self).__init__()
self._writer = mitogen.core.BufferedWriter(broker, self)
Expand All @@ -1213,18 +1203,22 @@ def _on_ec1_received(self, line, match):
LOG.debug('%r: first stage received mitogen.core source', self)

def _on_ec2_received(self, line, match):
LOG.debug('%r: new child booted successfully', self)
self.stream.conn._complete_connection()
py_major, py_minor = int(match.group(1)), int(match.group(2))
LOG.debug(
'%r: new child booted successfully on Python %d.%d',
self, py_major, py_minor,
)
self.stream.conn._complete_connection((py_major, py_minor))
return False

def on_unrecognized_line_received(self, line):
LOG.debug('%s: stdout: %s', self.stream.name,
line.decode('utf-8', 'replace'))

PATTERNS = [
(re.compile(EC0_MARKER), _on_ec0_received),
(re.compile(EC1_MARKER), _on_ec1_received),
(re.compile(EC2_MARKER), _on_ec2_received),
(re.compile(mitogen.core.EC0), _on_ec0_received),
(re.compile(mitogen.core.EC1), _on_ec1_received),
(re.compile(mitogen.core.EC2_PATTERN), _on_ec2_received),
]


Expand Down Expand Up @@ -1510,6 +1504,7 @@ def get_econtext_config(self):
'blacklist': self._router.get_module_blacklist(),
'max_message_size': self.options.max_message_size,
'version': mitogen.__version__,
'parent_python_version': sys.version_info[0:2],
}

def get_preamble(self):
Expand Down Expand Up @@ -1546,7 +1541,7 @@ def _adorn_eof_error(self, e):
if self.eof_error_hint:
e.args = ('%s\n\n%s' % (e.args[0], self.eof_error_hint),)

def _complete_connection(self):
def _complete_connection(self, remote_python_version):
self._timer.cancel()
if not self.exception:
mitogen.core.unlisten(self._router.broker, 'shutdown',
Expand All @@ -1556,6 +1551,7 @@ def _complete_connection(self):
MitogenProtocol(
router=self._router,
remote_id=self.context.context_id,
remote_python_version=remote_python_version,
)
)
self._router.route_monitor.notice_stream(self.stdio_stream)
Expand Down
4 changes: 2 additions & 2 deletions tests/first_stage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def test_valid_syntax(self):
stdout, stderr = proc.communicate()
self.assertEqual(0, proc.returncode)
self.assertEqual(stdout,
mitogen.parent.BootstrapProtocol.EC0_MARKER+b('\n'))
mitogen.core.EC0 + b('\n'))
self.assertIn(
b("Error -3 while decompressing data"), # Unknown compression method
stderr,
Expand Down Expand Up @@ -356,7 +356,7 @@ def test_premature_eof(self):
self.assertEqual(0, returncode)
self.assertEqual(
proc.stdout.read(),
mitogen.parent.BootstrapProtocol.EC0_MARKER + b("\n"),
mitogen.core.EC0 + b("\n"),
)
self.assertIn(
b("Error -5 while decompressing data"),
Expand Down
Loading