Java源码示例:org.apache.parquet.column.page.DataPageV1

示例1
public void setPage(DataPage page) {
  Preconditions.checkNotNull(page, "Cannot read from null page");
  this.page = page;
  this.page.accept(new DataPage.Visitor<ValuesReader>() {
    @Override
    public ValuesReader visit(DataPageV1 dataPageV1) {
      initFromPage(dataPageV1);
      return null;
    }

    @Override
    public ValuesReader visit(DataPageV2 dataPageV2) {
      initFromPage(dataPageV2);
      return null;
    }
  });
  this.triplesRead = 0;
  this.hasNext = triplesRead < triplesCount;
}
 
示例2
protected void initFromPage(DataPageV1 initPage) {
  this.triplesCount = initPage.getValueCount();
  ValuesReader rlReader = initPage.getRlEncoding().getValuesReader(desc, ValuesType.REPETITION_LEVEL);
  this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
  try {
    BytesInput bytes = initPage.getBytes();
    LOG.debug("page size {} bytes and {} records", bytes.size(), triplesCount);
    LOG.debug("reading repetition levels at 0");
    ByteBufferInputStream in = bytes.toInputStream();
    rlReader.initFromPage(triplesCount, in);
    LOG.debug("reading definition levels at {}", in.position());
    initDefinitionLevelsReader(initPage, desc, in, triplesCount);
    LOG.debug("reading data at {}", in.position());
    initDataReader(initPage.getValueEncoding(), in, initPage.getValueCount());
  } catch (IOException e) {
    throw new ParquetDecodingException("could not read page " + initPage + " in col " + desc, e);
  }
}
 
示例3
public void setPage(DataPage page) {
  Preconditions.checkNotNull(page, "Cannot read from null page");
  this.page = page;
  this.page.accept(new DataPage.Visitor<ValuesReader>() {
    @Override
    public ValuesReader visit(DataPageV1 dataPageV1) {
      initFromPage(dataPageV1);
      return null;
    }

    @Override
    public ValuesReader visit(DataPageV2 dataPageV2) {
      initFromPage(dataPageV2);
      return null;
    }
  });
  this.triplesRead = 0;
  advance();
}
 
示例4
private void initFromPage(DataPageV1 page) {
  this.triplesCount = page.getValueCount();
  ValuesReader rlReader = page.getRlEncoding().getValuesReader(desc, REPETITION_LEVEL);
  ValuesReader dlReader = page.getDlEncoding().getValuesReader(desc, DEFINITION_LEVEL);
  this.repetitionLevels = new ValuesReaderIntIterator(rlReader);
  this.definitionLevels = new ValuesReaderIntIterator(dlReader);
  try {
    BytesInput bytes = page.getBytes();
    LOG.debug("page size {} bytes and {} records", bytes.size(), triplesCount);
    LOG.debug("reading repetition levels at 0");
    ByteBufferInputStream in = bytes.toInputStream();
    rlReader.initFromPage(triplesCount, in);
    LOG.debug("reading definition levels at {}", in.position());
    dlReader.initFromPage(triplesCount, in);
    LOG.debug("reading data at {}", in.position());
    initDataReader(page.getValueEncoding(), in, page.getValueCount());
  } catch (IOException e) {
    throw new ParquetDecodingException("could not read page " + page + " in col " + desc, e);
  }
}
 
示例5
private void readPageV1(DataPageV1 page) throws IOException {
	this.pageValueCount = page.getValueCount();
	ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);

	// Initialize the decoders.
	if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
		throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
	}
	int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
	this.runLenDecoder = new RunLengthDecoder(bitWidth);
	try {
		BytesInput bytes = page.getBytes();
		ByteBufferInputStream in = bytes.toInputStream();
		rlReader.initFromPage(pageValueCount, in);
		this.runLenDecoder.initFromStream(pageValueCount, in);
		prepareNewPage(page.getValueEncoding(), in);
	} catch (IOException e) {
		throw new IOException("could not read page " + page + " in col " + descriptor, e);
	}
}
 
