package edu.iu.dsc.tws.task.impl;

import edu.iu.dsc.tws.api.checkpointing.CheckpointingClient;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.executor.IExecution;
import edu.iu.dsc.tws.api.compute.executor.INodeInstance;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.modifiers.Collector;
import edu.iu.dsc.tws.api.compute.modifiers.Receptor;
import edu.iu.dsc.tws.api.compute.nodes.ISink;
import edu.iu.dsc.tws.api.compute.nodes.ISource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.dataset.DataPartition;
import edu.iu.dsc.tws.api.resource.WorkerEnvironment;
import edu.iu.dsc.tws.dataset.DataObjectImpl;
import edu.iu.dsc.tws.dataset.EmptyDataObject;
import edu.iu.dsc.tws.executor.core.ExecutionPlanBuilder;
import edu.iu.dsc.tws.executor.threading.Executor;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.tsched.taskscheduler.TaskScheduler;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/impl/TaskExecutor.class */
public class TaskExecutor {
    private static final Logger LOG = Logger.getLogger(TaskExecutor.class.getName());
    private Config config;
    private int workerID;
    private List<JobMasterAPI.WorkerInfo> workerInfoList;
    private Communicator communicator;
    private CheckpointingClient checkpointingClient;
    private Executor executor;

    public TaskExecutor(Config config, int i, List<JobMasterAPI.WorkerInfo> list, Communicator communicator, CheckpointingClient checkpointingClient) {
        this.config = config;
        this.workerID = i;
        this.workerInfoList = list;
        this.communicator = communicator;
        this.checkpointingClient = checkpointingClient;
    }

    public TaskExecutor(WorkerEnvironment workerEnvironment) {
        this.config = workerEnvironment.getConfig();
        this.workerID = workerEnvironment.getWorkerId();
        this.workerInfoList = workerEnvironment.getWorkerList();
        this.communicator = workerEnvironment.getCommunicator();
        this.checkpointingClient = workerEnvironment.getWorkerController().getCheckpointingClient();
    }

