Java源码示例:org.apache.hadoop.hbase.io.FSDataInputStreamWrapper
示例1
/**
* @param fs
* @param p
* @param cacheConf
* @param in
* @param size
* @param r
* @param conf
* @param indexMaintainers
* @param viewConstants
* @param regionInfo
* @param regionStartKeyInHFile
* @param splitKey
* @throws IOException
*/
public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
final FSDataInputStreamWrapper in, long size, final Reference r,
final Configuration conf,
final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
final byte[][] viewConstants, final HRegionInfo regionInfo,
byte[] regionStartKeyInHFile, byte[] splitKey) throws IOException {
super(fs, p, in, size, cacheConf, conf);
this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
// Is it top or bottom half?
this.top = Reference.isTopFileRegion(r.getFileRegion());
this.splitRow = CellUtil.cloneRow(KeyValue.createKeyValueFromKey(splitkey));
this.indexMaintainers = indexMaintainers;
this.viewConstants = viewConstants;
this.regionInfo = regionInfo;
this.regionStartKeyInHFile = regionStartKeyInHFile;
this.offset = regionStartKeyInHFile.length;
}
示例2
@Override
public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Reference r, StoreFileReader reader) throws IOException {
final StoreFileReader ret;
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerAuthorizationCoprocessor.preStoreFileReaderOpen()");
}
try {
activatePluginClassLoader();
ret = implRegionObserver.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerAuthorizationCoprocessor.preStoreFileReaderOpen()");
}
return ret;
}
示例3
@Override
public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, FileSystem fs, Path p, FSDataInputStreamWrapper in, long size,
CacheConfig cacheConf, Reference r, StoreFileReader reader) throws IOException {
final StoreFileReader ret;
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerAuthorizationCoprocessor.postStoreFileReaderOpen()");
}
try {
activatePluginClassLoader();
ret = implRegionObserver.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
} finally {
deactivatePluginClassLoader();
}
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerAuthorizationCoprocessor.postStoreFileReaderOpen()");
}
return ret;
}
示例4
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param r original reference file. This will be not null only when reading a split file.
* @return a Reader instance to use instead of the base reader if overriding
* default behavior, null otherwise
* @throws IOException
*/
public StoreFileReader preStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r) throws IOException {
if (coprocEnvironments.isEmpty()) {
return null;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, null) {
@Override
public StoreFileReader call(RegionObserver observer) throws IOException {
return observer.preStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
getResult());
}
});
}
示例5
/**
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader instance
* @return The reader to use
* @throws IOException
*/
public StoreFileReader postStoreFileReaderOpen(final FileSystem fs, final Path p,
final FSDataInputStreamWrapper in, final long size, final CacheConfig cacheConf,
final Reference r, final StoreFileReader reader) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
return reader;
}
return execOperationWithResult(
new ObserverOperationWithResult<RegionObserver, StoreFileReader>(regionObserverGetter, reader) {
@Override
public StoreFileReader call(RegionObserver observer) throws IOException {
return observer.postStoreFileReaderOpen(this, fs, p, in, size, cacheConf, r,
getResult());
}
});
}
示例6
/**
* @param fs filesystem
* @param path Path to file to read
* @param cacheConf This must not be null. @see
* {@link org.apache.hadoop.hbase.io.hfile.CacheConfig#CacheConfig(Configuration)}
* @param primaryReplicaReader true if this is a reader for primary replica
* @param conf Configuration
* @return an active Reader instance
* @throws IOException Will throw a CorruptHFileException (DoNotRetryIOException subtype) if hfile
* is corrupt/invalid.
*/
public static Reader createReader(FileSystem fs, Path path, CacheConfig cacheConf,
boolean primaryReplicaReader, Configuration conf) throws IOException {
Preconditions.checkNotNull(cacheConf, "Cannot create Reader with null CacheConf");
FSDataInputStreamWrapper stream = new FSDataInputStreamWrapper(fs, path);
ReaderContext context = new ReaderContextBuilder()
.withFilePath(path)
.withInputStreamWrapper(stream)
.withFileSize(fs.getFileStatus(path).getLen())
.withFileSystem(stream.getHfs())
.withPrimaryReplicaReader(primaryReplicaReader)
.withReaderType(ReaderType.PREAD)
.build();
HFileInfo fileInfo = new HFileInfo(context, conf);
Reader reader = createReader(context, fileInfo, cacheConf, conf);
fileInfo.initMetaAndIndex(reader);
return reader;
}
示例7
/**
* @param fs
* @param p
* @param cacheConf
* @param in
* @param size
* @param r
* @param conf
* @param indexMaintainers
* @param viewConstants
* @param regionInfo
* @param regionStartKeyInHFile
* @param splitKey
* @throws IOException
*/
public IndexHalfStoreFileReader(final FileSystem fs, final Path p, final CacheConfig cacheConf,
final FSDataInputStreamWrapper in, long size, final Reference r,
final Configuration conf,
final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers,
final byte[][] viewConstants, final RegionInfo regionInfo,
byte[] regionStartKeyInHFile, byte[] splitKey, boolean primaryReplicaStoreFile,
AtomicInteger refCount, RegionInfo currentRegion) throws IOException {
super(fs, p, in, size, cacheConf, primaryReplicaStoreFile, refCount, false,
conf);
this.splitkey = splitKey == null ? r.getSplitKey() : splitKey;
// Is it top or bottom half?
this.top = Reference.isTopFileRegion(r.getFileRegion());
this.splitRow = CellUtil.cloneRow(new KeyValue.KeyOnlyKeyValue(splitkey));
this.indexMaintainers = indexMaintainers;
this.viewConstants = viewConstants;
this.childRegionInfo = regionInfo;
this.regionStartKeyInHFile = regionStartKeyInHFile;
this.offset = regionStartKeyInHFile.length;
this.refCount = refCount;
this.currentRegion = currentRegion;
}
示例8
public ReaderContext(Path filePath, FSDataInputStreamWrapper fsdis, long fileSize,
HFileSystem hfs, boolean primaryReplicaReader, ReaderType type) {
this.filePath = filePath;
this.fsdis = fsdis;
this.fileSize = fileSize;
this.hfs = hfs;
this.primaryReplicaReader = primaryReplicaReader;
this.type = type;
}
示例9
@VisibleForTesting
public ReaderContextBuilder withFileSystemAndPath(FileSystem fs, Path filePath)
throws IOException {
this.withFileSystem(fs)
.withFilePath(filePath)
.withFileSize(fs.getFileStatus(filePath).getLen())
.withInputStreamWrapper(new FSDataInputStreamWrapper(fs, filePath));
return this;
}
示例10
@Override
public StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, StoreFileReader reader) throws IOException {
ctPreStoreFileReaderOpen.incrementAndGet();
return null;
}
示例11
@Override
public StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, StoreFileReader reader) throws IOException {
ctPostStoreFileReaderOpen.incrementAndGet();
return reader;
}
示例12
@Test
public void testNewBlocksHaveDefaultChecksum() throws IOException {
Path path = new Path(TEST_UTIL.getDataTestDir(), "default_checksum");
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder().build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1000; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(is)
.withFileSize(totalSize)
.withFileSystem((HFileSystem) fs)
.withFilePath(path)
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(!b.isSharedMem());
assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode());
}
示例13
/**
* Read all blocks from {@code path} to populate {@code blockCache}.
*/
private static void cacheBlocks(Configuration conf, CacheConfig cacheConfig, FileSystem fs,
Path path, HFileContext cxt) throws IOException {
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, path);
long fileSize = fs.getFileStatus(path).getLen();
FixedFileTrailer trailer =
FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
ReaderContext context = new ReaderContextBuilder()
.withFilePath(path)
.withFileSize(fileSize)
.withFileSystem(fsdis.getHfs())
.withInputStreamWrapper(fsdis)
.build();
HFileInfo fileInfo = new HFileInfo(context, conf);
HFile.Reader reader = new HFilePreadReader(context, fileInfo, cacheConfig, conf);
fileInfo.initMetaAndIndex(reader);
long offset = trailer.getFirstDataBlockOffset(),
max = trailer.getLastDataBlockOffset();
List<HFileBlock> blocks = new ArrayList<>(4);
HFileBlock block;
while (offset <= max) {
block = reader.readBlock(offset, -1, /* cacheBlock */ true, /* pread */ false,
/* isCompaction */ false, /* updateCacheMetrics */ true, null, null);
offset += block.getOnDiskSizeWithHeader();
blocks.add(block);
}
LOG.info("read " + Iterables.toString(blocks));
reader.close();
}
示例14
@Override
public long getTotalBytesRead() {
return FSDataInputStreamWrapper.getTotalBytesRead();
}
示例15
@Override
public long getLocalBytesRead() {
return FSDataInputStreamWrapper.getLocalBytesRead();
}
示例16
@Override
public long getShortCircuitBytesRead() {
return FSDataInputStreamWrapper.getShortCircuitBytesRead();
}
示例17
@Override
public long getZeroCopyBytesRead() {
return FSDataInputStreamWrapper.getZeroCopyBytesRead();
}
示例18
public int processFile(Path file, boolean checkRootDir) throws IOException {
if (verbose) {
out.println("Scanning -> " + file);
}
if (checkRootDir) {
Path rootPath = CommonFSUtils.getRootDir(getConf());
String rootString = rootPath + rootPath.SEPARATOR;
if (!file.toString().startsWith(rootString)) {
// First we see if fully-qualified URI matches the root dir. It might
// also be an absolute path in the same filesystem, so we prepend the FS
// of the root dir and see if that fully-qualified URI matches.
FileSystem rootFS = rootPath.getFileSystem(getConf());
String qualifiedFile = rootFS.getUri().toString() + file.toString();
if (!qualifiedFile.startsWith(rootString)) {
err.println(
"ERROR, file (" + file + ") is not in HBase's root directory (" + rootString + ")");
return -2;
}
}
}
FileSystem fs = file.getFileSystem(getConf());
if (!fs.exists(file)) {
err.println("ERROR, file doesnt exist: " + file);
return -2;
}
HFile.Reader reader = HFile.createReader(fs, file, CacheConfig.DISABLED, true, getConf());
Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
KeyValueStatsCollector fileStats = null;
if (verbose || printKey || checkRow || checkFamily || printStats || checkMobIntegrity) {
// scan over file and read key/value's and check if requested
HFileScanner scanner = reader.getScanner(false, false, false);
fileStats = new KeyValueStatsCollector();
boolean shouldScanKeysValues = false;
if (this.isSeekToRow) {
// seek to the first kv on this row
shouldScanKeysValues =
(scanner.seekTo(PrivateCellUtil.createFirstOnRow(this.row)) != -1);
} else {
shouldScanKeysValues = scanner.seekTo();
}
if (shouldScanKeysValues)
scanKeysValues(file, fileStats, scanner, row);
}
// print meta data
if (shouldPrintMeta) {
printMeta(reader, fileInfo);
}
if (printBlockIndex) {
out.println("Block Index:");
out.println(reader.getDataBlockIndexReader());
}
if (printBlockHeaders) {
out.println("Block Headers:");
/*
* TODO: this same/similar block iteration logic is used in HFileBlock#blockRange and
* TestLazyDataBlockDecompression. Refactor?
*/
FSDataInputStreamWrapper fsdis = new FSDataInputStreamWrapper(fs, file);
long fileSize = fs.getFileStatus(file).getLen();
FixedFileTrailer trailer =
FixedFileTrailer.readFromStream(fsdis.getStream(false), fileSize);
long offset = trailer.getFirstDataBlockOffset(),
max = trailer.getLastDataBlockOffset();
HFileBlock block;
while (offset <= max) {
block = reader.readBlock(offset, -1, /* cacheBlock */ false, /* pread */ false,
/* isCompaction */ false, /* updateCacheMetrics */ false, null, null);
offset += block.getOnDiskSizeWithHeader();
out.println(block);
}
}
if (printStats) {
fileStats.finish();
out.println("Stats:\n" + fileStats);
}
reader.close();
return 0;
}
示例19
public FSDataInputStreamWrapper getInputStreamWrapper() {
return this.fsdis;
}
示例20
public ReaderContextBuilder withInputStreamWrapper(FSDataInputStreamWrapper fsdis) {
this.fsdis = fsdis;
return this;
}
示例21
protected void testReaderV2Internals() throws IOException {
if(includesTag) {
TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3);
}
for (Compression.Algorithm algo : COMPRESSION_ALGORITHMS) {
for (boolean pread : new boolean[] { false, true }) {
LOG.info("testReaderV2: Compression algorithm: " + algo +
", pread=" + pread);
Path path = new Path(TEST_UTIL.getDataTestDir(), "blocks_v2_"
+ algo);
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder()
.withCompression(algo)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withBytesPerCheckSum(HFile.DEFAULT_BYTES_PER_CHECKSUM)
.build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null,
meta);
long totalSize = 0;
for (int blockId = 0; blockId < 2; ++blockId) {
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < 1234; ++i)
dos.writeInt(i);
hbw.writeHeaderAndData(os);
totalSize += hbw.getOnDiskSizeWithHeader();
}
os.close();
FSDataInputStream is = fs.open(path);
meta = new HFileContextBuilder()
.withHBaseCheckSum(true)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(algo).build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(totalSize)
.withFilePath(path)
.withFileSystem(fs)
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
is.close();
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
b.sanityCheck();
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
HFileBlock expected = b;
if (algo == GZ) {
is = fs.open(path);
ReaderContext readerContext = new ReaderContextBuilder()
.withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(totalSize)
.withFilePath(path)
.withFileSystem(fs)
.build();
hbr = new HFileBlock.FSReaderImpl(readerContext, meta, alloc);
b = hbr.readBlockData(0,
2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true);
assertEquals(expected, b);
int wrongCompressedSize = 2172;
try {
hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread,
false, true);
fail("Exception expected");
} catch (IOException ex) {
String expectedPrefix = "Passed in onDiskSizeWithHeader=";
assertTrue("Invalid exception message: '" + ex.getMessage()
+ "'.\nMessage is expected to start with: '" + expectedPrefix
+ "'", ex.getMessage().startsWith(expectedPrefix));
}
assertRelease(b);
is.close();
}
assertRelease(expected);
}
}
}
示例22
protected void testConcurrentReadingInternals() throws IOException,
InterruptedException, ExecutionException {
for (Compression.Algorithm compressAlgo : COMPRESSION_ALGORITHMS) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "concurrent_reading");
Random rand = defaultRandom();
List<Long> offsets = new ArrayList<>();
List<BlockType> types = new ArrayList<>();
writeBlocks(rand, compressAlgo, path, offsets, null, types, null);
FSDataInputStream is = fs.open(path);
long fileSize = fs.getFileStatus(path).getLen();
HFileContext meta = new HFileContextBuilder()
.withHBaseCheckSum(true)
.withIncludesMvcc(includesMemstoreTS)
.withIncludesTags(includesTag)
.withCompression(compressAlgo)
.build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFileSize(fileSize)
.withFilePath(path)
.withFileSystem(fs)
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context, meta, alloc);
Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec);
for (int i = 0; i < NUM_READER_THREADS; ++i) {
ecs.submit(new BlockReaderThread("reader_" + (char) ('A' + i), hbr,
offsets, types, fileSize));
}
for (int i = 0; i < NUM_READER_THREADS; ++i) {
Future<Boolean> result = ecs.take();
assertTrue(result.get());
if (detailedLogging) {
LOG.info(String.valueOf(i + 1)
+ " reader threads finished successfully (algo=" + compressAlgo
+ ")");
}
}
is.close();
}
}
示例23
@Test
public void testDataBlockEncryption() throws IOException {
final int blocks = 10;
final int[] blockSizes = new int[blocks];
for (int i = 0; i < blocks; i++) {
blockSizes[i] = (1024 + RNG.nextInt(1024 * 63)) / Bytes.SIZEOF_INT;
}
for (Compression.Algorithm compression : TestHFileBlock.COMPRESSION_ALGORITHMS) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "block_v3_" + compression + "_AES");
LOG.info("testDataBlockEncryption: encryption=AES compression=" + compression);
long totalSize = 0;
HFileContext fileContext = new HFileContextBuilder()
.withCompression(compression)
.withEncryptionContext(cryptoContext)
.build();
FSDataOutputStream os = fs.create(path);
try {
for (int i = 0; i < blocks; i++) {
totalSize += writeBlock(os, fileContext, blockSizes[i]);
}
} finally {
os.close();
}
FSDataInputStream is = fs.open(path);
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(new FSDataInputStreamWrapper(is))
.withFilePath(path)
.withFileSystem(fs)
.withFileSize(totalSize).build();
try {
HFileBlock.FSReaderImpl hbr = new HFileBlock.FSReaderImpl(context, fileContext,
ByteBuffAllocator.HEAP);
long pos = 0;
for (int i = 0; i < blocks; i++) {
pos += readAndVerifyBlock(pos, fileContext, hbr, blockSizes[i]);
}
} finally {
is.close();
}
}
}
示例24
@Test
public void testVerifyCheckSum() throws IOException {
int intCount = 10000;
for (ChecksumType ckt : ChecksumType.values()) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksum" + ckt.getName());
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder()
.withChecksumType(ckt)
.build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null, meta);
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (int i = 0; i < intCount; ++i) {
dos.writeInt(i);
}
hbw.writeHeaderAndData(os);
int totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
// Use hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
FSDataInputStreamWrapper is = new FSDataInputStreamWrapper(fs, path);
meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(is)
.withFileSize(totalSize)
.withFileSystem((HFileSystem) fs)
.withFilePath(path)
.build();
HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(context,
meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
assertTrue(!b.isSharedMem());
// verify SingleByteBuff checksum.
verifySBBCheckSum(b.getBufferReadOnly());
// verify MultiByteBuff checksum.
verifyMBBCheckSum(b.getBufferReadOnly());
ByteBuff data = b.getBufferWithoutHeader();
for (int i = 0; i < intCount; i++) {
assertEquals(i, data.getInt());
}
try {
data.getInt();
fail();
} catch (BufferUnderflowException e) {
// expected failure
}
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}
示例25
protected void testChecksumInternals(boolean useTags) throws IOException {
Compression.Algorithm algo = NONE;
for (boolean pread : new boolean[] { false, true }) {
for (int bytesPerChecksum : BYTES_PER_CHECKSUM) {
Path path = new Path(TEST_UTIL.getDataTestDir(), "checksumChunk_" +
algo + bytesPerChecksum);
FSDataOutputStream os = fs.create(path);
HFileContext meta = new HFileContextBuilder()
.withCompression(algo)
.withIncludesMvcc(true)
.withIncludesTags(useTags)
.withHBaseCheckSum(true)
.withBytesPerCheckSum(bytesPerChecksum)
.build();
HFileBlock.Writer hbw = new HFileBlock.Writer(null,
meta);
// write one block. The block has data
// that is at least 6 times more than the checksum chunk size
long dataSize = 0;
DataOutputStream dos = hbw.startWriting(BlockType.DATA);
for (; dataSize < 6 * bytesPerChecksum;) {
for (int i = 0; i < 1234; ++i) {
dos.writeInt(i);
dataSize += 4;
}
}
hbw.writeHeaderAndData(os);
long totalSize = hbw.getOnDiskSizeWithHeader();
os.close();
long expectedChunks = ChecksumUtil.numChunks(
dataSize + HConstants.HFILEBLOCK_HEADER_SIZE,
bytesPerChecksum);
LOG.info("testChecksumChunks: pread={}, bytesPerChecksum={}, fileSize={}, "
+ "dataSize={}, expectedChunks={}, compression={}", pread, bytesPerChecksum,
totalSize, dataSize, expectedChunks, algo.toString());
// Verify hbase checksums.
assertEquals(true, hfs.useHBaseChecksum());
// Read data back from file.
FSDataInputStream is = fs.open(path);
FSDataInputStream nochecksum = hfs.getNoChecksumFs().open(path);
meta = new HFileContextBuilder()
.withCompression(algo)
.withIncludesMvcc(true)
.withIncludesTags(useTags)
.withHBaseCheckSum(true)
.withBytesPerCheckSum(bytesPerChecksum)
.build();
ReaderContext context = new ReaderContextBuilder()
.withInputStreamWrapper(new FSDataInputStreamWrapper(is, nochecksum))
.withFileSize(totalSize)
.withFileSystem(hfs)
.withFilePath(path)
.build();
HFileBlock.FSReader hbr =
new HFileBlock.FSReaderImpl(context, meta, ByteBuffAllocator.HEAP);
HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
is.close();
b.sanityCheck();
assertEquals(dataSize, b.getUncompressedSizeWithoutHeader());
// verify that we have the expected number of checksum chunks
assertEquals(totalSize, HConstants.HFILEBLOCK_HEADER_SIZE + dataSize +
expectedChunks * HFileBlock.CHECKSUM_SIZE);
// assert that we did not encounter hbase checksum verification failures
assertEquals(0, HFile.getAndResetChecksumFailuresCount());
}
}
}
示例26
/**
* Called before creation of Reader for a store file.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
*
* @param ctx the environment provided by the region server
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader, if not {@code null}, from previous RegionObserver in the chain
* @return a Reader instance to use instead of the base reader if overriding
* default behavior, null otherwise
* @deprecated For Phoenix only, StoreFileReader is not a stable interface.
*/
@Deprecated
// Passing InterfaceAudience.Private args FSDataInputStreamWrapper, CacheConfig and Reference.
// This is fine as the hook is deprecated any way.
default StoreFileReader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, StoreFileReader reader) throws IOException {
return reader;
}
示例27
/**
* Called after the creation of Reader for a store file.
*
* @param ctx the environment provided by the region server
* @param fs fileystem to read from
* @param p path to the file
* @param in {@link FSDataInputStreamWrapper}
* @param size Full size of the file
* @param cacheConf
* @param r original reference file. This will be not null only when reading a split file.
* @param reader the base reader instance
* @return The reader to use
* @deprecated For Phoenix only, StoreFileReader is not a stable interface.
*/
@Deprecated
// Passing InterfaceAudience.Private args FSDataInputStreamWrapper, CacheConfig and Reference.
// This is fine as the hook is deprecated any way.
default StoreFileReader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
Reference r, StoreFileReader reader) throws IOException {
return reader;
}