/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.FullSnapshotUtil;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyValueStateIterator;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.SupplierWithException;

public class FullSnapshotAsyncWriter<K>
implements SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> {
    @Nonnull
    private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
    @Nonnull
    private final FullSnapshotResources<K> snapshotResources;
    @Nonnull
    private final SnapshotType snapshotType;

    public FullSnapshotAsyncWriter(@Nonnull SnapshotType snapshotType, @Nonnull SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, @Nonnull FullSnapshotResources<K> snapshotResources) {
        this.checkpointStreamSupplier = checkpointStreamSupplier;
        this.snapshotResources = snapshotResources;
        this.snapshotType = snapshotType;
    }

    @Override
    public SnapshotResult<KeyedStateHandle> get(CloseableRegistry snapshotCloseableRegistry) throws Exception {
        KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(this.snapshotResources.getKeyGroupRange());
        CheckpointStreamWithResultProvider checkpointStreamWithResultProvider = this.checkpointStreamSupplier.get();
        snapshotCloseableRegistry.registerCloseable(checkpointStreamWithResultProvider);
        this.writeSnapshotToOutputStream(checkpointStreamWithResultProvider, keyGroupRangeOffsets);
        if (snapshotCloseableRegistry.unregisterCloseable(checkpointStreamWithResultProvider)) {
            CheckpointStreamWithResultProvider.KeyedStateHandleFactory stateHandleFactory = this.snapshotType.isSavepoint() ? KeyGroupsSavepointStateHandle::new : KeyGroupsStateHandle::new;
            return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult(), keyGroupRangeOffsets, stateHandleFactory);
        }
        throw new IOException("Stream is already unregistered/closed.");
    }

    private void writeSnapshotToOutputStream(@Nonnull CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, @Nonnull KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
        DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(checkpointStreamWithResultProvider.getCheckpointOutputStream());
        this.writeKVStateMetaData(outputView);
        try (KeyValueStateIterator kvStateIterator = this.snapshotResources.createKVStateIterator();){
            this.writeKVStateData(kvStateIterator, checkpointStreamWithResultProvider, keyGroupRangeOffsets);
        }
    }

    private void writeKVStateMetaData(DataOutputView outputView) throws IOException {
        KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<K>(this.snapshotResources.getKeySerializer(), this.snapshotResources.getMetaInfoSnapshots(), !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.snapshotResources.getStreamCompressionDecorator()));
        serializationProxy.write(outputView);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writeKVStateData(KeyValueStateIterator mergeIterator, CheckpointStreamWithResultProvider checkpointStreamWithResultProvider, KeyGroupRangeOffsets keyGroupRangeOffsets) throws IOException, InterruptedException {
        byte[] previousKey = null;
        byte[] previousValue = null;
        DataOutputViewStreamWrapper kgOutView = null;
        OutputStream kgOutStream = null;
        CheckpointStateOutputStream checkpointOutputStream = checkpointStreamWithResultProvider.getCheckpointOutputStream();
        try {
            if (mergeIterator.isValid()) {
                keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                kgOutStream = this.snapshotResources.getStreamCompressionDecorator().decorateWithCompression(checkpointOutputStream);
                kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                kgOutView.writeShort(mergeIterator.kvStateId());
                previousKey = mergeIterator.key();
                previousValue = mergeIterator.value();
                mergeIterator.next();
            }
            while (mergeIterator.isValid()) {
                assert (!FullSnapshotUtil.hasMetaDataFollowsFlag(previousKey));
                if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
                    this.checkInterrupted();
                    FullSnapshotUtil.setMetaDataFollowsFlagInKey(previousKey);
                }
                this.writeKeyValuePair(previousKey, previousValue, kgOutView);
                if (mergeIterator.isNewKeyGroup()) {
                    kgOutView.writeShort(65535);
                    kgOutStream.close();
                    keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), checkpointOutputStream.getPos());
                    kgOutStream = this.snapshotResources.getStreamCompressionDecorator().decorateWithCompression(checkpointOutputStream);
                    kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                    kgOutView.writeShort(mergeIterator.kvStateId());
                } else if (mergeIterator.isNewKeyValueState()) {
                    kgOutView.writeShort(mergeIterator.kvStateId());
                }
                previousKey = mergeIterator.key();
                previousValue = mergeIterator.value();
                mergeIterator.next();
            }
            if (previousKey != null) {
                assert (!FullSnapshotUtil.hasMetaDataFollowsFlag(previousKey));
                FullSnapshotUtil.setMetaDataFollowsFlagInKey(previousKey);
                this.writeKeyValuePair(previousKey, previousValue, kgOutView);
                kgOutView.writeShort(65535);
                kgOutStream.close();
                kgOutStream = null;
            }
        }
        finally {
            IOUtils.closeQuietly(kgOutStream);
        }
    }

    private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
        BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
        BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
    }

    private void checkInterrupted() throws InterruptedException {
        if (Thread.currentThread().isInterrupted()) {
            throw new InterruptedException("RocksDB snapshot interrupted.");
        }
    }
}

