package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.class */
public class ExecutionVertexDeploymentTest extends TestLogger {
    private static final String ERROR_MESSAGE = "test_failure_error_message";

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest$SubmitBlockingSimpleAckingTaskManagerGateway.class */
    private static class SubmitBlockingSimpleAckingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
        private SubmitBlockingSimpleAckingTaskManagerGateway() {
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, Time time) {
            return new CompletableFuture<>();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest$SubmitFailingSimpleAckingTaskManagerGateway.class */
    public static class SubmitFailingSimpleAckingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, Time time) {
            CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(new Exception(ExecutionVertexDeploymentTest.ERROR_MESSAGE));
            return completableFuture;
        }
    }

    @Test
    public void testDeployCall() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            executionVertex.deployToSlot(createTestingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            try {
                executionVertex.deployToSlot(createTestingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertFalse(executionVertex.getFailureInfo().isPresent());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeployWithSynchronousAnswer() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            executionVertex.deployToSlot(createTestingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            try {
                executionVertex.deployToSlot(createTestingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertFalse(executionVertex.getFailureInfo().isPresent());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeployWithAsynchronousAnswer() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            executionVertex.deployToSlot(createTestingLogicalSlot);
            try {
                executionVertex.deployToSlot(createTestingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            try {
                executionVertex.deployToSlot(createTestingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e2) {
            }
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testDeployFailedSynchronous() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            executionVertex.deployToSlot(createTestingLogicalSlot);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertTrue(executionVertex.getFailureInfo().isPresent());
            Assert.assertThat(executionVertex.getFailureInfo().map((v0) -> {
                return v0.getExceptionAsString();
            }).get(), StringContains.containsString(ERROR_MESSAGE));
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testDeployFailedAsynchronously() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            executionVertex.deployToSlot(createTestingLogicalSlot);
            for (int i = 0; i < 100 && (executionVertex.getExecutionState() != ExecutionState.FAILED || !executionVertex.getFailureInfo().isPresent()); i++) {
                Thread.sleep(10L);
            }
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertTrue(executionVertex.getFailureInfo().isPresent());
            Assert.assertThat(executionVertex.getFailureInfo().map((v0) -> {
                return v0.getExceptionAsString();
            }).get(), StringContains.containsString(ERROR_MESSAGE));
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringDeploy() {
        try {
            ExecutionVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitBlockingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            executionVertex.deployToSlot(createTestingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            Exception exc = new Exception("test error");
            executionVertex.fail(exc);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertThat(((SerializedThrowable) executionVertex.getFailureInfo().map((v0) -> {
                return v0.getException();
            }).get()).deserializeError(ClassLoader.getSystemClassLoader()), CoreMatchers.is(exc));
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
