package com.hazelcast.jet.python;

import com.hazelcast.logging.ILogger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/python/JetToPythonServer.class */
class JetToPythonServer {
    private final File baseDir;
    private ILogger logger;
    private Process pythonProcess;
    private String pythonProcessPid;
    private Thread stdoutLoggingThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JetToPythonServer(@Nonnull Path path, @Nonnull ILogger iLogger) {
        this.baseDir = path.toFile();
        this.logger = iLogger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int start() throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        Throwable th = null;
        try {
            serverSocket.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
            this.pythonProcess = new ProcessBuilder("/bin/sh", "-c", String.format("./%s %d", "jet_to_python_main.sh", Integer.valueOf(serverSocket.getLocalPort()))).directory(this.baseDir).redirectErrorStream(true).start();
            this.stdoutLoggingThread = PythonServiceContext.logStdOut(this.logger, this.pythonProcess, "python-main");
            this.pythonProcessPid = PythonServiceContext.processPid(this.pythonProcess);
            this.logger.info("Started Python process: " + this.pythonProcessPid);
            serverSocket.setSoTimeout((int) TimeUnit.SECONDS.toMillis(2L));
            do {
                try {
                    Socket accept = serverSocket.accept();
                    Throwable th2 = null;
                    try {
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(accept.getInputStream(), StandardCharsets.UTF_8));
                        Throwable th3 = null;
                        try {
                            int parseInt = Integer.parseInt(bufferedReader.readLine());
                            this.logger.info("Python process " + this.pythonProcessPid + " listening on port " + parseInt);
                            if (bufferedReader != null) {
                                if (0 != 0) {
                                    try {
                                        bufferedReader.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    bufferedReader.close();
                                }
                            }
                            return parseInt;
                        } catch (Throwable th5) {
                            if (bufferedReader != null) {
                                if (0 != 0) {
                                    try {
                                        bufferedReader.close();
                                    } catch (Throwable th6) {
                                        th3.addSuppressed(th6);
                                    }
                                } else {
                                    bufferedReader.close();
                                }
                            }
                            throw th5;
                        }
                    } finally {
                        if (accept != null) {
                            if (0 != 0) {
                                try {
                                    accept.close();
                                } catch (Throwable th7) {
                                    th2.addSuppressed(th7);
                                }
                            } else {
                                accept.close();
                            }
                        }
                    }
                } catch (SocketTimeoutException e) {
                }
            } while (this.pythonProcess.isAlive());
            throw new IOException("Python process died before completing initialization");
        } finally {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    serverSocket.close();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @SuppressFBWarnings(value = {"OS_OPEN_STREAM"}, justification = "PrintStream wraps Python's stdin, not to be closed")
    public void stop() {
        try {
            new PrintStream(this.pythonProcess.getOutputStream(), true, StandardCharsets.UTF_8.name()).println("stop");
        } catch (UnsupportedEncodingException e) {
            this.logger.info("UTF_8 reported as unsupported encoding??");
        }
        boolean z = false;
        while (!this.pythonProcess.waitFor(2L, TimeUnit.SECONDS)) {
            try {
            } catch (InterruptedException e2) {
                this.logger.info("Ignoring interruption signal in order to prevent Python process leak");
                z = true;
            }
            this.logger.warning("Python process " + this.pythonProcessPid + " still not done, sending a 'destroyForcibly' signal");
            this.pythonProcess.destroyForcibly();
        }
        while (true) {
            try {
                this.stdoutLoggingThread.join();
                break;
            } catch (InterruptedException e3) {
                this.logger.info("Ignoring interruption signal in order to prevent thread leak (" + this.stdoutLoggingThread.getName() + ')');
                z = true;
            }
        }
        if (z) {
            Thread.currentThread().interrupt();
        }
    }
}
