package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ThreadSafe
/* loaded from: input_file:org/apache/flink/changelog/fs/FsStateChangelogStorage.class */
public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogStorage.class);
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final AtomicInteger logIdGenerator;
    private final TaskChangelogRegistry changelogRegistry;

    @Nullable
    private LocalChangelogRegistry localChangelogRegistry;

    @Nonnull
    private final LocalRecoveryConfig localRecoveryConfig;

    public FsStateChangelogStorage(JobID jobID, Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        this(jobID, configuration, taskManagerJobMetricGroup, TaskChangelogRegistry.defaultChangelogRegistry(((Integer) configuration.get(FsStateChangelogOptions.NUM_DISCARD_THREADS)).intValue()), localRecoveryConfig);
    }

    public FsStateChangelogStorage(JobID jobID, Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup, TaskChangelogRegistry taskChangelogRegistry, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        this(StateChangeUploadScheduler.fromConfig(jobID, configuration, new ChangelogStorageMetricGroup(taskManagerJobMetricGroup), taskChangelogRegistry, localRecoveryConfig), ((MemorySize) configuration.get(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD)).getBytes(), taskChangelogRegistry, localRecoveryConfig);
    }

    @VisibleForTesting
    public FsStateChangelogStorage(JobID jobID, Path path, boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup, TaskChangelogRegistry taskChangelogRegistry, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        this(StateChangeUploadScheduler.directScheduler(new StateChangeFsUploader(jobID, path, path.getFileSystem(), z, i, changelogStorageMetricGroup, taskChangelogRegistry)), ((MemorySize) FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD.defaultValue()).getBytes(), taskChangelogRegistry, localRecoveryConfig);
    }

    @VisibleForTesting
    public FsStateChangelogStorage(StateChangeUploadScheduler stateChangeUploadScheduler, long j, TaskChangelogRegistry taskChangelogRegistry, LocalRecoveryConfig localRecoveryConfig) {
        super(ChangelogStreamHandleReader.DIRECT_READER);
        this.logIdGenerator = new AtomicInteger(0);
        this.localChangelogRegistry = LocalChangelogRegistry.NO_OP;
        this.preEmptivePersistThresholdInBytes = j;
        this.changelogRegistry = taskChangelogRegistry;
        this.uploader = stateChangeUploadScheduler;
        this.localRecoveryConfig = localRecoveryConfig;
        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
            this.localChangelogRegistry = new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor());
        }
    }

    /* renamed from: createWriter, reason: merged with bridge method [inline-methods] */
    public FsStateChangelogWriter m8createWriter(String str, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
        UUID uuid = new UUID(0L, this.logIdGenerator.getAndIncrement());
        LOG.info("createWriter for operator {}/{}: {}", new Object[]{str, keyGroupRange, uuid});
        return new FsStateChangelogWriter(uuid, keyGroupRange, this.uploader, this.preEmptivePersistThresholdInBytes, mailboxExecutor, this.changelogRegistry, this.localRecoveryConfig, this.localChangelogRegistry);
    }

    @Override // org.apache.flink.changelog.fs.FsStateChangelogStorageForRecovery
    public void close() throws Exception {
        this.uploader.close();
    }

    public AvailabilityProvider getAvailabilityProvider() {
        return this.uploader.getAvailabilityProvider();
    }
}
