Java源码示例:org.apache.nifi.provenance.serialization.RecordReader

示例1
private Long getFirstEntryTime(final File provenanceLogFile) {
    if (provenanceLogFile == null) {
        return null;
    }

    try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
        final StandardProvenanceEventRecord firstRecord = reader.nextRecord();
        if (firstRecord == null) {
            return provenanceLogFile.lastModified();
        }
        return firstRecord.getEventTime();
    } catch (final FileNotFoundException | EOFException fnf) {
        return null; // file no longer exists or there's no record in this file
    } catch (final IOException ioe) {
        logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
        logger.warn("", ioe);
        return null;
    }
}
 
示例2
private long determineFirstEventTimestamp() {
    // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
    // in the event file.
    final List<File> logFiles = getSortedLogFiles();
    if (logFiles.isEmpty()) {
        return 0L;
    }

    for (final File logFile : logFiles) {
        try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
            final StandardProvenanceEventRecord event = reader.nextRecord();
            if (event != null) {
                return event.getEventTime();
            }
        } catch (final IOException ioe) {
            logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
        }
    }

    return 0L;
}
 
示例3
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
    final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
    if ( blockField == null ) {
        reader.skipTo(getByteOffset(d, reader));
    } else {
        reader.skipToBlock(blockField.numericValue().intValue());
    }

    StandardProvenanceEventRecord record;
    while ( (record = reader.nextRecord()) != null) {
        final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
        if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
            break;
        }
    }

    if (record == null) {
        logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
    }

    return record;
}
 
示例4
public static void main(final String[] args) throws IOException {
    if (args.length != 1) {
        printUsage();
        return;
    }

    final File file = new File(args[0]);
    if (!file.exists()) {
        System.out.println("Cannot find file " + file.getAbsolutePath());
        return;
    }

    try (final RecordReader reader = RecordReaders.newRecordReader(file, Collections.emptyList(), 65535)) {
        StandardProvenanceEventRecord event;
        int index = 0;
        while ((event = reader.nextRecord()) != null) {
            final long byteOffset = reader.getBytesConsumed();
            final String string = stringify(event, index++, byteOffset);
            System.out.println(string);
        }
    }
}
 
示例5
@Override
public Optional<ProvenanceEventRecord> getEvent(final long id) throws IOException {
    final Optional<File> option = getPathForEventId(id);
    if (!option.isPresent()) {
        return Optional.empty();
    }

    try (final RecordReader reader = recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), config.getMaxAttributeChars())) {
        final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(id);
        if (!eventOption.isPresent()) {
            return eventOption;
        }

        // If an event is returned, the event may be the one we want, or it may be an event with a
        // higher event ID, if the desired event is not in the record reader. So we need to get the
        // event and check the Event ID to know whether to return the empty optional or the Optional
        // that was returned.
        final ProvenanceEventRecord event = eventOption.get();
        if (event.getEventId() == id) {
            return eventOption;
        } else {
            return Optional.empty();
        }
    }
}
 
示例6
@Test
public void testSimpleWriteWithToc() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    final RecordWriter writer = createWriter(journalFile, tocWriter, false, 1024 * 1024);

    writer.writeHeader(1L);
    writer.writeRecord(createEvent());
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        assertEquals(0, reader.getBlockIndex());
        reader.skipToBlock(0);
        final StandardProvenanceEventRecord recovered = reader.nextRecord();
        assertNotNull(recovered);

        assertEquals("nifi://unit-test", recovered.getTransitUri());
        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
示例7
@Test
public void testSingleRecordCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 8192);

    writer.writeHeader(1L);
    writer.writeRecord(createEvent());
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        assertEquals(0, reader.getBlockIndex());
        reader.skipToBlock(0);
        final StandardProvenanceEventRecord recovered = reader.nextRecord();
        assertNotNull(recovered);

        assertEquals("nifi://unit-test", recovered.getTransitUri());
        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
示例8
private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
    File[] storagefiles = storageDir.listFiles();
    long counter = 0;
    assertNotNull(storagefiles);
    for (final File file : storagefiles) {
        if (file.isFile()) {
            try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
                ProvenanceEventRecord r;
                ProvenanceEventRecord last = null;
                while ((r = reader.nextRecord()) != null) {
                    if (exact) {
                        assertTrue(counter++ == r.getEventId());
                    } else {
                        assertTrue(counter++ <= r.getEventId());
                    }
                }
            }
        }
    }
    return counter;
}
 
