feat: add repo importing

This commit is contained in:
2026-01-17 19:32:37 -08:00
parent 831259a6a6
commit fb62081ca8
28 changed files with 1823 additions and 116 deletions

View File

@@ -1,21 +0,0 @@
use crate::error::Result;
use sqlx::{PgPool, query_scalar};
pub struct AccountRepository<'a> {
pub pool: &'a PgPool,
}
impl<'a> AccountRepository<'a> {
pub fn new(pool: &'a PgPool) -> Self {
Self { pool }
}
pub async fn get_access_token(&self, user_id: &str) -> Result<String> {
query_scalar("SELECT access_token FROM account WHERE user_id = $1")
.bind(user_id)
.fetch_one(self.pool)
.await
.map_err(|_| crate::error::Error::AccessToken)
}
}

View File

@@ -0,0 +1,19 @@
use crate::{auth::User, error::Result, user::RepositorySchema};
use actix_web::{HttpResponse, web};
use serde::Serialize;
use crate::AppState;
#[derive(Serialize)]
struct AddResponse {
id: String,
}
pub async fn add_repo(
app_state: web::Data<AppState>,
user: web::ReqData<User>,
payload: web::Json<RepositorySchema>,
) -> Result<HttpResponse> {
let repo = payload.into_inner();
app_state.user.add_repository(&user.id, repo).await
}

View File

