# The piwheels project
# Copyright (c) 2017 Ben Nuttall <https://github.com/bennuttall>
# Copyright (c) 2017 Dave Jones <dave@waveform.org.uk>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Defines the :class:`PiWheelsSlave` class. An instance of this is the
entry-point for the :program:`piw-slave` script.
.. autoclass:: PiWheelsSlave
:members:
.. autofunction:: duration
"""
import os
import sys
import signal
import logging
import socket
import tempfile
from datetime import datetime, timedelta, timezone
from time import sleep
from random import randint
import dateutil.parser
from .. import __version__, terminal, transport, protocols, info, platform
from ..systemd import get_systemd
from .builder import Builder, Wheel
UTC = timezone.utc
class MasterTimeout(IOError):
"Exception raised when the master fails to respond before a timeout."
class TerminateTimeout(IOError):
"Exception raised when a build fails to terminate in a reasonable time."
[docs]class PiWheelsSlave:
"""
This is the main class for the :program:`piw-slave` script. It connects
(over 0MQ sockets) to a master (see :program:`piw-master`) then loops
around the slave protocol (see the :doc:`slaves` chapter). It retrieves
source packages directly from `PyPI`_, attempts to build a wheel in a
sandbox directory and, if successful, transmits the results to the master.
.. _PyPI: https://pypi.python.org/
"""
def __init__(self):
self.logger = logging.getLogger('slave')
self.config = None
self.slave_id = None
self.builder = None
self.pypi_url = None
self.systemd = None
def __call__(self, args=None):
sys.excepthook = terminal.error_handler
parser = terminal.configure_parser("""
The piw-slave script is intended to be run on a standalone machine to build
packages on behalf of the piw-master script. It is intended to be run as an
unprivileged user with a clean home-directory. Any build dependencies you wish
to use must already be installed. The script will run until it is explicitly
terminated, either by Ctrl+C, SIGTERM, or by the remote piw-master script.
""")
parser.add_argument(
'--debug', action='store_true', help="Set logging to debug level")
parser.add_argument(
'-m', '--master', env_var='PIW_MASTER', metavar='HOST',
default='localhost',
help="The IP address or hostname of the master server "
"(default: %(default)s)")
parser.add_argument(
'-t', '--timeout', env_var='PIW_TIMEOUT', metavar='DURATION',
default='3h', type=duration,
help="The time to wait before assuming a build has failed "
"(default: %(default)s)")
parser.add_argument(
'-d', '--dir', metavar='DIR', default=tempfile.gettempdir(),
help="The temporary directory to use when building wheels "
"(default: %(default)s)")
parser.add_argument(
'-L', '--label', metavar='STR', default=socket.gethostname(),
help="The label to transmit to the master identifying this "
"build slave (default: %(default)s)")
self.config = parser.parse_args(args)
if self.config.debug:
self.config.log_level = logging.DEBUG
terminal.configure_logging(self.config.log_level, self.config.log_file)
self.logger.info('PiWheels Slave version %s', __version__)
if os.geteuid() == 0:
self.logger.fatal('Slave must not be run as root')
return 1
if datetime.now(tz=UTC) < datetime(2020, 6, 1, tzinfo=UTC):
self.logger.fatal('System clock is far in the past')
return 1
self.systemd = get_systemd()
signal.signal(signal.SIGTERM, sig_term)
ctx = transport.Context()
queue = None
try:
while True:
queue = ctx.socket(
transport.REQ, protocol=reversed(protocols.slave_driver),
logger=self.logger)
queue.hwm = 10
queue.connect('tcp://{master}:5555'.format(
master=self.config.master))
self.systemd.ready()
try:
self.slave_id = None
self.main_loop(queue)
except MasterTimeout:
self.systemd.reloading()
self.logger.warning('Resetting connection')
queue.close(linger=1)
finally:
self.clean_up_build()
except SystemExit:
self.logger.warning('Shutting down on SIGTERM')
finally:
self.systemd.stopping()
queue.send_msg('BYE')
queue.close()
ctx.close()
# A general note about the design of the slave: the build slave is
# deliberately designed to be "brittle". In other words to fall over and
# die loudly in the event anything happens to go wrong (other than utterly
# expected failures like wheels occasionally failing to build and file
# transfers occasionally needing a retry). Hence all the apparently silly
# asserts littering the functions below.
# This is in stark contrast to the master which is expected to stay up and
# carry on running even if a build slave goes bat-shit crazy and starts
# sending nonsense (in which case it should calmly ignore it and/or attempt
# to kill said slave with a "BYE" message).
[docs] def main_loop(self, queue, master_timeout=timedelta(minutes=5)):
"""
The main messaging loop. Sends the initial request, and dispatches
replies via :meth:`handle_reply`. Implements a *timeout* for responses
from the master and raises :exc:`MasterTimeout` if *timeout* seconds
are exceeded.
"""
os_name, os_version = info.get_os_name_version()
msg, data = 'HELLO', [
self.config.timeout, master_timeout,
platform.get_impl_ver(), platform.get_abi_tag(),
platform.get_platform(), self.config.label,
os_name, os_version,
info.get_board_revision(), info.get_board_serial(),
]
while True:
queue.send_msg(msg, data)
start = datetime.now(tz=UTC)
while True:
self.systemd.watchdog_ping()
if queue.poll(1):
msg, data = queue.recv_msg()
msg, data = self.handle_reply(msg, data)
break
elif datetime.now(tz=UTC) - start > master_timeout:
self.logger.warning('Timed out waiting for master')
raise MasterTimeout()
[docs] def clean_up_build(self, timeout=timedelta(minutes=1)):
"""
Terminate any existing build and clean up its temporary storage. Raises
an exception if termination does not occur in a reasonable time.
"""
if self.builder is not None:
try:
if self.builder.is_alive():
self.logger.info('Terminating current build')
self.builder.stop()
self.builder.join(timeout.total_seconds())
if self.builder.is_alive():
self.logger.fatal('Build failed to terminate')
raise TerminateTimeout()
else:
self.logger.info('Removing temporary build directories')
self.builder.close()
finally:
# Always set self.builder to None to ensure we don't re-try
self.builder = None
[docs] def handle_reply(self, msg, data):
"""
Dispatch a message from the master to an appropriate handler method.
"""
handler = {
'ACK': lambda: self.do_ack(*data),
'SLEEP': lambda: self.do_sleep(data),
'BUILD': lambda: self.do_build(*data),
'CONT': self.do_cont,
'SEND': lambda: self.do_send(data),
'DONE': self.do_done,
'DIE': self.do_die,
}[msg]
return handler()
[docs] def get_status(self):
"""
Returns the set of statistics periodically required by the master when
reporting our status.
"""
return (
[datetime.now(tz=UTC)] +
list(info.get_disk_stats(self.config.dir)) +
list(info.get_mem_stats()) +
list(info.get_swap_stats()) +
[os.getloadavg()[0], info.get_cpu_temp()]
)
[docs] def do_ack(self, new_id, pypi_url):
"""
In response to our initial "HELLO" (detailing our various :pep:`425`
tags), the master is expected to send "ACK" back with an integer
identifier and the URL of the PyPI repository to download from. We use
the identifier in all future log messages for the ease of the
administrator.
We reply with "IDLE" to indicate we're ready to accept a build job.
"""
assert self.slave_id is None, 'Duplicate ACK'
self.slave_id = int(new_id)
self.pypi_url = pypi_url
self.logger = logging.getLogger('slave-%d' % self.slave_id)
self.logger.info('Connected to master')
return 'IDLE', self.get_status()
[docs] def do_sleep(self, paused):
"""
If, in response to an "IDLE" message we receive "SLEEP" this indicates
the master has nothing for us to do currently. Sleep for a little while
then try "IDLE" again.
"""
assert self.slave_id is not None, 'SLEEP before ACK'
if paused:
self.logger.info("Sleeping: paused")
else:
self.logger.info("Sleeping: no available jobs")
sleep(randint(5, 10))
return 'IDLE', self.get_status()
[docs] def do_build(self, package, version):
"""
Alternatively, in response to "IDLE", the master may send "BUILD"
*package* *version*. We should then attempt to build the specified
wheel and send back a "BUSY" message with more status info.
"""
assert self.slave_id is not None, 'BUILD before ACK'
assert not self.builder, 'Last build still exists'
self.logger.warning('Building package %s version %s', package, version)
self.builder = Builder(package, version, timeout=self.config.timeout,
index_url=self.pypi_url, dir=self.config.dir)
self.builder.start()
return 'BUSY', self.get_status()
[docs] def do_cont(self):
"""
Once we're busy building something the master will periodically ping
us with "CONT" if it wishes us to continue building. We wait up to
10 seconds for the build to finish and reply with "BUILT" (and a full
status report on the build) if it finishes in that time, or "BUSY"
and more status info otherwise.
If the master wishes to terminate a build prior to completion, it'll
send "DONE" instead of "CONT" and we move straight to clean-up.
"""
assert self.slave_id is not None, 'CONT before ACK'
assert self.builder, 'CONT before BUILD / after failed BUILD'
self.builder.join(randint(5, 10))
if not self.builder.is_alive():
if self.builder.status:
self.logger.info('Build succeeded')
else:
self.logger.warning('Build failed')
return 'BUILT', self.builder.as_message()[2:]
else:
return 'BUSY', self.get_status()
[docs] def do_send(self, filename):
"""
If a build succeeds and generates files (detailed in a "BUILT"
message), the master will reply with "SEND" *filename* indicating we
should transfer the specified file (this is done on a separate socket
with a different protocol; see :meth:`builder.Wheel.transfer` for more
details). Once the transfers concludes, reply to the master with
"SENT".
"""
assert self.slave_id is not None, 'SEND before ACK'
assert self.builder, 'SEND before BUILD / after failed BUILD'
assert self.builder.status, 'SEND after failed BUILD'
wheel = [f for f in self.builder.wheels if f.filename == filename][0]
self.logger.info(
'Sending %s to master on %s', wheel.filename, self.config.master)
ctx = transport.Context()
queue = ctx.socket(transport.DEALER, logger=self.logger)
queue.hwm = 10
queue.connect('tcp://{master}:5556'.format(master=self.config.master))
try:
wheel.transfer(queue, self.slave_id)
finally:
queue.close()
return 'SENT', protocols.NoData
[docs] def do_done(self):
"""
The master can send "DONE" at any point during a built to terminate it
prematurely. Alternately, this is also the standard response after a
successful build has finished and all files have been sent (and
successfully verified).
In response we must clean-up all resources associated with the build
(including terminating an on-going build) and return "IDLE" with the
usual stats.
"""
assert self.slave_id is not None, 'DONE before ACK'
assert self.builder, 'DONE before BUILD'
self.clean_up_build()
return 'IDLE', self.get_status()
[docs] def do_die(self):
"""
The master may respond with "DIE" at any time indicating we should
immediately terminate. We raise :exc:`SystemExit` to cause
:meth:`main_loop` to exit. Clean-up of any extant build is handled by
our caller.
"""
self.logger.warning('Master requested termination')
raise SystemExit(0)
[docs]def duration(s):
"""
Convert *s*, a string representing a duration, into a
:class:`datetime.timedelta`.
"""
return (
dateutil.parser.parse(s, default=datetime(1, 1, 1)) -
datetime(1, 1, 1)
)
def sig_term(signo, stack_frame):
"""
Handler for the SIGTERM signal; raises :exc:`SystemExit` which will cause
the :meth:`PiWheelsSlave.main_loop` method to terminate.
"""
# pylint: disable=unused-argument
raise SystemExit(0)
main = PiWheelsSlave() # pylint: disable=invalid-name