Java源码示例:org.apache.flink.yarn.configuration.YarnConfigOptions

示例1
private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
	IllegalAccessException {

	final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
	final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);

	final Set<String> applicationTags = new HashSet<>();

	// Trim whitespace and cull empty tags
	for (final String tag : tagsString.split(",")) {
		final String trimmedTag = tag.trim();
		if (!trimmedTag.isEmpty()) {
			applicationTags.add(trimmedTag);
		}
	}

	reflector.setApplicationTags(appContext, applicationTags);
}
 
示例2
/**
 * @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a>
 */
@Test
public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED");

	try {
		new YarnClusterDescriptor(
			configuration,
			yarnConfiguration,
			temporaryFolder.getRoot().getAbsolutePath(),
			yarnClient,
			true);
		fail("Expected exception not thrown");
	} catch (final IllegalArgumentException e) {
		assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore"));
	}
}
 
示例3
/**
 * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
 */
@Test
public void testResumeFromYarnPropertiesFile() throws Exception {

	File directoryPath = writeYarnPropertiesFile(validPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
		configuration,
		tmp.getRoot().getAbsolutePath(),
		"y",
		"yarn");

	final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);

	final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);

	assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
}
 
示例4
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
	File directoryPath = writeYarnPropertiesFile(validPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
		configuration,
		tmp.getRoot().getAbsolutePath(),
		"y",
		"yarn");
	final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
	final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
	assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
 
示例5
@Nonnull
private YarnClusterDescriptor setupYarnClusterDescriptor() {
	final Configuration flinkConfiguration = new Configuration();
	flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
	flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
	flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
	flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
	flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);

	flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
	flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);

	final int minMemory = 100;
	flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);

	return createYarnClusterDescriptor(flinkConfiguration);
}
 
示例6
private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
	IllegalAccessException {

	final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
	final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);

	final Set<String> applicationTags = new HashSet<>();

	// Trim whitespace and cull empty tags
	for (final String tag : tagsString.split(",")) {
		final String trimmedTag = tag.trim();
		if (!trimmedTag.isEmpty()) {
			applicationTags.add(trimmedTag);
		}
	}

	reflector.setApplicationTags(appContext, applicationTags);
}
 
示例7
/**
 * Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
 */
@Test
public void testResumeFromYarnPropertiesFile() throws Exception {

	File directoryPath = writeYarnPropertiesFile(validPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
		configuration,
		tmp.getRoot().getAbsolutePath(),
		"y",
		"yarn");

	final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);

	final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);

	assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
}
 
示例8
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
	File directoryPath = writeYarnPropertiesFile(validPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
		configuration,
		tmp.getRoot().getAbsolutePath(),
		"y",
		"yarn");
	final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
	final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
	assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
 
示例9
@Nonnull
private YarnClusterDescriptor setupYarnClusterDescriptor() {
	final Configuration flinkConfiguration = new Configuration();
	flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
	flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
	flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
	flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
	flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);

	flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
	flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);

	final int minMemory = 100;
	flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);

	return createYarnClusterDescriptor(flinkConfiguration);
}
 
示例10
@VisibleForTesting
static CPUResource getDefaultCpus(final Configuration configuration) {
	int fallback = configuration.getInteger(YarnConfigOptions.VCORES);
	double cpuCoresDouble = TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
	@SuppressWarnings("NumericCastThatLosesPrecision")
	long cpuCoresLong = Math.max((long) Math.ceil(cpuCoresDouble), 1L);
	//noinspection FloatingPointEquality
	if (cpuCoresLong != cpuCoresDouble) {
		LOG.info(
			"The amount of cpu cores must be a positive integer on Yarn. Rounding {} up to the closest positive integer {}.",
			cpuCoresDouble,
			cpuCoresLong);
	}
	if (cpuCoresLong > Integer.MAX_VALUE) {
		throw new IllegalConfigurationException(String.format(
			"The amount of cpu cores %d cannot exceed Integer.MAX_VALUE: %d",
			cpuCoresLong,
			Integer.MAX_VALUE));
	}
	//noinspection NumericCastThatLosesPrecision
	return new CPUResource(cpuCoresLong);
}
 
示例11
public YarnClusterDescriptor(
		Configuration flinkConfiguration,
		YarnConfiguration yarnConfiguration,
		YarnClient yarnClient,
		YarnClusterInformationRetriever yarnClusterInformationRetriever,
		boolean sharedYarnClient) {

	this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
	this.yarnClient = Preconditions.checkNotNull(yarnClient);
	this.yarnClusterInformationRetriever = Preconditions.checkNotNull(yarnClusterInformationRetriever);
	this.sharedYarnClient = sharedYarnClient;

	this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
	this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);

	getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
	decodeDirsToShipToCluster(flinkConfiguration).ifPresent(this::addShipFiles);

	this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
	this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
	this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
	this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);

	// we want to ignore the default value at this point.
	this.zookeeperNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
}
 
