package edu.iu.dsc.tws.task.impl.cdfw;

import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.ProtocolStringList;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.dataset.DataObject;
import edu.iu.dsc.tws.api.resource.JobListener;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.master.worker.JMWorkerMessenger;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/task/impl/cdfw/CDFWRuntime.class */
public class CDFWRuntime implements JobListener {
    private static final Logger LOG = Logger.getLogger(CDFWRuntime.class.getName());
    private int workerId;
    private TaskExecutor taskExecutor;
    private Map<String, Map<String, DataObject<Object>>> outPuts = new HashMap();
    private BlockingQueue<Any> executeMessageQueue = new LinkedBlockingQueue();
    private KryoSerializer serializer = new KryoSerializer();

    public CDFWRuntime(Config config, int i, List<JobMasterAPI.WorkerInfo> list, Communicator communicator) {
        this.taskExecutor = new TaskExecutor(config, i, list, communicator, null);
        this.workerId = i;
    }

    public boolean execute() {
        while (true) {
            try {
                Any take = this.executeMessageQueue.take();
                if (take.is(CDFWJobAPI.ExecuteMessage.class)) {
                    if (handleExecuteMessage(take)) {
                        return false;
                    }
                } else {
                    if (take.is(CDFWJobAPI.CDFWJobCompletedMessage.class)) {
                        LOG.log(Level.INFO, this.workerId + "Received CDFW job completed message. Leaving execution loop");
                        LOG.log(Level.INFO, this.workerId + " Execution Completed");
                        return true;
                    }
                    LOG.log(Level.WARNING, this.workerId + "Unknown message for cdfw task execution");
                }
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unable to insert message to the queue", (Throwable) e);
            }
        }
    }

    private boolean handleExecuteMessage(Any any) {
        JMWorkerMessenger jMWorkerMessenger = JMWorkerAgent.getJMWorkerAgent().getJMWorkerMessenger();
        try {
            CDFWJobAPI.SubGraph graph = any.unpack(CDFWJobAPI.ExecuteMessage.class).getGraph();
            ComputeGraph computeGraph = (ComputeGraph) this.serializer.deserialize(graph.getGraphSerialized().toByteArray());
            if (computeGraph == null) {
                LOG.severe(this.workerId + " Unable to find the subgraph " + graph.getName());
                return true;
            }
            ExecutionPlan plan = this.taskExecutor.plan(computeGraph);
            if (graph.getInputsList().size() != 0) {
                for (CDFWJobAPI.Input input : graph.getInputsList()) {
                    String name = input.getName();
                    String parentGraph = input.getParentGraph();
                    if (!this.outPuts.containsKey(parentGraph)) {
                        throw new RuntimeException("We cannot find the input graph: " + parentGraph);
                    }
                    Map<String, DataObject<Object>> map = this.outPuts.get(parentGraph);
                    if (!map.containsKey(name)) {
                        throw new RuntimeException("We cannot find the input: " + name);
                    }
                    this.taskExecutor.addSourceInput(computeGraph, plan, name, map.get(name));
                }
            }
            for (CDFWJobAPI.Input input2 : graph.getInputsList()) {
                this.taskExecutor.addSourceInput(computeGraph, plan, input2.getName(), this.outPuts.get(input2.getParentGraph()).get(input2.getName()));
            }
            this.taskExecutor.execute(computeGraph, plan);
            LOG.log(Level.INFO, this.workerId + " Completed subgraph : " + graph.getName());
            CDFWJobAPI.ExecuteCompletedMessage build = CDFWJobAPI.ExecuteCompletedMessage.newBuilder().setSubgraphName(graph.getName()).build();
            ProtocolStringList<String> outputsList = graph.getOutputsList();
            HashMap hashMap = new HashMap();
            for (String str : outputsList) {
                hashMap.put(str, this.taskExecutor.getSinkOutput(computeGraph, plan, str));
            }
            this.outPuts.put(graph.getName(), hashMap);
            if (!jMWorkerMessenger.sendToDriver(build)) {
                LOG.severe("Unable to send the subgraph completed message :" + build);
            }
            return false;
        } catch (InvalidProtocolBufferException e) {
            LOG.log(Level.SEVERE, "Unable to unpack received message ", e);
            return false;
        }
    }

    public void workersScaledUp(int i) {
        LOG.log(Level.INFO, this.workerId + "Workers scaled up msg received. Instances added: " + i);
    }

    public void workersScaledDown(int i) {
        LOG.log(Level.INFO, this.workerId + "Workers scaled down msg received. Instances removed: " + i);
    }

    public void driverMessageReceived(Any any) {
        try {
            this.executeMessageQueue.put(any);
        } catch (InterruptedException e) {
            LOG.log(Level.SEVERE, "Unable to insert message to the queue", (Throwable) e);
        }
    }

    public void allWorkersJoined(List<JobMasterAPI.WorkerInfo> list) {
        LOG.log(Level.INFO, this.workerId + "All workers joined msg received");
    }
}
