package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManagerTest.class */
class TaskExecutorManagerTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    TaskExecutorManagerTest() {
    }

    @Test
    void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() {
        WorkerResourceSpec build = new WorkerResourceSpec.Builder().setCpuCores(3.0d).build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setCpuCores(3.0d).build();
        ResourceProfile build3 = ResourceProfile.newBuilder().setCpuCores(2.0d).build();
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(build).setNumSlotsPerWorker(1).setMaxNumSlots(2).createTaskExecutorManager();
        Throwable th = null;
        try {
            try {
                createTaskExecutorManager.allocateWorker(build2);
                Assertions.assertThat(createTaskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
                createAndRegisterTaskExecutor(createTaskExecutorManager, 1, build3);
                Assertions.assertThat(createTaskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
                Assertions.assertThat(createTaskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
                if (createTaskExecutorManager != null) {
                    if (0 == 0) {
                        createTaskExecutorManager.close();
                        return;
                    }
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutorManager != null) {
                if (th != null) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testPendingSlotNotFulfilledByAllocatedSlot() {
        WorkerResourceSpec build = new WorkerResourceSpec.Builder().setCpuCores(3.0d).build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setCpuCores(3.0d).build();
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(build).setNumSlotsPerWorker(1).setMaxNumSlots(2).createTaskExecutorManager();
        Throwable th = null;
        try {
            createTaskExecutorManager.allocateWorker(build2);
            Assertions.assertThat(createTaskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
            TaskExecutorConnection createTaskExecutorConnection = createTaskExecutorConnection();
            createTaskExecutorManager.registerTaskManager(createTaskExecutorConnection, new SlotReport(new SlotStatus(new SlotID(createTaskExecutorConnection.getResourceID(), 0), build2, JobID.generate(), new AllocationID())), ResourceProfile.ANY, ResourceProfile.ANY);
            Assertions.assertThat(createTaskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
            Assertions.assertThat(createTaskExecutorManager.getNumberPendingTaskManagerSlots()).isEqualTo(1);
            if (createTaskExecutorManager != null) {
                if (0 == 0) {
                    createTaskExecutorManager.close();
                    return;
                }
                try {
                    createTaskExecutorManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTaskExecutorManager != null) {
                if (0 != 0) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
        Time milliseconds = Time.milliseconds(10L);
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceAllocator build = createResourceAllocatorBuilder().setDeclareResourceNeededConsumer(collection -> {
            Assertions.assertThat(collection).hasSize(1);
            ResourceDeclaration resourceDeclaration = (ResourceDeclaration) collection.iterator().next();
            Assertions.assertThat(resourceDeclaration.getNumNeeded()).isZero();
            Assertions.assertThat(resourceDeclaration.getUnwantedWorkers()).hasSize(1);
            completableFuture.complete(resourceDeclaration.getUnwantedWorkers().iterator().next());
        }).build();
        ExecutorService executor = EXECUTOR_RESOURCE.getExecutor();
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setTaskManagerTimeout(milliseconds).setResourceAllocator(build).setMainThreadExecutor(executor).createTaskExecutorManager();
        Throwable th = null;
        try {
            try {
                CompletableFuture.supplyAsync(() -> {
                    InstanceID createAndRegisterTaskExecutor = createAndRegisterTaskExecutor(createTaskExecutorManager, 1, ResourceProfile.ANY);
                    Assertions.assertThat(createTaskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
                    return createAndRegisterTaskExecutor;
                }, executor).thenCombine((CompletionStage) completableFuture, (instanceID, instanceID2) -> {
                    Assertions.assertThat(instanceID).isEqualTo(instanceID2);
                    Assertions.assertThat(createTaskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
                    return instanceID;
                }).thenAccept(instanceID3 -> {
                    createTaskExecutorManager.unregisterTaskExecutor(instanceID3);
                    Assertions.assertThat(createTaskExecutorManager.getNumberRegisteredSlots()).isZero();
                }).get();
                if (createTaskExecutorManager != null) {
                    if (0 == 0) {
                        createTaskExecutorManager.close();
                        return;
                    }
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutorManager != null) {
                if (th != null) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testTimeoutForUnusedTaskManager() throws Exception {
        WorkerResourceSpec build = new WorkerResourceSpec.Builder().setCpuCores(1.0d).build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setCpuCores(1.0d).build();
        Time milliseconds = Time.milliseconds(50L);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture completableFuture = new CompletableFuture();
        TestingResourceAllocator build3 = new TestingResourceAllocatorBuilder().setDeclareResourceNeededConsumer(collection -> {
            Assertions.assertThat(collection.size()).isEqualTo(1);
            ResourceDeclaration resourceDeclaration = (ResourceDeclaration) collection.iterator().next();
            if (atomicInteger.getAndIncrement() == 0) {
                Assertions.assertThat(resourceDeclaration.getNumNeeded()).isEqualTo(1);
                Assertions.assertThat(resourceDeclaration.getUnwantedWorkers()).isEmpty();
            } else {
                Assertions.assertThat(resourceDeclaration.getNumNeeded()).isZero();
                Assertions.assertThat(resourceDeclaration.getUnwantedWorkers()).hasSize(1);
                completableFuture.complete(resourceDeclaration.getUnwantedWorkers().iterator().next());
            }
        }).build();
        ExecutorService executor = EXECUTOR_RESOURCE.getExecutor();
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setTaskManagerTimeout(milliseconds).setDefaultWorkerResourceSpec(build).setResourceAllocator(build3).setMainThreadExecutor(executor).createTaskExecutorManager();
        Throwable th = null;
        try {
            try {
                CompletableFuture.supplyAsync(() -> {
                    createTaskExecutorManager.allocateWorker(build2);
                    InstanceID createAndRegisterTaskExecutor = createAndRegisterTaskExecutor(createTaskExecutorManager, 1, build2);
                    createTaskExecutorManager.occupySlot(createAndRegisterTaskExecutor);
                    createTaskExecutorManager.freeSlot(createAndRegisterTaskExecutor);
                    return createAndRegisterTaskExecutor;
                }, executor).thenAcceptBoth((CompletionStage) completableFuture, (instanceID, instanceID2) -> {
                    Assertions.assertThat(instanceID).isEqualTo(instanceID2);
                }).get();
                if (createTaskExecutorManager != null) {
                    if (0 == 0) {
                        createTaskExecutorManager.close();
                        return;
                    }
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutorManager != null) {
                if (th != null) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testRequestRedundantTaskManager() {
        ResourceProfile build = ResourceProfile.newBuilder().setCpuCores(1.0d).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestingResourceAllocator build2 = new TestingResourceAllocatorBuilder().setDeclareResourceNeededConsumer(collection -> {
            atomicInteger.getAndIncrement();
        }).build();
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
        TaskExecutorManager createTaskExecutorManager = new TaskExecutorManagerBuilder(manuallyTriggeredScheduledExecutor).setRedundantTaskManagerNum(1).setMaxNumSlots(10).setResourceAllocator(build2).createTaskExecutorManager();
        Throwable th = null;
        try {
            manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
            Assertions.assertThat(atomicInteger).hasValue(0);
            createTaskExecutorManager.occupySlot(createAndRegisterTaskExecutor(createTaskExecutorManager, 1, build));
            Assertions.assertThat(atomicInteger).hasValue(0);
            manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
            Assertions.assertThat(atomicInteger).hasValue(1);
            manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
            Assertions.assertThat(atomicInteger).hasValue(1);
            if (createTaskExecutorManager != null) {
                if (0 == 0) {
                    createTaskExecutorManager.close();
                    return;
                }
                try {
                    createTaskExecutorManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTaskExecutorManager != null) {
                if (0 != 0) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() {
        WorkerResourceSpec build = new WorkerResourceSpec.Builder().setCpuCores(1.0d).build();
        ResourceProfile build2 = ResourceProfile.newBuilder().setCpuCores(2.0d).build();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(build).setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceAllocator(createResourceAllocatorBuilder().setDeclareResourceNeededConsumer(collection -> {
            atomicInteger.incrementAndGet();
        }).build()).createTaskExecutorManager();
        Throwable th = null;
        try {
            try {
                Assertions.assertThat(createTaskExecutorManager.allocateWorker(build2)).isNotPresent();
                Assertions.assertThat(atomicInteger).hasValue(0);
                if (createTaskExecutorManager != null) {
                    if (0 == 0) {
                        createTaskExecutorManager.close();
                        return;
                    }
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutorManager != null) {
                if (th != null) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testMaxSlotLimitAllocateWorker() {
        ArrayList arrayList = new ArrayList();
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceAllocator(createResourceAllocatorBuilder().setDeclareResourceNeededConsumer(collection -> {
            Assertions.assertThat(collection).hasSize(1);
            arrayList.add(Integer.valueOf(((ResourceDeclaration) collection.iterator().next()).getNumNeeded()));
        }).build()).createTaskExecutorManager();
        Throwable th = null;
        try {
            try {
                Assertions.assertThat(arrayList).isEmpty();
                createTaskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
                Assertions.assertThat(arrayList).containsExactly(new Integer[]{1});
                createTaskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
                Assertions.assertThat(arrayList).containsExactly(new Integer[]{1});
                if (createTaskExecutorManager != null) {
                    if (0 == 0) {
                        createTaskExecutorManager.close();
                        return;
                    }
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutorManager != null) {
                if (th != null) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testMaxSlotLimitRegisterWorker() throws Exception {
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setNumSlotsPerWorker(1).setMaxNumSlots(1).createTaskExecutorManager();
        Throwable th = null;
        try {
            createAndRegisterTaskExecutor(createTaskExecutorManager, 1, ResourceProfile.ANY);
            createAndRegisterTaskExecutor(createTaskExecutorManager, 1, ResourceProfile.ANY);
            Assertions.assertThat(createTaskExecutorManager.getNumberRegisteredSlots()).isEqualTo(1);
            if (createTaskExecutorManager != null) {
                if (0 == 0) {
                    createTaskExecutorManager.close();
                    return;
                }
                try {
                    createTaskExecutorManager.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createTaskExecutorManager != null) {
                if (0 != 0) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testGetResourceOverview() {
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 10);
        ResourceProfile fromResources2 = ResourceProfile.fromResources(2.0d, 20);
        TaskExecutorManager createTaskExecutorManager = createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager();
        Throwable th = null;
        try {
            try {
                InstanceID createAndRegisterTaskExecutor = createAndRegisterTaskExecutor(createTaskExecutorManager, 2, fromResources);
                InstanceID createAndRegisterTaskExecutor2 = createAndRegisterTaskExecutor(createTaskExecutorManager, 2, fromResources2);
                createTaskExecutorManager.occupySlot(createAndRegisterTaskExecutor);
                createTaskExecutorManager.occupySlot(createAndRegisterTaskExecutor2);
                Assertions.assertThat(createTaskExecutorManager.getTotalFreeResources()).isEqualTo(fromResources.merge(fromResources2));
                Assertions.assertThat(createTaskExecutorManager.getTotalFreeResourcesOf(createAndRegisterTaskExecutor)).isEqualTo(fromResources);
                Assertions.assertThat(createTaskExecutorManager.getTotalFreeResourcesOf(createAndRegisterTaskExecutor2)).isEqualTo(fromResources2);
                Assertions.assertThat(createTaskExecutorManager.getTotalRegisteredResources()).isEqualTo(fromResources.merge(fromResources2).multiply(2));
                Assertions.assertThat(createTaskExecutorManager.getTotalRegisteredResourcesOf(createAndRegisterTaskExecutor)).isEqualTo(fromResources.multiply(2));
                Assertions.assertThat(createTaskExecutorManager.getTotalRegisteredResourcesOf(createAndRegisterTaskExecutor2)).isEqualTo(fromResources2.multiply(2));
                if (createTaskExecutorManager != null) {
                    if (0 == 0) {
                        createTaskExecutorManager.close();
                        return;
                    }
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTaskExecutorManager != null) {
                if (th != null) {
                    try {
                        createTaskExecutorManager.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTaskExecutorManager.close();
                }
            }
            throw th4;
        }
    }

    private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
        return new TaskExecutorManagerBuilder(new ScheduledExecutorServiceAdapter((ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor())).setResourceAllocator(createResourceAllocatorBuilder().build());
    }

    private static TestingResourceAllocatorBuilder createResourceAllocatorBuilder() {
        return new TestingResourceAllocatorBuilder();
    }

    private static InstanceID createAndRegisterTaskExecutor(TaskExecutorManager taskExecutorManager, int i, ResourceProfile resourceProfile) {
        TaskExecutorConnection createTaskExecutorConnection = createTaskExecutorConnection();
        taskExecutorManager.registerTaskManager(createTaskExecutorConnection, new SlotReport((List) IntStream.range(0, i).mapToObj(i2 -> {
            return new SlotStatus(new SlotID(createTaskExecutorConnection.getResourceID(), i2), resourceProfile);
        }).collect(Collectors.toList())), resourceProfile.multiply(i), resourceProfile);
        return createTaskExecutorConnection.getInstanceID();
    }

    private static TaskExecutorConnection createTaskExecutorConnection() {
        return new TaskExecutorConnection(ResourceID.generate(), new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
    }
}