示例12
private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
		IllegalAccessException {

	final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
	final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);

	final Set<String> applicationTags = new HashSet<>();

	// Trim whitespace and cull empty tags
	for (final String tag : tagsString.split(",")) {
		final String trimmedTag = tag.trim();
		if (!trimmedTag.isEmpty()) {
			applicationTags.add(trimmedTag);
		}
	}

	reflector.setApplicationTags(appContext, applicationTags);
}
 
示例13
private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
	checkNotNull(report);

	final ApplicationId clusterId = report.getApplicationId();
	final String host = report.getHost();
	final int port = report.getRpcPort();

	LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId);

	flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
	flinkConfiguration.setInteger(JobManagerOptions.PORT, port);

	flinkConfiguration.setString(RestOptions.ADDRESS, host);
	flinkConfiguration.setInteger(RestOptions.PORT, port);

	flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId));
}
 
示例14
private void encodeDirsToShipToCluster(final Configuration configuration, final CommandLine cmd) {
	checkNotNull(cmd);
	checkNotNull(configuration);

	if (cmd.hasOption(shipPath.getOpt())) {
		ConfigUtils.encodeArrayToConfig(
				configuration,
				YarnConfigOptions.SHIP_DIRECTORIES,
				cmd.getOptionValues(this.shipPath.getOpt()),
				(String path) -> {
					final File shipDir = new File(path);
					if (shipDir.isDirectory()) {
						return path;
					}
					LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
					return null;
				});
	}
}
 
示例15
private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
	final Configuration effectiveConfiguration = new Configuration(configuration);

	String applicationId = yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
	if (applicationId != null) {
		effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, applicationId);
	}

	// handle the YARN client's dynamic properties
	String dynamicPropertiesEncoded = yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
	Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
	for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
		effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
	}

	return effectiveConfiguration;
}
 
示例16
public YarnLocalResourceDescriptor uploadFlinkDist(final Path localJarPath) throws IOException, ClusterDeploymentException {
	if (flinkDist != null) {
		return flinkDist;
	} else if (!providedSharedLibs.isEmpty()) {
		throw new ClusterDeploymentException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\"" +
				" has to also include the lib/, plugin/ and flink-dist jar." +
				" In other case, it cannot be used.");
	}

	flinkDist = registerSingleLocalResource(
			localJarPath.getName(),
			localJarPath,
			"",
			true,
			false);
	return flinkDist;
}
 
示例17
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
	File directoryPath = writeYarnPropertiesFile(validPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
	final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);

	final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
	final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
	final ApplicationId clusterId = clientFactory.getClusterId(executorConfig);

	assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
 
示例18
@Test(timeout = 60000) // timeout after a minute.
public void testDetachedModeSecureWithPreInstallKeytab() throws Exception {
	runTest(() -> {
		Map<String, String> securityProperties = new HashMap<>();
		if (SecureTestEnvironment.getTestKeytab() != null) {
			// client login keytab
			securityProperties.put(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), SecureTestEnvironment.getTestKeytab());
			// pre-install Yarn local keytab, since both reuse the same temporary folder "tmp"
			securityProperties.put(YarnConfigOptions.LOCALIZED_KEYTAB_PATH.key(), SecureTestEnvironment.getTestKeytab());
			// unset keytab localization
			securityProperties.put(YarnConfigOptions.SHIP_LOCAL_KEYTAB.key(), "false");
		}
		if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
			securityProperties.put(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), SecureTestEnvironment.getHadoopServicePrincipal());
		}
		runDetachedModeTest(securityProperties);
		verifyResultContainsKerberosKeytab();
	});
}
 
示例19
private void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception {
	final FileSystem fs = FileSystem.get(getYarnConfiguration());

	String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName();

	Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);
	FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);

	final int flinkFileReplication = configuration.getInteger(YarnConfigOptions.FILE_REPLICATION);
	final int replication = YARN_CONFIGURATION.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);

	// If YarnConfigOptions.FILE_REPLICATION is not set. The replication number should equals to yarn configuration value.
	int expectedReplication = flinkFileReplication > 0
		? flinkFileReplication : replication;
	assertEquals(expectedReplication, fsStatus.getReplication());
}
 
