# The piwheels project
# Copyright (c) 2017 Ben Nuttall <https://github.com/bennuttall>
# Copyright (c) 2017 Dave Jones <dave@waveform.org.uk>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the copyright holder nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Defines the :class:`SlaveDriver` task; see class for more details.
.. autoclass:: SlaveDriver
:members:
"""
from datetime import datetime, timedelta, timezone
from .. import const, protocols, tasks, transport
from ..states import SlaveState, FileState
from .the_oracle import DbClient
from .file_juggler import FsClient
UTC = timezone.utc
[docs]class SlaveDriver(tasks.PausingTask):
"""
This task handles interaction with the build slaves using the slave
protocol. Interaction is driven by the slaves (i.e. the master doesn't
*push* jobs, rather the slaves *request* a job and the master replies with
the next (package, version) tuple from the internal "builds" queue).
The task also incidentally interacts with several other queues: the
internal "status" queue is sent details of every reply sent to a build
slave (the :meth:`~.PiWheelsMaster.main_loop` method passes this
information on to any listening monitors). Also, the internal "indexes"
queue is informed of any packages that need web page indexes re-building
(as a result of a successful build).
"""
# pylint: disable=too-many-instance-attributes
name = 'master.slave_driver'
def __init__(self, config):
super().__init__(config, control_protocol=protocols.slave_driver_control)
self.abi_queues = {}
self.excluded_builds = {}
slave_queue = self.socket(
transport.ROUTER, protocol=protocols.slave_driver)
slave_queue.bind(config.slave_queue)
self.register(slave_queue, self.handle_slave)
builds_queue = self.socket(
transport.PULL, protocol=reversed(protocols.the_architect))
builds_queue.hwm = 10
builds_queue.bind(config.builds_queue)
self.register(builds_queue, self.handle_build)
self.status_queue = self.socket(
transport.PUSH, protocol=protocols.monitor_stats)
self.status_queue.hwm = 10
self.status_queue.connect(const.INT_STATUS_QUEUE)
SlaveState.status_queue = self.status_queue
self.web_queue = self.socket(
transport.REQ, protocol=reversed(protocols.the_scribe))
self.web_queue.connect(config.web_queue)
self.stats_queue = self.socket(
transport.PUSH, protocol=reversed(protocols.big_brother))
self.stats_queue.connect(config.stats_queue)
delete_queue = self.socket(
transport.REP, protocol=reversed(protocols.cloud_gazer))
delete_queue.bind(const.SKIP_QUEUE)
self.register(delete_queue, self.handle_delete)
self.db = DbClient(config, self.logger)
self.fs = FsClient(config, self.logger)
self.slaves = {}
self.pypi_simple = config.pypi_simple
self.every(timedelta(seconds=10), self.remove_expired)
[docs] def close(self):
self.fs.close()
self.db.close()
SlaveState.status_queue = None
super().close()
[docs] def list_slaves(self):
"""
Additional task control method to trigger a "HELLO" message to the
internal control queue. See :meth:`~.tasks.Task.quit` for more
information.
"""
self._ctrl('HELLO')
[docs] def kill_slave(self, slave_id):
"""
Additional task control method to trigger a "KILL" message to the
internal control queue. See :meth:`handle_control` for more
information.
"""
self._ctrl('KILL', slave_id)
[docs] def sleep_slave(self, slave_id):
"""
Additional task control method to trigger a "SLEEP" message to the
internal control queue. See :meth:`handle_control` for more
information.
"""
self._ctrl('SLEEP', slave_id)
[docs] def skip_slave(self, slave_id):
"""
Additional task control method to trigger a "SKIP" message to the
internal control queue. See :meth:`handle_control` for more
information.
"""
self._ctrl('SKIP', slave_id)
[docs] def wake_slave(self, slave_id):
"""
Additional task control method to trigger a "WAKE" message to the
internal control queue. See :meth:`handle_control` for more
information.
"""
self._ctrl('WAKE', slave_id)
[docs] def remove_expired(self):
"""
Remove slaves which have exceeded their timeout.
"""
expired = {
address: slave
for address, slave in self.slaves.items()
if slave.expired
}
for address, slave in expired.items():
if slave.reply[0] == 'BUILD':
package, version = slave.reply[1]
self.logger.warning(
'slave %d (%s): timed out while building %s %s for %s',
slave.slave_id, slave.label, package, version,
slave.native_abi)
else:
self.logger.warning(
'slave %d (%s): timed out during %s',
slave.slave_id, slave.label, slave.reply[0])
# Send a fake DIE message to the status queue so that listening
# monitors know to remove the entry
slave.reply = ('DIE', None)
del self.slaves[address]
[docs] def handle_control(self, queue):
"""
Handle incoming requests to the internal control queue.
This class understands a couple of extra control messages unique to it,
specifically "KILL" to tell a build slave to terminate, "SKIP" to tell
a build slave to terminate its current build immmediately, and "HELLO"
to cause all "HELLO" messages from build slaves to be replayed (for the
benefit of a newly attached monitor process).
"""
try:
super().handle_control(queue)
except tasks.TaskControl as ctrl:
if ctrl.msg in ('KILL', 'SLEEP', 'SKIP', 'WAKE'):
for slave in self.slaves.values():
if ctrl.data is None or slave.slave_id == ctrl.data:
{
'KILL': slave.kill,
'SLEEP': slave.sleep,
'SKIP': slave.skip,
'WAKE': slave.wake,
}[ctrl.msg]()
elif ctrl.msg == 'HELLO':
for slave in self.slaves.values():
slave.hello()
else:
raise # pragma: no cover
[docs] def handle_build(self, queue):
"""
Refresh the ABI-specific queues of package versions waiting to be
built. The queues are limited to 1000 packages per ABI, and are kept as
lists ordered by release date. When a message arrives from
:class:`TheArchitect` it refreshes (replaces) all current queues. There
is, however, still a duplication possibility as :class:`TheArchitect`
doesn't know what packages are actively being built; this method
handles filtering out such packages.
Even if the active builds fail (because build slaves crash, or the
network dies) this doesn't matter as a future re-run of the build
queue query will return these packages again, and if no build slaves
are actively working on them at that time they will then be retried.
"""
try:
msg, new_queues = queue.recv_msg()
except IOError as e:
self.logger.error(str(e))
else:
self.logger.info('refreshing build-queue')
now = datetime.now(tz=UTC)
build_abis = self.db.get_build_abis()
# Prune expired entries from the excluded_builds buffer and add
# empty dicts for new ABIs
for abi in build_abis:
try:
excluded = self.excluded_builds[abi]
except KeyError:
excluded = {}
self.excluded_builds[abi] = {
key: expires
for key, expires in excluded.items()
if expires > now
}
# Set up the new queues without recent builds (and converting
# list-pairs into tuples)
self.abi_queues = {
abi: [
(package, version) for package, version in new_queue
if (package, version) not in self.excluded_builds[abi]
and (package, None) not in self.excluded_builds[abi]
]
for abi, new_queue in new_queues.items()
}
for abi in build_abis:
self.abi_queues.setdefault(abi, [])
self.stats_queue.send_msg('STATBQ', {
abi: len(queue)
for (abi, queue) in self.abi_queues.items()
})
[docs] def handle_delete(self, queue):
"""
Handle package or version deletion requests.
When the PyPI upstream deletes a version or package, the
:class:`CloudGazer` task requests that other tasks perform the deletion
on its behalf. In the case of this task, this involves cancelling any
pending builds of that package (version), and ignoring any builds
involving that package (version) in the next queue update from
:class:`TheArchitect`.
"""
msg, data = queue.recv_msg()
if msg == 'DELVER':
del_pkg, del_ver = data
elif msg == 'DELPKG':
del_pkg, del_ver = data, None
self.logger.info('marking package %s %s as excluded', del_pkg, del_ver)
for abi in self.db.get_build_abis():
try:
excluded = self.excluded_builds[abi]
except KeyError:
excluded = {}
self.excluded_builds[abi] = excluded
excluded[(del_pkg, del_ver)] = (
datetime.now(tz=UTC) + timedelta(hours=1))
try:
build_queue = self.abi_queues[abi]
except KeyError:
build_queue = []
self.abi_queues[abi] = [
(pkg, ver)
for pkg, ver in build_queue
if (del_pkg, del_ver) not in ((pkg, ver), (pkg, None))
]
for slave in self.slaves.values():
if slave.reply[0] == 'BUILD':
build_pkg, build_ver = slave.reply[1]
if build_pkg == del_pkg and del_ver in (None, build_ver):
self.logger.info('skipping deleted package %s %s',
del_pkg, del_ver)
slave.skip()
queue.send_msg('OK')
[docs] def handle_slave(self, queue):
"""
Handle requests from build slaves.
See the :doc:`slaves` chapter for an overview of the protocol for
messages between build slaves and :class:`SlaveDriver`. This method
retrieves the message from the build slave, finds the associated
:class:`~.states.SlaveState` and updates it with the message, then
calls the appropriate message handler. The handler will be expected to
return a reply (in the usual form of a list of strings) or ``None`` if
no reply should be sent (e.g. for a final "BYE" message).
"""
try:
address, msg, data = queue.recv_addr_msg()
except IOError as e:
self.logger.error(str(e))
return
try:
slave = self.slaves[address]
except KeyError:
if msg == 'HELLO':
slave = SlaveState(address, *data)
else:
# XXX Tell the slave to die?
self.logger.error('invalid first message from slave: %s', msg)
return
slave.request = msg, data
handler = {
'HELLO': self.do_hello,
'BYE': self.do_bye,
'IDLE': self.do_idle,
'BUSY': self.do_busy,
'BUILT': self.do_built,
'SENT': self.do_sent,
}[msg]
msg, data = handler(slave)
if msg is not None:
slave.reply = msg, data
queue.send_addr_msg(address, msg, data)
[docs] def do_hello(self, slave):
"""
Handler for the build slave's initial "HELLO" message. This associates
the specified *slave* state with the slave's address and returns
"HELLO" with the master's id for the slave (the id communicated back
simply for consistency of logging; administrators can correlate master
log messages with slave log messages when both have the same id
number; we can't use IP address for this as multiple slaves can run on
one machine).
:param SlaveState slave:
The object representing the current status of the build slave.
"""
self.logger.warning(
'slave %d (%s): hello (build_timeout=%s, busy_timeout=%s, abi=%s, '
'platform=%s, os_name=%s, os_version=%s, board_revision=%s, '
'board_serial=%s)',
slave.slave_id, slave.label, slave.build_timeout,
slave.busy_timeout, slave.native_abi, slave.native_platform,
slave.os_name, slave.os_version, slave.board_revision,
slave.board_serial)
self.slaves[slave.address] = slave
return 'ACK', [slave.slave_id, self.pypi_simple]
[docs] def do_bye(self, slave):
"""
Handler for the build slave's final "BYE" message upon shutdown. This
removes the associated state from the internal ``slaves`` dict.
:param SlaveState slave:
The object representing the current status of the build slave.
"""
self.logger.warning('slave %d (%s): shutdown',
slave.slave_id, slave.label)
# Send a fake DIE message to the status queue so that listening
# monitors know to remove the entry
slave.reply = 'DIE', protocols.NoData
del self.slaves[slave.address]
return None, None
[docs] def do_idle(self, slave):
"""
Handler for the build slave's "IDLE" message (which is effectively the
slave requesting work). If the master wants to terminate the slave,
it sends back "BYE". If the build queue (for the slave's ABI) is empty
or the task is currently paused, "SLEEP" is returned indicating the
slave should wait a while and then try again.
If a job can be retrieved from the (ABI specific) build queue, then
a "BUILD" message is sent back with the required package and version.
:param SlaveState slave:
The object representing the current status of the build slave.
"""
if slave.reply[0] not in ('ACK', 'SLEEP', 'DONE'):
self.logger.error(
'slave %d (%s): protocol error (IDLE after %s)',
slave.slave_id, slave.label, slave.reply[0])
return 'DIE', protocols.NoData
elif slave.killed:
return 'DIE', protocols.NoData
elif self.paused:
self.logger.info(
'slave %d (%s): sleeping because master is paused',
slave.slave_id, slave.label)
return 'SLEEP', True
else:
try:
abi_queue = self.abi_queues[slave.native_abi]
excluded_builds = self.excluded_builds[slave.native_abi]
except KeyError:
abi_queue = []
try:
while abi_queue:
package, version = abi_queue.pop(0)
if (package, version) not in excluded_builds:
self.logger.info(
'slave %d (%s): build %s %s',
slave.slave_id, slave.label, package, version)
excluded_builds[(package, version)] = (
datetime.now(tz=UTC) + slave.build_timeout)
return 'BUILD', [package, version]
self.logger.info(
'slave %d (%s): sleeping because no builds',
slave.slave_id, slave.label)
return 'SLEEP', False
finally:
# Only push queue stats if there's space in the stats_queue
# (it's not essential; just a nice-to-have)
if self.stats_queue.poll(0, transport.POLLOUT):
self.stats_queue.send_msg('STATBQ', {
abi: len(queue)
for (abi, queue) in self.abi_queues.items()
})
[docs] def do_busy(self, slave):
"""
Handler for the build slave's "BUSY" message, which is sent
periodically during package builds. If the slave fails to respond with
a BUSY ping for a duration longer than :attr:`SlaveState.busy_timeout`
then the master will assume the slave has died and remove it from the
internal state mapping (if the slave happens to resurrect itself later
the master will simply treat it as a new build slave).
In response to "BUSY" the master can respond "CONT" to indicate the
build should continue processing, or "DONE" to indicate that the build
slave should immediately terminate and discard the build and return to
"IDLE" state.
"""
if slave.skipped:
self.logger.info('slave %d (%s): build skipped',
slave.slave_id, slave.label)
return 'DONE', protocols.NoData
else:
return 'CONT', protocols.NoData
[docs] def do_built(self, slave):
"""
Handler for the build slave's "BUILT" message, which is sent after an
attempted package build succeeds or fails. The handler logs the result
in the database and, if files have been generated by the build, informs
the :class:`~.file_juggler.FileJuggler` task to expect a file transfer
before sending "SEND" back to the build slave with the required
filename.
If no files were generated (e.g. in the case of a failed build, or a
degenerate success), "DONE" is returned indicating that the build slave
is free to discard all resources generated during the build and return
to its idle state.
"""
if slave.reply[0] != 'BUILD':
self.logger.error(
'slave %d (%s): protocol error (BUILT after %s)',
slave.slave_id, slave.label, slave.reply[0])
return 'DIE', protocols.NoData
elif slave.skipped:
# If the build was skipped, throw away the result without recording
# success or failure (it may have been skipped because we know
# there's something wrong with the slave)
self.logger.info('slave %d (%s): build skipped',
slave.slave_id, slave.label)
return 'DONE', protocols.NoData
else:
build_armv6l_hack(slave.build)
if slave.build.status and not slave.build.transfers_done:
self.logger.info('slave %d (%s): build succeeded',
slave.slave_id, slave.label)
self.fs.expect(slave.slave_id,
slave.build.files[slave.build.next_file])
self.logger.info('slave %d (%s): send %s',
slave.slave_id, slave.label,
slave.build.next_file)
return 'SEND', slave.build.next_file
else:
self.logger.info('slave %d (%s): build failed',
slave.slave_id, slave.label)
self.db.log_build(slave.build)
self.web_queue.send_msg('PROJECT', slave.build.package)
self.web_queue.recv_msg()
return 'DONE', protocols.NoData
[docs] def do_sent(self, slave):
"""
Handler for the build slave's "SENT" message indicating that it's
finished sending the requested file to :class:`FileJuggler`. The
:class:`FsClient` RPC mechanism is used to ask :class:`FileJuggler` to
verify the transfer against the stored hash and, if this is successful,
a message is sent to :class:`TheScribe` to regenerate the package's
index.
If further files remain to be transferred, another "SEND" message is
returned to the build slave. Otherwise, "DONE" is sent to free all
build resources.
If a transfer fails to verify, another "SEND" message with the same
filename is returned to the build slave.
"""
if slave.reply[0] != 'SEND':
self.logger.error(
'slave %d (%s): protocol error (SENT after %s)',
slave.slave_id, slave.label, slave.reply[0])
return 'DIE', protocols.NoData
elif self.fs.verify(slave.slave_id, slave.build.package):
slave.build.files[slave.build.next_file].verified()
self.logger.info(
'slave %d (%s): verified transfer of %s',
slave.slave_id, slave.label, slave.reply[1])
if slave.build.transfers_done:
self.db.log_build(slave.build)
self.web_queue.send_msg('BOTH', slave.build.package)
self.web_queue.recv_msg()
return 'DONE', protocols.NoData
else:
self.fs.expect(slave.slave_id,
slave.build.files[slave.build.next_file])
self.logger.info('slave %d (%s): send %s',
slave.slave_id, slave.label,
slave.build.next_file)
return 'SEND', slave.build.next_file
else:
self.logger.info('slave %d (%s): re-send %s',
slave.slave_id, slave.label,
slave.build.next_file)
return 'SEND', slave.build.next_file
def build_armv6l_hack(build):
"""
A dirty hack for armv6l wheels; if the build contains any arch-specific
wheels for armv7l, generate equivalent armv6l entries from them (with
the transferred flag set to True as nothing actually needs transferring).
"""
for file in list(build.files.values()):
if file.platform_tag == 'linux_armv7l':
arm7_name = file.filename
arm6_name = arm7_name[:-16] + 'linux_armv6l.whl'
if arm6_name not in build.files:
build.files[arm6_name] = FileState(
arm6_name, file.filesize, file.filehash, file.package_tag,
file.package_version_tag, file.py_version_tag,
file.abi_tag, 'linux_armv6l', file.dependencies, True)