package au.gov.amsa.ais.rx;

import au.gov.amsa.ais.AisMessage;
import au.gov.amsa.ais.AisNmeaBuffer;
import au.gov.amsa.ais.AisNmeaMessage;
import au.gov.amsa.ais.AisParseException;
import au.gov.amsa.ais.LineAndTime;
import au.gov.amsa.ais.Timestamped;
import au.gov.amsa.ais.message.AisPosition;
import au.gov.amsa.ais.message.AisPositionA;
import au.gov.amsa.risky.format.AisClass;
import au.gov.amsa.risky.format.BinaryFixes;
import au.gov.amsa.risky.format.BinaryFixesFormat;
import au.gov.amsa.risky.format.BinaryFixesWriter;
import au.gov.amsa.risky.format.Fix;
import au.gov.amsa.risky.format.FixImpl;
import au.gov.amsa.risky.format.NavigationalStatus;
import au.gov.amsa.streams.Strings;
import au.gov.amsa.util.Files;
import au.gov.amsa.util.nmea.NmeaMessage;
import au.gov.amsa.util.nmea.NmeaMessageParseException;
import au.gov.amsa.util.nmea.NmeaUtil;
import com.github.davidmoten.rx.Checked;
import com.github.davidmoten.rx.slf4j.Logging;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;

