implemented index alias to make import atomic (#46)

This commit is contained in:
Rok Garbas 2020-05-22 14:50:44 +02:00 committed by GitHub
parent b264d7af55
commit 2600984760
Failed to generate hash of commit
3 changed files with 72 additions and 72 deletions

View file

@ -29,8 +29,7 @@ click_log.basic_config(logger)
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
INDEX_SCHEMA_VERSION = 1
ANALYSIS = { ANALYSIS = {
"analyzer": { "analyzer": {
"nixAttrName": { "nixAttrName": {
@ -70,6 +69,43 @@ ANALYSIS = {
}, },
}, },
} }
PACKAGES_MAPPING = dict(
properties=dict(
attr_name=dict(
type="text", analyzer="nixAttrName", fields={"raw": {"type": "keyword"}},
),
attr_set=dict(type="keyword"),
pname=dict(type="keyword"),
pversion=dict(type="keyword"),
description=dict(type="text"),
longDescription=dict(type="text"),
license=dict(
type="nested",
properties=dict(fullName=dict(type="text"), url=dict(type="text"),),
),
maintainers=dict(
type="nested",
properties=dict(
name=dict(type="text"),
email=dict(type="text"),
github=dict(type="text"),
),
),
platforms=dict(type="keyword"),
position=dict(type="text"),
homepage=dict(type="keyword"),
),
)
OPTIONS_MAPPING = dict(
properties=dict(
option_name=dict(type="keyword"),
description=dict(type="text"),
type=dict(type="keyword"),
default=dict(type="text"),
example=dict(type="text"),
source=dict(type="keyword"),
),
)
def get_last_evaluation(channel): def get_last_evaluation(channel):
@ -235,75 +271,30 @@ def get_options(evaluation):
return len(options), gen return len(options), gen
def recreate_index(es, channel): def create_index(es, index, mapping):
packages_index = f"{channel}-packages" if es.indices.exists(index):
if es.indices.exists(packages_index): logger.debug(f"create_index: index '{index}' already exists")
es.indices.delete(index=packages_index) return
logger.debug(
f"recreate_index: index '{packages_index}' already exists and was deleted"
)
es.indices.create( es.indices.create(
index=packages_index, index=index,
body=dict( body={
settings=dict(number_of_shards=1, analysis=ANALYSIS), "settings": {"number_of_shards": 1, "analysis": ANALYSIS},
mappings=dict( "mappings": mapping,
properties=dict( },
attr_name=dict(
type="text",
analyzer="nixAttrName",
fields={"raw": {"type": "keyword"}},
),
attr_set=dict(type="keyword"),
pname=dict(type="keyword"),
pversion=dict(type="keyword"),
description=dict(type="text"),
longDescription=dict(type="text"),
license=dict(
type="nested",
properties=dict(
fullName=dict(type="text"), url=dict(type="text"),
),
),
maintainers=dict(
type="nested",
properties=dict(
name=dict(type="text"),
email=dict(type="text"),
github=dict(type="text"),
),
),
platforms=dict(type="keyword"),
position=dict(type="text"),
homepage=dict(type="keyword"),
),
),
),
) )
logger.debug(f"recreate_index: index '{packages_index}' was created") logger.debug(f"create_index: index '{index}' was created")
options_index = f"{channel}-options"
if es.indices.exists(options_index): def create_index_name(type_, channel, evaluation):
es.indices.delete(index=options_index) return (
logger.debug( f"latest-{channel}-{type_}",
f"recreate_index: index '{options_index}' already exists and was deleted" f"evaluation-{INDEX_SCHEMA_VERSION}-{channel}-{evaluation['revisions_since_start']}-{evaluation['git_revision']}-{type_}",
)
es.indices.create(
index=options_index,
body=dict(
settings=dict(number_of_shards=1, analysis=ANALYSIS),
mappings=dict(
properties=dict(
option_name=dict(type="keyword"),
description=dict(type="text"),
type=dict(type="keyword"),
default=dict(type="text"),
example=dict(type="text"),
source=dict(type="keyword"),
),
),
),
) )
logger.debug(f"recreate_index: index '{options_index}' was created")
def update_alias(es, name, index):
es.indices.put_alias(index=index, name=name)
logger.debug(f"'{name}' alias now points to '{index}' index")
@click.command() @click.command()
@ -324,7 +315,12 @@ def main(es_url, channel, verbose):
evaluation = get_last_evaluation(channel) evaluation = get_last_evaluation(channel)
es = elasticsearch.Elasticsearch([es_url]) es = elasticsearch.Elasticsearch([es_url])
recreate_index(es, channel)
# ensure indexes exist
packages_alias, packages_index = create_index_name("packages", channel, evaluation)
options_alias, options_index = create_index_name("options", channel, evaluation)
create_index(es, packages_index, PACKAGES_MAPPING)
create_index(es, options_index, OPTIONS_MAPPING)
# write packages # write packages
number_of_packages, gen_packages = get_packages(evaluation) number_of_packages, gen_packages = get_packages(evaluation)
@ -333,7 +329,7 @@ def main(es_url, channel, verbose):
progress = tqdm.tqdm(unit="packages", total=number_of_packages) progress = tqdm.tqdm(unit="packages", total=number_of_packages)
successes = 0 successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk( for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=f"{channel}-packages", actions=gen_packages() client=es, index=packages_index, actions=gen_packages()
): ):
progress.update(1) progress.update(1)
successes += ok successes += ok
@ -346,12 +342,16 @@ def main(es_url, channel, verbose):
progress = tqdm.tqdm(unit="options", total=number_of_options) progress = tqdm.tqdm(unit="options", total=number_of_options)
successes = 0 successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk( for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=f"{channel}-options", actions=gen_options() client=es, index=options_index, actions=gen_options()
): ):
progress.update(1) progress.update(1)
successes += ok successes += ok
print("Indexed %d/%d options" % (successes, number_of_options)) print("Indexed %d/%d options" % (successes, number_of_options))
# update alias
update_alias(es, packages_alias, packages_index)
update_alias(es, options_alias, options_index)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View file

@ -219,7 +219,7 @@ makeRequest :
makeRequest options channel query from size = makeRequest options channel query from size =
ElasticSearch.makeRequest ElasticSearch.makeRequest
"option_name" "option_name"
("nixos-" ++ channel ++ "-options") ("latest-nixos-" ++ channel ++ "-options")
decodeResultItemSource decodeResultItemSource
options options
query query

View file

@ -287,7 +287,7 @@ makeRequest :
makeRequest options channel query from size = makeRequest options channel query from size =
ElasticSearch.makeRequest ElasticSearch.makeRequest
"attr_name" "attr_name"
("nixos-" ++ channel ++ "-packages") ("latest-nixos-" ++ channel ++ "-packages")
decodeResultItemSource decodeResultItemSource
options options
query query