示例9
@Test
public void testAddOneRecordReadTwice() throws IOException {
    final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
    final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);

    final Map<RecordField, Object> toAdd = new HashMap<>();
    toAdd.put(unitTestField, "hello");

    try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
        writer.writeHeader(1L);
        writer.writeRecord(createEvent());
    }

    try (final InputStream in = new FileInputStream(journalFile);
        final TocReader tocReader = new StandardTocReader(tocFile);
        final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {

        final ProvenanceEventRecord firstEvent = reader.nextRecord();
        assertNotNull(firstEvent);

        final ProvenanceEventRecord secondEvent = reader.nextRecord();
        assertNull(secondEvent);
    }
}
 
示例10
private long determineFirstEventTimestamp() {
    // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
    // in the event file.
    final List<File> logFiles = getSortedLogFiles();
    if (logFiles.isEmpty()) {
        return 0L;
    }

    for (final File logFile : logFiles) {
        try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
            final StandardProvenanceEventRecord event = reader.nextRecord();
            if (event != null) {
                return event.getEventTime();
            }
        } catch (final IOException ioe) {
            logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
        }
    }

    return 0L;
}
 
示例11
private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
    File[] storagefiles = storageDir.listFiles();
    long counter = 0;
    assertNotNull(storagefiles);
    for (final File file : storagefiles) {
        if (file.isFile()) {
            try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
                ProvenanceEventRecord r;
                ProvenanceEventRecord last = null;
                while ((r = reader.nextRecord()) != null) {
                    if (exact) {
                        assertTrue(counter++ == r.getEventId());
                    } else {
                        assertTrue(counter++ <= r.getEventId());
                    }
                }
            }
        }
    }
    return counter;
}
 
示例12
private Long getFirstEntryTime(final File provenanceLogFile) {
    if (provenanceLogFile == null) {
        return null;
    }

    try (final RecordReader reader = RecordReaders.newRecordReader(provenanceLogFile, null, Integer.MAX_VALUE)) {
        final StandardProvenanceEventRecord firstRecord = reader.nextRecord();
        if (firstRecord == null) {
            return provenanceLogFile.lastModified();
        }
        return firstRecord.getEventTime();
    } catch (final FileNotFoundException | EOFException fnf) {
        return null; // file no longer exists or there's no record in this file
    } catch (final IOException ioe) {
        logger.warn("Failed to read first entry in file {} due to {}", provenanceLogFile, ioe.toString());
        logger.warn("", ioe);
        return null;
    }
}
 
示例13
private long determineFirstEventTimestamp() {
    // Get the timestamp of the first event in the first Provenance Event Log File and the ID of the last event
    // in the event file.
    final List<File> logFiles = getSortedLogFiles();
    if (logFiles.isEmpty()) {
        return 0L;
    }

    for (final File logFile : logFiles) {
        try (final RecordReader reader = RecordReaders.newRecordReader(logFile, null, Integer.MAX_VALUE)) {
            final StandardProvenanceEventRecord event = reader.nextRecord();
            if (event != null) {
                return event.getEventTime();
            }
        } catch (final IOException ioe) {
            logger.warn("Failed to obtain timestamp of first event from Provenance Event Log File {}", logFile);
        }
    }

    return 0L;
}
 
示例14
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
    final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
    if ( blockField == null ) {
        reader.skipTo(getByteOffset(d, reader));
    } else {
        reader.skipToBlock(blockField.numericValue().intValue());
    }

    StandardProvenanceEventRecord record;
    while ( (record = reader.nextRecord()) != null) {
        final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
        if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
            break;
        }
    }

    if (record == null) {
        logger.warn("Failed to read Provenance Event for '" + d + "'. The event file may be missing or corrupted");
    }

    return record;
}
 