/* loaded from: input_file:au/gov/amsa/ais/rx/Streams.class */
public class Streams {
    public static final int BUFFER_SIZE = 100;
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static Logger log = LoggerFactory.getLogger(Streams.class);
    private static final Func1<Timestamped<AisMessage>, Observable<Fix>> TO_FIX = timestamped -> {
        try {
            if (!(timestamped.message() instanceof AisPosition)) {
                return Observable.empty();
            }
            AisPosition aisPosition = (AisPosition) timestamped.message();
            if (aisPosition.getLatitude() == null || aisPosition.getLongitude() == null || aisPosition.getLatitude().doubleValue() < -90.0d || aisPosition.getLatitude().doubleValue() > 90.0d || aisPosition.getLongitude().doubleValue() < -180.0d || aisPosition.getLongitude().doubleValue() > 180.0d) {
                return Observable.empty();
            }
            return Observable.just(new FixImpl(aisPosition.getMmsi(), aisPosition.getLatitude().floatValue(), aisPosition.getLongitude().floatValue(), timestamped.time(), Optional.empty(), aisPosition.getSource() != null ? Optional.of((short) 1) : Optional.empty(), aisPosition instanceof AisPositionA ? Optional.of(NavigationalStatus.values()[((AisPositionA) aisPosition).getNavigationalStatus().ordinal()]) : Optional.empty(), aisPosition.getSpeedOverGroundKnots() == null ? Optional.empty() : Optional.of(Float.valueOf(aisPosition.getSpeedOverGroundKnots().floatValue())), (aisPosition.getCourseOverGround() == null || aisPosition.getCourseOverGround().doubleValue() >= 360.0d || aisPosition.getCourseOverGround().doubleValue() < 0.0d) ? Optional.empty() : Optional.of(Float.valueOf(aisPosition.getCourseOverGround().floatValue())), (aisPosition.getTrueHeading() == null || aisPosition.getTrueHeading().intValue() >= 360 || aisPosition.getTrueHeading().intValue() < 0) ? Optional.empty() : Optional.of(Float.valueOf(aisPosition.getTrueHeading().floatValue())), aisPosition instanceof AisPositionA ? AisClass.A : AisClass.B));
        } catch (RuntimeException e) {
            log.warn(e.getMessage(), e);
            return Observable.empty();
        }
    };
    public static final Func1<String, Optional<NmeaMessage>> LINE_TO_NMEA_MESSAGE = str -> {
        try {
            return Optional.of(NmeaUtil.parseNmea(str));
        } catch (RuntimeException e) {
            return Optional.empty();
        }
    };
    public static final Func1<NmeaMessage, Optional<Timestamped<AisMessage>>> TO_AIS_MESSAGE = new Func1<NmeaMessage, Optional<Timestamped<AisMessage>>>() { // from class: au.gov.amsa.ais.rx.Streams.3
        public Optional<Timestamped<AisMessage>> call(NmeaMessage nmeaMessage) {
            try {
                return Optional.of(new AisNmeaMessage(nmeaMessage).getTimestampedMessage());
            } catch (RuntimeException e) {
                return Optional.empty();
            }
        }
    };
    public static final Func1<NmeaMessage, TimestampedAndLine<AisMessage>> TO_AIS_MESSAGE_AND_LINE = nmeaMessage -> {
        String line = nmeaMessage.toLine();
        try {
            return new TimestampedAndLine(Optional.of(new AisNmeaMessage(nmeaMessage).getTimestampedMessage(System.currentTimeMillis())), line, null);
        } catch (AisParseException e) {
            return new TimestampedAndLine(Optional.empty(), line, e.getMessage());
        } catch (RuntimeException e2) {
            log.warn(e2.getMessage(), e2);
            throw e2;
        }
    };
    public static final Func1<Optional<List<NmeaMessage>>, TimestampedAndLines<AisMessage>> TO_AIS_MESSAGE_AND_LINES = optional -> {
        if (!optional.isPresent()) {
            return new TimestampedAndLines(Optional.empty(), Collections.emptyList(), null);
        }
        List list = (List) ((List) optional.get()).stream().map(new Function<NmeaMessage, String>() { // from class: au.gov.amsa.ais.rx.Streams.4
            @Override // java.util.function.Function
            public String apply(NmeaMessage nmeaMessage) {
                return nmeaMessage.toLine();
            }
        }).collect(Collectors.toList());
        Optional<NmeaMessage> concatenateMessages = AisNmeaBuffer.concatenateMessages((List) optional.get());
        if (!concatenateMessages.isPresent()) {
            return new TimestampedAndLines(Optional.empty(), list, "could not concat");
        }
        try {
            return new TimestampedAndLines(Optional.of(new AisNmeaMessage(concatenateMessages.get()).getTimestampedMessage(System.currentTimeMillis())), list, null);
        } catch (AisParseException e) {
            return new TimestampedAndLines(Optional.empty(), list, e.getMessage());
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: au.gov.amsa.ais.rx.Streams$8, reason: invalid class name */
    /* loaded from: input_file:au/gov/amsa/ais/rx/Streams$8.class */
    public class AnonymousClass8 implements Observable.OnSubscribe<String> {
        private Socket socket = null;
        private BufferedReader reader = null;
        final /* synthetic */ HostPort val$hostPort;

        AnonymousClass8(HostPort hostPort) {
            this.val$hostPort = hostPort;
        }

        public void call(Subscriber<? super String> subscriber) {
            try {
                synchronized (this) {
                    Streams.log.info("creating new socket");
                    this.socket = Streams.createSocket(this.val$hostPort.getHost(), this.val$hostPort.getPort());
                }
                Streams.log.info("waiting one second before attempting connect");
                Thread.sleep(1000L);
                this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream(), Streams.UTF8));
                subscriber.add(createSubscription());
                while (!subscriber.isUnsubscribed()) {
                    try {
                        String readLine = this.reader.readLine();
                        if (readLine != null) {
                            subscriber.onNext(readLine);
                        } else {
                            cancel();
                            subscriber.onCompleted();
                        }
                    } catch (IOException e) {
                        if (!subscriber.isUnsubscribed()) {
                            throw e;
                        }
                        return;
                    }
                }
            } catch (Exception e2) {
                Streams.log.warn(e2.getMessage(), e2);
                cancel();
                subscriber.onError(e2);
            }
        }

        private Subscription createSubscription() {
            return new Subscription() { // from class: au.gov.amsa.ais.rx.Streams.8.1
                private final AtomicBoolean subscribed = new AtomicBoolean(true);

                public boolean isUnsubscribed() {
                    return !this.subscribed.get();
                }

                public void unsubscribe() {
                    this.subscribed.set(false);
                    AnonymousClass8.this.cancel();
                }
            };
        }

        public void cancel() {
            Streams.log.info("cancelling socket read");
            synchronized (this) {
                if (this.socket != null) {
                    if (this.reader != null) {
                        try {
                            this.reader.close();
                        } catch (IOException e) {
                        }
                    }
                    try {
                        this.socket.close();
                        this.socket = null;
                    } catch (IOException e2) {
                    }
                }
            }
        }
    }

