From 0d3488a3811c8d58bd570af64cc29840df9ba439 Mon Sep 17 00:00:00 2001 From: diogo464 Date: Sun, 10 Aug 2025 11:48:22 +0100 Subject: reorganized repo --- src/main.rs | 1154 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1154 insertions(+) create mode 100644 src/main.rs (limited to 'src') diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3a8e5f9 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,1154 @@ +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, + fs::File, + io::{BufWriter, Read as _, Write}, + os::unix::fs::MetadataExt, + path::{Path, PathBuf}, + str::FromStr, + sync::{Arc, Mutex}, + time::SystemTime, +}; + +use clap::{Args, Parser, Subcommand}; +use sha2::Digest; +use slotmap::SlotMap; + +const BLOB_STORE_HIERARCHY_DEPTH: usize = 4; + +#[derive(Debug)] +pub struct InvalidBlobId; + +impl Display for InvalidBlobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "invalid blob id") + } +} + +impl std::error::Error for InvalidBlobId {} + +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash)] +pub struct BlobId([u8; 32]); + +impl BlobId { + pub fn hash(&self, data: &[u8]) -> BlobId { + let mut hasher = sha2::Sha256::default(); + hasher.write_all(data).unwrap(); + Self(hasher.finalize().try_into().unwrap()) + } + + pub fn as_bytes(&self) -> &[u8; 32] { + &self.0 + } +} + +impl Display for BlobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + hex::display(&self.0).fmt(f) + } +} + +impl FromStr for BlobId { + type Err = InvalidBlobId; + + fn from_str(s: &str) -> Result { + let decoded = hex::decode(s).map_err(|_| InvalidBlobId)?; + Ok(Self(decoded.try_into().map_err(|_| InvalidBlobId)?)) + } +} + +#[derive(Debug, Clone)] +pub struct BlobStore(PathBuf); + +impl BlobStore { + pub fn new(path: impl Into) -> Self { + Self(path.into()) + } +} + +pub fn blob_path(store: &BlobStore, blob_id: &BlobId) -> PathBuf { + let encoded = hex::encode(blob_id.as_bytes()); + let mut path = store.0.clone(); + for depth in 0..BLOB_STORE_HIERARCHY_DEPTH { + let base_idx = depth * 2; + path.push(&encoded[base_idx..base_idx + 2]); + } + path.push(encoded); + path +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum ImportMode { + Move, + Copy, + HardLink, +} + +pub fn blob_import_file( + store: &BlobStore, + mode: ImportMode, + blob_id: &BlobId, + file_path: &Path, +) -> std::io::Result<()> { + let blob_path = blob_path(store, blob_id); + if blob_path.exists() { + return Ok(()); + } + + if let Some(parent) = blob_path.parent() { + std::fs::create_dir_all(parent)?; + } + match mode { + ImportMode::Move => { + std::fs::rename(file_path, blob_path)?; + } + ImportMode::Copy => { + todo!() + } + ImportMode::HardLink => match std::fs::hard_link(file_path, blob_path) { + Ok(()) => {} + Err(err) => { + if err.kind() != std::io::ErrorKind::AlreadyExists { + panic!("{err}"); + } + } + }, + } + Ok(()) +} + +pub fn blob_hash_file(file_path: &Path) -> std::io::Result { + let mut file = std::fs::File::open(file_path)?; + let mut buf = vec![0u8; 1 * 1024 * 1024]; + let mut hasher = sha2::Sha256::default(); + loop { + let n = file.read(&mut buf)?; + if n == 0 { + break; + } + hasher.write_all(&buf[..n])?; + } + Ok(BlobId(hasher.finalize().try_into().unwrap())) +} + +pub fn blob_size(store: &BlobStore, blob_id: &BlobId) -> std::io::Result { + let blob_path = blob_path(store, blob_id); + Ok(blob_path.metadata()?.size()) +} + +const OPERATION_KIND_CREATE_FILE: &'static str = "create_file"; +const OPERATION_KIND_CREATE_DIR: &'static str = "create_dir"; +const OPERATION_KIND_REMOVE: &'static str = "remove"; +const OPERATION_KIND_RENAME: &'static str = "rename"; + +#[derive(Debug)] +pub struct InvalidOperation; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Operation { + pub header: OperationHeader, + pub data: OperationData, +} + +impl FromStr for Operation { + type Err = InvalidOperation; + + fn from_str(s: &str) -> Result { + let err = || InvalidOperation; + let mut iter = s.split('\t'); + + let timestamp_str = iter.next().ok_or_else(err)?; + let revision_str = iter.next().ok_or_else(err)?; + let email_str = iter.next().ok_or_else(err)?; + let kind_str = iter.next().ok_or_else(err)?; + + let timestamp = timestamp_str.parse().map_err(|_| err())?; + let revision = revision_str.parse().map_err(|_| err())?; + let email = email_str.to_string(); + let data = match kind_str { + OPERATION_KIND_CREATE_FILE => { + let path_str = iter.next().ok_or_else(err)?; + let blob_str = iter.next().ok_or_else(err)?; + let size_str = iter.next().ok_or_else(err)?; + + let path = path_str.parse().map_err(|_| err())?; + let blob = blob_str.parse().map_err(|_| err())?; + let size = size_str.parse().map_err(|_| err())?; + + OperationData::CreateFile(OperationCreateFile { path, blob, size }) + } + OPERATION_KIND_CREATE_DIR => { + let path_str = iter.next().ok_or_else(err)?; + + let path = path_str.parse().map_err(|_| err())?; + + OperationData::CreateDir(OperationCreateDir { path }) + } + OPERATION_KIND_RENAME => { + let old_str = iter.next().ok_or_else(err)?; + let new_str = iter.next().ok_or_else(err)?; + + let old = old_str.parse().map_err(|_| err())?; + let new = new_str.parse().map_err(|_| err())?; + + OperationData::Rename(OperationRename { old, new }) + } + OPERATION_KIND_REMOVE => { + let path_str = iter.next().ok_or_else(err)?; + + let path = path_str.parse().map_err(|_| err())?; + + OperationData::Remove(OperationRemove { path }) + } + _ => return Err(err()), + }; + + Ok(Self { + header: OperationHeader { + timestamp, + revision, + email, + }, + data, + }) + } +} + +impl Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}\t{}\t{}\t", + self.header.timestamp, self.header.revision, self.header.email + )?; + match &self.data { + OperationData::CreateFile(data) => write!( + f, + "{}\t{}\t{}\t{}", + OPERATION_KIND_CREATE_FILE, data.path, data.blob, data.size + ), + OperationData::CreateDir(data) => { + write!(f, "{}\t{}", OPERATION_KIND_CREATE_DIR, data.path) + } + OperationData::Remove(data) => write!(f, "{}\t{}", OPERATION_KIND_REMOVE, data.path), + OperationData::Rename(data) => { + write!(f, "{}\t{}\t{}", OPERATION_KIND_RENAME, data.old, data.new) + } + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct OperationHeader { + pub timestamp: u64, + pub revision: u64, + pub email: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum OperationData { + CreateFile(OperationCreateFile), + CreateDir(OperationCreateDir), + Remove(OperationRemove), + Rename(OperationRename), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct OperationCreateFile { + pub path: DrivePath, + pub blob: BlobId, + pub size: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct OperationCreateDir { + pub path: DrivePath, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct OperationRemove { + pub path: DrivePath, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct OperationRename { + pub old: DrivePath, + pub new: DrivePath, +} + +#[derive(Debug)] +pub struct InvalidDrivePath(String); + +impl std::fmt::Display for InvalidDrivePath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "invalid path: {}", self.0) + } +} + +impl std::error::Error for InvalidDrivePath {} + +#[derive(Debug, Default, Clone, PartialEq, Eq, Hash)] +pub struct DrivePath(Vec); + +impl Display for DrivePath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.is_root() { + f.write_str("/")?; + } else { + for comp in self.components() { + f.write_str("/")?; + comp.fmt(f)?; + } + } + Ok(()) + } +} + +impl DrivePath { + pub fn is_root(&self) -> bool { + self.0.is_empty() + } + + pub fn push(&self, component: DrivePathComponent) -> DrivePath { + let mut components = self.0.clone(); + components.push(component); + Self(components) + } + + pub fn components(&self) -> &[DrivePathComponent] { + self.0.as_slice() + } + + pub fn parent(&self) -> DrivePath { + if self.0.is_empty() { + Self(Default::default()) + } else { + let slice = self.0.as_slice(); + Self(slice[..slice.len() - 1].to_owned()) + } + } + + pub fn last(&self) -> Option { + self.0.last().cloned() + } + + pub fn split(&self) -> Option<(DrivePath, DrivePathComponent)> { + if self.0.is_empty() { + None + } else { + Some(( + Self(self.0[..self.0.len() - 1].to_owned()), + self.0[self.0.len() - 1].clone(), + )) + } + } +} + +impl FromStr for DrivePath { + type Err = InvalidDrivePath; + + fn from_str(s: &str) -> Result { + let mut components = Vec::default(); + for unchecked_component in s.trim().trim_matches('/').split('/') { + match unchecked_component.parse() { + Ok(component) => components.push(component), + Err(err) => { + return Err(InvalidDrivePath(format!( + "path contained invalid component '{unchecked_component}': {err}" + ))); + } + }; + } + Ok(Self(components)) + } +} + +#[derive(Debug)] +pub struct InvalidDrivePathComponent(&'static str); + +impl Display for InvalidDrivePathComponent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl InvalidDrivePathComponent { + pub const fn new(msg: &'static str) -> Self { + Self(msg) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DrivePathComponent(String); + +impl Display for DrivePathComponent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl DrivePathComponent { + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl FromStr for DrivePathComponent { + type Err = InvalidDrivePathComponent; + + fn from_str(s: &str) -> Result { + if s.is_empty() { + return Err(InvalidDrivePathComponent::new( + "path component cannot be empty", + )); + } + + if s.contains('\t') { + return Err(InvalidDrivePathComponent::new( + "path component cannot contain tabs", + )); + } + + if s == "." || s == ".." { + return Err(InvalidDrivePathComponent::new( + "path component cannot be '.' or '..'", + )); + } + + if s.contains('/') { + return Err(InvalidDrivePathComponent::new( + "path component cannot contain '/'", + )); + } + + if s.len() > 256 { + return Err(InvalidDrivePathComponent::new("path component is too long")); + } + + Ok(Self(s.to_string())) + } +} + +#[derive(Debug)] +pub struct FsError(String); + +impl From for FsError { + fn from(value: String) -> Self { + Self(value) + } +} + +impl From<&str> for FsError { + fn from(value: &str) -> Self { + Self(value.to_string()) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum FsNodeKind { + File, + Directory, +} + +pub struct FsNode { + pub kind: FsNodeKind, + pub author: String, + pub lastmod: u64, + pub children: HashMap, + pub blob: BlobId, + pub size: u64, +} + +slotmap::new_key_type! { pub struct FsNodeId; } + +pub struct Fs { + pub root: FsNodeId, + pub nodes: SlotMap, +} + +impl Default for Fs { + fn default() -> Self { + let mut nodes: SlotMap = Default::default(); + let root = nodes.insert(FsNode { + kind: FsNodeKind::Directory, + author: "system".to_string(), + lastmod: 0, + children: Default::default(), + blob: Default::default(), + size: 0, + }); + Self { root, nodes } + } +} + +pub fn apply(fs: &mut Fs, op: &Operation) -> Result<(), FsError> { + match &op.data { + OperationData::CreateFile(data) => apply_create_file(fs, &op.header, &data), + OperationData::CreateDir(data) => apply_create_dir(fs, &op.header, &data), + OperationData::Remove(data) => apply_remove(fs, &op.header, &data), + OperationData::Rename(data) => apply_rename(fs, &op.header, &data), + } +} + +pub fn apply_create_file( + fs: &mut Fs, + header: &OperationHeader, + data: &OperationCreateFile, +) -> Result<(), FsError> { + if data.path.is_root() { + return Err(FsError::from("cannot create file at root")); + } + + let mut parent_id = fs.root; + for comp in data.path.parent().components() { + let node = &mut fs.nodes[parent_id]; + if node.kind != FsNodeKind::Directory { + return Err(FsError::from("cannot create file under another file")); + } + parent_id = match node.children.get(comp) { + Some(id) => *id, + None => { + let id = fs.nodes.insert(FsNode { + kind: FsNodeKind::Directory, + author: header.email.clone(), + lastmod: header.timestamp, + children: Default::default(), + blob: Default::default(), + size: 0, + }); + fs.nodes[parent_id].children.insert(comp.clone(), id); + id + } + }; + } + + let name = data.path.last().unwrap(); + let parent = &mut fs.nodes[parent_id]; + + if parent.kind != FsNodeKind::Directory { + return Err(FsError::from("cannot create file under another file")); + } + + match parent.children.get(&name).copied() { + Some(node_id) => { + let node = &mut fs.nodes[node_id]; + if node.kind == FsNodeKind::Directory { + return Err(FsError::from( + "node at path already exists but is a directory", + )); + } + node.author = header.email.clone(); + node.lastmod = header.timestamp; + node.blob = data.blob.clone(); + } + None => { + let id = fs.nodes.insert(FsNode { + kind: FsNodeKind::File, + author: header.email.clone(), + lastmod: header.timestamp, + children: Default::default(), + blob: data.blob.clone(), + size: data.size, + }); + fs.nodes[parent_id].children.insert(name, id); + } + } + Ok(()) +} + +pub fn apply_create_dir( + fs: &mut Fs, + header: &OperationHeader, + data: &OperationCreateDir, +) -> Result<(), FsError> { + let (parent, name) = data + .path + .split() + .ok_or_else(|| FsError::from("cannot recreate root directory"))?; + + let mut parent_id = fs.root; + for comp in parent.components() { + let node = &fs.nodes[parent_id]; + parent_id = match node.children.get(comp) { + Some(&child_id) => { + let child = &fs.nodes[child_id]; + if child.kind == FsNodeKind::File { + return Err(FsError::from("cannot create directory under file")); + } + child_id + } + None => { + let id = fs.nodes.insert(FsNode { + kind: FsNodeKind::Directory, + author: header.email.clone(), + lastmod: header.timestamp, + children: Default::default(), + blob: Default::default(), + size: 0, + }); + fs.nodes[parent_id].children.insert(comp.clone(), id); + id + } + }; + } + + let parent = &fs.nodes[parent_id]; + match parent.children.get(&name).copied() { + Some(child_id) => { + let child = &fs.nodes[child_id]; + if child.kind != FsNodeKind::Directory { + return Err(FsError::from( + "cannot create directory, the given path is already a file", + )); + } + } + None => { + let id = fs.nodes.insert(FsNode { + kind: FsNodeKind::Directory, + author: header.email.clone(), + lastmod: header.timestamp, + children: Default::default(), + blob: Default::default(), + size: 0, + }); + + let parent = &mut fs.nodes[parent_id]; + parent.children.insert(name, id); + } + } + + Ok(()) +} + +pub fn apply_remove( + fs: &mut Fs, + _header: &OperationHeader, + data: &OperationRemove, +) -> Result<(), FsError> { + let (parent, name) = data + .path + .split() + .ok_or_else(|| FsError::from("cannot remove root directory"))?; + + let parent_id = match find_node(fs, &parent) { + Some(id) => id, + None => return Ok(()), + }; + + let parent = &mut fs.nodes[parent_id]; + parent.children.remove(&name); + + Ok(()) +} + +fn find_node(fs: &Fs, path: &DrivePath) -> Option { + let mut node_id = fs.root; + for comp in path.components() { + let node = &fs.nodes[node_id]; + node_id = *node.children.get(comp)?; + } + Some(node_id) +} + +pub fn apply_rename( + fs: &mut Fs, + _header: &OperationHeader, + data: &OperationRename, +) -> Result<(), FsError> { + let (old_parent, old_name) = data + .old + .split() + .ok_or_else(|| FsError::from("cannot move root directory"))?; + + let (new_parent, new_name) = data + .new + .split() + .ok_or_else(|| FsError::from("move destination cannot be root directory"))?; + + let old_parent_id = find_node(fs, &old_parent).unwrap(); + let old_parent = &mut fs.nodes[old_parent_id]; + let node_id = old_parent.children.remove(&old_name).unwrap(); + + let new_parent_id = find_node(fs, &new_parent).unwrap(); + let new_parent = &mut fs.nodes[new_parent_id]; + new_parent.children.insert(new_name, node_id); + + Ok(()) +} + +#[derive(Debug, Parser)] +struct Cli { + #[clap(subcommand)] + cmd: Cmd, +} + +#[derive(Debug, Parser)] +struct CliCommon { + #[clap(long, default_value = "./", env = "FCTDRIVE_PATH")] + drive: PathBuf, +} + +#[derive(Debug, Subcommand)] +enum Cmd { + CreateFile(CreateFileArgs), + CreateDir(CreateDirArgs), + Remove(RemoveArgs), + Rename(RenameArgs), + Ls(LsArgs), + Import(ImportArgs), + Blob(BlobArgs), +} + +#[derive(Debug, Args)] +struct CreateFileArgs { + #[clap(flatten)] + common: CliCommon, + + #[clap(long)] + timestamp: Option, + + #[clap(long)] + email: String, + + #[clap(long)] + path: DrivePath, + + #[clap(long)] + file: std::path::PathBuf, +} + +#[derive(Debug, Args)] +struct CreateDirArgs { + #[clap(flatten)] + common: CliCommon, + + #[clap(long)] + timestamp: Option, + + #[clap(long)] + email: String, + + #[clap(long)] + path: DrivePath, +} + +#[derive(Debug, Args)] +struct RemoveArgs { + #[clap(flatten)] + common: CliCommon, + + #[clap(long)] + timestamp: Option, + + #[clap(long)] + email: String, + + #[clap(long)] + path: DrivePath, +} + +#[derive(Debug, Args)] +struct RenameArgs { + #[clap(flatten)] + common: CliCommon, + + #[clap(long)] + timestamp: Option, + + #[clap(long)] + email: String, + + #[clap(long)] + old: DrivePath, + + #[clap(long)] + new: DrivePath, +} + +#[derive(Debug, Args)] +struct LsArgs { + #[clap(flatten)] + common: CliCommon, + + #[clap(short, long)] + recursive: bool, + + path: Option, +} + +#[derive(Debug, Args)] +struct ImportArgs { + #[clap(flatten)] + common: CliCommon, + + #[clap(long)] + timestamp: Option, + + #[clap(long)] + email: String, + + path: PathBuf, +} + +#[derive(Debug, Args)] +struct BlobArgs { + #[clap(flatten)] + common: CliCommon, + + blob_id: BlobId, +} + +fn main() { + let cli = Cli::parse(); + + match cli.cmd { + Cmd::CreateFile(args) => cmd_create_file(args), + Cmd::CreateDir(args) => cmd_create_dir(args), + Cmd::Remove(args) => cmd_remove(args), + Cmd::Rename(args) => cmd_rename(args), + Cmd::Ls(args) => cmd_ls(args), + Cmd::Import(args) => cmd_import(args), + Cmd::Blob(args) => cmd_blob(args), + } +} + +fn cmd_create_file(args: CreateFileArgs) { + let store = common_create_blob_store(&args.common); + let file_blob_id = blob_hash_file(&args.file).unwrap(); + let file_blob_size = blob_size(&store, &file_blob_id).unwrap(); + + let _lock = common_write_lock(&args.common); + let mut fs = Fs::default(); + let mut ops = common_read_log_file(&args.common); + ops.iter().for_each(|op| apply(&mut fs, op).unwrap()); + + let timestamp = args.timestamp.unwrap_or_else(get_timestamp); + let revision = get_next_revision(&ops); + blob_import_file(&store, ImportMode::HardLink, &file_blob_id, &args.file).unwrap(); + + let new_op = Operation { + header: OperationHeader { + timestamp, + revision, + email: args.email, + }, + data: OperationData::CreateFile(OperationCreateFile { + path: args.path, + blob: file_blob_id, + size: file_blob_size, + }), + }; + apply(&mut fs, &new_op).unwrap(); + ops.push(new_op); + common_write_log_file(&args.common, &ops); +} + +fn cmd_create_dir(args: CreateDirArgs) { + let _lock = common_write_lock(&args.common); + let mut fs = Fs::default(); + let mut ops = common_read_log_file(&args.common); + ops.iter().for_each(|op| apply(&mut fs, op).unwrap()); + + let timestamp = args.timestamp.unwrap_or_else(get_timestamp); + let revision = get_next_revision(&ops); + + let new_op = Operation { + header: OperationHeader { + timestamp, + revision, + email: args.email, + }, + data: OperationData::CreateDir(OperationCreateDir { path: args.path }), + }; + apply(&mut fs, &new_op).unwrap(); + ops.push(new_op); + common_write_log_file(&args.common, &ops); +} + +fn cmd_remove(args: RemoveArgs) { + let _lock = common_write_lock(&args.common); + let mut fs = Fs::default(); + let mut ops = common_read_log_file(&args.common); + ops.iter().for_each(|op| apply(&mut fs, op).unwrap()); + + let timestamp = args.timestamp.unwrap_or_else(get_timestamp); + let revision = get_next_revision(&ops); + + let new_op = Operation { + header: OperationHeader { + timestamp, + revision, + email: args.email, + }, + data: OperationData::Remove(OperationRemove { path: args.path }), + }; + apply(&mut fs, &new_op).unwrap(); + ops.push(new_op); + common_write_log_file(&args.common, &ops); +} + +fn cmd_rename(args: RenameArgs) { + let _lock = common_write_lock(&args.common); + let mut fs = Fs::default(); + let mut ops = common_read_log_file(&args.common); + ops.iter().for_each(|op| apply(&mut fs, op).unwrap()); + + let timestamp = args.timestamp.unwrap_or_else(get_timestamp); + let revision = get_next_revision(&ops); + + let new_op = Operation { + header: OperationHeader { + timestamp, + revision, + email: args.email, + }, + data: OperationData::Rename(OperationRename { + old: args.old, + new: args.new, + }), + }; + + apply(&mut fs, &new_op).unwrap(); + ops.push(new_op); + common_write_log_file(&args.common, &ops); +} + +type FsNodeWriter<'a> = std::io::BufWriter>; + +fn cmd_ls(args: LsArgs) { + let stdout = std::io::stdout().lock(); + let mut writer = BufWriter::new(stdout); + let mut fs = Fs::default(); + let ops = common_read_log_file(&args.common); + ops.iter().for_each(|op| apply(&mut fs, op).unwrap()); + let node_id = match args.path { + Some(path) => find_node(&fs, &path), + None => Some(fs.root), + }; + if let Some(node_id) = node_id { + write_node( + &mut writer, + &fs, + node_id, + Default::default(), + args.recursive, + true, + ); + } + writer.flush().unwrap(); + std::process::exit(0); +} + +fn write_node( + writer: &mut FsNodeWriter, + fs: &Fs, + node_id: FsNodeId, + path: DrivePath, + recursive: bool, + recurse: bool, +) { + let node = &fs.nodes[node_id]; + match node.kind { + FsNodeKind::File => { + writeln!( + writer, + "{}\tfile\t{}\t{}\t{}\t{}", + path, node.lastmod, node.blob, node.size, node.author + ) + .unwrap(); + } + FsNodeKind::Directory => { + writeln!( + writer, + "{}\tdir\t{}\t-\t-\t{}", + path, node.lastmod, node.author + ) + .unwrap(); + if recursive || recurse { + for (child_comp, child_id) in node.children.iter() { + let child_path = path.push(child_comp.clone()); + write_node(writer, fs, *child_id, child_path, recursive, false); + } + } + } + } +} + +#[derive(Debug, Clone)] +struct Queue(Arc>>); + +impl Default for Queue { + fn default() -> Self { + Self(Arc::new(Mutex::new(Default::default()))) + } +} + +impl From> for Queue { + fn from(value: Vec) -> Self { + Self(Arc::new(Mutex::new(value.into()))) + } +} + +impl Queue { + pub fn push(&self, v: T) { + self.0.lock().unwrap().push_back(v); + } + + pub fn pop(&self) -> Option { + self.0.lock().unwrap().pop_front() + } +} + +fn cmd_import(args: ImportArgs) { + let _lock = common_write_lock(&args.common); + + let mut ops = common_read_log_file(&args.common); + + let store = BlobStore::new("blobs"); + let timestamp = args.timestamp.unwrap_or_else(get_timestamp); + let root = args.path.canonicalize().unwrap(); + + let files = Queue::from(collect_all_file_paths(&root)); + let num_threads = std::thread::available_parallelism().unwrap().get(); + let mut handles = Vec::default(); + for _ in 0..num_threads { + let root = root.clone(); + let email = args.email.clone(); + let files = files.clone(); + let store = store.clone(); + let handle = std::thread::spawn(move || { + let mut ops = Vec::default(); + while let Some(file) = files.pop() { + let rel = file.strip_prefix(&root).unwrap(); + let drive_path = rel.to_str().unwrap().parse::().unwrap(); + let blob_id = blob_hash_file(&file).unwrap(); + blob_import_file(&store, ImportMode::HardLink, &blob_id, &file).unwrap(); + let blob_size = blob_size(&store, &blob_id).unwrap(); + let op = Operation { + header: OperationHeader { + timestamp, + revision: 0, + email: email.clone(), + }, + data: OperationData::CreateFile(OperationCreateFile { + path: drive_path, + blob: blob_id, + size: blob_size, + }), + }; + ops.push(op); + } + ops + }); + handles.push(handle); + } + + let mut fs = Fs::default(); + let mut next_rev = get_next_revision(&ops); + for handle in handles { + let mut task_ops = handle.join().unwrap(); + for op in &mut task_ops { + op.header.revision = next_rev; + next_rev += 1; + } + ops.extend(task_ops); + } + ops.iter().for_each(|op| apply(&mut fs, op).unwrap()); + common_write_log_file(&args.common, &ops); +} + +fn cmd_blob(args: BlobArgs) { + let store = common_create_blob_store(&args.common); + let path = blob_path(&store, &args.blob_id); + println!("{}", path.display()); +} + +fn collect_all_file_paths(root: &Path) -> Vec { + let mut queue = vec![root.to_path_buf()]; + let mut files = vec![]; + while let Some(path) = queue.pop() { + if path.is_dir() { + let mut read = path.read_dir().unwrap(); + while let Some(entry) = read.next() { + let entry = entry.unwrap(); + queue.push(entry.path()); + } + } else { + files.push(path); + } + } + files +} + +fn common_create_blob_store(common: &CliCommon) -> BlobStore { + BlobStore::new(common.drive.join("blobs")) +} + +fn common_log_file_path(common: &CliCommon) -> PathBuf { + common.drive.join("log.txt") +} + +fn common_log_temp_file_path(common: &CliCommon) -> PathBuf { + common.drive.join("log.txt.tmp") +} + +fn common_read_log_file(common: &CliCommon) -> Vec { + let log_file_path = common_log_file_path(common); + let mut operations = Vec::default(); + if std::fs::exists(&log_file_path).unwrap() { + let contents = std::fs::read_to_string(&log_file_path).unwrap(); + for line in contents.lines() { + let operation = line.parse().unwrap(); + operations.push(operation); + } + } + operations +} + +fn common_write_log_file(common: &CliCommon, ops: &[Operation]) { + let log_file_path = common_log_file_path(common); + let log_temp_file_path = common_log_temp_file_path(common); + { + let file = File::options() + .create(true) + .write(true) + .truncate(true) + .open(&log_temp_file_path) + .unwrap(); + let mut writer = BufWriter::new(file); + for op in ops { + writeln!(writer, "{op}").unwrap(); + } + writer.flush().unwrap(); + } + std::fs::rename(&log_temp_file_path, &log_file_path).unwrap(); +} + +fn common_write_lock_path(common: &CliCommon) -> PathBuf { + common.drive.join("write.lock") +} + +fn common_write_lock(common: &CliCommon) -> File { + let lock_path = common_write_lock_path(common); + let file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .open(lock_path) + .unwrap(); + file.lock().unwrap(); + file +} + +fn get_timestamp() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() +} + +fn get_next_revision(ops: &[Operation]) -> u64 { + match ops.last() { + Some(op) => op.header.revision + 1, + None => 0, + } +} -- cgit