package cn.ponfee.scheduler.supervisor.thread;

import cn.ponfee.scheduler.common.lock.DoInLocked;
import cn.ponfee.scheduler.common.util.Jsons;
import cn.ponfee.scheduler.core.base.AbstractHeartbeatThread;
import cn.ponfee.scheduler.core.enums.ExecuteState;
import cn.ponfee.scheduler.core.model.SchedInstance;
import cn.ponfee.scheduler.core.model.SchedTask;
import cn.ponfee.scheduler.supervisor.manager.SchedulerJobManager;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;

/* loaded from: input_file:cn/ponfee/scheduler/supervisor/thread/RunningInstanceScanner.class */
public class RunningInstanceScanner extends AbstractHeartbeatThread {
    private final DoInLocked doInLocked;
    private final SchedulerJobManager schedulerJobManager;
    private final long beforeMilliseconds;

    public RunningInstanceScanner(long j, DoInLocked doInLocked, SchedulerJobManager schedulerJobManager) {
        super(j);
        this.doInLocked = doInLocked;
        this.schedulerJobManager = schedulerJobManager;
        this.beforeMilliseconds = this.heartbeatPeriodMs << 2;
    }

    protected boolean heartbeat() {
        if (this.schedulerJobManager.hasNotDiscoveredWorkers()) {
            this.log.warn("Not found available worker.");
            return true;
        }
        Boolean bool = (Boolean) this.doInLocked.apply(this::process);
        return bool == null || bool.booleanValue();
    }

    private boolean process() {
        Date date = new Date();
        List<SchedInstance> findExpireRunning = this.schedulerJobManager.findExpireRunning(new Date(date.getTime() - this.beforeMilliseconds), 200);
        if (CollectionUtils.isEmpty(findExpireRunning)) {
            return true;
        }
        Iterator<SchedInstance> it = findExpireRunning.iterator();
        while (it.hasNext()) {
            processEach(it.next(), date);
        }
        return findExpireRunning.size() < 200;
    }

    private void processEach(SchedInstance schedInstance, Date date) {
        List<SchedTask> findMediumTaskByInstanceId = this.schedulerJobManager.findMediumTaskByInstanceId(schedInstance.getInstanceId().longValue());
        List<SchedTask> list = (List) findMediumTaskByInstanceId.stream().filter(schedTask -> {
            return ExecuteState.WAITING.equals(schedTask.getExecuteState());
        }).collect(Collectors.toList());
        if (!CollectionUtils.isNotEmpty(list)) {
            if (this.schedulerJobManager.hasAliveExecuting(findMediumTaskByInstanceId)) {
                this.schedulerJobManager.renewUpdateTime(schedInstance, date);
                return;
            } else {
                this.log.info("Scan instance, all worker dead, terminate the sched instance: {}", schedInstance.getInstanceId());
                this.schedulerJobManager.terminateDeadInstance(schedInstance.getInstanceId().longValue());
                return;
            }
        }
        if (this.schedulerJobManager.renewUpdateTime(schedInstance, date)) {
            List<SchedTask> filterDispatchingTask = this.schedulerJobManager.filterDispatchingTask(list);
            if (CollectionUtils.isNotEmpty(filterDispatchingTask)) {
                this.log.info("Redispatch sched instance: {} | {}", schedInstance, Jsons.toJson(filterDispatchingTask));
                this.schedulerJobManager.dispatch(this.schedulerJobManager.getJob(schedInstance.getJobId().longValue()), schedInstance, filterDispatchingTask);
            }
        }
    }
}