@@ -1,8 +1,10 @@
use actix_web::{HttpRequest, HttpResponse, web};
use std::collections::HashSet;
use actix_web::{HttpResponse, web};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::{AppState, account::AccountRepository, auth::User, error::Result};
use crate::{AppState, auth::User, error::Result};
#[derive(Debug, Serialize, Deserialize)]
pub struct Repository {
@@ -15,6 +17,9 @@ pub struct Repository {
pub stars: Option<usize>,
pub updated_at: DateTime<Utc>,
pub private: bool,
pub default_branch: String,
#[serde(default)]
pub added: bool,
}
pub async fn get_repos(
@@ -22,8 +27,7 @@ pub async fn get_repos(
req: web::ReqData<User>,
) -> Result<HttpResponse> {
let user = req.into_inner();
let query = AccountRepository::new(&app_state.pool);
let token = query.get_access_token(&user.id).await?;
let token = app_state.user.get_access_token(&user.id).await?;
let response = app_state
.reqwest_client
@@ -32,11 +36,24 @@ pub async fn get_repos(
.send()
.await?;
response.error_for_status_ref()?;
let added_ids = app_state
.user
.get_repositories(&user.id)
.await?
.into_iter()
.map(|r| r.id.clone())
.collect::<HashSet<String>>();
tracing::debug!(added_response = ?added_ids);
let data = response
.json::<Vec<Repository>>()
.await?
.into_iter()
.filter(|r| r.private == false)
.filter_map(|mut r| {
(!r.private).then(|| {
r.added = added_ids.contains(&r.id.to_string());
r
})
})
.collect::<Vec<Repository>>();
Ok(HttpResponse::Ok().json(data))

View File

@@ -1,2 +1,3 @@
pub mod add_repo;
pub mod get_repos;
pub mod search_repos;

View File

@@ -1,6 +1,4 @@
use crate::{
account::AccountRepository, auth::User, endpoints::get_repos::Repository, error::Result,
};
use crate::{auth::User, endpoints::get_repos::Repository, error::Result};
use actix_web::{
HttpRequest, HttpResponse,
web::{self, ReqData},
@@ -25,9 +23,7 @@ pub async fn search_repos(
req: ReqData<User>,
) -> Result<HttpResponse> {
let user = req.into_inner();
let token = AccountRepository::new(&app_state.pool)
.get_access_token(&user.id)
.await?;
let token = app_state.user.get_access_token(&user.id).await?;
let search_query = format!("user:{} {} fork:true", user.name, query.q);
let response = app_state

View File

@@ -1,4 +1,5 @@
use actix_web::{HttpResponse, ResponseError, http::StatusCode};
use aws_sdk_dynamodb::error::SdkError;
use serde::Serialize;
use thiserror::Error;
@@ -18,6 +19,16 @@ pub enum Error {
Jwx(#[from] jsonwebtoken::errors::Error),
#[error("token expired")]
TokenExpired,
#[error("dynamodb error: {0}")]
DynamoDB(String),
#[error("item already exists")]
AlreadyExists,
}
impl<E: std::fmt::Debug> From<SdkError<E>> for Error {
fn from(err: SdkError<E>) -> Self {
Error::DynamoDB(format!("{:?}", err))
}
}
#[derive(Serialize)]
@@ -33,6 +44,9 @@ impl ResponseError for Error {
Error::TokenExpired => HttpResponse::Unauthorized().json(ErrorResponse {
error: "token expired".to_string(),
}),
Error::AlreadyExists => HttpResponse::BadRequest().json(ErrorResponse {
error: "item already exists".to_string(),
}),
_ => HttpResponse::InternalServerError().finish(),
}
}

View File

@@ -1,8 +1,8 @@
mod account;
mod auth;
mod endpoints;
mod error;
mod middleware;
mod user;
use std::env;
@@ -10,20 +10,24 @@ use actix_web::{
App, HttpServer,
middleware::from_fn,
rt::System,
web::{self, route},
web::{self},
};
use aws_config::BehaviorVersion;
use sqlx::PgPool;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
EnvFilter, fmt::format::FmtSpan, layer::SubscriberExt, util::SubscriberInitExt,
};
use crate::auth::{Auth, JWT};
use crate::{
auth::{Auth, JWT},
user::UserRepository,
};
struct AppState {
reqwest_client: reqwest::Client,
pool: PgPool,
auth: Auth,
user: UserRepository,
}
async fn run() -> std::io::Result<()> {
@@ -35,14 +39,20 @@ async fn run() -> std::io::Result<()> {
))
.build()
.expect("failed to create reqwest client");
let config = aws_config::load_defaults(BehaviorVersion::v2026_01_12()).await;
let dynamodb_client = aws_sdk_dynamodb::Client::new(&config);
let app_data = web::Data::new(AppState {
reqwest_client: reqwest_client.clone(),
pool: PgPool::connect(
&env::var("DATABASE_URL").expect("DATABASE_URL environment variable must be set"),
)
.await
.expect("error connecting to db"),
auth: Auth::JWT(JWT::new(reqwest_client.clone())),
user: UserRepository::new(
PgPool::connect(
&env::var("DATABASE_URL").expect("DATABASE_URL environment variable must be set"),
)
.await
.expect("error connecting to db"),
dynamodb_client,
),
});
HttpServer::new(move || {
@@ -70,6 +80,8 @@ async fn run() -> std::io::Result<()> {
"/repos/search",
web::get().to(endpoints::search_repos::search_repos),
)
.wrap(from_fn(middleware::protected))
.route("/repo/add", web::post().to(endpoints::add_repo::add_repo))
.wrap(from_fn(middleware::protected)),
),
),
@@ -90,7 +102,8 @@ fn main() -> std::io::Result<()> {
.add_directive("reqwest=info".parse().unwrap())
.add_directive("hyper=info".parse().unwrap())
.add_directive("h2=info".parse().unwrap())
.add_directive("rustls=info".parse().unwrap());
.add_directive("rustls=info".parse().unwrap())
.add_directive("aws=info".parse().unwrap());
tracing_subscriber::registry()
.with(tracing_env_filter)

104
api/src/user.rs Normal file
View File

@@ -0,0 +1,104 @@
use std::env;
use crate::error::Result;
use actix_web::HttpResponse;
use aws_sdk_dynamodb::types::AttributeValue;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, query_scalar};
pub struct UserRepository {
pool: PgPool,
dynamodb_client: aws_sdk_dynamodb::Client,
table_name: String,
}
#[derive(Serialize, Deserialize)]
pub struct RepositorySchema {
pub id: String,
pub name: String,
}
impl UserRepository {
pub fn new(pool: PgPool, dynamodb_client: aws_sdk_dynamodb::Client) -> Self {
Self {
pool,
dynamodb_client,
table_name: env::var("REPOS_TABLE_NAME")
.expect("environment variable REPOS_TABLE_NAME must be set"),
}
}
pub async fn get_access_token(&self, user_id: &str) -> Result<String> {
query_scalar("SELECT access_token FROM account WHERE user_id = $1")
.bind(user_id)
.fetch_one(&self.pool)
.await
.map_err(|_| crate::error::Error::AccessToken)
}
pub async fn get_repositories(&self, user_id: &str) -> Result<Vec<RepositorySchema>> {
let response = self
.dynamodb_client
.query()
.table_name(&self.table_name)
.key_condition_expression("pk = :pk AND begins_with(sk, :sk)")
.expression_attribute_values(":pk", AttributeValue::S(format!("USER#{user_id}")))
.expression_attribute_values(":sk", AttributeValue::S("REPO#".into()))
.send()
.await?
.items()
.iter()
.filter_map(|item| {
Some(RepositorySchema {
id: item
.get("sk")?
.as_s()
.ok()?
.strip_prefix("REPO#")?
.to_string(),
name: item.get("full_name")?.as_s().ok()?.to_string(),
})
})
.collect::<Vec<RepositorySchema>>();
Ok(response)
}
pub async fn add_repository(
&self,
user_id: &str,
repo: RepositorySchema,
) -> Result<HttpResponse> {
let now = Utc::now().to_rfc3339();
let response = self
.dynamodb_client
.put_item()
.table_name(&self.table_name)
.condition_expression("attribute_not_exists(sk)")
.item("pk", AttributeValue::S(format!("USER#{user_id}")))
.item("sk", AttributeValue::S(format!("REPO#{}", repo.id)))
.item("full_name", AttributeValue::S(repo.name.to_string()))
.item("gsi1pk", AttributeValue::S("REPOS".into()))
.item("gsi1sk", AttributeValue::S(now.clone()))
.item("imported_at", AttributeValue::S(now))
.send()
.await;
match response {
Ok(_) => Ok(HttpResponse::Ok().json(repo)),
Err(err) => {
if err
.as_service_error()
.map(|e| e.is_conditional_check_failed_exception())
.unwrap_or(false)
{
Err(crate::error::Error::AlreadyExists)
} else {
Err(err.into())
}
}
}
}
}