use std::env; use crate::error::Result; use actix_web::HttpResponse; use aws_sdk_dynamodb::types::AttributeValue; use chrono::Utc; use serde::{Deserialize, Serialize}; use sqlx::{PgPool, query_scalar}; pub struct UserRepository { pool: PgPool, dynamodb_client: aws_sdk_dynamodb::Client, table_name: String, } #[derive(Serialize, Deserialize)] pub struct RepositorySchema { pub id: String, pub full_name: String, } impl UserRepository { pub fn new(pool: PgPool, dynamodb_client: aws_sdk_dynamodb::Client) -> Self { Self { pool, dynamodb_client, table_name: env::var("REPOS_TABLE_NAME") .expect("environment variable REPOS_TABLE_NAME must be set"), } } pub async fn get_access_token(&self, user_id: &str) -> Result { query_scalar("SELECT access_token FROM account WHERE user_id = $1") .bind(user_id) .fetch_one(&self.pool) .await .map_err(|_| crate::error::Error::AccessToken) } pub async fn get_repositories(&self, user_id: &str) -> Result> { let response = self .dynamodb_client .query() .table_name(&self.table_name) .key_condition_expression("pk = :pk AND begins_with(sk, :sk)") .expression_attribute_values(":pk", AttributeValue::S(format!("USER#{user_id}"))) .expression_attribute_values(":sk", AttributeValue::S("REPO#".into())) .send() .await? .items() .iter() .filter_map(|item| { Some(RepositorySchema { id: item .get("sk")? .as_s() .ok()? .strip_prefix("REPO#")? .to_string(), full_name: item.get("full_name")?.as_s().ok()?.to_string(), }) }) .collect::>(); Ok(response) } pub async fn global_repositories(&self) -> Result { let response = self .dynamodb_client .query() .key_condition_expression("pk = :pk") .expression_attribute_values(":pk", AttributeValue::S("REPOS".into())) .send() .await? .items() .iter() .filter_map(|item| { if (*item.get("approved")?.as_bool().ok()?) == false { return None; }; Some(item.get("full_name")?.as_s().ok()?.to_string()) }) .collect::>(); Ok(HttpResponse::Ok().json(response)) } pub async fn add_repository( &self, user_id: &str, repo: RepositorySchema, ) -> Result { let now = Utc::now().to_rfc3339(); let response = self .dynamodb_client .put_item() .table_name(&self.table_name) .condition_expression("attribute_not_exists(sk)") .item("pk", AttributeValue::S(format!("USER#{user_id}"))) .item("sk", AttributeValue::S(format!("REPO#{}", repo.id))) .item("full_name", AttributeValue::S(repo.full_name.to_string())) .item("gsi1pk", AttributeValue::S("REPOS".into())) .item("gsi1sk", AttributeValue::S(now.clone())) .item("imported_at", AttributeValue::S(now)) .item("approved", AttributeValue::Bool(false)) .send() .await; match response { Ok(_) => Ok(HttpResponse::Ok().json(repo)), Err(err) => { if err .as_service_error() .map(|e| e.is_conditional_check_failed_exception()) .unwrap_or(false) { Err(crate::error::Error::AlreadyExists) } else { Err(err.into()) } } } } }