Skip to content

Commit 3319820

Browse files
committed
added dedup function
1 parent a624770 commit 3319820

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ SUBCOMMANDS:
2828
cat Concatenate multiple files or all files in a directory
2929
convert Convert file format (CSV, Parquet, JSON)
3030
count Count the number of rows in a file
31+
dedup Remove duplicate rows
3132
describe Show summary statistics for a file
3233
help Prints this message or the help of the given subcommand(s)
3334
query Run a SQL query on a file

src/bin/main.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use datafusion::prelude::*;
2-
use dfkit::commands::{cat, convert, count, describe, dfsplit, query, reverse, schema, sort, view};
2+
use dfkit::commands::{cat, convert, count, describe, dfsplit, query, reverse, schema, sort, view, dedup};
33
use dfkit::utils::{DfKitError, parse_file_list};
44
use std::env;
55
use std::path::PathBuf;
@@ -101,6 +101,14 @@ pub enum Commands {
101101
#[structopt(short, long, parse(from_os_str))]
102102
output: PathBuf,
103103
},
104+
105+
#[structopt(about = "Remove duplicate rows")]
106+
Dedup {
107+
#[structopt(short, long, parse(from_os_str))]
108+
filename: PathBuf,
109+
#[structopt(short, long, parse(from_os_str))]
110+
output: Option<PathBuf>,
111+
}
104112
}
105113

106114
#[tokio::main]
@@ -158,6 +166,9 @@ async fn main() -> Result<(), DfKitError> {
158166
let file_list = parse_file_list(files, dir)?;
159167
cat(&ctx, file_list, &output).await?;
160168
}
169+
Commands::Dedup { filename, output } => {
170+
dedup(&ctx, &filename, output).await?;
171+
}
161172
}
162173

163174
Ok(())

src/commands.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,22 @@ pub async fn cat(
218218

219219
Ok(())
220220
}
221+
222+
pub async fn dedup(
223+
ctx: &SessionContext,
224+
filename: &Path,
225+
output: Option<PathBuf>,
226+
) -> Result<(), DfKitError> {
227+
let _ = register_table(&ctx, "t", &filename).await?;
228+
let df = ctx.sql("SELECT DISTINCT * FROM t").await?;
229+
230+
if let Some(out_path) = output {
231+
let file_type = file_type(&filename)?;
232+
write_output(df, &out_path, &file_type).await?;
233+
println!("Deduplicated file written to: {}", out_path.display());
234+
} else {
235+
df.show().await?;
236+
}
237+
238+
Ok(())
239+
}

tests/test.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,3 +376,30 @@ fn test_url_file_split_to_local() {
376376

377377
assert_eq!(files.len(), 5);
378378
}
379+
380+
381+
#[test]
382+
fn test_dedup_csv_file() {
383+
let temp = tempdir().unwrap();
384+
let input_path = create_extended_csv(temp.path());
385+
fs::write(&input_path, "name,age\nalice,30\nbob,40\ncharlie,25\nalice,30").unwrap();
386+
387+
let mut cmd = Command::cargo_bin("dfkit").unwrap();
388+
let output = cmd
389+
.args(["dedup", "--filename", input_path.to_str().unwrap()])
390+
.assert()
391+
.success()
392+
.get_output()
393+
.stdout
394+
.clone();
395+
396+
assert_snapshot!(String::from_utf8(output).unwrap(), @r"
397+
+---------+-----+
398+
| name | age |
399+
+---------+-----+
400+
| bob | 40 |
401+
| charlie | 25 |
402+
| alice | 30 |
403+
+---------+-----+
404+
");
405+
}

0 commit comments

Comments
 (0)