/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.temporal;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.table.data.RowData;

@Internal
public abstract class BaseTwoInputStreamOperatorWithStateRetention
extends AbstractStreamOperator<RowData>
implements TwoInputStreamOperator<RowData, RowData, RowData>,
Triggerable<Object, VoidNamespace> {
    private static final long serialVersionUID = -5953921797477294258L;
    private static final String CLEANUP_TIMESTAMP = "cleanup-timestamp";
    private static final String TIMERS_STATE_NAME = "timers";
    private final long minRetentionTime;
    private final long maxRetentionTime;
    protected final boolean stateCleaningEnabled;
    private transient ValueState<Long> latestRegisteredCleanupTimer;
    private transient SimpleTimerService timerService;

    protected BaseTwoInputStreamOperatorWithStateRetention(long minRetentionTime, long maxRetentionTime) {
        this.minRetentionTime = minRetentionTime;
        this.maxRetentionTime = maxRetentionTime;
        this.stateCleaningEnabled = minRetentionTime > 1L;
    }

    @Override
    public void open() throws Exception {
        this.initializeTimerService();
        if (this.stateCleaningEnabled) {
            ValueStateDescriptor<Long> cleanupStateDescriptor = new ValueStateDescriptor<Long>(CLEANUP_TIMESTAMP, Types.LONG);
            this.latestRegisteredCleanupTimer = this.getRuntimeContext().getState(cleanupStateDescriptor);
        }
    }

    private void initializeTimerService() {
        InternalTimerService<VoidNamespace> internalTimerService = this.getInternalTimerService(TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE, this);
        this.timerService = new SimpleTimerService(internalTimerService);
    }

    protected void registerProcessingCleanupTimer() throws IOException {
        if (this.stateCleaningEnabled) {
            long currentProcessingTime = this.timerService.currentProcessingTime();
            Optional<Long> currentCleanupTime = Optional.ofNullable(this.latestRegisteredCleanupTimer.value());
            if (!currentCleanupTime.isPresent() || currentProcessingTime + this.minRetentionTime > currentCleanupTime.get()) {
                this.updateCleanupTimer(currentProcessingTime, currentCleanupTime);
            }
        }
    }

    private void updateCleanupTimer(long currentProcessingTime, Optional<Long> currentCleanupTime) throws IOException {
        currentCleanupTime.ifPresent(aLong -> this.timerService.deleteProcessingTimeTimer((long)aLong));
        long newCleanupTime = currentProcessingTime + this.maxRetentionTime;
        this.timerService.registerProcessingTimeTimer(newCleanupTime);
        this.latestRegisteredCleanupTimer.update(newCleanupTime);
    }

    protected void cleanupLastTimer() throws IOException {
        Optional<Long> currentCleanupTime;
        if (this.stateCleaningEnabled && (currentCleanupTime = Optional.ofNullable(this.latestRegisteredCleanupTimer.value())).isPresent()) {
            this.latestRegisteredCleanupTimer.clear();
            this.timerService.deleteProcessingTimeTimer(currentCleanupTime.get());
        }
    }

    @Override
    public final void onProcessingTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {
        if (this.stateCleaningEnabled) {
            long timerTime = timer.getTimestamp();
            Long cleanupTime = this.latestRegisteredCleanupTimer.value();
            if (cleanupTime != null && cleanupTime == timerTime) {
                this.cleanupState(cleanupTime);
                this.latestRegisteredCleanupTimer.clear();
            }
        }
    }

    public abstract void cleanupState(long var1);
}

