Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions parquet-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,17 @@ Usage: parquet [options] [command] [command options]
See 'parquet help <command>' for more information on a specific command.
```

### Configuration Options

- `--conf` or `--property`: Set any configuration property in format `key=value`. Can be specified multiple times.

Examples:
```bash
parquet convert input.avro -o output.parquet --conf parquet.avro.write-parquet-uuid=true

parquet convert input.avro -o output.parquet --conf parquet.avro.write-old-list-structure=false

# Multiple options
parquet convert-csv input.csv -o output.parquet --schema schema.avsc --conf parquet.avro.write-parquet-uuid=true --conf parquet.avro.write-old-list-structure=false

```
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.csv.AvroCSV;
Expand Down Expand Up @@ -117,6 +118,11 @@ public ConvertCSVCommand(Logger console) {
description = "Remove any data already in the target view or dataset")
boolean overwrite = false;

@Parameter(
names = {"--conf", "--property"},
description = "Set a configuration property (format: key=value). Can be specified multiple times.")
List<String> confProperties;

@Override
@SuppressWarnings("unchecked")
public int run() throws IOException {
Expand Down Expand Up @@ -168,6 +174,21 @@ public int run() throws IOException {
}
}

Configuration conf = new Configuration(getConf());

if (confProperties != null) {
for (String prop : confProperties) {
String[] parts = prop.split("=", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Configuration property must be in format key=value: " + prop);
}
String key = parts[0].trim();
String value = parts[1].trim();
conf.set(key, value);
console.debug("Set configuration property: {}={}", key, value);
}
}

try (ParquetWriter<Record> writer = AvroParquetWriter.<Record>builder(qualifiedPath(outputPath))
.withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
.withWriteMode(overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE)
Expand All @@ -177,7 +198,7 @@ public int run() throws IOException {
.withPageSize(pageSize)
.withRowGroupSize(rowGroupSize)
.withDataModel(GenericData.get())
.withConf(getConf())
.withConf(conf)
.withSchema(csvSchema)
.build()) {
for (String target : targets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
Expand Down Expand Up @@ -95,6 +96,11 @@ public ConvertCommand(Logger console) {
@Parameter(names = "--dictionary-size", description = "Max dictionary page size")
int dictionaryPageSize = ParquetWriter.DEFAULT_PAGE_SIZE;

@Parameter(
names = {"--conf", "--property"},
description = "Set a configuration property (format: key=value). Can be specified multiple times.")
List<String> confProperties;

@Override
@SuppressWarnings("unchecked")
public int run() throws IOException {
Expand All @@ -119,13 +125,28 @@ public int run() throws IOException {
outFS.delete(outPath);
}

Configuration conf = new Configuration(getConf());

if (confProperties != null) {
for (String prop : confProperties) {
String[] parts = prop.split("=", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Configuration property must be in format key=value: " + prop);
}
String key = parts[0].trim();
String value = parts[1].trim();
conf.set(key, value);
console.debug("Set configuration property: {}={}", key, value);
}
}

Iterable<Record> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
try {
try (ParquetWriter<Record> writer = AvroParquetWriter.<Record>builder(qualifiedPath(outputPath))
.withWriterVersion(v2 ? PARQUET_2_0 : PARQUET_1_0)
.withConf(getConf())
.withConf(conf)
.withCompressionCodec(codec)
.withRowGroupSize(rowGroupSize)
.withDictionaryPageSize(dictionaryPageSize < 64 ? 64 : dictionaryPageSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,18 @@ public void testConvertCSVCommandWithDifferentSchemas() throws IOException {
command.setConf(new Configuration());
command.run();
}

@Test
public void testConvertCSVCommandWithGenericConf() throws IOException {
File file = csvFile();
ConvertCSVCommand command = new ConvertCSVCommand(createLogger());
command.targets = Arrays.asList(file.getAbsolutePath());
File output = new File(getTempFolder(), getClass().getSimpleName() + "_with_generic_conf.parquet");
command.outputPath = output.getAbsolutePath();
command.confProperties =
Arrays.asList("parquet.avro.write-parquet-uuid=true", "parquet.avro.write-old-list-structure=false");
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
Assert.assertTrue(output.exists());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,58 @@ public void testConvertCommand() throws IOException {
Assert.assertEquals(0, command.run());
Assert.assertTrue(output.exists());
}

@Test
public void testConvertCommandWithGenericConf() throws IOException {
File file = toAvro(parquetFile());
ConvertCommand command = new ConvertCommand(createLogger());
command.targets = Arrays.asList(file.getAbsolutePath());
File output = new File(getTempFolder(), "converted_with_generic_conf.parquet");
command.outputPath = output.getAbsolutePath();
command.confProperties = Arrays.asList(
"parquet.avro.write-parquet-uuid=true",
"parquet.avro.write-old-list-structure=false",
"test.property=test.value");
command.setConf(new Configuration());

Assert.assertEquals(0, command.run());
Assert.assertTrue(output.exists());
}

@Test
public void testConvertCommandConfigurationValidation() throws IOException {
File file = toAvro(parquetFile());
ConvertCommand command = new ConvertCommand(createLogger());
command.targets = Arrays.asList(file.getAbsolutePath());
File output = new File(getTempFolder(), "converted_with_config_validation.parquet");
command.outputPath = output.getAbsolutePath();

command.confProperties =
Arrays.asList("parquet.avro.write-parquet-uuid=true", "parquet.avro.write-old-list-structure=false");

command.setConf(new Configuration());

Assert.assertEquals(0, command.run());
Assert.assertTrue(output.exists());

File output2 = new File(getTempFolder(), "converted_with_config_validation2.parquet");
command.outputPath = output2.getAbsolutePath();
command.confProperties =
Arrays.asList("parquet.avro.write-parquet-uuid=false", "parquet.avro.write-old-list-structure=true");

Assert.assertEquals(0, command.run());
Assert.assertTrue(output2.exists());
}

@Test(expected = IllegalArgumentException.class)
public void testConvertCommandWithInvalidConf() throws IOException {
File file = toAvro(parquetFile());
ConvertCommand command = new ConvertCommand(createLogger());
command.targets = Arrays.asList(file.getAbsolutePath());
File output = new File(getTempFolder(), "converted_with_invalid_conf.parquet");
command.outputPath = output.getAbsolutePath();
command.confProperties = Arrays.asList("invalid-property-format");
command.setConf(new Configuration());
command.run();
}
}