use std::env; use crate::error::Result; use actix_web::HttpResponse; use aws_sdk_dynamodb::types::AttributeValue; use chrono::Utc; use serde::{Deserialize, Serialize}; use serde_dynamo::from_item; use sqlx::{PgPool, query_scalar}; pub struct UserRepository { pool: PgPool, dynamodb_client: aws_sdk_dynamodb::Client, table_name: String, } #[derive(Deserialize)] struct RepositoryDB { sk: String, full_name: String, owner_id: String, description: String, #[serde(default)] approved: bool, } impl RepositoryDB { fn into_repository(self) -> Option { Some(RepositoryDefinition { id: self.sk.strip_prefix("REPO#")?.to_string(), full_name: self.full_name, owner_id: self.owner_id, description: self.description, }) } fn into_global_response(self) -> Option { if !self.approved { return None; } Some(GlobalRepositoriesResponse { id: self.sk.strip_prefix("REPO#")?.to_string(), full_name: self.full_name, description: self.description, }) } } #[derive(Serialize, Deserialize)] pub struct RepositoryDefinition { pub id: String, pub full_name: String, #[serde(skip)] pub owner_id: String, pub description: String, } #[derive(Serialize)] pub struct GlobalRepositoriesResponse { pub id: String, pub full_name: String, pub description: String, } impl UserRepository { pub fn new(pool: PgPool, dynamodb_client: aws_sdk_dynamodb::Client) -> Self { Self { pool, dynamodb_client, table_name: env::var("REPOS_TABLE_NAME") .expect("environment variable REPOS_TABLE_NAME must be set"), } } pub async fn get_access_token(&self, user_id: &str) -> Result { query_scalar("SELECT access_token FROM account WHERE user_id = $1") .bind(user_id) .fetch_one(&self.pool) .await .map_err(|_| crate::error::Error::AccessToken) } pub async fn get_repositories_user(&self, user_id: &str) -> Result> { let response = self .dynamodb_client .query() .table_name(&self.table_name) .key_condition_expression("pk = :pk AND begins_with(sk, :sk)") .expression_attribute_values(":pk", AttributeValue::S(format!("USER#{user_id}"))) .expression_attribute_values(":sk", AttributeValue::S("REPO#".into())) .send() .await?; let repos = response .items() .iter() .filter_map(|item| { let dynamo_repo: RepositoryDB = from_item(item.clone()).ok()?; dynamo_repo.into_repository() }) .collect(); Ok(repos) } pub async fn global_repositories(&self) -> Result { let response = self .dynamodb_client .query() .table_name(&self.table_name) .index_name("gsi1") .key_condition_expression("gsi1pk = :pk") .expression_attribute_values(":pk", AttributeValue::S("REPOS".into())) .send() .await?; let repos: Vec = response .items() .iter() .filter_map(|item| { let dynamo_repo: RepositoryDB = from_item(item.clone()).ok()?; dynamo_repo.into_global_response() }) .collect(); Ok(HttpResponse::Ok().json(repos)) } pub async fn get_approved_repository( &self, repo_id: &str, ) -> Result> { let response = self .dynamodb_client .query() .table_name(&self.table_name) .index_name("gsi1") .key_condition_expression("gsi1pk = :pk") .filter_expression("sk = :sk AND approved = :approved") .expression_attribute_values(":pk", AttributeValue::S("REPOS".into())) .expression_attribute_values(":sk", AttributeValue::S(format!("REPO#{repo_id}"))) .expression_attribute_values(":approved", AttributeValue::Bool(true)) .send() .await?; let repo = response.items().first().and_then(|item| { let dynamo_repo: RepositoryDB = from_item(item.clone()).ok()?; dynamo_repo.into_repository() }); Ok(repo) } pub async fn add_repository( &self, user_id: &str, repo: RepositoryDefinition, ) -> Result { let now = Utc::now().to_rfc3339(); let response = self .dynamodb_client .put_item() .table_name(&self.table_name) .condition_expression("attribute_not_exists(sk)") .item("pk", AttributeValue::S(format!("USER#{user_id}"))) .item("sk", AttributeValue::S(format!("REPO#{}", repo.id))) .item("full_name", AttributeValue::S(repo.full_name.to_string())) .item("gsi1pk", AttributeValue::S("REPOS".into())) .item("gsi1sk", AttributeValue::S(now.clone())) .item("imported_at", AttributeValue::S(now)) .item("approved", AttributeValue::Bool(false)) .item("owner_id", AttributeValue::S(user_id.into())) .item("description", AttributeValue::S(repo.description.clone())) .send() .await; match response { Ok(_) => Ok(HttpResponse::Ok().json(repo)), Err(err) => { if err .as_service_error() .map(|e| e.is_conditional_check_failed_exception()) .unwrap_or(false) { Err(crate::error::Error::AlreadyExists) } else { Err(err.into()) } } } } }