import channel script should be idempotent (#47)
This commit is contained in:
parent
2600984760
commit
c45a1581b1
|
@ -271,10 +271,11 @@ def get_options(evaluation):
|
||||||
return len(options), gen
|
return len(options), gen
|
||||||
|
|
||||||
|
|
||||||
def create_index(es, index, mapping):
|
def ensure_index(es, index, mapping):
|
||||||
if es.indices.exists(index):
|
if es.indices.exists(index):
|
||||||
logger.debug(f"create_index: index '{index}' already exists")
|
logger.debug(f"ensure_index: index '{index}' already exists")
|
||||||
return
|
return False
|
||||||
|
|
||||||
es.indices.create(
|
es.indices.create(
|
||||||
index=index,
|
index=index,
|
||||||
body={
|
body={
|
||||||
|
@ -282,10 +283,12 @@ def create_index(es, index, mapping):
|
||||||
"mappings": mapping,
|
"mappings": mapping,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
logger.debug(f"create_index: index '{index}' was created")
|
logger.debug(f"ensure_index: index '{index}' was created")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def create_index_name(type_, channel, evaluation):
|
def ensure_index_name(type_, channel, evaluation):
|
||||||
return (
|
return (
|
||||||
f"latest-{channel}-{type_}",
|
f"latest-{channel}-{type_}",
|
||||||
f"evaluation-{INDEX_SCHEMA_VERSION}-{channel}-{evaluation['revisions_since_start']}-{evaluation['git_revision']}-{type_}",
|
f"evaluation-{INDEX_SCHEMA_VERSION}-{channel}-{evaluation['revisions_since_start']}-{evaluation['git_revision']}-{type_}",
|
||||||
|
@ -317,40 +320,44 @@ def main(es_url, channel, verbose):
|
||||||
es = elasticsearch.Elasticsearch([es_url])
|
es = elasticsearch.Elasticsearch([es_url])
|
||||||
|
|
||||||
# ensure indexes exist
|
# ensure indexes exist
|
||||||
packages_alias, packages_index = create_index_name("packages", channel, evaluation)
|
options_alias, options_index = ensure_index_name("options", channel, evaluation)
|
||||||
options_alias, options_index = create_index_name("options", channel, evaluation)
|
packages_alias, packages_index = ensure_index_name("packages", channel, evaluation)
|
||||||
create_index(es, packages_index, PACKAGES_MAPPING)
|
packages_index_created = ensure_index(es, packages_index, PACKAGES_MAPPING)
|
||||||
create_index(es, options_index, OPTIONS_MAPPING)
|
options_index_created = ensure_index(es, options_index, OPTIONS_MAPPING)
|
||||||
|
|
||||||
# write packages
|
# write packages
|
||||||
number_of_packages, gen_packages = get_packages(evaluation)
|
if packages_index_created:
|
||||||
if number_of_packages:
|
number_of_packages, gen_packages = get_packages(evaluation)
|
||||||
click.echo("Indexing packages...")
|
if number_of_packages:
|
||||||
progress = tqdm.tqdm(unit="packages", total=number_of_packages)
|
click.echo("Indexing packages...")
|
||||||
successes = 0
|
progress = tqdm.tqdm(unit="packages", total=number_of_packages)
|
||||||
for ok, action in elasticsearch.helpers.streaming_bulk(
|
successes = 0
|
||||||
client=es, index=packages_index, actions=gen_packages()
|
for ok, action in elasticsearch.helpers.streaming_bulk(
|
||||||
):
|
client=es, index=packages_index, actions=gen_packages()
|
||||||
progress.update(1)
|
):
|
||||||
successes += ok
|
progress.update(1)
|
||||||
click.echo("Indexed %d/%d packages" % (successes, number_of_packages))
|
successes += ok
|
||||||
|
click.echo("Indexed %d/%d packages" % (successes, number_of_packages))
|
||||||
|
|
||||||
# write options
|
# write options
|
||||||
number_of_options, gen_options = get_options(evaluation)
|
if options_index_created:
|
||||||
if number_of_options:
|
number_of_options, gen_options = get_options(evaluation)
|
||||||
click.echo("Indexing options...")
|
if number_of_options:
|
||||||
progress = tqdm.tqdm(unit="options", total=number_of_options)
|
click.echo("Indexing options...")
|
||||||
successes = 0
|
progress = tqdm.tqdm(unit="options", total=number_of_options)
|
||||||
for ok, action in elasticsearch.helpers.streaming_bulk(
|
successes = 0
|
||||||
client=es, index=options_index, actions=gen_options()
|
for ok, action in elasticsearch.helpers.streaming_bulk(
|
||||||
):
|
client=es, index=options_index, actions=gen_options()
|
||||||
progress.update(1)
|
):
|
||||||
successes += ok
|
progress.update(1)
|
||||||
print("Indexed %d/%d options" % (successes, number_of_options))
|
successes += ok
|
||||||
|
print("Indexed %d/%d options" % (successes, number_of_options))
|
||||||
|
|
||||||
# update alias
|
# update alias
|
||||||
update_alias(es, packages_alias, packages_index)
|
if packages_index_created:
|
||||||
update_alias(es, options_alias, options_index)
|
update_alias(es, packages_alias, packages_index)
|
||||||
|
if options_index_created:
|
||||||
|
update_alias(es, options_alias, options_index)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
Loading…
Reference in a new issue