package au.gov.amsa.risky.format;

import com.github.davidmoten.util.Optional;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import java.util.zip.GZIPInputStream;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.SyncOnSubscribe;

/* loaded from: input_file:au/gov/amsa/risky/format/BinaryFixesOnSubscribeWithBackp.class */
public final class BinaryFixesOnSubscribeWithBackp extends SyncOnSubscribe<State, Fix> {
    private final InputStream is;
    private final Optional<Integer> mmsi;
    private final BinaryFixesFormat format;

    /* loaded from: input_file:au/gov/amsa/risky/format/BinaryFixesOnSubscribeWithBackp$State.class */
    public static final class State {
        final InputStream is;
        final Optional<Integer> mmsi;
        final Queue<Fix> queue;

        public State(InputStream inputStream, Optional<Integer> optional, Queue<Fix> queue) {
            this.is = inputStream;
            this.mmsi = optional;
            this.queue = queue;
        }
    }

    public BinaryFixesOnSubscribeWithBackp(InputStream inputStream, Optional<Integer> optional, BinaryFixesFormat binaryFixesFormat) {
        this.is = inputStream;
        this.mmsi = optional;
        this.format = binaryFixesFormat;
    }

    public static Observable<Fix> from(final File file, final BinaryFixesFormat binaryFixesFormat) {
        return Observable.using(new Func0<InputStream>() { // from class: au.gov.amsa.risky.format.BinaryFixesOnSubscribeWithBackp.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public InputStream m5call() {
                try {
                    return file.getName().endsWith(".gz") ? new GZIPInputStream(new FileInputStream(file)) : new FileInputStream(file);
                } catch (FileNotFoundException e) {
                    throw new RuntimeException(e);
                } catch (IOException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }, new Func1<InputStream, Observable<Fix>>() { // from class: au.gov.amsa.risky.format.BinaryFixesOnSubscribeWithBackp.2
            public Observable<Fix> call(InputStream inputStream) {
                return Observable.create(new BinaryFixesOnSubscribeWithBackp(inputStream, BinaryFixesFormat.this == BinaryFixesFormat.WITH_MMSI ? Optional.absent() : Optional.of(Integer.valueOf(BinaryFixesUtil.getMmsi(file))), BinaryFixesFormat.this));
            }
        }, new Action1<InputStream>() { // from class: au.gov.amsa.risky.format.BinaryFixesOnSubscribeWithBackp.3
            public void call(InputStream inputStream) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: generateState, reason: merged with bridge method [inline-methods] */
    public State m4generateState() {
        return new State(this.is, this.mmsi, new LinkedList());
    }

    protected State next(State state, Observer<? super Fix> observer) {
        int recordSize = BinaryFixes.recordSize(this.format);
        Fix poll = state.queue.poll();
        if (poll != null) {
            observer.onNext(poll);
        } else {
            byte[] bArr = new byte[4096 * BinaryFixes.recordSize(this.format)];
            try {
                int read = state.is.read(bArr);
                if (read > 0) {
                    for (int i = 0; i < read; i += recordSize) {
                        ByteBuffer wrap = ByteBuffer.wrap(bArr, i, recordSize);
                        state.queue.add(BinaryFixesUtil.toFix(state.mmsi.isPresent() ? ((Integer) state.mmsi.get()).intValue() : wrap.getInt(), wrap));
                    }
                    observer.onNext(state.queue.remove());
                } else {
                    observer.onCompleted();
                }
            } catch (IOException e) {
                observer.onError(e);
            }
        }
        return state;
    }

    protected /* bridge */ /* synthetic */ Object next(Object obj, Observer observer) {
        return next((State) obj, (Observer<? super Fix>) observer);
    }
}
