Java源码示例:org.apache.parquet.column.page.DataPageV1
示例1
public void setPage(DataPage page) {
Preconditions.checkNotNull(page, "Cannot read from null page");
this.page = page;
this.page.accept(new DataPage.Visitor<ValuesReader>() {
@Override
public ValuesReader visit(DataPageV1 dataPageV1) {
initFromPage(dataPageV1);
return null;
}
@Override
public ValuesReader visit(DataPageV2 dataPageV2) {
initFromPage(dataPageV2);
return null;
}
});
this.triplesRead = 0;
this.hasNext = triplesRead < triplesCount;
}
示例2
protected void initFromPage(DataPageV1 initPage) {
this.triplesCount = initPage.getValueCount();
ValuesReader rlReader = initPage.getRlEncoding().getValuesReader(desc, ValuesType.REPETITION_LEVEL);
this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
try {
BytesInput bytes = initPage.getBytes();
LOG.debug("page size {} bytes and {} records", bytes.size(), triplesCount);
LOG.debug("reading repetition levels at 0");
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(triplesCount, in);
LOG.debug("reading definition levels at {}", in.position());
initDefinitionLevelsReader(initPage, desc, in, triplesCount);
LOG.debug("reading data at {}", in.position());
initDataReader(initPage.getValueEncoding(), in, initPage.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + initPage + " in col " + desc, e);
}
}
示例3
public void setPage(DataPage page) {
Preconditions.checkNotNull(page, "Cannot read from null page");
this.page = page;
this.page.accept(new DataPage.Visitor<ValuesReader>() {
@Override
public ValuesReader visit(DataPageV1 dataPageV1) {
initFromPage(dataPageV1);
return null;
}
@Override
public ValuesReader visit(DataPageV2 dataPageV2) {
initFromPage(dataPageV2);
return null;
}
});
this.triplesRead = 0;
advance();
}
示例4
private void initFromPage(DataPageV1 page) {
this.triplesCount = page.getValueCount();
ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
ValuesReader dlReader = page.getDlEncoding().getValuesReader(desc, DEFINITION_LEVEL);
this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
this.definitionLevels = new ValuesReaderIntIterator(dlReader);
try {
BytesInput bytes = page.getBytes();
LOG.debug("page size {} bytes and {} records", bytes.size(), triplesCount);
LOG.debug("reading repetition levels at 0");
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(triplesCount, in);
LOG.debug("reading definition levels at {}", in.position());
dlReader.initFromPage(triplesCount, in);
LOG.debug("reading data at {}", in.position());
initDataReader(page.getValueEncoding(), in, page.getValueCount());
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
}
}
示例5
private void readPageV1(DataPageV1 page) throws IOException {
this.pageValueCount = page.getValueCount();
ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
// Initialize the decoders.
if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
}
int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
this.runLenDecoder = new RunLengthDecoder(bitWidth);
try {
BytesInput bytes = page.getBytes();
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
this.runLenDecoder.initFromStream(pageValueCount, in);
prepareNewPage(page.getValueEncoding(), in);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}
示例6
private void readPage() {
LOG.debug("loading page");
DataPage page = pageReader.readPage();
page.accept(new DataPage.Visitor<Void>() {
@Override
public Void visit(DataPageV1 dataPageV1) {
readPageV1(dataPageV1);
return null;
}
@Override
public Void visit(DataPageV2 dataPageV2) {
readPageV2(dataPageV2);
return null;
}
});
}
示例7
private void readPageV1(DataPageV1 page) {
ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
int valueCount = page.getValueCount();
try {
BytesInput bytes = page.getBytes();
LOG.debug("page size {} bytes and {} values", bytes.size(), valueCount);
LOG.debug("reading repetition levels at 0");
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(valueCount, in);
LOG.debug("reading definition levels at {}", in.position());
dlReader.initFromPage(valueCount, in);
LOG.debug("reading data at {}", in.position());
initDataReader(page.getValueEncoding(), in, valueCount);
} catch (IOException e) {
throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
}
newPageInitialized(page);
}
示例8
private static <T extends Comparable<T>>
Statistics<T> getStatisticsFromPageHeader(DataPage page) {
return page.accept(new DataPage.Visitor<Statistics<T>>() {
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV1 dataPageV1) {
return (Statistics<T>) dataPageV1.getStatistics();
}
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV2 dataPageV2) {
return (Statistics<T>) dataPageV2.getStatistics();
}
});
}
示例9
@Override
protected void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor desc, ByteBufferInputStream in,
int triplesCount) throws IOException {
ValuesReader dlReader = dataPageV1.getDlEncoding().getValuesReader(desc, ValuesType.DEFINITION_LEVEL);
this.definitionLevels = new ValuesReaderIntIterator(dlReader);
dlReader.initFromPage(triplesCount, in);
}
示例10
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
DataPageV1 page = (DataPageV1) pageReader.readPage();
assertEquals(values, page.getValueCount());
assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
}
示例11
@Override
public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
throws IOException {
if (valueCount == 0) {
throw new ParquetEncodingException("illegal page of 0 values");
}
memSize += bytesInput.size();
pages.add(new DataPageV1(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
totalValueCount += valueCount;
LOG.debug("page written for {} bytes and {} records", bytesInput.size(), valueCount);
}
示例12
/**
* Enable writing out page level crc checksum, disable verification in read path but check that
* the crc checksums are correct. Tests whether we successfully write out correct crc checksums
* without potentially failing on the read path verification .
*/
@Test
public void testWriteOnVerifyOff() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
}
}
示例13
/**
* Write out checksums and verify them on the read path. Tests that crc is set and that we can
* read back what we wrote if checksums are enabled on both the write and read path.
*/
@Test
public void testWriteOnVerifyOn() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
}
}
示例14
/**
* Tests that the checksum is calculated using the compressed version of the data and that
* checksum verification succeeds
*/
@Test
public void testCompression() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes));
assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes));
assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes));
assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes));
assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
}
}
示例15
/**
* Tests that we adhere to the checksum calculation specification, namely that the crc is
* calculated using the compressed concatenation of the repetition levels, definition levels and
* the actual data. This is done by generating sample data with a nested schema containing nulls
* (generating non trivial repetition and definition levels).
*/
@Test
public void testNestedWithNulls() throws IOException {
Configuration conf = new Configuration();
// Write out sample file via the non-checksum code path, extract the raw bytes to calculate the
// reference crc with
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);
try (ParquetFileReader refReader = getParquetFileReader(refPath, conf,
Arrays.asList(colCIdDesc, colDValDesc))) {
PageReadStore refPageReadStore = refReader.readNextRowGroup();
byte[] colCIdPageBytes = readNextPage(colCIdDesc, refPageReadStore).getBytes().toByteArray();
byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
// Write out sample file with checksums
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
Path path = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colCIdDesc, colDValDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore);
assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes));
assertCorrectContent(colCIdPage.getBytes().toByteArray(), colCIdPageBytes);
DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
}
}
}
示例16
@Test
public void testDictionaryEncoding() throws IOException {
Configuration conf = new Configuration();
// Write out dictionary encoded sample file via the non-checksum code path, extract the raw
// bytes to calculate the reference crc with
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);
try (ParquetFileReader refReader =
getParquetFileReader(refPath, conf, Collections.singletonList(colDValDesc))) {
PageReadStore refPageReadStore = refReader.readNextRowGroup();
// Read (decompressed) dictionary page
byte[] dictPageBytes = readDictPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
// Write out sample file with checksums
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
Path path = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);
try (ParquetFileReader reader =
getParquetFileReader(path, conf, Collections.singletonList(colDValDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DictionaryPage dictPage = readDictPage(colDValDesc, pageReadStore);
assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes));
assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes);
DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
}
}
}
示例17
private static <T extends Comparable<T>> Statistics<T> getStatisticsFromPageHeader(DataPage page) {
return page.accept(new DataPage.Visitor<Statistics<T>>() {
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV1 dataPageV1) {
return (Statistics<T>) dataPageV1.getStatistics();
}
@Override
@SuppressWarnings("unchecked")
public Statistics<T> visit(DataPageV2 dataPageV2) {
return (Statistics<T>) dataPageV2.getStatistics();
}
});
}
示例18
@Override
public String visit(DataPageV1 page) {
String enc = encodingAsString(page.getValueEncoding(), false);
long totalSize = page.getCompressedSize();
int count = page.getValueCount();
String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) : "";
float perValue = ((float) totalSize) / count;
String minMax = minMaxAsString(page.getStatistics());
return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
humanReadable(totalSize), "", numNulls, minMax);
}
示例19
@Override
protected void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor desc, ByteBufferInputStream in,
int triplesCount) throws IOException {
this.vectorizedDefinitionLevelReader = newVectorizedDefinitionLevelReader(desc);
this.vectorizedDefinitionLevelReader.initFromPage(triplesCount, in);
}
示例20
protected abstract void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor descriptor,
ByteBufferInputStream in, int count) throws IOException;
示例21
/**
* Reads `total` values from this columnReader into column.
*/
@Override
public final void readToVector(int readNumber, VECTOR vector) throws IOException {
int rowId = 0;
WritableIntVector dictionaryIds = null;
if (dictionary != null) {
dictionaryIds = vector.reserveDictionaryIds(readNumber);
}
while (readNumber > 0) {
// Compute the number of values we want to read in this page.
int leftInPage = (int) (endOfPageValueCount - valuesRead);
if (leftInPage == 0) {
DataPage page = pageReader.readPage();
if (page instanceof DataPageV1) {
readPageV1((DataPageV1) page);
} else if (page instanceof DataPageV2) {
readPageV2((DataPageV2) page);
} else {
throw new RuntimeException("Unsupported page type: " + page.getClass());
}
leftInPage = (int) (endOfPageValueCount - valuesRead);
}
int num = Math.min(readNumber, leftInPage);
if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
runLenDecoder.readDictionaryIds(
num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);
if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
vector.setDictionary(new ParquetDictionary(dictionary));
} else {
readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
}
} else {
if (vector.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is not. The batch
// does not support a mix of dictionary and not so we will decode the dictionary.
readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
}
vector.setDictionary(null);
readBatch(rowId, num, vector);
}
valuesRead += num;
rowId += num;
readNumber -= num;
}
}
示例22
/**
* Test whether corruption in the page content is detected by checksum verification
*/
@Test
public void testCorruptedPage() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
InputFile inputFile = HadoopInputFile.fromPath(path, conf);
try (SeekableInputStream inputStream = inputFile.newStream()) {
int fileLen = (int) inputFile.getLength();
byte[] fileBytes = new byte[fileLen];
inputStream.readFully(fileBytes);
inputStream.close();
// There are 4 pages in total (2 per column), we corrupt the first page of the first column
// and the second page of the second column. We do this by altering a byte roughly in the
// middle of each page to be corrupted
fileBytes[fileLen / 8]++;
fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++;
OutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) {
outputStream.write(fileBytes);
outputStream.close();
// First we disable checksum verification, the corruption will go undetected as it is in the
// data section of the page
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
try (ParquetFileReader reader = getParquetFileReader(path, conf,
Arrays.asList(colADesc, colBDesc))) {
PageReadStore pageReadStore = reader.readNextRowGroup();
DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
assertFalse("Data in page was not corrupted",
Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
readNextPage(colADesc, pageReadStore);
readNextPage(colBDesc, pageReadStore);
DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
assertFalse("Data in page was not corrupted",
Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
}
// Now we enable checksum verification, the corruption should be detected
conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
try (ParquetFileReader reader =
getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) {
// We expect an exception on the first encountered corrupt page (in readAllPages)
assertVerificationFailed(reader);
}
}
}
}
示例23
/** Read the next page for a column */
private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) {
return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage();
}
示例24
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
DataPage page = pageReader.readPage();
assertEquals(values, page.getValueCount());
assertArrayEquals(bytes.toByteArray(), ((DataPageV1)page).getBytes().toByteArray());
}