Java源码示例:org.apache.nifi.provenance.serialization.RecordReader
示例1
private Long getFirstEntryTime(final File provenanceLogFile) {
if (provenanceLogFile == null) {
return null;
}
try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord firstRecord = reader.nextRecord();
if (firstRecord == null) {
return provenanceLogFile.lastModified();
}
return firstRecord.getEventTime();
} catch (final FileNotFoundException | EOFException fnf) {
return null; // file no longer exists or there's no record in this file
} catch (final IOException ioe) {
logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
logger.warn("", ioe);
return null;
}
}
示例2
private long determineFirstEventTimestamp() {
// Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
// in the event file.
final List<File> logFiles = getSortedLogFiles();
if (logFiles.isEmpty()) {
return 0L;
}
for (final File logFile : logFiles) {
try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord event = reader.nextRecord();
if (event != null) {
return event.getEventTime();
}
} catch (final IOException ioe) {
logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
}
}
return 0L;
}
示例3
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField == null ) {
reader.skipTo(getByteOffset(d, reader));
} else {
reader.skipToBlock(blockField.numericValue().intValue());
}
StandardProvenanceEventRecord record;
while ( (record = reader.nextRecord()) != null) {
final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
break;
}
}
if (record == null) {
logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
}
return record;
}
示例4
public static void main(final String[] args) throws IOException {
if (args.length != 1) {
printUsage();
return;
}
final File file = new File(args[0]);
if (!file.exists()) {
System.out.println("Cannot find file " + file.getAbsolutePath());
return;
}
try (final RecordReader reader = RecordReaders.newRecordReader(file, Collections.emptyList(), 65535)) {
StandardProvenanceEventRecord event;
int index = 0;
while ((event = reader.nextRecord()) != null) {
final long byteOffset = reader.getBytesConsumed();
final String string = stringify(event, index++, byteOffset);
System.out.println(string);
}
}
}
示例5
@Override
public Optional<ProvenanceEventRecord> getEvent(final long id) throws IOException {
final Optional<File> option = getPathForEventId(id);
if (!option.isPresent()) {
return Optional.empty();
}
try (final RecordReader reader = recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), config.getMaxAttributeChars())) {
final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(id);
if (!eventOption.isPresent()) {
return eventOption;
}
// If an event is returned, the event may be the one we want, or it may be an event with a
// higher event ID, if the desired event is not in the record reader. So we need to get the
// event and check the Event ID to know whether to return the empty optional or the Optional
// that was returned.
final ProvenanceEventRecord event = eventOption.get();
if (event.getEventId() == id) {
return eventOption;
} else {
return Optional.empty();
}
}
}
示例6
@Test
public void testSimpleWriteWithToc() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0);
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri());
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
示例7
@Test
public void testSingleRecordCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(0, reader.getBlockIndex());
reader.skipToBlock(0);
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri());
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
示例8
private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
File[] storagefiles = storageDir.listFiles();
long counter = 0;
assertNotNull(storagefiles);
for (final File file : storagefiles) {
if (file.isFile()) {
try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
ProvenanceEventRecord r;
ProvenanceEventRecord last = null;
while ((r = reader.nextRecord()) != null) {
if (exact) {
assertTrue(counter++ == r.getEventId());
} else {
assertTrue(counter++ <= r.getEventId());
}
}
}
}
}
return counter;
}
示例9
@Test
public void testAddOneRecordReadTwice() throws IOException {
final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);
final Map<RecordField, Object> toAdd = new HashMap<>();
toAdd.put(unitTestField, "hello");
try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
writer.writeHeader(1L);
writer.writeRecord(createEvent());
}
try (final InputStream in = new FileInputStream(journalFile);
final TocReader tocReader = new StandardTocReader(tocFile);
final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {
final ProvenanceEventRecord firstEvent = reader.nextRecord();
assertNotNull(firstEvent);
final ProvenanceEventRecord secondEvent = reader.nextRecord();
assertNull(secondEvent);
}
}
示例10
private long determineFirstEventTimestamp() {
// Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
// in the event file.
final List<File> logFiles = getSortedLogFiles();
if (logFiles.isEmpty()) {
return 0L;
}
for (final File logFile : logFiles) {
try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord event = reader.nextRecord();
if (event != null) {
return event.getEventTime();
}
} catch (final IOException ioe) {
logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
}
}
return 0L;
}
示例11
private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
File[] storagefiles = storageDir.listFiles();
long counter = 0;
assertNotNull(storagefiles);
for (final File file : storagefiles) {
if (file.isFile()) {
try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
ProvenanceEventRecord r;
ProvenanceEventRecord last = null;
while ((r = reader.nextRecord()) != null) {
if (exact) {
assertTrue(counter++ == r.getEventId());
} else {
assertTrue(counter++ <= r.getEventId());
}
}
}
}
}
return counter;
}
示例12
private Long getFirstEntryTime(final File provenanceLogFile) {
if (provenanceLogFile == null) {
return null;
}
try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord firstRecord = reader.nextRecord();
if (firstRecord == null) {
return provenanceLogFile.lastModified();
}
return firstRecord.getEventTime();
} catch (final FileNotFoundException | EOFException fnf) {
return null; // file no longer exists or there's no record in this file
} catch (final IOException ioe) {
logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
logger.warn("", ioe);
return null;
}
}
示例13
private long determineFirstEventTimestamp() {
// Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
// in the event file.
final List<File> logFiles = getSortedLogFiles();
if (logFiles.isEmpty()) {
return 0L;
}
for (final File logFile : logFiles) {
try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
final StandardProvenanceEventRecord event = reader.nextRecord();
if (event != null) {
return event.getEventTime();
}
} catch (final IOException ioe) {
logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
}
}
return 0L;
}
示例14
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField == null ) {
reader.skipTo(getByteOffset(d, reader));
} else {
reader.skipToBlock(blockField.numericValue().intValue());
}
StandardProvenanceEventRecord record;
while ( (record = reader.nextRecord()) != null) {
final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
break;
}
}
if (record == null) {
logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
}
return record;
}
示例15
public static void main(final String[] args) throws IOException {
if (args.length != 1) {
printUsage();
return;
}
final File file = new File(args[0]);
if (!file.exists()) {
System.out.println("Cannot find file " + file.getAbsolutePath());
return;
}
try (final RecordReader reader = RecordReaders.newRecordReader(file, Collections.emptyList(), 65535)) {
StandardProvenanceEventRecord event;
int index = 0;
while ((event = reader.nextRecord()) != null) {
final long byteOffset = reader.getBytesConsumed();
final String string = stringify(event, index++, byteOffset);
System.out.println(string);
}
}
}
示例16
@Override
public Optional<ProvenanceEventRecord> getEvent(final long id) throws IOException {
final Optional<File> option = getPathForEventId(id);
if (!option.isPresent()) {
return Optional.empty();
}
try (final RecordReader reader = recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), config.getMaxAttributeChars())) {
final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(id);
if (!eventOption.isPresent()) {
return eventOption;
}
// If an event is returned, the event may be the one we want, or it may be an event with a
// higher event ID, if the desired event is not in the record reader. So we need to get the
// event and check the Event ID to know whether to return the empty optional or the Optional
// that was returned.
final ProvenanceEventRecord event = eventOption.get();
if (event.getEventId() == id) {
return eventOption;
} else {
return Optional.empty();
}
}
}
示例17
private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
File[] storagefiles = storageDir.listFiles();
long counter = 0;
assertNotNull(storagefiles);
for (final File file : storagefiles) {
if (file.isFile()) {
try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
ProvenanceEventRecord r;
ProvenanceEventRecord last = null;
while ((r = reader.nextRecord()) != null) {
if (exact) {
assertTrue(counter++ == r.getEventId());
} else {
assertTrue(counter++ <= r.getEventId());
}
}
}
}
}
return counter;
}
示例18
@Test
public void testAddOneRecordReadTwice() throws IOException {
final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);
final Map<RecordField, Object> toAdd = new HashMap<>();
toAdd.put(unitTestField, "hello");
try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
writer.writeHeader(1L);
writer.writeRecord(createEvent());
}
try (final InputStream in = new FileInputStream(journalFile);
final TocReader tocReader = new StandardTocReader(tocFile);
final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {
final ProvenanceEventRecord firstEvent = reader.nextRecord();
assertNotNull(firstEvent);
final ProvenanceEventRecord secondEvent = reader.nextRecord();
assertNull(secondEvent);
}
}
示例19
@Override
public File execute(final File expiredFile) throws IOException {
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, null, Integer.MAX_VALUE)) {
final long maxEventId = reader.getMaxEventId();
indexConfig.setMinIdIndexed(maxEventId);
logger.info("Updated Minimum Event ID for Provenance Event Repository - Minimum Event ID now {}", maxEventId);
} catch (final IOException ioe) {
logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
}
return expiredFile;
}
示例20
private long getByteOffset(final Document d, final RecordReader reader) {
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField != null ) {
final int blockIndex = blockField.numericValue().intValue();
final TocReader tocReader = reader.getTocReader();
return tocReader.getBlockOffset(blockIndex);
}
return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
}
示例21
@Test
public void testMultipleRecordsSameBlockCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new record each 1 MB of uncompressed data
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 1024 * 1024);
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent());
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < 10; i++) {
assertEquals(0, reader.getBlockIndex());
// call skipToBlock half the time to ensure that we can; avoid calling it
// the other half of the time to ensure that it's okay.
if (i <= 5) {
reader.skipToBlock(0);
}
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri());
}
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
示例22
@Test
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new block each 10 bytes
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 100);
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent());
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < 10; i++) {
final StandardProvenanceEventRecord recovered = reader.nextRecord();
System.out.println(recovered);
assertNotNull(recovered);
assertEquals(i, recovered.getEventId());
assertEquals("nifi://unit-test", recovered.getTransitUri());
final Map<String, String> updatedAttrs = recovered.getUpdatedAttributes();
assertNotNull(updatedAttrs);
assertEquals(2, updatedAttrs.size());
assertEquals("1.txt", updatedAttrs.get("filename"));
assertTrue(updatedAttrs.containsKey("uuid"));
}
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
示例23
@Test
public void testFieldAddedToSchema() throws IOException {
final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);
final Map<RecordField, Object> toAdd = new HashMap<>();
toAdd.put(unitTestField, "hello");
try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.writeRecord(createEvent());
}
try (final InputStream in = new FileInputStream(journalFile);
final TocReader tocReader = new StandardTocReader(tocFile);
final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {
for (int i = 0; i < 2; i++) {
final StandardProvenanceEventRecord event = reader.nextRecord();
assertNotNull(event);
assertEquals("1234", event.getComponentId());
assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
assertNotNull(event.getUpdatedAttributes());
assertFalse(event.getUpdatedAttributes().isEmpty());
}
}
}
示例24
@Override
public File execute(final File expiredFile) throws IOException {
try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, null, Integer.MAX_VALUE)) {
final long maxEventId = reader.getMaxEventId();
indexConfig.setMinIdIndexed(maxEventId);
logger.info("Updated Minimum Event ID for Provenance Event Repository - Minimum Event ID now {}", maxEventId);
} catch (final IOException ioe) {
logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
}
return expiredFile;
}
示例25
private long getByteOffset(final Document d, final RecordReader reader) {
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField != null ) {
final int blockIndex = blockField.numericValue().intValue();
final TocReader tocReader = reader.getTocReader();
return tocReader.getBlockOffset(blockIndex);
}
return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
}
示例26
private void assertRecoveredRecord(File journalFile, TocReader tocReader, String expectedTransitUri, int expectedBlockIndex) throws IOException {
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
assertEquals(expectedBlockIndex, reader.getBlockIndex());
reader.skipToBlock(expectedBlockIndex);
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals(expectedTransitUri, recovered.getTransitUri());
assertNull(reader.nextRecord());
}
}
示例27
@Test
public void testMultipleRecordsSameBlockCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new record each 1 MB of uncompressed data
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 1024 * 1024);
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent());
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < 10; i++) {
assertEquals(0, reader.getBlockIndex());
// call skipToBlock half the time to ensure that we can; avoid calling it
// the other half of the time to ensure that it's okay.
if (i <= 5) {
reader.skipToBlock(0);
}
final StandardProvenanceEventRecord recovered = reader.nextRecord();
assertNotNull(recovered);
assertEquals("nifi://unit-test", recovered.getTransitUri());
}
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
示例28
@Test
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
final File tocFile = TocUtil.getTocFile(journalFile);
final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
// new block each 10 bytes
final RecordWriter writer = createWriter(journalFile, tocWriter, true, 100);
writer.writeHeader(1L);
for (int i = 0; i < 10; i++) {
writer.writeRecord(createEvent());
}
writer.close();
final TocReader tocReader = new StandardTocReader(tocFile);
try (final FileInputStream fis = new FileInputStream(journalFile);
final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
for (int i = 0; i < 10; i++) {
final StandardProvenanceEventRecord recovered = reader.nextRecord();
System.out.println(recovered);
assertNotNull(recovered);
assertEquals(i, recovered.getEventId());
assertEquals("nifi://unit-test", recovered.getTransitUri());
final Map<String, String> updatedAttrs = recovered.getUpdatedAttributes();
assertNotNull(updatedAttrs);
assertEquals(2, updatedAttrs.size());
assertEquals("1.txt", updatedAttrs.get("filename"));
assertTrue(updatedAttrs.containsKey("uuid"));
}
assertNull(reader.nextRecord());
}
FileUtils.deleteFile(journalFile.getParentFile(), true);
}
示例29
@Test
public void testFieldAddedToSchema() throws IOException {
final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);
final Map<RecordField, Object> toAdd = new HashMap<>();
toAdd.put(unitTestField, "hello");
try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
writer.writeHeader(1L);
writer.writeRecord(createEvent());
writer.writeRecord(createEvent());
}
try (final InputStream in = new FileInputStream(journalFile);
final TocReader tocReader = new StandardTocReader(tocFile);
final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {
for (int i = 0; i < 2; i++) {
final StandardProvenanceEventRecord event = reader.nextRecord();
assertNotNull(event);
assertEquals("1234", event.getComponentId());
assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
assertNotNull(event.getUpdatedAttributes());
assertFalse(event.getUpdatedAttributes().isEmpty());
}
}
}
示例30
@Override
public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
final List<ProvenanceEventRecord> records = new ArrayList<>(maxRecords);
final List<Path> paths = getPathsForId(firstRecordId);
if (paths == null || paths.isEmpty()) {
return records;
}
for (final Path path : paths) {
try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), maxAttributeChars)) {
// if this is the first record, try to find out the block index and jump directly to
// the block index. This avoids having to read through a lot of data that we don't care about
// just to get to the first record that we want.
if (records.isEmpty()) {
final TocReader tocReader = reader.getTocReader();
if (tocReader != null) {
final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId);
if (blockIndex != null) {
reader.skipToBlock(blockIndex);
}
}
}
StandardProvenanceEventRecord record;
while (records.size() < maxRecords && (record = reader.nextRecord()) != null) {
if (record.getEventId() >= firstRecordId && isAuthorized(record, user)) {
records.add(record);
}
}
} catch (final EOFException | FileNotFoundException fnfe) {
// assume file aged off (or there's no data in file, in case of EOFException, which indicates that data was cached
// in operating system and entire O/S crashed and always.sync was not turned on.)
} catch (final IOException ioe) {
logger.error("Failed to read Provenance Event File {} due to {}", path.toFile(), ioe.toString());
logger.error("", ioe);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to read Provenance Event File " + path.toFile() + " due to " + ioe.toString());
}
if (records.size() >= maxRecords) {
break;
}
}
if (logger.isDebugEnabled()) {
logger.debug("Retrieving up to {} records starting at Event ID {}; returning {} events", maxRecords, firstRecordId, records.size());
}
return records;
}