Java源码示例:org.apache.flink.yarn.configuration.YarnConfigOptions
示例1
private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
IllegalAccessException {
final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
final Set<String> applicationTags = new HashSet<>();
// Trim whitespace and cull empty tags
for (final String tag : tagsString.split(",")) {
final String trimmedTag = tag.trim();
if (!trimmedTag.isEmpty()) {
applicationTags.add(trimmedTag);
}
}
reflector.setApplicationTags(appContext, applicationTags);
}
示例2
/**
* @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a>
*/
@Test
public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED");
try {
new YarnClusterDescriptor(
configuration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
true);
fail("Expected exception not thrown");
} catch (final IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore"));
}
}
示例3
/**
* Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
*/
@Test
public void testResumeFromYarnPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
}
示例4
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
示例5
@Nonnull
private YarnClusterDescriptor setupYarnClusterDescriptor() {
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
final int minMemory = 100;
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
return createYarnClusterDescriptor(flinkConfiguration);
}
示例6
private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
IllegalAccessException {
final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
final Set<String> applicationTags = new HashSet<>();
// Trim whitespace and cull empty tags
for (final String tag : tagsString.split(",")) {
final String trimmedTag = tag.trim();
if (!trimmedTag.isEmpty()) {
applicationTags.add(trimmedTag);
}
}
reflector.setApplicationTags(appContext, applicationTags);
}
示例7
/**
* Test that the CliFrontend is able to pick up the .yarn-properties file from a specified location.
*/
@Test
public void testResumeFromYarnPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {}, true);
final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
assertEquals(TEST_YARN_APPLICATION_ID, clusterId);
}
示例8
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
final ApplicationId clusterId = flinkYarnSessionCli.getClusterId(commandLine);
assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
示例9
@Nonnull
private YarnClusterDescriptor setupYarnClusterDescriptor() {
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
flinkConfiguration.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
flinkConfiguration.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
final int minMemory = 100;
flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
return createYarnClusterDescriptor(flinkConfiguration);
}
示例10
@VisibleForTesting
static CPUResource getDefaultCpus(final Configuration configuration) {
int fallback = configuration.getInteger(YarnConfigOptions.VCORES);
double cpuCoresDouble = TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, fallback).getValue().doubleValue();
@SuppressWarnings("NumericCastThatLosesPrecision")
long cpuCoresLong = Math.max((long) Math.ceil(cpuCoresDouble), 1L);
//noinspection FloatingPointEquality
if (cpuCoresLong != cpuCoresDouble) {
LOG.info(
"The amount of cpu cores must be a positive integer on Yarn. Rounding {} up to the closest positive integer {}.",
cpuCoresDouble,
cpuCoresLong);
}
if (cpuCoresLong > Integer.MAX_VALUE) {
throw new IllegalConfigurationException(String.format(
"The amount of cpu cores %d cannot exceed Integer.MAX_VALUE: %d",
cpuCoresLong,
Integer.MAX_VALUE));
}
//noinspection NumericCastThatLosesPrecision
return new CPUResource(cpuCoresLong);
}
示例11
public YarnClusterDescriptor(
Configuration flinkConfiguration,
YarnConfiguration yarnConfiguration,
YarnClient yarnClient,
YarnClusterInformationRetriever yarnClusterInformationRetriever,
boolean sharedYarnClient) {
this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
this.yarnClient = Preconditions.checkNotNull(yarnClient);
this.yarnClusterInformationRetriever = Preconditions.checkNotNull(yarnClusterInformationRetriever);
this.sharedYarnClient = sharedYarnClient;
this.flinkConfiguration = Preconditions.checkNotNull(flinkConfiguration);
this.userJarInclusion = getUserJarInclusionMode(flinkConfiguration);
getLocalFlinkDistPath(flinkConfiguration).ifPresent(this::setLocalJarPath);
decodeDirsToShipToCluster(flinkConfiguration).ifPresent(this::addShipFiles);
this.yarnQueue = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_QUEUE);
this.customName = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_NAME);
this.applicationType = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TYPE);
this.nodeLabel = flinkConfiguration.getString(YarnConfigOptions.NODE_LABEL);
// we want to ignore the default value at this point.
this.zookeeperNamespace = flinkConfiguration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, null);
}
示例12
private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException,
IllegalAccessException {
final ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
final String tagsString = flinkConfiguration.getString(YarnConfigOptions.APPLICATION_TAGS);
final Set<String> applicationTags = new HashSet<>();
// Trim whitespace and cull empty tags
for (final String tag : tagsString.split(",")) {
final String trimmedTag = tag.trim();
if (!trimmedTag.isEmpty()) {
applicationTags.add(trimmedTag);
}
}
reflector.setApplicationTags(appContext, applicationTags);
}
示例13
private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
checkNotNull(report);
final ApplicationId clusterId = report.getApplicationId();
final String host = report.getHost();
final int port = report.getRpcPort();
LOG.info("Found Web Interface {}:{} of application '{}'.", host, port, clusterId);
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(clusterId));
}
示例14
private void encodeDirsToShipToCluster(final Configuration configuration, final CommandLine cmd) {
checkNotNull(cmd);
checkNotNull(configuration);
if (cmd.hasOption(shipPath.getOpt())) {
ConfigUtils.encodeArrayToConfig(
configuration,
YarnConfigOptions.SHIP_DIRECTORIES,
cmd.getOptionValues(this.shipPath.getOpt()),
(String path) -> {
final File shipDir = new File(path);
if (shipDir.isDirectory()) {
return path;
}
LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
return null;
});
}
}
示例15
private Configuration applyYarnProperties(Configuration configuration) throws FlinkException {
final Configuration effectiveConfiguration = new Configuration(configuration);
String applicationId = yarnPropertiesFile.getProperty(YARN_APPLICATION_ID_KEY);
if (applicationId != null) {
effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, applicationId);
}
// handle the YARN client's dynamic properties
String dynamicPropertiesEncoded = yarnPropertiesFile.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) {
effectiveConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
}
return effectiveConfiguration;
}
示例16
public YarnLocalResourceDescriptor uploadFlinkDist(final Path localJarPath) throws IOException, ClusterDeploymentException {
if (flinkDist != null) {
return flinkDist;
} else if (!providedSharedLibs.isEmpty()) {
throw new ClusterDeploymentException("The \"" + YarnConfigOptions.PROVIDED_LIB_DIRS.key() + "\"" +
" has to also include the lib/, plugin/ and flink-dist jar." +
" In other case, it cannot be used.");
}
flinkDist = registerSingleLocalResource(
localJarPath.getName(),
localJarPath,
"",
true,
false);
return flinkDist;
}
示例17
@Test
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
final FlinkYarnSessionCli flinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[] {"-yid", TEST_YARN_APPLICATION_ID_2.toString() }, true);
final Configuration executorConfig = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(commandLine);
final ClusterClientFactory<ApplicationId> clientFactory = getClusterClientFactory(executorConfig);
final ApplicationId clusterId = clientFactory.getClusterId(executorConfig);
assertEquals(TEST_YARN_APPLICATION_ID_2, clusterId);
}
示例18
@Test(timeout = 60000) // timeout after a minute.
public void testDetachedModeSecureWithPreInstallKeytab() throws Exception {
runTest(() -> {
Map<String, String> securityProperties = new HashMap<>();
if (SecureTestEnvironment.getTestKeytab() != null) {
// client login keytab
securityProperties.put(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), SecureTestEnvironment.getTestKeytab());
// pre-install Yarn local keytab, since both reuse the same temporary folder "tmp"
securityProperties.put(YarnConfigOptions.LOCALIZED_KEYTAB_PATH.key(), SecureTestEnvironment.getTestKeytab());
// unset keytab localization
securityProperties.put(YarnConfigOptions.SHIP_LOCAL_KEYTAB.key(), "false");
}
if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
securityProperties.put(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), SecureTestEnvironment.getHadoopServicePrincipal());
}
runDetachedModeTest(securityProperties);
verifyResultContainsKerberosKeytab();
});
}
示例19
private void extraVerification(Configuration configuration, ApplicationId applicationId) throws Exception {
final FileSystem fs = FileSystem.get(getYarnConfiguration());
String suffix = ".flink/" + applicationId.toString() + "/" + flinkUberjar.getName();
Path uberJarHDFSPath = new Path(fs.getHomeDirectory(), suffix);
FileStatus fsStatus = fs.getFileStatus(uberJarHDFSPath);
final int flinkFileReplication = configuration.getInteger(YarnConfigOptions.FILE_REPLICATION);
final int replication = YARN_CONFIGURATION.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);
// If YarnConfigOptions.FILE_REPLICATION is not set. The replication number should equals to yarn configuration value.
int expectedReplication = flinkFileReplication > 0
? flinkFileReplication : replication;
assertEquals(expectedReplication, fsStatus.getReplication());
}
示例20
private void checkStagingDirectory(Configuration flinkConfig, ApplicationId appId) throws IOException {
final List<String> providedLibDirs = flinkConfig.get(YarnConfigOptions.PROVIDED_LIB_DIRS);
final boolean isProvidedLibDirsConfigured = providedLibDirs != null && !providedLibDirs.isEmpty();
try (final FileSystem fs = FileSystem.get(YARN_CONFIGURATION)) {
final Path stagingDirectory = new Path(fs.getHomeDirectory(), ".flink/" + appId.toString());
if (isProvidedLibDirsConfigured) {
assertFalse(
"The provided lib dirs is set, so the lib directory should not be uploaded to staging directory.",
fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
} else {
assertTrue(
"The lib directory should be uploaded to staging directory.",
fs.exists(new Path(stagingDirectory, flinkLibFolder.getName())));
}
}
}
示例21
private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) {
final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
}
}
示例22
@Test
public void testConfigOverwrite() throws ClusterDeploymentException {
Configuration configuration = new Configuration();
// overwrite vcores in config
configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
configuration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
true);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
// configure slots
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1)
.setTaskManagerMemoryMB(1)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(1)
.createClusterSpecification();
try {
clusterDescriptor.deploySessionCluster(clusterSpecification);
fail("The deploy call should have failed.");
} catch (ClusterDeploymentException e) {
// we expect the cause to be an IllegalConfigurationException
if (!(e.getCause() instanceof IllegalConfigurationException)) {
throw e;
}
} finally {
clusterDescriptor.close();
}
}
示例23
/**
* Tests that we fail when reading an invalid yarn properties file when retrieving
* the cluster id.
*/
@Test(expected = FlinkException.class)
public void testInvalidYarnPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
}
示例24
private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
InvocationTargetException, IllegalAccessException {
ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance();
reflector.setKeepContainersAcrossApplicationAttempts(appContext, true);
reflector.setAttemptFailuresValidityInterval(
appContext,
flinkConfiguration.getLong(YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL));
}
示例25
private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) {
final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
}
}
示例26
/**
* @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a>
*/
@Test
public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED");
try {
createYarnClusterDescriptor(configuration);
fail("Expected exception not thrown");
} catch (final IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore"));
}
}
示例27
@Test
public void testConfigOverwrite() throws ClusterDeploymentException {
Configuration configuration = new Configuration();
// overwrite vcores in config
configuration.setInteger(YarnConfigOptions.VCORES, Integer.MAX_VALUE);
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
YarnClusterDescriptor clusterDescriptor = createYarnClusterDescriptor(configuration);
clusterDescriptor.setLocalJarPath(new Path(flinkJar.getPath()));
// configure slots
ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(1)
.setTaskManagerMemoryMB(1)
.setNumberTaskManagers(1)
.setSlotsPerTaskManager(1)
.createClusterSpecification();
try {
clusterDescriptor.deploySessionCluster(clusterSpecification);
fail("The deploy call should have failed.");
} catch (ClusterDeploymentException e) {
// we expect the cause to be an IllegalConfigurationException
if (!(e.getCause() instanceof IllegalConfigurationException)) {
throw e;
}
} finally {
clusterDescriptor.close();
}
}
示例28
/**
* Tests that we fail when reading an invalid yarn properties file when retrieving
* the cluster id.
*/
@Test(expected = FlinkException.class)
public void testInvalidYarnPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
}
示例29
public static Optional<File> getUsrLibDir(final Configuration configuration) {
final YarnConfigOptions.UserJarInclusion userJarInclusion = configuration
.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
final Optional<File> userLibDir = tryFindUserLibDirectory();
checkState(
userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED || userLibDir.isPresent(),
"The %s is set to %s. But the usrlib directory does not exist.",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
YarnConfigOptions.UserJarInclusion.DISABLED);
return userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED ? userLibDir : Optional.empty();
}
示例30
private Optional<Path> getLocalFlinkDistPath(final Configuration configuration) {
final String localJarPath = configuration.getString(YarnConfigOptions.FLINK_DIST_JAR);
if (localJarPath != null) {
return Optional.of(new Path(localJarPath));
}
LOG.info("No path for the flink jar passed. Using the location of " + getClass() + " to locate the jar");
// check whether it's actually a jar file --> when testing we execute this class without a flink-dist jar
final String decodedPath = getDecodedJarPath();
return decodedPath.endsWith(".jar")
? Optional.of(new Path(new File(decodedPath).toURI()))
: Optional.empty();
}