示例15
public static void main(final String[] args) throws IOException {
    if (args.length != 1) {
        printUsage();
        return;
    }

    final File file = new File(args[0]);
    if (!file.exists()) {
        System.out.println("Cannot find file " + file.getAbsolutePath());
        return;
    }

    try (final RecordReader reader = RecordReaders.newRecordReader(file, Collections.emptyList(), 65535)) {
        StandardProvenanceEventRecord event;
        int index = 0;
        while ((event = reader.nextRecord()) != null) {
            final long byteOffset = reader.getBytesConsumed();
            final String string = stringify(event, index++, byteOffset);
            System.out.println(string);
        }
    }
}
 
示例16
@Override
public Optional<ProvenanceEventRecord> getEvent(final long id) throws IOException {
    final Optional<File> option = getPathForEventId(id);
    if (!option.isPresent()) {
        return Optional.empty();
    }

    try (final RecordReader reader = recordReaderFactory.newRecordReader(option.get(), Collections.emptyList(), config.getMaxAttributeChars())) {
        final Optional<ProvenanceEventRecord> eventOption = reader.skipToEvent(id);
        if (!eventOption.isPresent()) {
            return eventOption;
        }

        // If an event is returned, the event may be the one we want, or it may be an event with a
        // higher event ID, if the desired event is not in the record reader. So we need to get the
        // event and check the Event ID to know whether to return the empty optional or the Optional
        // that was returned.
        final ProvenanceEventRecord event = eventOption.get();
        if (event.getEventId() == id) {
            return eventOption;
        } else {
            return Optional.empty();
        }
    }
}
 
示例17
private long checkJournalRecords(final File storageDir, final Boolean exact) throws IOException {
    File[] storagefiles = storageDir.listFiles();
    long counter = 0;
    assertNotNull(storagefiles);
    for (final File file : storagefiles) {
        if (file.isFile()) {
            try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
                ProvenanceEventRecord r;
                ProvenanceEventRecord last = null;
                while ((r = reader.nextRecord()) != null) {
                    if (exact) {
                        assertTrue(counter++ == r.getEventId());
                    } else {
                        assertTrue(counter++ <= r.getEventId());
                    }
                }
            }
        }
    }
    return counter;
}
 
示例18
@Test
public void testAddOneRecordReadTwice() throws IOException {
    final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
    final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);

    final Map<RecordField, Object> toAdd = new HashMap<>();
    toAdd.put(unitTestField, "hello");

    try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
        writer.writeHeader(1L);
        writer.writeRecord(createEvent());
    }

    try (final InputStream in = new FileInputStream(journalFile);
        final TocReader tocReader = new StandardTocReader(tocFile);
        final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {

        final ProvenanceEventRecord firstEvent = reader.nextRecord();
        assertNotNull(firstEvent);

        final ProvenanceEventRecord secondEvent = reader.nextRecord();
        assertNull(secondEvent);
    }
}
 
示例19
@Override
public File execute(final File expiredFile) throws IOException {
    try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, null, Integer.MAX_VALUE)) {
        final long maxEventId = reader.getMaxEventId();
        indexConfig.setMinIdIndexed(maxEventId);

        logger.info("Updated Minimum Event ID for Provenance Event Repository - Minimum Event ID now {}", maxEventId);
    } catch (final IOException ioe) {
        logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
    }

    return expiredFile;
}
 
示例20
private long getByteOffset(final Document d, final RecordReader reader) {
    final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
    if ( blockField != null ) {
        final int blockIndex = blockField.numericValue().intValue();
        final TocReader tocReader = reader.getTocReader();
        return tocReader.getBlockOffset(blockIndex);
    }

    return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
}
 
示例21
@Test
public void testMultipleRecordsSameBlockCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    // new record each 1 MB of uncompressed data
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 1024 * 1024);

    writer.writeHeader(1L);
    for (int i = 0; i < 10; i++) {
        writer.writeRecord(createEvent());
    }
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        for (int i = 0; i < 10; i++) {
            assertEquals(0, reader.getBlockIndex());

            // call skipToBlock half the time to ensure that we can; avoid calling it
            // the other half of the time to ensure that it's okay.
            if (i <= 5) {
                reader.skipToBlock(0);
            }

            final StandardProvenanceEventRecord recovered = reader.nextRecord();
            assertNotNull(recovered);
            assertEquals("nifi://unit-test", recovered.getTransitUri());
        }

        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
