Source code for piwheels.master.mr_chase

# The piwheels project
#   Copyright (c) 2017 Ben Nuttall <https://github.com/bennuttall>
#   Copyright (c) 2017 Dave Jones <dave@waveform.org.uk>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the copyright holder nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

"""
Defines the :class:`MrChase` task; see class for more details.

.. autoclass:: MrChase
    :members:
"""

from datetime import datetime, timezone
from logging import Logger
from piwheels.format import canonicalize_name

from .. import const, protocols, transport, tasks
from ..states import BuildState
from .the_oracle import DbClient
from .file_juggler import FsClient
from .slave_driver import build_armv6l_hack


UTC = timezone.utc


[docs]class MrChase(tasks.PauseableTask): """ This task handles smuggling packages into the database manually. It is the task that the :program:`piw-import` script talks to in order to import packages. Internally, the task is essentially an abbreviated :class:`~slave_driver.SlaveDriver` (in as much as it has to perform similar database and file-system interactions) but without having to handle talking to lots of build slaves. """ name = 'master.mr_chase' def __init__(self, config): super().__init__(config) import_queue = self.socket( transport.ROUTER, protocol=protocols.mr_chase) import_queue.bind(config.import_queue) self.register(import_queue, self.handle_import) self.status_queue = self.socket( transport.PUSH, protocol=protocols.monitor_stats) self.status_queue.hwm = 10 self.status_queue.connect(const.INT_STATUS_QUEUE) self.web_queue = self.socket( transport.REQ, protocol=reversed(protocols.the_scribe)) self.web_queue.connect(config.web_queue) self.stats_queue = self.socket( transport.PUSH, protocol=reversed(protocols.big_brother)) self.stats_queue.connect(config.stats_queue) self.skip_queue = self.socket( transport.REQ, protocol=protocols.cloud_gazer) self.skip_queue.connect(const.SKIP_QUEUE) self.db = DbClient(config, self.logger) self.fs = FsClient(config, self.logger) self.states = {}
[docs] def close(self): self.fs.close() self.db.close() super().close()
[docs] def handle_import(self, queue): """ Handle requests from :program:`piw-import` instances. See the :doc:`importer` and :doc:`remove` chapters for an overview of the protocol for messages between the importer and :class:`MrChase`. """ # pylint: disable=too-many-locals try: address, msg, data = queue.recv_addr_msg() except IOError as e: # XXX How do we ditch states of errored / unresponsive clients? self.logger.error(str(e)) return try: state = self.states[address] except KeyError: if msg == 'IMPORT': state = BuildState.from_message(data) # XXX Slave ID is always 0 ... what happens if two simultaneous # imports are attempted, particularly re the file-expect # mechanism? state._slave_id = 0 self.states[address] = state elif msg in ('ADDPKG', 'ADDVER', 'REMPKG', 'REMVER', 'REBUILD'): # No need to store state for these tools state = data elif msg == 'SENT': self.logger.error('SENT before IMPORT') queue.send_addr_msg(address, 'ERROR', 'protocol violation') return handler = { 'ADDPKG': self.do_add_package, 'ADDVER': self.do_add_version, 'IMPORT': self.do_import, 'REMPKG': self.do_remove_package, 'REMVER': self.do_remove_version, 'REBUILD': self.do_rebuild, 'SENT': self.do_sent, }[msg] msg, data = handler(state) if msg in ('DONE', 'ERROR'): self.states.pop(address, None) queue.send_addr_msg(address, msg, data)
[docs] def do_import(self, state): """ Handler for the importer's initial "IMPORT" message. This method checks the information in the state passes some simple tests, then ensures that the requested package and version exist in the database (creating them if necessary). """ # pylint: disable=too-many-return-statements if not state.status: self.logger.error('attempting to add failed build') return 'ERROR', 'importing a failed build is not supported' if not state.files: self.logger.error('attempting to add empty build') return 'ERROR', 'no files listed for import' build_armv6l_hack(state) build_abis = self.db.get_build_abis() if state.abi_tag not in build_abis: self.logger.error('invalid ABI: %s', state.abi_tag) return 'ERROR', 'invalid ABI: %s' % state.abi_tag if not self.db.test_package_version(state.package, state.version): self.logger.error('unknown package version %s %s', state.package, state.version) return 'ERROR', 'unknown package version %s %s' % ( state.package, state.version) try: self.db.log_build(state) except IOError as err: self.logger.error('failed to log build: %s', err) return 'ERROR', str(err) self.logger.info('registered build for %s %s', state.package, state.version) if state.status and not state.transfers_done: self.fs.expect(0, state.files[state.next_file]) self.logger.info('send %s', state.next_file) return 'SEND', state.next_file else: # XXX We'll never reach this branch at the moment, but in future we # might well support failed builds (as another method of skipping # builds) self.web_queue.send_msg('LOG', (state.build_id, state.output)) self.web_queue.recv_msg() self.web_queue.send_msg('PROJECT', state.package) self.web_queue.recv_msg() return 'DONE', protocols.NoData
[docs] def do_sent(self, state): """ Handler for the importer's "SENT" message indicating that it's finished sending the requested file to :class:`FileJuggler`. The file is verified (as in :class:`SlaveDriver`) and, if this is successful, a mesasge is sent to :class:`TheScribe` to regenerate the package's index. If further files remain to be transferred, another "SEND" message is returned to the build slave. Otherwise, "DONE" is sent to free all build resources. If a transfer fails to verify, another "SEND" message with the same filename is returned to the build slave. """ if self.fs.verify(0, state.package): self.logger.info('verified transfer of %s', state.next_file) state.files[state.next_file].verified() if state.transfers_done: self.web_queue.send_msg('LOG', (state.build_id, state.output)) self.web_queue.recv_msg() self.web_queue.send_msg('BOTH', state.package) self.web_queue.recv_msg() return 'DONE', 'IMPORT' else: self.fs.expect(0, state.files[state.next_file]) self.logger.info('send %s', state.next_file) return 'SEND', state.next_file else: self.logger.info('re-send %s', state.next_file) return 'SEND', state.next_file
[docs] def do_add_package(self, state): """ Handler for the remover's "ADDPKG" message, indicating a request to add a package to the system, or update it. """ display_name, description, skip, unskip, aliases = state package = canonicalize_name(display_name) aliases = set(aliases) | {package, display_name} # Ensure display_name sorts last, so it is treated as display name aliases = sorted(aliases, key=lambda s: s == display_name) self.logger.info('adding package %s', package) if self.db.add_new_package(package, skip, description): rewrite = 'BOTH' msg, data = 'DONE', 'NEWPKG' else: self.logger.info('updating package %s', package) if skip: return 'ERROR', 'SKIPPKG' if unskip: self.db.skip_package(package, reason='') if description: self.db.set_package_description(package, description) rewrite = 'PROJECT' msg, data = 'DONE', 'UPDPKG' self.do_add_package_aliases(package, aliases) self.web_queue.send_msg(rewrite, package) self.web_queue.recv_msg() return msg, data
[docs] def do_add_version(self, state): """ Handler for the remover's "ADDVER" message, indicating a request to add a specific version of a package to the system, or update it. """ ( display_name, version, skip, unskip, released, yank, unyank, aliases ) = state package = canonicalize_name(display_name) aliases = set(aliases) | {package, display_name} # Ensure display_name sorts last, so it is treated as display name aliases = sorted(aliases, key=lambda s: s == display_name) self.logger.info('adding version %s %s', package, version) if not self.db.test_package(package): return 'ERROR', 'NOPKG' if self.db.add_new_package_version(package, version, released, skip): if yank: self.db.yank_version(package, version) rewrite = 'PROJECT' msg, data = 'DONE', 'NEWVER' else: self.logger.info('updating version %s %s', package, version) if skip: return 'ERROR', 'SKIPVER' if unskip: self.db.skip_package_version(package, version, reason='') if yank: return 'ERROR', 'YANKVER' if unyank: self.db.unyank_version(package, version) rewrite = 'BOTH' msg, data = 'DONE', 'UPDVER' self.do_add_package_aliases(package, aliases) self.web_queue.send_msg(rewrite, package) self.web_queue.recv_msg() return msg, data
[docs] def do_add_package_aliases(self, package, aliases): "Add aliases for a package name" for alias in aliases: self.db.add_package_name(package, alias, datetime.now(tz=UTC))
[docs] def do_remove_package(self, state): """ Handler for the remover's "REMPKG" message, indicating a request to remove or alter a whole package. """ package, builds, skip = state package = canonicalize_name(package) if not self.db.test_package(package): self.logger.error('unknown package %s', package) return 'ERROR', 'NOPKG' if skip or builds: if skip: self.logger.info('marking package %s as skipped', package) self.db.skip_package(package, skip) msg = 'SKIPPKG' if builds: self.logger.info('deleting all builds for package %s', package) for version in self.db.get_project_data(package)['releases']: self.db.delete_build(package, version) msg = 'DELPKGBLD' else: self.logger.info('deleting package %s', package) # FKs will take care of removing builds here self.db.delete_package(package) msg = 'DELPKG' self.web_queue.send_msg('DELPKG', package) self.skip_queue.send_msg('DELPKG', package) self.web_queue.recv_msg() self.skip_queue.recv_msg() return 'DONE', msg
[docs] def do_remove_version(self, state): """ Handler for the remover's "REMVER" message, indicating a request to remove or alter a specific package version. """ package, version, builds, skip, yank = state package = canonicalize_name(package) if not self.db.test_package_version(package, version): self.logger.error('unknown package version %s %s', package, version) return 'ERROR', 'NOVER' if skip or builds or yank: if skip: self.logger.info('marking %s %s as skipped', package, version) self.db.skip_package_version(package, version, skip) msg = 'SKIPVER' if yank: self.logger.info('yanking %s %s', package, version) self.db.yank_version(package, version) self.web_queue.send_msg('BOTH', package) self.web_queue.recv_msg() msg = 'YANKVER' if builds: self.logger.info('deleting all builds for %s %s', package, version) self.db.delete_build(package, version) msg = 'DELVERBLD' else: self.logger.info('removing %s %s', package, version) self.db.delete_version(package, version) msg = 'DELVER' if msg in ('SKIPVER', 'DELVER', 'DELVERBLD'): self.web_queue.send_msg('DELVER', [package, version]) self.skip_queue.send_msg('DELVER', [package, version]) self.web_queue.recv_msg() self.skip_queue.recv_msg() return 'DONE', msg
[docs] def do_rebuild(self, state): """ Handler for the rebuilder's "REBUILD" message, indicating a request to rebuild part of the website. """ part, *state = state if part in ('HOME', 'SEARCH'): self.logger.info('requesting rebuild of homepage and search') self.stats_queue.send_msg('HOME') else: # ('PROJECT', 'BOTH'): package, = state if package is None: self.logger.warning('requesting rebuild of *all* pages') for package in self.db.get_all_packages(): self.web_queue.send_msg(part, package) self.web_queue.recv_msg() elif self.db.test_package(package): self.logger.info('requesting rebuild of pages for %s', package) self.web_queue.send_msg(part, package) self.web_queue.recv_msg() else: return 'ERROR', 'unknown package %s' % package return 'DONE', 'REBUILD'