    public ExecutionPlan plan(ComputeGraph computeGraph) {
        TaskScheduler taskScheduler = new TaskScheduler();
        taskScheduler.initialize(this.config);
        return new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient).build(this.config, computeGraph, taskScheduler.schedule(computeGraph, createWorkerPlan()));
    }

    public Map<String, ExecutionPlan> plan(ComputeGraph... computeGraphArr) {
        WorkerPlan createWorkerPlan = createWorkerPlan();
        TaskScheduler taskScheduler = new TaskScheduler();
        taskScheduler.initialize(this.config);
        Map schedule = taskScheduler.schedule(createWorkerPlan, computeGraphArr);
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (ComputeGraph computeGraph : computeGraphArr) {
            linkedHashMap.put(computeGraph.getGraphName(), new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient).build(this.config, computeGraph, (TaskSchedulePlan) schedule.get(computeGraph.getGraphName())));
        }
        return linkedHashMap;
    }

    public ExecutionPlan executionPlan(ComputeGraph computeGraph, TaskSchedulePlan taskSchedulePlan) {
        return new ExecutionPlanBuilder(this.workerID, this.workerInfoList, this.communicator, this.checkpointingClient).build(this.config, computeGraph, taskSchedulePlan);
    }

    public void execute(Config config, ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        Config build = Config.newBuilder().putAll(this.config).putAll(config).build();
        if (this.executor == null) {
            this.executor = new Executor(build, this.workerID, this.communicator.getChannel(), computeGraph.getOperationMode());
        }
        this.executor.execute(executionPlan);
        this.executor.waitFor(executionPlan);
    }

    public void execute(ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        if (this.executor == null) {
            this.executor = new Executor(this.config, this.workerID, this.communicator.getChannel(), computeGraph.getOperationMode());
        }
        this.executor.execute(executionPlan);
        this.executor.waitFor(executionPlan);
    }

    public void itrExecute(ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        if (this.executor == null) {
            this.executor = new Executor(this.config, this.workerID, this.communicator.getChannel(), computeGraph.getOperationMode());
        }
        this.executor.execute(executionPlan);
    }

    public void waitFor(ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        if (this.executor == null) {
            throw new IllegalStateException("Cannot call waifor before calling execute");
        }
        this.executor.waitFor(executionPlan);
    }

    public IExecution iExecute(ComputeGraph computeGraph, ExecutionPlan executionPlan) {
        if (this.executor == null) {
            this.executor = new Executor(this.config, this.workerID, this.communicator.getChannel(), computeGraph.getOperationMode());
        }
        return this.executor.iExecute(executionPlan);
    }

    public void addInput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str, String str2, DataObject<?> dataObject) {
        Map nodes = executionPlan.getNodes(str);
        if (nodes == null) {
            return;
        }
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            INodeInstance iNodeInstance = (INodeInstance) ((Map.Entry) it.next()).getValue();
            Receptor node = iNodeInstance.getNode();
            if (!(node instanceof Receptor)) {
                throw new RuntimeException("Cannot add input to non input instance: " + iNodeInstance);
            }
            node.add(str2, dataObject);
        }
    }

    public void addSourceInput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str, DataObject<Object> dataObject) {
        Map nodes = executionPlan.getNodes();
        if (nodes == null) {
            throw new RuntimeException(String.format("%d Failed to set input for non-existing existing sources: %s", Integer.valueOf(this.workerID), executionPlan.getNodeNames()));
        }
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            Receptor node = ((INodeInstance) ((Map.Entry) it.next()).getValue()).getNode();
            if ((node instanceof Receptor) && (node instanceof ISource)) {
                node.add(str, dataObject);
            }
        }
    }

    public <T> DataObject<T> getOutput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str) {
        Map nodes = executionPlan.getNodes(str);
        if (nodes == null) {
            return new EmptyDataObject();
        }
        DataObjectImpl dataObjectImpl = new DataObjectImpl(str, this.config);
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            INodeInstance iNodeInstance = (INodeInstance) ((Map.Entry) it.next()).getValue();
            Collector node = iNodeInstance.getNode();
            if (!(node instanceof Collector)) {
                throw new RuntimeException("Cannot collect from node because it is not a collector: " + iNodeInstance);
            }
            dataObjectImpl.addPartition(node.get());
        }
        return dataObjectImpl;
    }

    public <T> DataObject<T> getSinkOutput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str) {
        Map nodes = executionPlan.getNodes();
        DataObjectImpl dataObjectImpl = new DataObjectImpl(this.config);
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            INodeInstance iNodeInstance = (INodeInstance) ((Map.Entry) it.next()).getValue();
            Collector node = iNodeInstance.getNode();
            if ((node instanceof Collector) && (node instanceof ISink)) {
                DataPartition dataPartition = node.get(str);
                if (dataPartition != null) {
                    dataObjectImpl.addPartition(dataPartition);
                } else {
                    LOG.warning(String.format("Task id %d returned null for data %s", Integer.valueOf(iNodeInstance.getId()), str));
                }
            }
        }
        return dataObjectImpl;
    }

    public <T> DataObject<T> getOutput(ComputeGraph computeGraph, ExecutionPlan executionPlan, String str, String str2) {
        Map nodes = executionPlan.getNodes(str);
        if (nodes == null) {
            return new EmptyDataObject();
        }
        DataObjectImpl dataObjectImpl = new DataObjectImpl(this.config);
        Iterator it = nodes.entrySet().iterator();
        while (it.hasNext()) {
            INodeInstance iNodeInstance = (INodeInstance) ((Map.Entry) it.next()).getValue();
            Collector node = iNodeInstance.getNode();
            if (!(node instanceof Collector)) {
                throw new RuntimeException("Cannot collect from node because it is not a collector: " + iNodeInstance);
            }
            DataPartition dataPartition = node.get(str2);
            if (dataPartition != null) {
                dataObjectImpl.addPartition(dataPartition);
            } else {
                LOG.warning(String.format("Task id %d returned null for data %s", Integer.valueOf(iNodeInstance.getId()), str2));
            }
        }
        return dataObjectImpl;
    }

    private WorkerPlan createWorkerPlan() {
        ArrayList arrayList = new ArrayList();
        Iterator<JobMasterAPI.WorkerInfo> it = this.workerInfoList.iterator();
        while (it.hasNext()) {
            arrayList.add(new Worker(it.next().getWorkerID()));
        }
        return new WorkerPlan(arrayList);
    }

    public void close() {
        if (this.executor != null) {
            this.executor.close();
        }
    }
}