示例22
@Test
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    // new block each 10 bytes
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 100);

    writer.writeHeader(1L);
    for (int i = 0; i < 10; i++) {
        writer.writeRecord(createEvent());
    }
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        for (int i = 0; i < 10; i++) {
            final StandardProvenanceEventRecord recovered = reader.nextRecord();
            System.out.println(recovered);
            assertNotNull(recovered);
            assertEquals(i, recovered.getEventId());
            assertEquals("nifi://unit-test", recovered.getTransitUri());

            final Map<String, String> updatedAttrs = recovered.getUpdatedAttributes();
            assertNotNull(updatedAttrs);
            assertEquals(2, updatedAttrs.size());
            assertEquals("1.txt", updatedAttrs.get("filename"));
            assertTrue(updatedAttrs.containsKey("uuid"));
        }

        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
示例23
@Test
public void testFieldAddedToSchema() throws IOException {
    final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
    final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);

    final Map<RecordField, Object> toAdd = new HashMap<>();
    toAdd.put(unitTestField, "hello");

    try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
        writer.writeHeader(1L);
        writer.writeRecord(createEvent());
        writer.writeRecord(createEvent());
    }

    try (final InputStream in = new FileInputStream(journalFile);
        final TocReader tocReader = new StandardTocReader(tocFile);
        final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {

        for (int i = 0; i < 2; i++) {
            final StandardProvenanceEventRecord event = reader.nextRecord();
            assertNotNull(event);
            assertEquals("1234", event.getComponentId());
            assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());

            assertNotNull(event.getUpdatedAttributes());
            assertFalse(event.getUpdatedAttributes().isEmpty());
        }
    }
}
 
示例24
@Override
public File execute(final File expiredFile) throws IOException {
    try (final RecordReader reader = RecordReaders.newRecordReader(expiredFile, null, Integer.MAX_VALUE)) {
        final long maxEventId = reader.getMaxEventId();
        indexConfig.setMinIdIndexed(maxEventId);

        logger.info("Updated Minimum Event ID for Provenance Event Repository - Minimum Event ID now {}", maxEventId);
    } catch (final IOException ioe) {
        logger.warn("Failed to obtain max ID present in journal file {}", expiredFile.getAbsolutePath());
    }

    return expiredFile;
}
 
示例25
private long getByteOffset(final Document d, final RecordReader reader) {
    final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
    if ( blockField != null ) {
        final int blockIndex = blockField.numericValue().intValue();
        final TocReader tocReader = reader.getTocReader();
        return tocReader.getBlockOffset(blockIndex);
    }

    return d.getField(FieldNames.STORAGE_FILE_OFFSET).numericValue().longValue();
}
 
示例26
private void assertRecoveredRecord(File journalFile, TocReader tocReader, String expectedTransitUri, int expectedBlockIndex) throws IOException {
    try (final FileInputStream fis = new FileInputStream(journalFile);
         final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        assertEquals(expectedBlockIndex, reader.getBlockIndex());
        reader.skipToBlock(expectedBlockIndex);
        final StandardProvenanceEventRecord recovered = reader.nextRecord();
        assertNotNull(recovered);

        assertEquals(expectedTransitUri, recovered.getTransitUri());
        assertNull(reader.nextRecord());
    }
}
 
示例27
@Test
public void testMultipleRecordsSameBlockCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    // new record each 1 MB of uncompressed data
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 1024 * 1024);

    writer.writeHeader(1L);
    for (int i = 0; i < 10; i++) {
        writer.writeRecord(createEvent());
    }
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        for (int i = 0; i < 10; i++) {
            assertEquals(0, reader.getBlockIndex());

            // call skipToBlock half the time to ensure that we can; avoid calling it
            // the other half of the time to ensure that it's okay.
            if (i <= 5) {
                reader.skipToBlock(0);
            }

            final StandardProvenanceEventRecord recovered = reader.nextRecord();
            assertNotNull(recovered);
            assertEquals("nifi://unit-test", recovered.getTransitUri());
        }

        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