示例6
private void readPage() {
  LOG.debug("loading page");
  DataPage page = pageReader.readPage();
  page.accept(new DataPage.Visitor<Void>() {
    @Override
    public Void visit(DataPageV1 dataPageV1) {
      readPageV1(dataPageV1);
      return null;
    }
    @Override
    public Void visit(DataPageV2 dataPageV2) {
      readPageV2(dataPageV2);
      return null;
    }
  });
}
 
示例7
private void readPageV1(DataPageV1 page) {
  ValuesReader rlReader = page.getRlEncoding().getValuesReader(path, REPETITION_LEVEL);
  ValuesReader dlReader = page.getDlEncoding().getValuesReader(path, DEFINITION_LEVEL);
  this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
  this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
  int valueCount = page.getValueCount();
  try {
    BytesInput bytes = page.getBytes();
    LOG.debug("page size {} bytes and {} values", bytes.size(), valueCount);
    LOG.debug("reading repetition levels at 0");
    ByteBufferInputStream in = bytes.toInputStream();
    rlReader.initFromPage(valueCount, in);
    LOG.debug("reading definition levels at {}", in.position());
    dlReader.initFromPage(valueCount, in);
    LOG.debug("reading data at {}", in.position());
    initDataReader(page.getValueEncoding(), in, valueCount);
  } catch (IOException e) {
    throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
  }
  newPageInitialized(page);
}
 
示例8
private static <T extends Comparable<T>>
Statistics<T> getStatisticsFromPageHeader(DataPage page) {
  return page.accept(new DataPage.Visitor<Statistics<T>>() {
    @Override
    @SuppressWarnings("unchecked")
    public Statistics<T> visit(DataPageV1 dataPageV1) {
      return (Statistics<T>) dataPageV1.getStatistics();
    }

    @Override
    @SuppressWarnings("unchecked")
    public Statistics<T> visit(DataPageV2 dataPageV2) {
      return (Statistics<T>) dataPageV2.getStatistics();
    }
  });
}
 
示例9
@Override
protected void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor desc, ByteBufferInputStream in,
                                          int triplesCount) throws IOException {
  ValuesReader dlReader = dataPageV1.getDlEncoding().getValuesReader(desc, ValuesType.DEFINITION_LEVEL);
  this.definitionLevels = new ValuesReaderIntIterator(dlReader);
  dlReader.initFromPage(triplesCount, in);
}
 
示例10
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
    throws IOException {
  PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
  DataPageV1 page = (DataPageV1) pageReader.readPage();
  assertEquals(values, page.getValueCount());
  assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
}
 
示例11
@Override
public void writePage(BytesInput bytesInput, int valueCount, Statistics statistics, Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding)
    throws IOException {
  if (valueCount == 0) {
    throw new ParquetEncodingException("illegal page of 0 values");
  }
  memSize += bytesInput.size();
  pages.add(new DataPageV1(BytesInput.copy(bytesInput), valueCount, (int)bytesInput.size(), statistics, rlEncoding, dlEncoding, valuesEncoding));
  totalValueCount += valueCount;
  LOG.debug("page written for {} bytes and {} records", bytesInput.size(), valueCount);
}
 
示例12
/**
 * Enable writing out page level crc checksum, disable verification in read path but check that
 * the crc checksums are correct. Tests whether we successfully write out correct crc checksums
 * without potentially failing on the read path verification .
 */
@Test
public void testWriteOnVerifyOff() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
  conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);

  Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);

  try (ParquetFileReader reader = getParquetFileReader(path, conf,
    Arrays.asList(colADesc, colBDesc))) {
    PageReadStore pageReadStore = reader.readNextRowGroup();

    DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
    assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
    assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);

    DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
    assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
    assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);

    DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
    assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
    assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);

    DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
    assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
    assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
  }
}
 
示例13
/**
 * Write out checksums and verify them on the read path. Tests that crc is set and that we can
 * read back what we wrote if checksums are enabled on both the write and read path.
 */
@Test
public void testWriteOnVerifyOn() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
  conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);

  Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);

  try (ParquetFileReader reader = getParquetFileReader(path, conf,
    Arrays.asList(colADesc, colBDesc))) {
    PageReadStore pageReadStore = reader.readNextRowGroup();

    DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
    assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
    assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);

    DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
    assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
    assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);

    DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
    assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
    assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);

    DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
    assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
    assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
  }
}
 
