package edu.iu.dsc.tws.task.window.core;

import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.modifiers.Closable;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.task.window.IWindowCompute;
import edu.iu.dsc.tws.task.window.api.GlobalStreamId;
import edu.iu.dsc.tws.task.window.api.IEvictionPolicy;
import edu.iu.dsc.tws.task.window.api.ITimestampExtractor;
import edu.iu.dsc.tws.task.window.api.IWindow;
import edu.iu.dsc.tws.task.window.api.IWindowMessage;
import edu.iu.dsc.tws.task.window.api.WindowLifeCycleListener;
import edu.iu.dsc.tws.task.window.config.WindowConfig;
import edu.iu.dsc.tws.task.window.event.WatermarkEventGenerator;
import edu.iu.dsc.tws.task.window.exceptions.InvalidWindow;
import edu.iu.dsc.tws.task.window.manage.WindowManager;
import edu.iu.dsc.tws.task.window.policy.eviction.count.CountEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.eviction.count.WatermarkCountEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.eviction.duration.DurationEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.eviction.duration.WatermarkDurationEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.IWindowingPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.count.CountWindowPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.count.WatermarkCountWindowPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.duration.DurationWindowPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.duration.WatermarkDurationWindowPolicy;
import edu.iu.dsc.tws.task.window.strategy.IWindowStrategy;
import edu.iu.dsc.tws.task.window.util.WindowParameter;
import edu.iu.dsc.tws.task.window.util.WindowUtils;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/window/core/BaseWindowedSink.class */
public abstract class BaseWindowedSink<T> extends AbstractSingleWindowDataSink<T> implements IWindowCompute<T>, Closable {
    private static final Logger LOG = Logger.getLogger(BaseWindowedSink.class.getName());
    private static final long DEFAULT_WATERMARK_INTERVAL = 1000;
    private static final long DEFAULT_MAX_LAG = 0;
    private long maxLagMs = DEFAULT_MAX_LAG;
    private WindowConfig.Duration watermarkInterval = null;
    private WindowConfig.Duration allowedLateness = null;
    private WindowManager<T> windowManager;
    private IWindowingPolicy<T> windowingPolicy;
    private WindowParameter windowParameter;
    private WindowLifeCycleListener<T> windowLifeCycleListener;
    private IEvictionPolicy<T> evictionPolicy;
    private IWindow iWindow;
    private T collectiveOutput;
    private IWindowMessage<T> collectiveEvents;
    private ITimestampExtractor<T> iTimestampExtractor;
    private WatermarkEventGenerator<T> watermarkEventGenerator;

    /* loaded from: input_file:edu/iu/dsc/tws/task/window/core/BaseWindowedSink$WindowedLateOutputCollector.class */
    private static class WindowedLateOutputCollector<T> {
        private final List<IMessage<T>> messageList;
        private IWindowMessage<T> iWindowMessage = null;

        WindowedLateOutputCollector(List<IMessage<T>> list) {
            this.messageList = list;
        }
    }

    public abstract boolean execute(IWindowMessage<T> iWindowMessage);

    public abstract boolean getExpire(IWindowMessage<T> iWindowMessage);

    public abstract boolean getLateMessages(IMessage<T> iMessage);

    public void prepare(Config config, TaskContext taskContext) {
        super.prepare(config, taskContext);
        this.windowLifeCycleListener = newWindowLifeCycleListener();
        this.windowManager = new WindowManager<>(this.windowLifeCycleListener);
        initialize(taskContext);
    }

    public void initialize(TaskContext taskContext) {
        try {
            if (this.iWindow == null) {
                this.iWindow = WindowUtils.getWindow(this.windowParameter.getWindowCountSize(), this.windowParameter.getSlidingCountSize(), this.windowParameter.getWindowDurationSize(), this.windowParameter.getSldingDurationSize());
            }
            if (this.iTimestampExtractor != null) {
                long j = this.watermarkInterval != null ? this.watermarkInterval.value : 1000L;
                if (this.allowedLateness != null) {
                    this.maxLagMs = this.allowedLateness.value;
                } else {
                    this.maxLagMs = DEFAULT_MAX_LAG;
                }
                this.watermarkEventGenerator = new WatermarkEventGenerator<>(this.windowManager, this.maxLagMs, j, getComponentStreams(taskContext));
            }
            setPolicies(this.iWindow.getWindowStrategy());
            start();
        } catch (InvalidWindow e) {
            e.printStackTrace();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // edu.iu.dsc.tws.task.window.api.IWindowedSink, edu.iu.dsc.tws.task.window.IWindowCompute
    public boolean execute(IMessage<T> iMessage) {
        if (!isTimestamped()) {
            this.windowManager.add(iMessage);
            return true;
        }
        long extractTimestamp = this.iTimestampExtractor.extractTimestamp(iMessage.getContent());
        if (this.watermarkEventGenerator.track(new GlobalStreamId(iMessage.edge()), extractTimestamp)) {
            this.windowManager.add(iMessage, extractTimestamp);
            return true;
        }
        getLateMessages(iMessage);
        return true;
    }

    public BaseWindowedSink<T> withTumblingCountWindow(long j) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withTumblingCountWindow(j);
        return this;
    }

    public BaseWindowedSink<T> withTumblingDurationWindow(long j, TimeUnit timeUnit) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withTumblingDurationWindow(j, timeUnit);
        return this;
    }

