package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.class */
class SortMergeSubpartitionReaderTest {
    private static final int bufferSize = 1024;
    private static final byte[] dataBytes = new byte[1024];
    private static final int numSubpartitions = 10;
    private static final int numBuffersPerSubpartition = 10;
    private PartitionedFile partitionedFile;
    private FileChannel dataFileChannel;
    private FileChannel indexFileChannel;

    SortMergeSubpartitionReaderTest() {
    }

    @BeforeEach
    void before(@TempDir Path path) throws Exception {
        new Random().nextBytes(dataBytes);
        this.partitionedFile = PartitionTestUtils.createPartitionedFile(path.toString(), 10, 10, 1024, dataBytes);
        this.dataFileChannel = openFileChannel(this.partitionedFile.getDataFilePath());
        this.indexFileChannel = openFileChannel(this.partitionedFile.getIndexFilePath());
    }

    @AfterEach
    void after() {
        IOUtils.closeAllQuietly(new AutoCloseable[]{this.dataFileChannel, this.indexFileChannel});
        this.partitionedFile.deleteQuietly();
    }

    @Test
    void testReadBuffers() throws Exception {
        CountingAvailabilityListener countingAvailabilityListener = new CountingAvailabilityListener();
        SortMergeSubpartitionReader createSortMergeSubpartitionReader = createSortMergeSubpartitionReader(countingAvailabilityListener);
        Assertions.assertThat(countingAvailabilityListener.numNotifications).isZero();
        Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(2);
        createSortMergeSubpartitionReader.readBuffers(createsMemorySegments, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(1);
        Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
        Assertions.assertThat(createsMemorySegments).isEmpty();
        Queue<MemorySegment> createsMemorySegments2 = createsMemorySegments(2);
        createSortMergeSubpartitionReader.readBuffers(createsMemorySegments2, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(1);
        Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
        Assertions.assertThat(createsMemorySegments2).isEmpty();
        while (createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 0) {
            ((ResultSubpartition.BufferAndBacklog) Preconditions.checkNotNull(createSortMergeSubpartitionReader.getNextBuffer())).buffer().recycleBuffer();
        }
        Queue<MemorySegment> createsMemorySegments3 = createsMemorySegments(10);
        createSortMergeSubpartitionReader.readBuffers(createsMemorySegments3, FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(2);
        Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(8);
        Assertions.assertThat(createsMemorySegments3.size()).isEqualTo(1);
    }

    @Test
    void testPollBuffers() throws Exception {
        SortMergeSubpartitionReader createSortMergeSubpartitionReader = createSortMergeSubpartitionReader(new CountingAvailabilityListener());
        Assertions.assertThat(createSortMergeSubpartitionReader.getNextBuffer()).isNull();
        Assertions.assertThat(createSortMergeSubpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()).isFalse();
        createSortMergeSubpartitionReader.readBuffers(createsMemorySegments(10), FreeingBufferRecycler.INSTANCE);
        int i = 9;
        while (i >= 0) {
            if (createSortMergeSubpartitionReader.getAvailabilityAndBacklog(i).isAvailable()) {
                ResultSubpartition.BufferAndBacklog bufferAndBacklog = (ResultSubpartition.BufferAndBacklog) Preconditions.checkNotNull(createSortMergeSubpartitionReader.getNextBuffer());
                Buffer fullBufferData = bufferAndBacklog.buffer().getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment(bufferAndBacklog.buffer().readableBytes()));
                Assertions.assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBufferData.getNioBufferReadable());
                Assertions.assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(i == 0 ? 0 : i - 1);
                Assertions.assertThat(i <= 1 ? Buffer.DataType.NONE : Buffer.DataType.DATA_BUFFER).isEqualTo(bufferAndBacklog.getNextDataType());
                fullBufferData.recycleBuffer();
            }
            i--;
        }
    }

    @Test
    void testFail() throws Exception {
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(5);
        try {
            CountingAvailabilityListener countingAvailabilityListener = new CountingAvailabilityListener();
            SortMergeSubpartitionReader createSortMergeSubpartitionReader = createSortMergeSubpartitionReader(countingAvailabilityListener);
            createsMemorySegments.getClass();
            createSortMergeSubpartitionReader.readBuffers(createsMemorySegments, (v1) -> {
                r2.add(v1);
            });
            Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(1);
            Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
            createSortMergeSubpartitionReader.fail(new RuntimeException("Test exception."));
            Assertions.assertThat(createSortMergeSubpartitionReader.getReleaseFuture()).isDone();
            Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
            Assertions.assertThat(createSortMergeSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
            Assertions.assertThat(createSortMergeSubpartitionReader.isReleased()).isTrue();
            Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(2);
            Assertions.assertThat(createSortMergeSubpartitionReader.getFailureCause()).isNotNull();
            Assertions.assertThat(createsMemorySegments).hasSize(5);
        } catch (Throwable th) {
            Assertions.assertThat(createsMemorySegments).hasSize(5);
            throw th;
        }
    }

    @Test
    void testReleaseAllResources() throws Exception {
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(5);
        try {
            CountingAvailabilityListener countingAvailabilityListener = new CountingAvailabilityListener();
            SortMergeSubpartitionReader createSortMergeSubpartitionReader = createSortMergeSubpartitionReader(countingAvailabilityListener);
            createsMemorySegments.getClass();
            createSortMergeSubpartitionReader.readBuffers(createsMemorySegments, (v1) -> {
                r2.add(v1);
            });
            Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(1);
            Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
            createSortMergeSubpartitionReader.releaseAllResources();
            Assertions.assertThat(createSortMergeSubpartitionReader.getReleaseFuture()).isDone();
            Assertions.assertThat(createSortMergeSubpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
            Assertions.assertThat(createSortMergeSubpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
            Assertions.assertThat(createSortMergeSubpartitionReader.isReleased()).isTrue();
            Assertions.assertThat(countingAvailabilityListener.numNotifications).isEqualTo(1);
            Assertions.assertThat(createSortMergeSubpartitionReader.getFailureCause()).isNull();
            Assertions.assertThat(createsMemorySegments).hasSize(5);
        } catch (Throwable th) {
            Assertions.assertThat(createsMemorySegments).hasSize(5);
            throw th;
        }
    }

    @Test
    void testReadBuffersAfterReleased() throws Exception {
        Queue<MemorySegment> createsMemorySegments = createsMemorySegments(5);
        try {
            SortMergeSubpartitionReader createSortMergeSubpartitionReader = createSortMergeSubpartitionReader(new CountingAvailabilityListener());
            createsMemorySegments.getClass();
            createSortMergeSubpartitionReader.readBuffers(createsMemorySegments, (v1) -> {
                r2.add(v1);
            });
            createSortMergeSubpartitionReader.releaseAllResources();
            Assertions.assertThatThrownBy(() -> {
                createsMemorySegments.getClass();
                createSortMergeSubpartitionReader.readBuffers(createsMemorySegments, (v1) -> {
                    r2.add(v1);
                });
            }).isInstanceOf(IllegalStateException.class);
            Assertions.assertThat(createsMemorySegments).hasSize(5);
        } catch (Throwable th) {
            Assertions.assertThat(createsMemorySegments).hasSize(5);
            throw th;
        }
    }

    @Test
    void testPollBuffersAfterReleased() throws Exception {
        SortMergeSubpartitionReader createSortMergeSubpartitionReader = createSortMergeSubpartitionReader(new CountingAvailabilityListener());
        createSortMergeSubpartitionReader.readBuffers(createsMemorySegments(10), FreeingBufferRecycler.INSTANCE);
        Assertions.assertThat(createSortMergeSubpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable()).isTrue();
        createSortMergeSubpartitionReader.releaseAllResources();
        Assertions.assertThat(createSortMergeSubpartitionReader.getNextBuffer()).isNull();
    }

    private SortMergeSubpartitionReader createSortMergeSubpartitionReader(BufferAvailabilityListener bufferAvailabilityListener) throws Exception {
        PartitionedFileReader partitionedFileReader = new PartitionedFileReader(this.partitionedFile, 0, this.dataFileChannel, this.indexFileChannel, BufferReaderWriterUtil.allocatedHeaderBuffer(), PartitionedFileWriteReadTest.createAndConfigIndexEntryBuffer());
        Assertions.assertThat(partitionedFileReader.hasRemaining()).isTrue();
        return new SortMergeSubpartitionReader(bufferAvailabilityListener, partitionedFileReader);
    }

    private static FileChannel openFileChannel(Path path) throws IOException {
        return FileChannel.open(path, StandardOpenOption.READ);
    }

    private static Queue<MemorySegment> createsMemorySegments(int i) {
        ArrayDeque arrayDeque = new ArrayDeque();
        for (int i2 = 0; i2 < i; i2++) {
            arrayDeque.add(MemorySegmentFactory.allocateUnpooledSegment(1024));
        }
        return arrayDeque;
    }
}
