#! /usr/bin/env nix-shell #! nix-shell -i python3 -p python3 python3Packages.click python3Packages.click-log python3Packages.elasticsearch python3Packages.boto3 python3Packages.tqdm # develop: # $ nix-shell -p python3Packages.black python3Packages.mypy python3Packages.flake8 # # format: # $ nix-shell -p python3Packages.black --command "black import-channel" # # lint: # $ nix-shell -p python3Packages.flake8 --command "flake8 --ignore E501,E265 import-channel" import boto3 import click import logging import click_log import elasticsearch import elasticsearch.helpers import json import os.path import shlex import subprocess import tqdm import botocore.client import botocore logger = logging.getLogger("import-channel") click_log.basic_config(logger) CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) ANALYSIS = { "analyzer": { "nixAttrName": { "type": "custom", "tokenizer": "nix_attrname", "filter": ["lowercase", "nix_stopwords"], }, }, "tokenizer": { "nix_attrname": { "type": "pattern", # Split on attrname separators like _, . "pattern": "|".join( [ "[_.-]", # Common separators like underscores, dots and dashes "\\d+?Packages", # python37Packages -> python # Camelcase tokenizer adapted from # https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-pattern-analyzer.html "".join( [ "(?<=[\\p{L}&&[^\\p{Lu}]])" # lower case "(?=\\p{Lu})", # followed by upper case "|", "(?<=\\p{Lu})" # or upper case "(?=\\p{Lu}[\\p{L}&&[^\\p{Lu}]])", # followed by lower case ] ), ] ), }, }, "filter": { "nix_stopwords": { "type": "stop", "ignore_case": True, "stopwords": ["packages", "package", "options", "option"], }, }, } def get_last_evaluation(channel): logger.debug(f"Retriving last evaluation for {channel} channel") project, project_version = channel.split("-", 1) logger.debug(f"get_last_evaluation: project='{project}'") logger.debug(f"get_last_evaluation: project_version='{project_version}'") bucket = "nix-releases" prefix = f"{project}/{project_version}/" logger.debug( f"get_last_evaluation: list all evaluation in '{bucket}' bucker under '{prefix}' prefix" ) s3 = boto3.client( "s3", config=botocore.client.Config(signature_version=botocore.UNSIGNED) ) s3_result = s3.list_objects(Bucket=bucket, Prefix=prefix, Delimiter="/",) evaluations = [] for item in s3_result.get("CommonPrefixes"): if not item: continue logger.debug(f"get_last_evaluation: evaluation in raw {item}") prefix = item.get("Prefix") evaluation = prefix[len(f"{project}/{project_version}/{channel}") :] if evaluation.startswith("beta"): evaluation = evaluation[len("beta") :] try: revisions_since_start, git_revision = ( evaluation.lstrip(".").rstrip("/").split(".") ) except Exception as e: # noqa continue evaluation = { "revisions_since_start": int(revisions_since_start), "git_revision": git_revision, "prefix": prefix, } logger.debug(f"get_last_evaluation: evaluation {evaluation}") evaluations.append(evaluation) logger.debug( f"get_last_evaluation: {len(evaluations)} evaluations found for {channel} channel" ) evaluations = sorted(evaluations, key=lambda i: i["revisions_since_start"]) logger.debug(f"get_last_evaluation: last evaluation is: {evaluations[-1]}") return evaluations[-1] def get_packages(evaluation): logger.debug( f"get_packages: Retriving list of packages for '{evaluation['git_revision']}' revision" ) result = subprocess.run( shlex.split( f"nix-env -f '' -I nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/{evaluation['git_revision']}.tar.gz --arg config 'import {CURRENT_DIR}/packages-config.nix' -qa --json" ), stdout=subprocess.PIPE, check=True, ) packages = json.loads(result.stdout).items() packages = list(packages) def gen(): for attr_name, data in packages: position = data["meta"].get("position") if position and position.startswith("/nix/store"): position = position[44:] licenses = data["meta"].get("license") if licenses: if type(licenses) == str: licenses = [dict(fullName=licenses)] elif type(licenses) == dict: licenses = [licenses] licenses = [ type(license) == str and dict(fullName=license, url=None) or dict(fullName=license.get("fullName"), url=license.get("url"),) for license in licenses ] else: licenses = [] maintainers = [ type(maintainer) == str and dict(name=maintainer, email=None, github=None) or dict( name=maintainer.get("name"), email=maintainer.get("email"), github=maintainer.get("github"), ) for maintainer in data["meta"].get("maintainers", []) ] platforms = [ type(platform) == str and platform or None for platform in data["meta"].get("platforms", []) ] attr_set = None if "." in attr_name: attr_set = attr_name.split(".")[0] if not attr_set.endswith("Packages") and not attr_set.endswith( "Plugins" ): attr_set = None doc = dict( id=attr_name, attr_name=attr_name, attr_set=attr_set, pname=data["pname"], pversion=data["version"], description=data["meta"].get("description"), longDescription=data["meta"].get("longDescription", ""), license=licenses, maintainers=maintainers, platforms=[i for i in platforms if i], position=position, homepage=data["meta"].get("homepage"), ) yield doc logger.debug(f"get_packages: Found {len(packages)} packages") return len(packages), gen def get_options(evaluation): result = subprocess.run( shlex.split( f"nix-build --no-out-link -A options -I nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/{evaluation['git_revision']}.tar.gz" ), stdout=subprocess.PIPE, check=True, ) options = [] options_file = result.stdout.strip().decode() options_file = f"{options_file}/share/doc/nixos/options.json" if os.path.exists(options_file): with open(options_file) as f: options = json.load(f).items() options = list(options) def gen(): for name, option in options: example = option.get("example") if ( example and type(example) == dict and example.get("_type") == "literalExample" ): example = str(example["text"]) yield dict( id=name, option_name=name, description=option.get("description"), type=option.get("type"), default=str(option.get("default")), example=str(example), source=option.get("declarations", [None])[0], ) return len(options), gen def recreate_index(es, channel): packages_index = f"{channel}-packages" if es.indices.exists(packages_index): es.indices.delete(index=packages_index) logger.debug( f"recreate_index: index '{packages_index}' already exists and was deleted" ) es.indices.create( index=packages_index, body=dict( settings=dict(number_of_shards=1, analysis=ANALYSIS), mappings=dict( properties=dict( attr_name=dict( type="text", analyzer="nixAttrName", fields={"raw": {"type": "keyword"}}, ), attr_set=dict(type="keyword"), pname=dict(type="keyword"), pversion=dict(type="keyword"), description=dict(type="text"), longDescription=dict(type="text"), license=dict( type="nested", properties=dict( fullName=dict(type="text"), url=dict(type="text"), ), ), maintainers=dict( type="nested", properties=dict( name=dict(type="text"), email=dict(type="text"), github=dict(type="text"), ), ), platforms=dict(type="keyword"), position=dict(type="text"), homepage=dict(type="keyword"), ), ), ), ) logger.debug(f"recreate_index: index '{packages_index}' was created") options_index = f"{channel}-options" if es.indices.exists(options_index): es.indices.delete(index=options_index) logger.debug( f"recreate_index: index '{options_index}' already exists and was deleted" ) es.indices.create( index=options_index, body=dict( settings=dict(number_of_shards=1, analysis=ANALYSIS), mappings=dict( properties=dict( option_name=dict(type="keyword"), description=dict(type="text"), type=dict(type="keyword"), default=dict(type="text"), example=dict(type="text"), source=dict(type="keyword"), ), ), ), ) logger.debug(f"recreate_index: index '{options_index}' was created") @click.command() @click.option("-u", "--es-url", help="Elasticsearch connection url") @click.option("-c", "--channel", help="NixOS channel name") @click.option("-v", "--verbose", count=True) def main(es_url, channel, verbose): logging_level = "CRITICAL" if verbose == 1: logging_level = "WARNING" elif verbose >= 2: logging_level = "DEBUG" logger.setLevel(getattr(logging, logging_level)) logger.debug(f"Verbosity is {verbose}") logger.debug(f"Logging set to {logging_level}") evaluation = get_last_evaluation(channel) es = elasticsearch.Elasticsearch([es_url]) recreate_index(es, channel) # write packages number_of_packages, gen_packages = get_packages(evaluation) if number_of_packages: click.echo("Indexing packages...") progress = tqdm.tqdm(unit="packages", total=number_of_packages) successes = 0 for ok, action in elasticsearch.helpers.streaming_bulk( client=es, index=f"{channel}-packages", actions=gen_packages() ): progress.update(1) successes += ok click.echo("Indexed %d/%d packages" % (successes, number_of_packages)) # write options number_of_options, gen_options = get_options(evaluation) if number_of_options: click.echo("Indexing options...") progress = tqdm.tqdm(unit="options", total=number_of_options) successes = 0 for ok, action in elasticsearch.helpers.streaming_bulk( client=es, index=f"{channel}-options", actions=gen_options() ): progress.update(1) successes += ok print("Indexed %d/%d options" % (successes, number_of_options)) if __name__ == "__main__": main() # vi:ft=python