示例28
@Test
public void testMultipleRecordsMultipleBlocksCompressed() throws IOException {
    final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz");
    final File tocFile = TocUtil.getTocFile(journalFile);
    final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false);
    // new block each 10 bytes
    final RecordWriter writer = createWriter(journalFile, tocWriter, true, 100);

    writer.writeHeader(1L);
    for (int i = 0; i < 10; i++) {
        writer.writeRecord(createEvent());
    }
    writer.close();

    final TocReader tocReader = new StandardTocReader(tocFile);

    try (final FileInputStream fis = new FileInputStream(journalFile);
        final RecordReader reader = createReader(fis, journalFile.getName(), tocReader, 2048)) {
        for (int i = 0; i < 10; i++) {
            final StandardProvenanceEventRecord recovered = reader.nextRecord();
            System.out.println(recovered);
            assertNotNull(recovered);
            assertEquals(i, recovered.getEventId());
            assertEquals("nifi://unit-test", recovered.getTransitUri());

            final Map<String, String> updatedAttrs = recovered.getUpdatedAttributes();
            assertNotNull(updatedAttrs);
            assertEquals(2, updatedAttrs.size());
            assertEquals("1.txt", updatedAttrs.get("filename"));
            assertTrue(updatedAttrs.containsKey("uuid"));
        }

        assertNull(reader.nextRecord());
    }

    FileUtils.deleteFile(journalFile.getParentFile(), true);
}
 
示例29
@Test
public void testFieldAddedToSchema() throws IOException {
    final RecordField unitTestField = new SimpleRecordField("Unit Test Field", FieldType.STRING, Repetition.EXACTLY_ONE);
    final Consumer<List<RecordField>> schemaModifier = fields -> fields.add(unitTestField);

    final Map<RecordField, Object> toAdd = new HashMap<>();
    toAdd.put(unitTestField, "hello");

    try (final ByteArraySchemaRecordWriter writer = createSchemaWriter(schemaModifier, toAdd)) {
        writer.writeHeader(1L);
        writer.writeRecord(createEvent());
        writer.writeRecord(createEvent());
    }

    try (final InputStream in = new FileInputStream(journalFile);
        final TocReader tocReader = new StandardTocReader(tocFile);
        final RecordReader reader = createReader(in, journalFile.getName(), tocReader, 10000)) {

        for (int i = 0; i < 2; i++) {
            final StandardProvenanceEventRecord event = reader.nextRecord();
            assertNotNull(event);
            assertEquals("1234", event.getComponentId());
            assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());

            assertNotNull(event.getUpdatedAttributes());
            assertFalse(event.getUpdatedAttributes().isEmpty());
        }
    }
}
 
示例30
@Override
public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
    final List<ProvenanceEventRecord> records = new ArrayList<>(maxRecords);

    final List<Path> paths = getPathsForId(firstRecordId);
    if (paths == null || paths.isEmpty()) {
        return records;
    }

    for (final Path path : paths) {
        try (RecordReader reader = RecordReaders.newRecordReader(path.toFile(), getAllLogFiles(), maxAttributeChars)) {
            // if this is the first record, try to find out the block index and jump directly to
            // the block index. This avoids having to read through a lot of data that we don't care about
            // just to get to the first record that we want.
            if (records.isEmpty()) {
                final TocReader tocReader = reader.getTocReader();
                if (tocReader != null) {
                    final Integer blockIndex = tocReader.getBlockIndexForEventId(firstRecordId);
                    if (blockIndex != null) {
                        reader.skipToBlock(blockIndex);
                    }
                }
            }

            StandardProvenanceEventRecord record;
            while (records.size() < maxRecords && (record = reader.nextRecord()) != null) {
                if (record.getEventId() >= firstRecordId && isAuthorized(record, user)) {
                    records.add(record);
                }
            }
        } catch (final EOFException | FileNotFoundException fnfe) {
            // assume file aged off (or there's no data in file, in case of EOFException, which indicates that data was cached
            // in operating system and entire O/S crashed and always.sync was not turned on.)
        } catch (final IOException ioe) {
            logger.error("Failed to read Provenance Event File {} due to {}", path.toFile(), ioe.toString());
            logger.error("", ioe);
            eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to read Provenance Event File " + path.toFile() + " due to " + ioe.toString());
        }

        if (records.size() >= maxRecords) {
            break;
        }
    }

    if (logger.isDebugEnabled()) {
        logger.debug("Retrieving up to {} records starting at Event ID {}; returning {} events", maxRecords, firstRecordId, records.size());
    }

    return records;
}