示例14
/**
 * Tests that the checksum is calculated using the compressed version of the data and that
 * checksum verification succeeds
 */
@Test
public void testCompression() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
  conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);

  Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY);

  try (ParquetFileReader reader = getParquetFileReader(path, conf,
    Arrays.asList(colADesc, colBDesc))) {
    PageReadStore pageReadStore = reader.readNextRowGroup();

    DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
    assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes));
    assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);

    DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
    assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes));
    assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);

    DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
    assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes));
    assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);

    DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
    assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes));
    assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
  }
}
 
示例15
/**
 * Tests that we adhere to the checksum calculation specification, namely that the crc is
 * calculated using the compressed concatenation of the repetition levels, definition levels and
 * the actual data. This is done by generating sample data with a nested schema containing nulls
 * (generating non trivial repetition and definition levels).
 */
@Test
public void testNestedWithNulls() throws IOException {
  Configuration conf = new Configuration();

  // Write out sample file via the non-checksum code path, extract the raw bytes to calculate the
  // reference crc with
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
  conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
  Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);

  try (ParquetFileReader refReader = getParquetFileReader(refPath, conf,
    Arrays.asList(colCIdDesc, colDValDesc))) {
    PageReadStore refPageReadStore = refReader.readNextRowGroup();
    byte[] colCIdPageBytes = readNextPage(colCIdDesc, refPageReadStore).getBytes().toByteArray();
    byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();

    // Write out sample file with checksums
    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
    Path path = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);

    try (ParquetFileReader reader = getParquetFileReader(path, conf,
      Arrays.asList(colCIdDesc, colDValDesc))) {
      PageReadStore pageReadStore = reader.readNextRowGroup();

      DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore);
      assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes));
      assertCorrectContent(colCIdPage.getBytes().toByteArray(), colCIdPageBytes);

      DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
      assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
      assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
    }
  }
}
 
示例16
@Test
public void testDictionaryEncoding() throws IOException {
  Configuration conf = new Configuration();

  // Write out dictionary encoded sample file via the non-checksum code path, extract the raw
  // bytes to calculate the  reference crc with
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
  conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
  Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);

  try (ParquetFileReader refReader =
    getParquetFileReader(refPath, conf, Collections.singletonList(colDValDesc))) {
    PageReadStore refPageReadStore = refReader.readNextRowGroup();
    // Read (decompressed) dictionary page
    byte[] dictPageBytes = readDictPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
    byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();

    // Write out sample file with checksums
    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
    Path path = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);

    try (ParquetFileReader reader =
      getParquetFileReader(path, conf, Collections.singletonList(colDValDesc))) {
      PageReadStore pageReadStore = reader.readNextRowGroup();

      DictionaryPage dictPage = readDictPage(colDValDesc, pageReadStore);
      assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes));
      assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes);

      DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
      assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
      assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
    }
  }
}
 
示例17
private static <T extends Comparable<T>> Statistics<T> getStatisticsFromPageHeader(DataPage page) {
  return page.accept(new DataPage.Visitor<Statistics<T>>() {
    @Override
    @SuppressWarnings("unchecked")
    public Statistics<T> visit(DataPageV1 dataPageV1) {
      return (Statistics<T>) dataPageV1.getStatistics();
    }

    @Override
    @SuppressWarnings("unchecked")
    public Statistics<T> visit(DataPageV2 dataPageV2) {
      return (Statistics<T>) dataPageV2.getStatistics();
    }
  });
}
 
示例18
@Override
public String visit(DataPageV1 page) {
  String enc = encodingAsString(page.getValueEncoding(), false);
  long totalSize = page.getCompressedSize();
  int count = page.getValueCount();
  String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) : "";
  float perValue = ((float) totalSize) / count;
  String minMax = minMaxAsString(page.getStatistics());
  return String.format("%3d-%-3d  %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
      rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
      humanReadable(totalSize), "", numNulls, minMax);
}
 