示例20
private void checkStagingDirectory(Configuration flinkConfig, ApplicationId appId) throws IOException {
	final List<String> providedLibDirs = flinkConfig.get(YarnConfigOptions.PROVIDED_LIB_DIRS);
	final boolean isProvidedLibDirsConfigured  = providedLibDirs != null && !providedLibDirs.isEmpty();

	try (final FileSystem fs = FileSystem.get(YARN_CONFIGURATION)) {
		final Path stagingDirectory = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString());
		if (isProvidedLibDirsConfigured) {
			assertFalse(
				"The provided lib dirs is set, so the lib directory should not be uploaded to staging directory.",
				fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
		} else {
			assertTrue(
				"The lib directory should be uploaded to staging directory.",
				fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
		}
	}
}
 
示例21
private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) {
	final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
	if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
		throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)",
			YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
	}
}
 
示例22
@Test
public void testConfigOverwrite() throws ClusterDeploymentException {
	Configuration configuration = new Configuration();
	// overwrite vcores in config
	configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
	configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

	YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
		configuration,
		yarnConfiguration,
		temporaryFolder.getRoot().getAbsolutePath(),
		yarnClient,
		true);

	clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));

	// configure slots
	ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
		.setMasterMemoryMB(1)
		.setTaskManagerMemoryMB(1)
		.setNumberTaskManagers(1)
		.setSlotsPerTaskManager(1)
		.createClusterSpecification();

	try {
		clusterDescriptor.deploySessionCluster(clusterSpecification);

		fail("The deploy call should have failed.");
	} catch (ClusterDeploymentException e) {
		// we expect the cause to be an IllegalConfigurationException
		if (!(e.getCause() instanceof IllegalConfigurationException)) {
			throw e;
		}
	} finally {
		clusterDescriptor.close();
	}
}
 
示例23
/**
 * Tests that we fail when reading an invalid yarn properties file when retrieving
 * the cluster id.
 */
@Test(expected = FlinkException.class)
public void testInvalidYarnPropertiesFile() throws Exception {

	File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	new FlinkYarnSessionCli(
		configuration,
		tmp.getRoot().getAbsolutePath(),
		"y",
		"yarn");
}
 
示例24
private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
	InvocationTargetException, IllegalAccessException {

	ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();

	reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);

	reflector.setAttemptFailuresValidityInterval(
		appContext,
		flinkConfiguration.getLong(YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL));
}
 
示例25
private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) {
	final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
	if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
		throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)",
			YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
	}
}
 
示例26
/**
 * @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a>
 */
@Test
public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED");

	try {
		createYarnClusterDescriptor(configuration);
		fail("Expected exception not thrown");
	} catch (final IllegalArgumentException e) {
		assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore"));
	}
}
 
示例27
@Test
public void testConfigOverwrite() throws ClusterDeploymentException {
	Configuration configuration = new Configuration();
	// overwrite vcores in config
	configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
	configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);

	YarnClusterDescriptor clusterDescriptor = createYarnClusterDescriptor(configuration);

	clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));

	// configure slots
	ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
		.setMasterMemoryMB(1)
		.setTaskManagerMemoryMB(1)
		.setNumberTaskManagers(1)
		.setSlotsPerTaskManager(1)
		.createClusterSpecification();

	try {
		clusterDescriptor.deploySessionCluster(clusterSpecification);

		fail("The deploy call should have failed.");
	} catch (ClusterDeploymentException e) {
		// we expect the cause to be an IllegalConfigurationException
		if (!(e.getCause() instanceof IllegalConfigurationException)) {
			throw e;
		}
	} finally {
		clusterDescriptor.close();
	}
}
 
示例28
/**
 * Tests that we fail when reading an invalid yarn properties file when retrieving
 * the cluster id.
 */
@Test(expected = FlinkException.class)
public void testInvalidYarnPropertiesFile() throws Exception {

	File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);

	final Configuration configuration = new Configuration();
	configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());

	new FlinkYarnSessionCli(
		configuration,
		tmp.getRoot().getAbsolutePath(),
		"y",
		"yarn");
}
 
示例29
public static Optional<File> getUsrLibDir(final Configuration configuration) {
	final YarnConfigOptions.UserJarInclusion userJarInclusion = configuration
			.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
	final Optional<File> userLibDir = tryFindUserLibDirectory();

	checkState(
			userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED || userLibDir.isPresent(),
			"The %s is set to %s. But the usrlib directory does not exist.",
			YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
			YarnConfigOptions.UserJarInclusion.DISABLED);

	return userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? userLibDir : Optional.empty();
}
 
示例30
private Optional<Path> getLocalFlinkDistPath(final Configuration configuration) {
	final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
	if (localJarPath != null) {
		return Optional.of(new Path(localJarPath));
	}

	LOG.info("No path for the flink jar passed. Using the location of " + getClass() + " to locate the jar");

	// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
	final String decodedPath = getDecodedJarPath();
	return decodedPath.endsWith(".jar")
			? Optional.of(new Path(new File(decodedPath).toURI()))
			: Optional.empty();
}