Java源码示例:org.apache.nifi.nar.NarUnpacker
示例1
@Test
public void testProcessorLoadsNarResources() throws IOException, ClassNotFoundException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
NiFiProperties properties = loadSpecifiedProperties("/conf/nifi.properties",
NiFiProperties.COMPONENT_DOCS_DIRECTORY,
temporaryFolder.getRoot().getAbsolutePath());
NarUnpacker.unpackNars(properties);
NarClassLoaders.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
ExtensionManager.discoverExtensions(NarClassLoaders.getInstance().getExtensionClassLoaders());
DocGenerator.generate(properties);
File processorDirectory = new File(temporaryFolder.getRoot(), "org.apache.nifi.processors.WriteResourceToStream");
File indexHtml = new File(processorDirectory, "index.html");
Assert.assertTrue(indexHtml + " should have been generated", indexHtml.exists());
String generatedHtml = FileUtils.readFileToString(indexHtml);
Assert.assertNotNull(generatedHtml);
Assert.assertTrue(generatedHtml.contains("This example processor loads a resource from the nar and writes it to the FlowFile content"));
Assert.assertTrue(generatedHtml.contains("files that were successfully processed"));
Assert.assertTrue(generatedHtml.contains("files that were not successfully processed"));
Assert.assertTrue(generatedHtml.contains("resources"));
}
示例2
@Test
public void testProcessorLoadsNarResources() throws IOException, ClassNotFoundException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
NiFiProperties properties = loadSpecifiedProperties("/conf/nifi.properties",
NiFiProperties.COMPONENT_DOCS_DIRECTORY,
temporaryFolder.getRoot().getAbsolutePath());
final Bundle systemBundle = SystemBundle.create(properties);
final ExtensionMapping mapping = NarUnpacker.unpackNars(properties, systemBundle);
NarClassLoadersHolder.getInstance().init(properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, NarClassLoadersHolder.getInstance().getBundles());
DocGenerator.generate(properties, extensionManager, mapping);
final String extensionClassName = "org.apache.nifi.processors.WriteResourceToStream";
final BundleCoordinate coordinate = mapping.getProcessorNames().get(extensionClassName).stream().findFirst().get();
final String path = coordinate.getGroup() + "/" + coordinate.getId() + "/" + coordinate.getVersion() + "/" + extensionClassName;
File processorDirectory = new File(temporaryFolder.getRoot(), path);
File indexHtml = new File(processorDirectory, "index.html");
Assert.assertTrue(indexHtml + " should have been generated", indexHtml.exists());
String generatedHtml = FileUtils.readFileToString(indexHtml, Charset.defaultCharset());
Assert.assertNotNull(generatedHtml);
Assert.assertTrue(generatedHtml.contains("This example processor loads a resource from the nar and writes it to the FlowFile content"));
Assert.assertTrue(generatedHtml.contains("files that were successfully processed"));
Assert.assertTrue(generatedHtml.contains("files that were not successfully processed"));
Assert.assertTrue(generatedHtml.contains("resources"));
}
示例3
public NiFi(final NiFiProperties properties, ClassLoader rootClassLoader)
throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
// There can only be one krb5.conf for the overall Java process so set this globally during
// start up so that processors and our Kerberos authentication code don't have to set this
final File kerberosConfigFile = properties.getKerberosConfigurationFile();
if (kerberosConfigFile != null) {
final String kerberosConfigFilePath = kerberosConfigFile.getAbsolutePath();
LOGGER.info("Setting java.security.krb5.conf to {}", new Object[]{kerberosConfigFilePath});
System.setProperty("java.security.krb5.conf", kerberosConfigFilePath);
}
setDefaultUncaughtExceptionHandler();
// register the shutdown hook
addShutdownHook();
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
if (bootstrapPort != null) {
try {
final int port = Integer.parseInt(bootstrapPort);
if (port < 1 || port > 65535) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
bootstrapListener = new BootstrapListener(this, port);
bootstrapListener.start();
} catch (final NumberFormatException nfe) {
throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
}
} else {
LOGGER.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
bootstrapListener = null;
}
// delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing
// the working directory, we can be assured that it will attempt to extract the
// war every time the application starts.
File webWorkingDir = properties.getWebWorkingDirectory();
FileUtils.deleteFilesInDirectory(webWorkingDir, null, LOGGER, true, true);
FileUtils.deleteFile(webWorkingDir, LOGGER, 3);
detectTimingIssues();
// redirect JUL log events
initLogging();
final Bundle systemBundle = SystemBundle.create(properties, rootClassLoader);
// expand the nars
final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties, systemBundle);
// load the extensions classloaders
NarClassLoaders narClassLoaders = NarClassLoadersHolder.getInstance();
narClassLoaders.init(rootClassLoader,
properties.getFrameworkWorkingDirectory(), properties.getExtensionsWorkingDirectory());
// load the framework classloader
final ClassLoader frameworkClassLoader = narClassLoaders.getFrameworkBundle().getClassLoader();
if (frameworkClassLoader == null) {
throw new IllegalStateException("Unable to find the framework NAR ClassLoader.");
}
final Set<Bundle> narBundles = narClassLoaders.getBundles();
// load the server from the framework classloader
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class, Set.class);
final long startTime = System.nanoTime();
nifiServer = (NiFiServer) jettyConstructor.newInstance(properties, narBundles);
nifiServer.setExtensionMapping(extensionMapping);
nifiServer.setBundles(systemBundle, narBundles);
if (shutdown) {
LOGGER.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
nifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.sendStartedStatus(true);
}
final long duration = System.nanoTime() - startTime;
LOGGER.info("Controller initialization took " + duration + " nanoseconds "
+ "(" + (int) TimeUnit.SECONDS.convert(duration, TimeUnit.NANOSECONDS) + " seconds).");
}
}
示例4
public static void main(final String[] args) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
String nifi_home = System.getenv("NIFI_HOME");
if (nifi_home == null || nifi_home.equals("")) {
nifi_home = ".";
}
final File libDir = new File(nifi_home + "/lib");
final File narWorkingDirectory = new File(nifi_home + "/work/stateless-nars");
if (args.length >= 1 && args[0].equals(EXTRACT_NARS)) {
if (!libDir.exists()) {
System.out.println("Specified lib directory <" + libDir + "> does not exist");
return;
}
final File[] narFiles = libDir.listFiles(file -> file.getName().endsWith(".nar"));
if (narFiles == null) {
System.out.println("Could not obtain listing of lib directory <" + libDir + ">");
return;
}
if (!narWorkingDirectory.exists() && !narWorkingDirectory.mkdirs()) {
throw new IOException("Could not create NAR working directory <" + narWorkingDirectory + ">");
}
logger.info("Unpacking {} NARs", narFiles.length);
final long startUnpack = System.nanoTime();
for (final File narFile : narFiles) {
NarUnpacker.unpackNar(narFile, narWorkingDirectory);
}
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startUnpack);
logger.info("Finished unpacking {} NARs in {} millis", narFiles.length, millis);
System.exit(0);
}
File frameworkWorkingDirectory;
try {
frameworkWorkingDirectory = Objects.requireNonNull(narWorkingDirectory.listFiles(file -> file.getName().startsWith("nifi-framework")))[0];
} catch (Exception ex) {
throw new FileNotFoundException("Could not find core stateless dependencies in the working directory <" + narWorkingDirectory + ">");
}
final File bundledDependenciesDir = new File(frameworkWorkingDirectory, NarUnpacker.BUNDLED_DEPENDENCIES_DIRECTORY);
final File[] jarFiles = bundledDependenciesDir.listFiles();
if (jarFiles == null) {
throw new IOException("Could not obtain listing of NiFi-Framework NAR's bundled dependencies in working directory <" + bundledDependenciesDir + ">");
}
final URL[] jarUrls = toURLs(jarFiles);
final ClassLoader rootClassLoader = Thread.currentThread().getContextClassLoader();
final URLClassLoader frameworkClassLoader = new URLClassLoader(jarUrls, rootClassLoader);
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
final Class<?> programClass = Class.forName(PROGRAM_CLASS_NAME, true, frameworkClassLoader);
final Method launchMethod = programClass.getMethod("launch", String[].class, ClassLoader.class, File.class);
launchMethod.setAccessible(true);
launchMethod.invoke(null, args, rootClassLoader, narWorkingDirectory);
}
示例5
public OneWaySslAccessControlHelper(final String nifiPropertiesPath) throws Exception {
// configure the location of the nifi properties
File nifiPropertiesFile = new File(nifiPropertiesPath);
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, nifiPropertiesFile.getAbsolutePath());
NiFiProperties props = NiFiProperties.createBasicNiFiProperties(nifiPropertiesPath);
flowXmlPath = props.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE);
// delete the database directory to avoid issues with re-registration in testRequestAccessUsingToken
FileUtils.deleteDirectory(props.getDatabaseRepositoryPath().toFile());
final File libTargetDir = new File("target/test-classes/access-control/lib");
libTargetDir.mkdirs();
final File libSourceDir = new File("src/test/resources/lib");
for (final File libFile : libSourceDir.listFiles()) {
final File libDestFile = new File(libTargetDir, libFile.getName());
Files.copy(libFile.toPath(), libDestFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
final Bundle systemBundle = SystemBundle.create(props);
NarUnpacker.unpackNars(props, systemBundle);
NarClassLoadersHolder.getInstance().init(props.getFrameworkWorkingDirectory(), props.getExtensionsWorkingDirectory());
// load extensions
final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, NarClassLoadersHolder.getInstance().getBundles());
ExtensionManagerHolder.init(extensionManager);
// start the server
server = new NiFiTestServer("src/main/webapp", CONTEXT_PATH, props);
server.startServer();
server.loadFlow();
// get the base url
baseUrl = server.getBaseUrl() + CONTEXT_PATH;
// Create a TlsConfiguration for the truststore properties only
TlsConfiguration trustOnlyTlsConfiguration = TlsConfiguration.fromNiFiPropertiesTruststoreOnly(props);
// create the user
final Client client = WebUtils.createClient(null, SslContextFactory.createSslContext(trustOnlyTlsConfiguration));
user = new NiFiTestUser(client, null);
}
示例6
public AccessControlHelper(final String nifiPropertiesPath) throws Exception {
// configure the location of the nifi properties
File nifiPropertiesFile = new File(nifiPropertiesPath);
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, nifiPropertiesFile.getAbsolutePath());
NiFiProperties props = NiFiProperties.createBasicNiFiProperties(null);
flowXmlPath = props.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE);
final File libTargetDir = new File("target/test-classes/access-control/lib");
libTargetDir.mkdirs();
final File libSourceDir = new File("src/test/resources/lib");
for (final File libFile : libSourceDir.listFiles()) {
final File libDestFile = new File(libTargetDir, libFile.getName());
Files.copy(libFile.toPath(), libDestFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
final Bundle systemBundle = SystemBundle.create(props);
NarUnpacker.unpackNars(props, systemBundle);
NarClassLoadersHolder.getInstance().init(props.getFrameworkWorkingDirectory(), props.getExtensionsWorkingDirectory());
// load extensions
final ExtensionDiscoveringManager extensionManager = new StandardExtensionDiscoveringManager();
extensionManager.discoverExtensions(systemBundle, NarClassLoadersHolder.getInstance().getBundles());
ExtensionManagerHolder.init(extensionManager);
// start the server
server = new NiFiTestServer("src/main/webapp", CONTEXT_PATH, props);
server.startServer();
server.loadFlow();
// get the base url
baseUrl = server.getBaseUrl() + CONTEXT_PATH;
// create the users - user purposefully decoupled from clientId (same user different browsers tabs)
readUser = new NiFiTestUser(server.getClient(), NiFiTestAuthorizer.READ_USER_DN);
writeUser = new NiFiTestUser(server.getClient(), NiFiTestAuthorizer.WRITE_USER_DN);
readWriteUser = new NiFiTestUser(server.getClient(), NiFiTestAuthorizer.READ_WRITE_USER_DN);
noneUser = new NiFiTestUser(server.getClient(), NiFiTestAuthorizer.NONE_USER_DN);
privilegedUser = new NiFiTestUser(server.getClient(), NiFiTestAuthorizer.PRIVILEGED_USER_DN);
executeCodeUser = new NiFiTestUser(server.getClient(), NiFiTestAuthorizer.EXECUTED_CODE_USER_DN);
anonymousUser = new NiFiTestUser(server.getClient(), StringUtils.EMPTY);
// populate the initial data flow
NiFiWebApiTest.populateFlow(server.getClient(), baseUrl, readWriteUser, READ_WRITE_CLIENT_ID);
}