Java源码示例:org.apache.parquet.column.page.DataPageV2
示例1
public void setPage(DataPage page) {
Preconditions.checkNotNull(page, "Cannot read from null page");
this.page = page;
this.page.accept(new DataPage.Visitor<ValuesReader>() {
@Override
public ValuesReader visit(DataPageV1 dataPageV1) {
initFromPage(dataPageV1);
return null;
}
@Override
public ValuesReader visit(DataPageV2 dataPageV2) {
initFromPage(dataPageV2);
return null;
}
});
this.triplesRead = 0;
this.hasNext = triplesRead < triplesCount;
}
示例2
public void setPage(DataPage page) {
Preconditions.checkNotNull(page, "Cannot read from null page");
this.page = page;
this.page.accept(new DataPage.Visitor<ValuesReader>() {
@Override
public ValuesReader visit(DataPageV1 dataPageV1) {
initFromPage(dataPageV1);
return null;
}
@Override
public ValuesReader visit(DataPageV2 dataPageV2) {
initFromPage(dataPageV2);
return null;
}
});
this.triplesRead = 0;
advance();
}
示例3
private void readPage() {
LOG.debug("loading page");
DataPage page = pageReader.readPage();
page.accept(new DataPage.Visitor<Void>() {
@Override
public Void visit(DataPageV1 dataPageV1) {
readPageV1(dataPageV1);
return null;
}
@Override
public Void visit(DataPageV2 dataPageV2) {
readPageV2(dataPageV2);
return null;
}
});
}
示例4
private static <T extends Comparable<T>>
Statistics<T> getStatisticsFromPageHeader(DataPage page) {
return page.accept(new DataPage.Visitor<Statistics<T>>() {
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV1 dataPageV1) {
return (Statistics<T>) dataPageV1.getStatistics();
}
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV2 dataPageV2) {
return (Statistics<T>) dataPageV2.getStatistics();
}
});
}
示例5
protected void initFromPage(DataPageV2 initPage) {
this.triplesCount = initPage.getValueCount();
this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), initPage.getRepetitionLevels());
try {
initDefinitionLevelsReader(initPage, desc);
LOG.debug("page data size {} bytes and {} records", initPage.getData().size(), triplesCount);
initDataReader(initPage.getDataEncoding(), initPage.getData().toInputStream(), triplesCount);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + initPage + " in col " + desc, e);
}
}
示例6
private void initFromPage(DataPageV2 page) {
this.triplesCount = page.getValueCount();
this.repetitionLevels = newRLEIterator(desc.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevels = newRLEIterator(desc.getMaxDefinitionLevel(), page.getDefinitionLevels());
LOG.debug("page data size {} bytes and {} records", page.getData().size(), triplesCount);
try {
initDataReader(page.getDataEncoding(), page.getData().toInputStream(), triplesCount);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
}
}
示例7
private void readPageV2(DataPageV2 page) throws IOException {
this.pageValueCount = page.getValueCount();
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
// do not read the length from the stream. v2 pages handle dividing the page bytes.
this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
this.runLenDecoder.initFromStream(
this.pageValueCount, page.getDefinitionLevels().toInputStream());
try {
prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}
示例8
private void readPageV2(DataPageV2 page) {
this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
int valueCount = page.getValueCount();
LOG.debug("page data size {} bytes and {} values", page.getData().size(), valueCount);
try {
initDataReader(page.getDataEncoding(), page.getData().toInputStream(), valueCount);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
newPageInitialized(page);
}
示例9
@Test
public void test() throws Exception {
MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter,
ParquetProperties.builder()
.withDictionaryPageSize(1024).withWriterVersion(PARQUET_2_0)
.withPageSize(2048).build());
for (int i = 0; i < rows; i++) {
columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
if ((i + 1) % 1000 == 0) {
columnWriterV2.writePage();
}
}
columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
int rowCount = 0;
for (DataPage dataPage : pages) {
valueCount += dataPage.getValueCount();
rowCount += ((DataPageV2)dataPage).getRowCount();
}
assertEquals(rows, rowCount);
assertEquals(rows, valueCount);
MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
ValidatingConverter converter = new ValidatingConverter();
ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
for (int i = 0; i < rows; i++) {
assertEquals(0, columnReader.getCurrentRepetitionLevel());
assertEquals(0, columnReader.getCurrentDefinitionLevel());
columnReader.writeCurrentValueToConverter();
columnReader.consume();
}
assertEquals(rows, converter.count);
}
示例10
@Test
public void testOptional() throws Exception {
MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
ColumnDescriptor col = schema.getColumns().get(0);
MemPageWriter pageWriter = new MemPageWriter();
ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter,
ParquetProperties.builder()
.withDictionaryPageSize(1024).withWriterVersion(PARQUET_2_0)
.withPageSize(2048).build());
for (int i = 0; i < rows; i++) {
columnWriterV2.writeNull(0, 0);
if ((i + 1) % 1000 == 0) {
columnWriterV2.writePage();
}
}
columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
int rowCount = 0;
for (DataPage dataPage : pages) {
valueCount += dataPage.getValueCount();
rowCount += ((DataPageV2)dataPage).getRowCount();
}
assertEquals(rows, rowCount);
assertEquals(rows, valueCount);
MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage());
ValidatingConverter converter = new ValidatingConverter();
ColumnReader columnReader = new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION));
for (int i = 0; i < rows; i++) {
assertEquals(0, columnReader.getCurrentRepetitionLevel());
assertEquals(0, columnReader.getCurrentDefinitionLevel());
columnReader.consume();
}
assertEquals(0, converter.count);
}
示例11
@Override
public void writePageV2(int rowCount, int nullCount, int valueCount,
BytesInput repetitionLevels, BytesInput definitionLevels,
Encoding dataEncoding, BytesInput data, Statistics<?> statistics) throws IOException {
if (valueCount == 0) {
throw new ParquetEncodingException("illegal page of 0 values");
}
long size = repetitionLevels.size() + definitionLevels.size() + data.size();
memSize += size;
pages.add(DataPageV2.uncompressed(rowCount, nullCount, valueCount, copy(repetitionLevels), copy(definitionLevels), dataEncoding, copy(data), statistics));
totalValueCount += valueCount;
LOG.debug("page written for {} bytes and {} records", size, valueCount);
}
示例12
private void validateV2Page(MessageType schema, PageReadStore pages, String[] path, int values, int rows, int nullCount,
byte[] repetition, byte[] definition, byte[] data, int uncompressedSize) throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
DataPageV2 page = (DataPageV2)pageReader.readPage();
assertEquals(values, page.getValueCount());
assertEquals(rows, page.getRowCount());
assertEquals(nullCount, page.getNullCount());
assertEquals(uncompressedSize, page.getUncompressedSize());
assertArrayEquals(repetition, page.getRepetitionLevels().toByteArray());
assertArrayEquals(definition, page.getDefinitionLevels().toByteArray());
assertArrayEquals(data, page.getData().toByteArray());
}
示例13
private static <T extends Comparable<T>> Statistics<T> getStatisticsFromPageHeader(DataPage page) {
return page.accept(new DataPage.Visitor<Statistics<T>>() {
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV1 dataPageV1) {
return (Statistics<T>) dataPageV1.getStatistics();
}
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV2 dataPageV2) {
return (Statistics<T>) dataPageV2.getStatistics();
}
});
}
示例14
@Override
public String visit(DataPageV2 page) {
String enc = encodingAsString(page.getDataEncoding(), false);
long totalSize = page.getCompressedSize();
int count = page.getValueCount();
int numRows = page.getRowCount();
int numNulls = page.getNullCount();
float perValue = ((float) totalSize) / count;
String minMax = minMaxAsString(page.getStatistics());
String compression = (page.isCompressed() ? shortCodec : "_");
return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s",
rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue),
humanReadable(totalSize), numRows, numNulls, minMax);
}
示例15
@Override
protected void initDefinitionLevelsReader(DataPageV2 dataPageV2, ColumnDescriptor desc) {
this.vectorizedDefinitionLevelReader = newVectorizedDefinitionLevelReader(desc);
}
示例16
@Override
protected void initDefinitionLevelsReader(DataPageV2 dataPageV2, ColumnDescriptor desc) {
this.definitionLevels = newRLEIterator(desc.getMaxDefinitionLevel(), dataPageV2.getDefinitionLevels());
}
示例17
/**
* Reads `total` values from this columnReader into column.
*/
@Override
public final void readToVector(int readNumber, VECTOR vector) throws IOException {
int rowId = 0;
WritableIntVector dictionaryIds = null;
if (dictionary != null) {
dictionaryIds = vector.reserveDictionaryIds(readNumber);
}
while (readNumber > 0) {
// Compute the number of values we want to read in this page.
int leftInPage = (int) (endOfPageValueCount - valuesRead);
if (leftInPage == 0) {
DataPage page = pageReader.readPage();
if (page instanceof DataPageV1) {
readPageV1((DataPageV1) page);
} else if (page instanceof DataPageV2) {
readPageV2((DataPageV2) page);
} else {
throw new RuntimeException("Unsupported page type: " + page.getClass());
}
leftInPage = (int) (endOfPageValueCount - valuesRead);
}
int num = Math.min(readNumber, leftInPage);
if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
runLenDecoder.readDictionaryIds(
num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
vector.setDictionary(new ParquetDictionary(dictionary));
} else {
readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
}
} else {
if (vector.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is not. The batch
// does not support a mix of dictionary and not so we will decode the dictionary.
readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
}
vector.setDictionary(null);
readBatch(rowId, num, vector);
}
valuesRead += num;
rowId += num;
readNumber -= num;
}
}
示例18
@Test
public void test() throws Exception {
Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
Path root = file.getParent();
FileSystem fs = file.getFileSystem(conf);
if (fs.exists(root)) {
fs.delete(root, true);
}
fs.mkdirs(root);
MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
ColumnDescriptor col = schema.getColumns().get(0);
Encoding dataEncoding = PLAIN;
int valueCount = 10;
int d = 1;
int r = 2;
int v = 3;
BytesInput definitionLevels = BytesInput.fromInt(d);
BytesInput repetitionLevels = BytesInput.fromInt(r);
Statistics<?> statistics = Statistics.getBuilderForReading(Types.required(PrimitiveTypeName.BINARY).named("test_binary"))
.build();
BytesInput data = BytesInput.fromInt(v);
int rowCount = 5;
int nullCount = 1;
statistics.incrementNumNulls(nullCount);
statistics.setMinMaxFromBytes(new byte[] {0, 1, 2}, new byte[] {0, 1, 2, 3});
long pageOffset;
long pageSize;
{
OutputFileForTesting outputFile = new OutputFileForTesting(file, conf);
ParquetFileWriter writer = new ParquetFileWriter(outputFile, schema, Mode.CREATE,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
writer.start();
writer.startBlock(rowCount);
pageOffset = outputFile.out().getPos();
{
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema,
new HeapByteBufferAllocator(), Integer.MAX_VALUE);
PageWriter pageWriter = store.getPageWriter(col);
pageWriter.writePageV2(
rowCount, nullCount, valueCount,
repetitionLevels, definitionLevels,
dataEncoding, data,
statistics);
store.flushToFileWriter(writer);
pageSize = outputFile.out().getPos() - pageOffset;
}
writer.endBlock();
writer.end(new HashMap<String, String>());
}
{
ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
ParquetFileReader reader = new ParquetFileReader(
conf, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns());
PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(col);
DataPageV2 page = (DataPageV2)pageReader.readPage();
assertEquals(rowCount, page.getRowCount());
assertEquals(nullCount, page.getNullCount());
assertEquals(valueCount, page.getValueCount());
assertEquals(d, intValue(page.getDefinitionLevels()));
assertEquals(r, intValue(page.getRepetitionLevels()));
assertEquals(dataEncoding, page.getDataEncoding());
assertEquals(v, intValue(page.getData()));
// Checking column/offset indexes for the one page
ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
ColumnIndex columnIndex = reader.readColumnIndex(column);
assertArrayEquals(statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
assertArrayEquals(statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
assertEquals(statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
assertFalse(columnIndex.getNullPages().get(0));
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
assertEquals(1, offsetIndex.getPageCount());
assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
assertEquals(0, offsetIndex.getFirstRowIndex(0));
assertEquals(pageOffset, offsetIndex.getOffset(0));
reader.close();
}
}
示例19
protected abstract void initDefinitionLevelsReader(DataPageV2 dataPageV2, ColumnDescriptor descriptor);