package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.class */
public class RocksDBStateDownloader extends RocksDBStateDataTransfer {
    public RocksDBStateDownloader(int i) {
        super(i);
    }

    public void transferAllStateDataToDirectory(Collection<StateHandleDownloadSpec> collection, CloseableRegistry closeableRegistry) throws Exception {
        CloseableRegistry closeableRegistry2 = new CloseableRegistry();
        closeableRegistry.registerCloseable(closeableRegistry2);
        try {
            try {
                FutureUtils.completeAll((List) transferAllStateDataToDirectoryAsync(collection, closeableRegistry2).collect(Collectors.toList())).get();
                if (closeableRegistry.unregisterCloseable(closeableRegistry2)) {
                    IOUtils.closeQuietly(closeableRegistry2);
                }
            } catch (Exception e) {
                collection.stream().map((v0) -> {
                    return v0.getDownloadDestination();
                }).map((v0) -> {
                    return v0.toFile();
                }).forEach(FileUtils::deleteDirectoryQuietly);
                Throwable stripException = ExceptionUtils.stripException(ExceptionUtils.stripExecutionException(e), RuntimeException.class);
                if (!(stripException instanceof IOException)) {
                    throw new FlinkRuntimeException("Failed to download data for state handles.", e);
                }
                throw ((IOException) stripException);
            }
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(closeableRegistry2)) {
                IOUtils.closeQuietly(closeableRegistry2);
            }
            throw th;
        }
    }

    private Stream<CompletableFuture<Void>> transferAllStateDataToDirectoryAsync(Collection<StateHandleDownloadSpec> collection, CloseableRegistry closeableRegistry) {
        return collection.stream().flatMap(stateHandleDownloadSpec -> {
            return Streams.concat(new Stream[]{stateHandleDownloadSpec.getStateHandle().getSharedState().stream(), stateHandleDownloadSpec.getStateHandle().getPrivateState().stream()}).map(handleAndLocalPath -> {
                String localPath = handleAndLocalPath.getLocalPath();
                StreamStateHandle handle = handleAndLocalPath.getHandle();
                Path resolve = stateHandleDownloadSpec.getDownloadDestination().resolve(localPath);
                return ThrowingRunnable.unchecked(() -> {
                    downloadDataForStateHandle(resolve, handle, closeableRegistry);
                });
            });
        }).map(runnable -> {
            return CompletableFuture.runAsync(runnable, this.executorService);
        });
    }

    private void downloadDataForStateHandle(Path path, StreamStateHandle streamStateHandle, CloseableRegistry closeableRegistry) throws IOException {
        if (closeableRegistry.isClosed()) {
            return;
        }
        try {
            Closeable openInputStream = streamStateHandle.openInputStream();
            closeableRegistry.registerCloseable(openInputStream);
            Files.createDirectories(path.getParent(), new FileAttribute[0]);
            OutputStream newOutputStream = Files.newOutputStream(path, new OpenOption[0]);
            closeableRegistry.registerCloseable(newOutputStream);
            byte[] bArr = new byte[8192];
            while (true) {
                int read = openInputStream.read(bArr);
                if (read == -1) {
                    closeableRegistry.unregisterAndCloseAll(new Closeable[]{newOutputStream, openInputStream});
                    return;
                }
                newOutputStream.write(bArr, 0, read);
            }
        } catch (Exception e) {
            IOUtils.closeQuietly(closeableRegistry);
            throw new IOException(e);
        }
    }

    @Override // org.apache.flink.contrib.streaming.state.RocksDBStateDataTransfer, java.io.Closeable, java.lang.AutoCloseable
    public /* bridge */ /* synthetic */ void close() {
        super.close();
    }
}
