Java源码示例:org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings
示例1
@Nonnull
private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertices) {
final JobGraph jobGraph = new JobGraph(jobVertices);
// enable checkpointing which is required to resume from a savepoint
final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
1000L,
1000L,
1000L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true);
final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
checkpoinCoordinatorConfiguration,
null);
jobGraph.setSnapshotSettings(checkpointingSettings);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
return jobGraph;
}
示例2
@Nonnull
private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertices) {
final JobGraph jobGraph = new JobGraph(jobVertices);
// enable checkpointing which is required to resume from a savepoint
final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
1000L,
1000L,
1000L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0);
final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
checkpoinCoordinatorConfiguration,
null);
jobGraph.setSnapshotSettings(checkpointingSettings);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
return jobGraph;
}
示例3
private static JobCheckpointingSettings createCheckpointSettingsWithInterval(final long checkpointInterval) {
final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
checkpointInterval,
Long.MAX_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0);
return new JobCheckpointingSettings(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
checkpointCoordinatorConfiguration,
null);
}
示例4
@Nonnull
private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestoreSettings savepointRestoreSettings, JobVertex... jobVertices) {
final JobGraph jobGraph = new JobGraph(jobVertices);
// enable checkpointing which is required to resume from a savepoint
final CheckpointCoordinatorConfiguration checkpoinCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
1000L,
1000L,
1000L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0);
final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
checkpoinCoordinatorConfiguration,
null);
jobGraph.setSnapshotSettings(checkpointingSettings);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
return jobGraph;
}
示例5
private static JobCheckpointingSettings createCheckpointSettingsWithInterval(final long checkpointInterval) {
final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(
checkpointInterval,
Long.MAX_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0);
return new JobCheckpointingSettings(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
checkpointCoordinatorConfiguration,
null);
}
示例6
/**
* Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE and the checkpoint mode to
* {@link CheckpointingMode#AT_LEAST_ONCE}.
*/
@Test
public void testDisabledCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(0).print();
StreamGraph streamGraph = env.getStreamGraph();
assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
assertFalse(snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
assertEquals(CheckpointingMode.AT_LEAST_ONCE, streamConfig.getCheckpointMode());
}
示例7
/**
* Tests access to the snapshotting settings.
*/
@Test
public void testGetSnapshottingSettings() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
when(jobVertex.getParallelism()).thenReturn(1);
JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
new CheckpointCoordinatorConfiguration(
181238123L,
19191992L,
191929L,
123,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false
),
null);
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
snapshottingSettings.getCheckpointCoordinatorConfiguration(),
new UnregisteredMetricsGroup());
assertEquals(snapshottingSettings.getCheckpointCoordinatorConfiguration(), tracker.getJobCheckpointingConfiguration());
}
示例8
private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
jobGraph.setSnapshotSettings(
new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
100,
10 * 60 * 1000,
0,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false),
null));
final Time timeout = Time.seconds(10L);
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
configuration,
executor,
executor,
new ProgrammedSlotProvider(1),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
1,
blobWriter,
timeout,
LoggerFactory.getLogger(getClass()));
}
示例9
/**
* Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE.
*/
@Test
public void testDisabledCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamGraph streamGraph = new StreamGraph(env);
assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
}
示例10
/**
* Tests access to the snapshotting settings.
*/
@Test
public void testGetSnapshottingSettings() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
when(jobVertex.getParallelism()).thenReturn(1);
JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
new CheckpointCoordinatorConfiguration(
181238123L,
19191992L,
191929L,
123,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false,
false,
0
),
null);
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
snapshottingSettings.getCheckpointCoordinatorConfiguration(),
new UnregisteredMetricsGroup());
assertEquals(snapshottingSettings.getCheckpointCoordinatorConfiguration(), tracker.getJobCheckpointingConfiguration());
}
示例11
private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
jobGraph.setSnapshotSettings(
new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
100,
10 * 60 * 1000,
0,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false,
false,
0),
null));
final Time timeout = Time.seconds(10L);
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
configuration,
executor,
executor,
new ProgrammedSlotProvider(1),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
blobWriter,
timeout,
LoggerFactory.getLogger(getClass()),
NettyShuffleMaster.INSTANCE,
NoOpPartitionTracker.INSTANCE);
}
示例12
/**
* Tests that disabled checkpointing sets the checkpointing interval to Long.MAX_VALUE.
*/
@Test
public void testDisabledCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamGraph streamGraph = new StreamGraph(env.getConfig(), env.getCheckpointConfig());
assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointCoordinatorConfiguration().getCheckpointInterval());
}
示例13
/**
* Tests access to the snapshotting settings.
*/
@Test
public void testGetSnapshottingSettings() throws Exception {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
when(jobVertex.getParallelism()).thenReturn(1);
JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
new CheckpointCoordinatorConfiguration(
181238123L,
19191992L,
191929L,
123,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false,
false,
false,
0
),
null);
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
snapshottingSettings.getCheckpointCoordinatorConfiguration(),
new UnregisteredMetricsGroup());
assertEquals(snapshottingSettings.getCheckpointCoordinatorConfiguration(), tracker.getJobCheckpointingConfiguration());
}
示例14
private static JobCheckpointingSettings createCheckpointSettings(JobVertex... vertices) {
final List<JobVertexID> ids = Arrays.stream(vertices)
.map(JobVertex::getID)
.collect(Collectors.toList());
final CheckpointCoordinatorConfiguration coordCfg =
new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder()
.setMaxConcurrentCheckpoints(1)
.setCheckpointInterval(10)
.setCheckpointTimeout(100_000)
.build();
return new JobCheckpointingSettings(ids, ids, ids, coordCfg, null);
}
示例15
public static void enableCheckpointing(final JobGraph jobGraph, @Nullable StateBackend stateBackend) {
final List<JobVertexID> triggerVertices = new ArrayList<>();
final List<JobVertexID> allVertices = new ArrayList<>();
for (JobVertex vertex : jobGraph.getVertices()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
}
allVertices.add(vertex.getID());
}
final CheckpointCoordinatorConfiguration config = new CheckpointCoordinatorConfiguration(
Long.MAX_VALUE, // disable periodical checkpointing
DEFAULT_CHECKPOINT_TIMEOUT_MS,
0,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false,
false,
false,
0);
SerializedValue<StateBackend> serializedStateBackend = null;
if (stateBackend != null) {
try {
serializedStateBackend = new SerializedValue<>(stateBackend);
} catch (IOException e) {
throw new RuntimeException("could not serialize state backend", e);
}
}
jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
triggerVertices, allVertices, allVertices,
config, serializedStateBackend));
}
示例16
@Test
public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
final ClassLoader classLoader = new URLClassLoader(new URL[0], getClass().getClassLoader());
final Serializable outOfClassPath = CommonTestUtils.createObjectForClassNotInClassPath(classLoader);
final MasterTriggerRestoreHook.Factory[] hooks = {
new TestFactory(outOfClassPath) };
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serHooks = new SerializedValue<>(hooks);
final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
1000L,
10000L,
0L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
serHooks);
final JobGraph jobGraph = new JobGraph(new JobID(), "test job");
jobGraph.setSnapshotSettings(checkpointingSettings);
// to serialize/deserialize the job graph to see if the behavior is correct under
// distributed execution
final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
final Time timeout = Time.seconds(10L);
final ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
null,
copy,
new Configuration(),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
mock(SlotProvider.class),
classLoader,
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
10,
VoidBlobWriter.getInstance(),
timeout,
log);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
}
示例17
/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
*/
@Test
public void testJobManagerJMXMetricAccess() throws Exception {
Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
try {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
500,
500,
50,
5,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
null));
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
client.setDetached(true);
client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader());
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor()
).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
Assert.assertEquals(1, nameSet.size());
assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
BlockingInvokable.unblock();
} finally {
BlockingInvokable.unblock();
}
}
示例18
@Test
public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
final CommonTestUtils.ObjectAndClassLoader outsideClassLoading = CommonTestUtils.createObjectFromNewClassLoader();
final ClassLoader classLoader = outsideClassLoading.getClassLoader();
final Serializable outOfClassPath = outsideClassLoading.getObject();
final MasterTriggerRestoreHook.Factory[] hooks = {
new TestFactory(outOfClassPath) };
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serHooks = new SerializedValue<>(hooks);
final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
1000L,
10000L,
0L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0),
new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
serHooks);
final JobGraph jobGraph = new JobGraph(new JobID(), "test job");
jobGraph.setSnapshotSettings(checkpointingSettings);
// to serialize/deserialize the job graph to see if the behavior is correct under
// distributed execution
final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
final Time timeout = Time.seconds(10L);
final ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
null,
copy,
new Configuration(),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
mock(SlotProvider.class),
classLoader,
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
log,
NettyShuffleMaster.INSTANCE,
NoOpPartitionTracker.INSTANCE);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
}
示例19
/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
*/
@Test
public void testJobManagerJMXMetricAccess() throws Exception {
Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
try {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
500,
500,
50,
5,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
0),
null));
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
client.setDetached(true);
client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader());
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor()
).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
Assert.assertEquals(1, nameSet.size());
assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
BlockingInvokable.unblock();
} finally {
BlockingInvokable.unblock();
}
}
示例20
@Test
public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception {
final ClassLoaderUtils.ObjectAndClassLoader<Serializable> outsideClassLoading = ClassLoaderUtils.createSerializableObjectFromNewClassLoader();
final ClassLoader classLoader = outsideClassLoading.getClassLoader();
final Serializable outOfClassPath = outsideClassLoading.getObject();
final MasterTriggerRestoreHook.Factory[] hooks = {
new TestFactory(outOfClassPath) };
final SerializedValue<MasterTriggerRestoreHook.Factory[]> serHooks = new SerializedValue<>(hooks);
final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
1000L,
10000L,
0L,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0),
new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)),
serHooks);
final JobGraph jobGraph = new JobGraph(new JobID(), "test job");
jobGraph.setSnapshotSettings(checkpointingSettings);
// to serialize/deserialize the job graph to see if the behavior is correct under
// distributed execution
final JobGraph copy = CommonTestUtils.createCopySerializable(jobGraph);
final Time timeout = Time.seconds(10L);
final ExecutionGraph eg = ExecutionGraphBuilder.buildGraph(
null,
copy,
new Configuration(),
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
mock(SlotProvider.class),
classLoader,
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
VoidBlobWriter.getInstance(),
timeout,
log,
NettyShuffleMaster.INSTANCE,
NoOpJobMasterPartitionTracker.INSTANCE);
assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend);
}
示例21
private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
final ScheduledExecutorService executor = TestingUtils.defaultExecutor();
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
jobGraph.setSnapshotSettings(
new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
100,
10 * 60 * 1000,
0,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
false,
false,
false,
0),
null));
final Time timeout = Time.seconds(10L);
return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
configuration,
executor,
executor,
new ProgrammedSlotProvider(1),
getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
timeout,
new NoRestartStrategy(),
new UnregisteredMetricsGroup(),
blobWriter,
timeout,
LoggerFactory.getLogger(getClass()),
NettyShuffleMaster.INSTANCE,
NoOpJobMasterPartitionTracker.INSTANCE);
}
示例22
/**
* Tests that metrics registered on the JobManager are actually accessible via JMX.
*/
@Test
public void testJobManagerJMXMetricAccess() throws Exception {
Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
try {
JobVertex sourceJobVertex = new JobVertex("Source");
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
new CheckpointCoordinatorConfiguration(
500,
500,
50,
5,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0),
null));
ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
ClientUtils.submitJob(client, jobGraph);
FutureUtils.retrySuccessfulWithDelay(
() -> client.getJobStatus(jobGraph.getJobID()),
Time.milliseconds(10),
deadline,
status -> status == JobStatus.RUNNING,
TestingUtils.defaultScheduledExecutor()
).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
Assert.assertEquals(1, nameSet.size());
assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
BlockingInvokable.unblock();
} finally {
BlockingInvokable.unblock();
}
}
示例23
/**
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
*
* @param settings The snapshot settings
*/
public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
}
示例24
/**
* Gets the settings for asynchronous snapshots. This method returns null, when
* checkpointing is not enabled.
*
* @return The snapshot settings
*/
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
示例25
/**
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
*
* @param settings The snapshot settings
*/
public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
}
示例26
/**
* Gets the settings for asynchronous snapshots. This method returns null, when
* checkpointing is not enabled.
*
* @return The snapshot settings
*/
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
示例27
/**
* Sets the settings for asynchronous snapshots. A value of {@code null} means that
* snapshotting is not enabled.
*
* @param settings The snapshot settings
*/
public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
}
示例28
/**
* Gets the settings for asynchronous snapshots. This method returns null, when
* checkpointing is not enabled.
*
* @return The snapshot settings
*/
public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}