示例19
@Override
protected void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor desc, ByteBufferInputStream in,
                                          int triplesCount) throws IOException {
  this.vectorizedDefinitionLevelReader = newVectorizedDefinitionLevelReader(desc);
  this.vectorizedDefinitionLevelReader.initFromPage(triplesCount, in);
}
 
示例20
protected abstract void initDefinitionLevelsReader(DataPageV1 dataPageV1, ColumnDescriptor descriptor,
ByteBufferInputStream in, int count) throws IOException;
 
示例21
/**
 * Reads `total` values from this columnReader into column.
 */
@Override
public final void readToVector(int readNumber, VECTOR vector) throws IOException {
	int rowId = 0;
	WritableIntVector dictionaryIds = null;
	if (dictionary != null) {
		dictionaryIds = vector.reserveDictionaryIds(readNumber);
	}
	while (readNumber > 0) {
		// Compute the number of values we want to read in this page.
		int leftInPage = (int) (endOfPageValueCount - valuesRead);
		if (leftInPage == 0) {
			DataPage page = pageReader.readPage();
			if (page instanceof DataPageV1) {
				readPageV1((DataPageV1) page);
			} else if (page instanceof DataPageV2) {
				readPageV2((DataPageV2) page);
			} else {
				throw new RuntimeException("Unsupported page type: " + page.getClass());
			}
			leftInPage = (int) (endOfPageValueCount - valuesRead);
		}
		int num = Math.min(readNumber, leftInPage);
		if (isCurrentPageDictionaryEncoded) {
			// Read and decode dictionary ids.
			runLenDecoder.readDictionaryIds(
					num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);

			if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
				// Column vector supports lazy decoding of dictionary values so just set the dictionary.
				// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
				// non-dictionary encoded values have already been added).
				vector.setDictionary(new ParquetDictionary(dictionary));
			} else {
				readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
			}
		} else {
			if (vector.hasDictionary() && rowId != 0) {
				// This batch already has dictionary encoded values but this new page is not. The batch
				// does not support a mix of dictionary and not so we will decode the dictionary.
				readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
			}
			vector.setDictionary(null);
			readBatch(rowId, num, vector);
		}

		valuesRead += num;
		rowId += num;
		readNumber -= num;
	}
}
 
示例22
/**
 * Test whether corruption in the page content is detected by checksum verification
 */
@Test
public void testCorruptedPage() throws IOException {
  Configuration conf = new Configuration();
  conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);

  Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);

  InputFile inputFile = HadoopInputFile.fromPath(path, conf);
  try (SeekableInputStream inputStream = inputFile.newStream()) {
    int fileLen = (int) inputFile.getLength();
    byte[] fileBytes = new byte[fileLen];
    inputStream.readFully(fileBytes);
    inputStream.close();

    // There are 4 pages in total (2 per column), we corrupt the first page of the first column
    // and the second page of the second column. We do this by altering a byte roughly in the
    // middle of each page to be corrupted
    fileBytes[fileLen / 8]++;
    fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++;

    OutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
    try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) {
      outputStream.write(fileBytes);
      outputStream.close();

      // First we disable checksum verification, the corruption will go undetected as it is in the
      // data section of the page
      conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
      try (ParquetFileReader reader = getParquetFileReader(path, conf,
        Arrays.asList(colADesc, colBDesc))) {
        PageReadStore pageReadStore = reader.readNextRowGroup();

        DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
        assertFalse("Data in page was not corrupted",
          Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
        readNextPage(colADesc, pageReadStore);
        readNextPage(colBDesc, pageReadStore);
        DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
        assertFalse("Data in page was not corrupted",
          Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
      }

      // Now we enable checksum verification, the corruption should be detected
      conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
      try (ParquetFileReader reader =
             getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) {
        // We expect an exception on the first encountered corrupt page (in readAllPages)
        assertVerificationFailed(reader);
      }
    }
  }
}
 
示例23
/** Read the next page for a column */
private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) {
  return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage();
}
 
示例24
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
  PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
  DataPage page = pageReader.readPage();
  assertEquals(values, page.getValueCount());
  assertArrayEquals(bytes.toByteArray(), ((DataPageV1)page).getBytes().toByteArray());
}