package org.apache.flink.runtime.operators.chaining;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.FlatMapDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.class */
public class ChainedOperatorsMetricTest extends TaskTestBase {
    private static final int MEMORY_MANAGER_SIZE = 3145728;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
    private final List<Record> outList = new ArrayList();
    private static final String HEAD_OPERATOR_NAME = "headoperator";
    private static final String CHAINED_OPERATOR_NAME = "chainedoperator";

    /* loaded from: input_file:org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest$DuplicatingFlatMapFunction.class */
    public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> {
        private static final long serialVersionUID = -1152068682935346164L;

        public void flatMap(Record record, Collector<Record> collector) throws Exception {
            collector.collect(record);
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((Record) obj, (Collector<Record>) collector);
        }
    }

    @Test
    public void testOperatorIOMetricReuse() throws Exception {
        initEnvironment(3145728L, 1024);
        this.mockEnv = new MockEnvironmentBuilder().setTaskName(HEAD_OPERATOR_NAME).setManagedMemorySize(3145728L).setInputSplitProvider(this.inputSplitProvider).setBufferSize(1024).setMetricGroup(TaskManagerMetricGroup.createTaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, "host", ResourceID.generate()).addJob(new JobID(), "jobName").addTask(ExecutionGraphTestUtils.createExecutionAttemptId(), "task")).build();
        addInput(new UniformRecordGenerator(100, 20, false), 0);
        addOutput(this.outList);
        addChainedOperator();
        registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
        new BatchTask(this.mockEnv).invoke();
        Assert.assertEquals(8000L, this.outList.size());
        TaskMetricGroup metricGroup = this.mockEnv.getMetricGroup();
        TaskIOMetricGroup iOMetricGroup = metricGroup.getIOMetricGroup();
        Counter numRecordsInCounter = iOMetricGroup.getNumRecordsInCounter();
        Counter numRecordsOutCounter = iOMetricGroup.getNumRecordsOutCounter();
        Assert.assertEquals(2000L, numRecordsInCounter.getCount());
        Assert.assertEquals(8000L, numRecordsOutCounter.getCount());
        OperatorIOMetricGroup iOMetricGroup2 = metricGroup.getOrAddOperator(HEAD_OPERATOR_NAME).getIOMetricGroup();
        Counter numRecordsInCounter2 = iOMetricGroup2.getNumRecordsInCounter();
        Counter numRecordsOutCounter2 = iOMetricGroup2.getNumRecordsOutCounter();
        Assert.assertEquals(2000L, numRecordsInCounter2.getCount());
        Assert.assertEquals(4000L, numRecordsOutCounter2.getCount());
        InternalOperatorIOMetricGroup iOMetricGroup3 = metricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME).getIOMetricGroup();
        Counter numRecordsInCounter3 = iOMetricGroup3.getNumRecordsInCounter();
        Counter numRecordsOutCounter3 = iOMetricGroup3.getNumRecordsOutCounter();
        Assert.assertEquals(4000L, numRecordsInCounter3.getCount());
        Assert.assertEquals(8000L, numRecordsOutCounter3.getCount());
    }

    private void addChainedOperator() {
        TaskConfig taskConfig = new TaskConfig(new Configuration());
        taskConfig.addInputToGroup(0);
        taskConfig.setInputSerializer(serFact, 0);
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig.setOutputSerializer(serFact);
        taskConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
        taskConfig.setStubWrapper(new UserCodeClassWrapper(DuplicatingFlatMapFunction.class));
        getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, taskConfig, CHAINED_OPERATOR_NAME);
    }
}
