Java源码示例:org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport
示例1
@Benchmark
@Threads(1)
public void writeUsingSparkWriter() throws IOException {
StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
.writeSupport(new ParquetWriteSupport())
.set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
.set("spark.sql.parquet.writeLegacyFormat", "false")
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.schema(SCHEMA)
.build()) {
writer.addAll(rows);
}
}
示例2
@Benchmark
@Threads(1)
public void writeUsingSparkWriter() throws IOException {
StructType sparkSchema = SparkSchemaUtil.convert(SCHEMA);
try (FileAppender<InternalRow> writer = Parquet.write(Files.localOutput(dataFile))
.writeSupport(new ParquetWriteSupport())
.set("org.apache.spark.sql.parquet.row.attributes", sparkSchema.json())
.set("spark.sql.parquet.writeLegacyFormat", "false")
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.schema(SCHEMA)
.build()) {
writer.addAll(rows);
}
}
示例3
public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat format) {
Schema schema = spec.schema();
try {
switch (format) {
case PARQUET:
String jsonSchema = convert(schema).json();
return Parquet.write(file)
.writeSupport(new ParquetWriteSupport())
.set("org.apache.spark.sql.parquet.row.attributes", jsonSchema)
.set("spark.sql.parquet.writeLegacyFormat", "false")
.set("spark.sql.parquet.binaryAsString", "false")
.set("spark.sql.parquet.int96AsTimestamp", "false")
.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
.setAll(properties)
.schema(schema)
.build();
case AVRO:
return Avro.write(file)
.createWriterFunc(ignored -> new SparkAvroWriter(schema))
.setAll(properties)
.schema(schema)
.build();
case ORC: {
@SuppressWarnings("unchecked")
SparkOrcWriter writer = new SparkOrcWriter(ORC.write(file)
.schema(schema)
.build());
return writer;
}
default:
throw new UnsupportedOperationException("Cannot write unknown format: " + format);
}
} catch (IOException e) {
throw new RuntimeIOException(e);
}
}