    public BaseWindowedSink<T> withSlidingCountWindow(long j, long j2) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withSlidingingCountWindow(j, j2);
        return this;
    }

    public BaseWindowedSink<T> withSlidingDurationWindow(long j, TimeUnit timeUnit, long j2, TimeUnit timeUnit2) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withSlidingDurationWindow(j, timeUnit, j2, timeUnit2);
        return this;
    }

    public BaseWindowedSink<T> withCustomTimestampExtractor(ITimestampExtractor iTimestampExtractor) {
        this.iTimestampExtractor = iTimestampExtractor;
        return this;
    }

    public BaseWindowedSink<T> withTimestampExtractor() {
        this.iTimestampExtractor = null;
        return this;
    }

    public BaseWindowedSink<T> withAllowedLateness(long j, TimeUnit timeUnit) {
        this.allowedLateness = new WindowConfig.Duration(j, timeUnit);
        return this;
    }

    public BaseWindowedSink<T> withWatermarkInterval(long j, TimeUnit timeUnit) {
        this.watermarkInterval = new WindowConfig.Duration(j, timeUnit);
        return this;
    }

    public BaseWindowedSink<T> withWindow(IWindow iWindow) {
        this.iWindow = iWindow;
        return this;
    }

    protected WindowLifeCycleListener<T> newWindowLifeCycleListener() {
        return new WindowLifeCycleListener<T>() { // from class: edu.iu.dsc.tws.task.window.core.BaseWindowedSink.1
            @Override // edu.iu.dsc.tws.task.window.api.WindowLifeCycleListener
            public void onExpiry(IWindowMessage<T> iWindowMessage) {
                BaseWindowedSink.this.getExpire(iWindowMessage);
            }

            @Override // edu.iu.dsc.tws.task.window.api.WindowLifeCycleListener
            public void onActivation(IWindowMessage<T> iWindowMessage, IWindowMessage<T> iWindowMessage2, IWindowMessage<T> iWindowMessage3) {
                BaseWindowedSink.this.collectiveEvents = iWindowMessage;
                BaseWindowedSink.this.execute(BaseWindowedSink.this.collectiveEvents);
            }
        };
    }

    public IWindowingPolicy<T> getWindowingPolicy(WindowConfig.Count count, WindowConfig.Duration duration, WindowManager<T> windowManager, IEvictionPolicy<T> iEvictionPolicy) {
        return count != null ? new CountWindowPolicy(count.value, windowManager, iEvictionPolicy) : new DurationWindowPolicy(duration.value, windowManager, iEvictionPolicy);
    }

    public IEvictionPolicy<T> getEvictionPolicy(WindowConfig.Count count, WindowConfig.Duration duration) {
        return count != null ? new CountEvictionPolicy(count.value) : new DurationEvictionPolicy(duration.value);
    }

    public void setPolicies(IWindowStrategy<T> iWindowStrategy) {
        IEvictionPolicy<T> evictionPolicy = iWindowStrategy.getEvictionPolicy();
        if (isTimestamped()) {
            if (evictionPolicy instanceof CountEvictionPolicy) {
                LOG.info(String.format("WatermarkCountEvictionPolicy selected", new Object[0]));
                this.evictionPolicy = new WatermarkCountEvictionPolicy(this.windowParameter.getWindowCountSize().value);
            }
            if (evictionPolicy instanceof DurationEvictionPolicy) {
                LOG.info(String.format("WatermarkDurationEvictionPolicy selected", new Object[0]));
                this.evictionPolicy = new WatermarkDurationEvictionPolicy(this.windowParameter.getWindowDurationSize().value, this.maxLagMs);
            }
        } else {
            this.evictionPolicy = evictionPolicy;
        }
        IWindowingPolicy<T> windowingPolicy = iWindowStrategy.getWindowingPolicy(this.windowManager, this.evictionPolicy);
        if (isTimestamped()) {
            if (windowingPolicy instanceof CountWindowPolicy) {
                LOG.info(String.format("WatermarkCountWindowingPolicy selected", new Object[0]));
                this.windowingPolicy = new WatermarkCountWindowPolicy(this.windowParameter.getSlidingCountSize().value, this.windowManager, this.evictionPolicy, this.windowManager);
            }
            if (windowingPolicy instanceof DurationWindowPolicy) {
                LOG.info(String.format("WatermarkDurationWindowingPolicy selected", new Object[0]));
                this.windowingPolicy = new WatermarkDurationWindowPolicy(this.windowParameter.getSldingDurationSize().value, this.windowManager, this.windowManager, this.evictionPolicy);
            }
        } else {
            this.windowingPolicy = windowingPolicy;
        }
        this.windowManager.setEvictionPolicy(this.evictionPolicy);
        this.windowManager.setWindowingPolicy(this.windowingPolicy);
    }

    public void start() {
        if (this.watermarkEventGenerator != null) {
            LOG.info("Starting WatermarkGenerator");
            LOG.log(Level.FINE, "Starting watermark generator");
            this.watermarkEventGenerator.start();
        }
        LOG.log(Level.FINE, "Starting windowing policy");
        this.windowingPolicy.start();
    }

    public void close() {
        if (this.watermarkEventGenerator != null) {
            this.watermarkEventGenerator.shutdown();
        }
        this.windowManager.shutdown();
    }

    public void reset() {
    }

    private boolean isTimestamped() {
        return this.iTimestampExtractor != null;
    }

    private Set<GlobalStreamId> getComponentStreams(TaskContext taskContext) {
        new HashSet();
        return wrapGlobalStreamId(taskContext);
    }

    private Set<GlobalStreamId> wrapGlobalStreamId(TaskContext taskContext) {
        HashSet hashSet = new HashSet();
        Iterator it = taskContext.getInEdges().keySet().iterator();
        while (it.hasNext()) {
            hashSet.add(new GlobalStreamId((String) it.next()));
        }
        return hashSet;
    }
}