    /* loaded from: input_file:au/gov/amsa/ais/rx/Streams$TimestampedAndLine.class */
    public static class TimestampedAndLine<T extends AisMessage> {
        private final Optional<Timestamped<T>> message;
        private final String line;
        private final String error;

        public TimestampedAndLine(Optional<Timestamped<T>> optional, String str, String str2) {
            this.message = optional;
            this.line = str;
            this.error = str2;
        }

        public Optional<Timestamped<T>> getMessage() {
            return this.message;
        }

        public String getLine() {
            return this.line;
        }

        public String getError() {
            return this.error;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.message.isPresent()) {
                sb.append("message=" + this.message);
            } else {
                sb.append("error=" + this.error);
            }
            sb.append(", line=");
            sb.append(this.line);
            return sb.toString();
        }
    }

    /* loaded from: input_file:au/gov/amsa/ais/rx/Streams$TimestampedAndLines.class */
    public static class TimestampedAndLines<T extends AisMessage> {
        private final Optional<Timestamped<T>> message;
        private final List<String> lines;
        private final String error;

        public TimestampedAndLines(Optional<Timestamped<T>> optional, List<String> list, String str) {
            this.message = optional;
            this.lines = list;
            this.error = str;
        }

        public Optional<Timestamped<T>> getMessage() {
            return this.message;
        }

        public List<String> getLines() {
            return this.lines;
        }

        public String getError() {
            return this.error;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.message.isPresent()) {
                sb.append("message=" + this.message);
            } else {
                sb.append("error=" + this.error);
            }
            sb.append(", lines=");
            sb.append(this.lines);
            return sb.toString();
        }
    }

    public static Observable<String> connect(String str, int i) {
        return connect(new HostPort(str, i));
    }

    private static Observable<String> connect(HostPort hostPort) {
        return connectOnce(hostPort).timeout(1L, TimeUnit.MINUTES).retry();
    }

    public static Observable<TimestampedAndLine<AisMessage>> connectAndExtract(String str, int i) {
        return extract(connect(str, i));
    }

    public static Observable<TimestampedAndLine<AisMessage>> extract(Observable<String> observable) {
        return observable.map(LINE_TO_NMEA_MESSAGE).compose(valueIfPresent()).compose(aggregateMultiLineNmea(100)).map(TO_AIS_MESSAGE_AND_LINE);
    }

    public static Observable<TimestampedAndLines<AisMessage>> extractWithLines(Observable<String> observable) {
        return observable.map(LINE_TO_NMEA_MESSAGE).compose(valueIfPresent()).compose(addToBuffer(100)).map(TO_AIS_MESSAGE_AND_LINES);
    }

    public static Observable<Timestamped<AisMessage>> extractMessages(Observable<String> observable) {
        return observable.map(LINE_TO_NMEA_MESSAGE).compose(valueIfPresent()).compose(aggregateMultiLineNmea(100)).map(TO_AIS_MESSAGE).compose(valueIfPresent());
    }

    public static <T> Func1<Optional<T>, Boolean> isPresent() {
        return optional -> {
            return Boolean.valueOf(optional.isPresent());
        };
    }

    public static <T> Func1<Optional<T>, T> toValue() {
        return optional -> {
            return optional.get();
        };
    }

    public static <T> Observable.Transformer<Optional<T>, T> valueIfPresent() {
        return observable -> {
            return observable.filter(isPresent()).map(toValue());
        };
    }

    public static Observable<Fix> extractFixes(Observable<String> observable) {
        return extractMessages(observable).flatMap(TO_FIX, 1);
    }

    public static Observable<String> nmeaFrom(File file) {
        return Observable.using(Checked.f0(() -> {
            return new FileInputStream(file);
        }), fileInputStream -> {
            return nmeaFrom(fileInputStream);
        }, fileInputStream2 -> {
            try {
                fileInputStream2.close();
            } catch (IOException e) {
            }
        }, true);
    }

    public static Observable<String> nmeaFrom(InputStream inputStream) {
        return Strings.split(Strings.from(new InputStreamReader(inputStream, UTF8)), "\n");
    }

    public static Observable<String> nmeaFromGzip(String str) {
        return nmeaFromGzip(new File(str));
    }

    public static Observable<Observable<String>> nmeasFromGzip(Observable<File> observable) {
        return observable.map(file -> {
            return nmeaFromGzip(file.getPath());
        });
    }

    public static Observable<String> nmeaFromGzip(File file) {
        return Observable.using(() -> {
            try {
                return new InputStreamReader(new GZIPInputStream(new BufferedInputStream(new FileInputStream(file), 1048576)), UTF8);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, reader -> {
            return Strings.split(Strings.from(reader), "\n");
        }, reader2 -> {
            try {
                reader2.close();
            } catch (IOException e) {
            }
        }, true);
    }

    public static void print(Observable<?> observable, final PrintStream printStream) {
        observable.subscribe(new Observer<Object>() { // from class: au.gov.amsa.ais.rx.Streams.1
            public void onCompleted() {
            }

            public void onError(Throwable th) {
                th.printStackTrace();
            }

            public void onNext(Object obj) {
                printStream.println(obj);
            }
        });
    }

    public static void print(Observable<?> observable) {
        print(observable, System.out);
    }

    public static final Func1<String, Observable<LineAndTime>> toLineAndTime() {
        return new Func1<String, Observable<LineAndTime>>() { // from class: au.gov.amsa.ais.rx.Streams.2
            public Observable<LineAndTime> call(String str) {
                try {
                    Long unixTimeMillis = NmeaUtil.parseNmea(str).getUnixTimeMillis();
                    return unixTimeMillis == null ? Observable.empty() : Observable.just(new LineAndTime(str, unixTimeMillis.longValue()));
                } catch (NmeaMessageParseException e) {
                    return Observable.empty();
                } catch (RuntimeException e2) {
                    return Observable.empty();
                }
            }
        };
    }

    private static boolean containsWeirdCharacters(String str) {
        if (str == null) {
            return false;
        }
        for (char c : str.toCharArray()) {
            if (c < ' ' && c != '\n' && c != '\r') {
                log.warn("ch=" + ((int) c));
                return true;
            }
        }
        return false;
    }

    public static final Observable.Transformer<NmeaMessage, Optional<List<NmeaMessage>>> addToBuffer(final int i) {
        return new Observable.Transformer<NmeaMessage, Optional<List<NmeaMessage>>>() { // from class: au.gov.amsa.ais.rx.Streams.5
            public Observable<Optional<List<NmeaMessage>>> call(Observable<NmeaMessage> observable) {
                int i2 = i;
                return Observable.defer(() -> {
                    AisNmeaBuffer aisNmeaBuffer = new AisNmeaBuffer(i2);
                    return observable.map(nmeaMessage -> {
                        return aisNmeaBuffer.add(nmeaMessage);
                    });
                });
            }
        };
    }

    public static final Observable.Transformer<NmeaMessage, NmeaMessage> aggregateMultiLineNmea(final int i) {
        return new Observable.Transformer<NmeaMessage, NmeaMessage>() { // from class: au.gov.amsa.ais.rx.Streams.6
            public Observable<NmeaMessage> call(Observable<NmeaMessage> observable) {
                int i2 = i;
                return Observable.defer(() -> {
                    AisNmeaBuffer aisNmeaBuffer = new AisNmeaBuffer(i2);
                    return observable.flatMap(nmeaMessage -> {
                        return Streams.addToBuffer(aisNmeaBuffer, nmeaMessage);
                    }, 1);
                });
            }
        };
    }

    public static final Observable.Transformer<NmeaMessage, NmeaMessage> aggregateMultiLineNmeaWithLines(final int i) {
        return new Observable.Transformer<NmeaMessage, NmeaMessage>() { // from class: au.gov.amsa.ais.rx.Streams.7
            public Observable<NmeaMessage> call(Observable<NmeaMessage> observable) {
                int i2 = i;
                return Observable.defer(() -> {
                    AisNmeaBuffer aisNmeaBuffer = new AisNmeaBuffer(i2);
                    return observable.flatMap(nmeaMessage -> {
                        return Streams.addToBuffer(aisNmeaBuffer, nmeaMessage);
                    }, 1);
                });
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Observable<? extends NmeaMessage> addToBuffer(AisNmeaBuffer aisNmeaBuffer, NmeaMessage nmeaMessage) {
        try {
            Optional<List<NmeaMessage>> add = aisNmeaBuffer.add(nmeaMessage);
            if (!add.isPresent()) {
                return Observable.empty();
            }
            Optional<NmeaMessage> concatenateMessages = AisNmeaBuffer.concatenateMessages(add.get());
            return concatenateMessages.isPresent() ? Observable.just(concatenateMessages.get()) : Observable.empty();
        } catch (RuntimeException e) {
            log.warn(e.getMessage(), e);
            return Observable.empty();
        }
    }

    public static Observable<String> connectOnce(HostPort hostPort) {
        return Observable.unsafeCreate(new AnonymousClass8(hostPort));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Socket createSocket(String str, int i) {
        try {
            return new Socket(str, i);
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
    }

    public static Func1<List<File>, Observable<Integer>> extractFixesFromNmeaGzAndAppendToFile(int i, Scheduler scheduler, Func1<Fix, String> func1, int i2, Action1<File> action1) {
        return list -> {
            return BinaryFixesWriter.writeFixes(func1, extractFixes(Observable.from(list).doOnNext(action1).concatMap(file -> {
                return nmeaFromGzip(file.getAbsolutePath()).doOnError(th -> {
                    log.warn("problem reading file " + file + ": " + th.getMessage());
                }).onErrorResumeNext(Observable.empty());
            })), i2, false, BinaryFixesFormat.WITHOUT_MMSI).reduce(0, countFixes()).subscribeOn(scheduler);
        };
    }

    private static Func2<Integer, List<Fix>, Integer> countFixes() {
        return (num, list) -> {
            return Integer.valueOf(num.intValue() + list.size());
        };
    }

    public static Observable<Integer> writeFixesFromNmeaGz(File file, Pattern pattern, File file2, int i, int i2, Scheduler scheduler, int i3, long j, Func1<Fix, String> func1) {
        final List find = Files.find(file, pattern);
        Observable from = Observable.from(find);
        Action1<File> action1 = new Action1<File>() { // from class: au.gov.amsa.ais.rx.Streams.9
            AtomicInteger count = new AtomicInteger();
            Long start = null;

            public void call(File file3) {
                if (this.start == null) {
                    this.start = Long.valueOf(System.currentTimeMillis());
                }
                int incrementAndGet = this.count.incrementAndGet();
                Streams.log.info("file " + incrementAndGet + " of " + find.size() + ", " + file3.getName() + ", rateFilesPerSecond=" + (((System.currentTimeMillis() - this.start.longValue()) / incrementAndGet) / 1000.0d));
            }
        };
        deleteDirectory(file2);
        return from.buffer(Math.max(find.size() / Runtime.getRuntime().availableProcessors(), 1)).flatMap(extractFixesFromNmeaGzAndAppendToFile(i3, scheduler, func1, i2, action1), 1).scan(0, (num, num2) -> {
            return Integer.valueOf(num.intValue() + num2.intValue());
        }).lift(Logging.logger().showCount().showMemory().showRateSince("rate", 5000L).every(i).log()).last().doOnCompleted(() -> {
            log.info("completed converting nmea to binary fixes, starting sort");
        }).concatWith(BinaryFixes.sortBinaryFixFilesByTime(file2, j, scheduler));
    }

    private static void deleteDirectory(File file) {
        try {
            FileUtils.deleteDirectory(file);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] strArr) {
        Observable takeLast = nmeaFromGzip(new File("/media/an/nmea/2015/NMEA_ITU_20150521.gz")).compose(observable -> {
            return extract(observable);
        }).takeLast(10000);
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        takeLast.forEach((v1) -> {
            r1.println(v1);
        });
    }
}
