aux-search/flake-info/src/elastic.rs
Naïm Favier f31e580e52
Clean up workflows (#415)
* Clean up workflows

* Rename workflow files

* Temporarily enable workflows from PR

* Add nixpkgs channel to NIX_PATH

* Fix typo

* Fix index names for warmup

* Build verbosely

* Fix index names again

* Re-add jq

* Revert "Temporarily enable workflows from PR"

This reverts commit dd040e94645d9e61b80505471f57b1c0e35b1210.
2022-01-17 12:18:00 +01:00

539 lines
18 KiB
Rust

use std::{borrow::Borrow, collections::HashMap};
use clap::arg_enum;
pub use elasticsearch::http::transport::Transport;
use elasticsearch::{
http::response::{self, Response},
indices::{
IndicesCreateParts, IndicesDeleteAliasParts, IndicesDeleteParts, IndicesExistsParts,
IndicesGetAliasParts, IndicesPutAliasParts, IndicesUpdateAliasesParts,
},
BulkOperation, Elasticsearch as Client,
};
use lazy_static::lazy_static;
use log::{info, warn};
use serde_json::{json, Value};
use thiserror::Error;
use crate::data::Export;
lazy_static! {
static ref MAPPING: Value = json!({
"mappings": {
"properties": {
"type": {"type": "keyword"},
"flake_name": {
"type": "text",
"analyzer": "english",
},
"flake_description": {
"type": "text",
"analyzer": "english",
},
"flake_resolved": {
"type": "nested",
"properties": {
"type": {
"type": "keyword"
},
"owner": {
"type": "keyword"
},
"repo": {
"type": "keyword"
},
"url" : {
"type": "keyword"
}
}
},
"flake_source": {
"type": "nested",
"properties": {
"type": {
"type": "keyword"
},
"owner": {
"type": "keyword"
},
"repo": {
"type": "keyword"
},
"desciption": {
"type": "text",
"analyzer": "english",
},
"git_ref": {
"type": "keyword"
},
"url": {
"type": "keyword"
},
}
},
"package_attr_name": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_attr_name_reverse": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_attr_name_query": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_attr_name_query_reverse": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_attr_set": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_attr_set_reverse": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_pname": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_pname_reverse": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_pversion": {
"type": "keyword"
},
"package_platforms": {
"type": "keyword"
},
"package_system": {
"type": "keyword"
},
"package_position": {
"type": "text"
},
"package_outputs": {
"type": "keyword"
},
"package_description": {
"type": "text",
"analyzer": "english",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_description_reverse": {
"type": "text",
"analyzer": "english",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_longDescription": {
"type": "text",
"analyzer": "english",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_longDescription_reverse": {
"type": "text",
"analyzer": "english",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"package_license": {
"type": "nested",
"properties": {
"fullName": {"type": "text"},
"url": {"type": "text"}},
},
"package_license_set": {"type": "keyword"},
"package_maintainers": {
"type": "nested",
"properties": {
"name": {"type": "text"},
"email": {"type": "text"},
"github": {"type": "text"},
},
},
"package_maintainers_set": {"type": "keyword"},
"package_homepage": {
"type": "keyword"
},
// Options fields
"option_name": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"option_name_reverse": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"option_name": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"option_name_reverse": {
"type": "keyword",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"option_description": {
"type": "text",
"analyzer": "english",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"option_description_reverse": {
"type": "text",
"analyzer": "english",
"fields": {"edge": {"type": "text", "analyzer": "edge"}},
},
"option_type": {"type": "keyword"},
"option_default": {"type": "text"},
"option_example": {"type": "text"},
"option_source": {"type": "keyword"},
}
},
"settings": {
"analysis": {
"normalizer": {
"lowercase": {"type": "custom", "char_filter": [], "filter": ["lowercase"]}
},
"tokenizer": {
"edge": {
"type": "edge_ngram",
"min_gram": 2,
"max_gram": 50,
"token_chars": [
"letter",
"digit",
// Either we use them or we would need to strip them before that.
"punctuation",
"symbol",
],
},
},
"analyzer": {
"edge": {"tokenizer": "edge", "filter": ["lowercase"]},
"lowercase": {
"type": "custom",
"tokenizer": "keyword",
"filter": ["lowercase"],
},
},
}
}
});
}
#[derive(Default)]
pub struct Elasticsearch {
client: Client,
}
#[derive(Error, Debug)]
pub enum ElasticsearchError {
#[error("Transport failed to initialize: {0}")]
TransportInitError(elasticsearch::Error),
#[error("Failed to send push exports: {0}")]
PushError(elasticsearch::Error),
#[error("Push exports returned bad result: {0:?}")]
PushResponseError(response::Exception),
#[error("Failed to iitialize index: {0}")]
InitIndexError(elasticsearch::Error),
#[error("Push exports returned bad result: {0:?}")]
InitResponseError(response::Exception),
#[error("An unexpected error occured in the elastic search client: {0}")]
ClientError(elasticsearch::Error),
#[error("Failed to serialize exported data: {0}")]
SerializationError(#[from] serde_json::Error),
#[error("An index with the name \"{0}\" already exists and the (default) stategy is abort")]
IndexExistsError(String),
}
impl Elasticsearch {
pub fn new(url: &str) -> Result<Self, ElasticsearchError> {
let transport =
Transport::single_node(url).map_err(ElasticsearchError::TransportInitError)?;
let client = Client::new(transport);
Ok(Elasticsearch { client })
}
pub fn with_transport(transport: Transport) -> Self {
let client = Client::new(transport);
Elasticsearch { client }
}
pub async fn push_exports(
&self,
config: &Config<'_>,
exports: &[Export],
) -> Result<(), ElasticsearchError> {
// let exports: Result<Vec<Value>, serde_json::Error> = exports.iter().map(serde_json::to_value).collect();
// let exports = exports?;
let bodies = exports.chunks(10_000).map(|chunk| {
chunk
.iter()
.map(|e| BulkOperation::from(BulkOperation::index(e)))
});
for body in bodies {
let response = self
.client
.bulk(elasticsearch::BulkParts::Index(config.index))
.body(body.collect())
.send()
.await
.map_err(ElasticsearchError::PushError)?;
dbg!(response)
.exception()
.await
.map_err(ElasticsearchError::ClientError)?
.map(ElasticsearchError::PushResponseError)
.map_or(Ok(()), Err)?;
}
Ok(())
}
pub async fn ensure_index(&self, config: &Config<'_>) -> Result<(), ElasticsearchError> {
let exists = self.check_index(config).await?;
if exists {
match config.exists_strategy {
ExistsStrategy::Abort => {
warn!(
"Index \"{}\" already exists, strategy is: Abort push",
config.index
);
return Err(ElasticsearchError::IndexExistsError(
config.index.to_owned(),
));
}
ExistsStrategy::Ignore => {
warn!(
"Index \"{}\" already exists, strategy is: Ignore, proceed push",
config.index
);
return Ok(());
}
ExistsStrategy::Recreate => {
warn!(
"Index \"{}\" already exists, strategy is: Recreate index",
config.index
);
self.clear_index(config).await?;
}
}
}
let response = self
.client
.indices()
.create(IndicesCreateParts::Index(config.index))
.body(MAPPING.as_object())
.send()
.await
.map_err(ElasticsearchError::InitIndexError)?;
dbg!(response)
.exception()
.await
.map_err(ElasticsearchError::ClientError)?
.map(ElasticsearchError::PushResponseError)
.map_or(Ok(()), Err)?;
Ok(())
}
pub async fn check_index(&self, config: &Config<'_>) -> Result<bool, ElasticsearchError> {
let response = self
.client
.indices()
.exists(IndicesExistsParts::Index(&[config.index]))
.send()
.await
.map_err(ElasticsearchError::InitIndexError)?;
Ok(response.status_code() == 200)
}
pub async fn clear_index(&self, config: &Config<'_>) -> Result<(), ElasticsearchError> {
let response = self
.client
.indices()
.delete(IndicesDeleteParts::Index(&[config.index]))
.send()
.await
.map_err(ElasticsearchError::InitIndexError)?;
dbg!(response)
.exception()
.await
.map_err(ElasticsearchError::ClientError)?
.map(ElasticsearchError::PushResponseError)
.map_or(Ok(()), Err)
}
pub async fn write_alias(
&self,
config: &Config<'_>,
index: &str,
alias: &str,
) -> Result<(), ElasticsearchError> {
// delete old alias
info!("Try deleting old alias");
let response = self
.client
.indices()
.get_alias(IndicesGetAliasParts::Name(&[alias]))
.send()
.await
.map_err(ElasticsearchError::InitIndexError)?;
let indices = response
.json::<HashMap<String, Value>>()
.await
.map_err(ElasticsearchError::InitIndexError)?
.keys()
.cloned()
.collect::<Vec<String>>();
self.client
.indices()
.delete_alias(IndicesDeleteAliasParts::IndexName(
&indices.iter().map(AsRef::as_ref).collect::<Vec<_>>(),
&[alias],
))
.send()
.await
.map_err(ElasticsearchError::InitIndexError)?;
// put new alias
info!("Putting new alias");
let response = self
.client
.indices()
.put_alias(IndicesPutAliasParts::IndexName(&[index], alias))
.send()
.await
.map_err(ElasticsearchError::InitIndexError)?;
dbg!(response)
.exception()
.await
.map_err(ElasticsearchError::ClientError)?
.map(ElasticsearchError::PushResponseError)
.map_or(Ok(()), Err)
}
}
#[derive(Debug)]
pub struct Config<'a> {
pub index: &'a str,
pub exists_strategy: ExistsStrategy,
}
arg_enum! {
/// Different strategies to deal with eisting indices
/// Abort: cancel push, return with an error
/// Ignore: Reuse existing index, appending new data
/// Recreate: Drop the existing index and start with a new one
#[derive(Debug, Clone, Copy)]
pub enum ExistsStrategy {
Abort,
Ignore,
Recreate,
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use super::*;
use crate::{
data::{self, import::Kind},
process_flake,
};
#[tokio::test]
async fn test_delete() -> Result<(), Box<dyn std::error::Error>> {
let es = Elasticsearch::new("http://localhost:9200").unwrap();
let config = &Config {
index: "flakes_index",
exists_strategy: ExistsStrategy::Ignore,
};
es.ensure_index(config).await?;
es.clear_index(config).await?;
let exists = es.check_index(config).await?;
assert!(!exists);
Ok(())
}
#[tokio::test]
async fn test_init() -> Result<(), Box<dyn std::error::Error>> {
let es = Elasticsearch::new("http://localhost:9200").unwrap();
let config = &Config {
index: "flakes_index",
exists_strategy: ExistsStrategy::Recreate,
};
es.ensure_index(config).await?;
let exists = es.check_index(config).await?;
assert!(exists, "Index should exist");
Ok(())
}
#[tokio::test]
async fn test_push() -> Result<(), Box<dyn std::error::Error>> {
let sources: Vec<data::Source> =
data::Source::read_sources_file(Path::new("./examples/examples.in.json"))?;
let exports = sources
.iter()
.flat_map(|s| process_flake(s, &Kind::All, false, &[]))
.flatten()
.collect::<Vec<Export>>();
println!("{}", serde_json::to_string(&exports[1]).unwrap());
let es = Elasticsearch::new("http://localhost:9200").unwrap();
let config = &Config {
index: "flakes_index",
exists_strategy: ExistsStrategy::Recreate,
};
es.ensure_index(config).await?;
es.push_exports(config, &exports).await?;
Ok(())
}
#[tokio::test]
async fn test_abort_if_index_exists() -> Result<(), Box<dyn std::error::Error>> {
let es = Elasticsearch::new("http://localhost:9200").unwrap();
let config = &Config {
index: "flakes_index",
exists_strategy: ExistsStrategy::Abort,
};
es.ensure_index(&Config {
exists_strategy: ExistsStrategy::Ignore,
..*config
})
.await?;
assert!(matches!(
es.ensure_index(config).await,
Err(ElasticsearchError::IndexExistsError(_)),
));
es.clear_index(config).await?;
Ok(())
}
}