package org.graylog.scheduler.job;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.graylog.events.JobSchedulerTestClock;
import org.graylog.events.TestEventProcessorParameters;
import org.graylog.events.processor.EventProcessorEngine;
import org.graylog.events.processor.EventProcessorExecutionJob;
import org.graylog.events.processor.EventProcessorParameters;
import org.graylog.scheduler.JobDefinitionDto;
import org.graylog.scheduler.JobExecutionContext;
import org.graylog.scheduler.JobExecutionException;
import org.graylog.scheduler.JobScheduleStrategies;
import org.graylog.scheduler.JobTriggerDto;
import org.graylog.scheduler.JobTriggerStatus;
import org.graylog.scheduler.JobTriggerUpdate;
import org.graylog.scheduler.JobTriggerUpdates;
import org.graylog.scheduler.schedule.IntervalJobSchedule;
import org.graylog.scheduler.schedule.OnceJobSchedule;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;

/* loaded from: input_file:org/graylog/scheduler/job/EventProcessorExecutionJobTest.class */
public class EventProcessorExecutionJobTest {

    @Rule
    public final MockitoRule mockitoRule = MockitoJUnit.rule();

    @Mock
    private EventProcessorEngine eventProcessorEngine;
    private JobScheduleStrategies jobScheduleStrategies;
    private JobSchedulerTestClock clock;

    @Before
    public void setUp() {
        this.clock = new JobSchedulerTestClock(DateTime.parse("2019-01-01T00:00:00.000Z"));
        this.jobScheduleStrategies = new JobScheduleStrategies(this.clock);
    }

    @Test
    public void execute() throws Exception {
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(60L).getMillis();
        TestEventProcessorParameters create = TestEventProcessorParameters.create(nowUTC.minus(millis), nowUTC);
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(create).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(IntervalJobSchedule.builder().interval(1L).unit(TimeUnit.SECONDS).build()).build();
        JobTriggerUpdate execute = eventProcessorExecutionJob.execute(JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build());
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.times(1))).execute("processor-1", create);
        Assertions.assertThat(execute.nextTime()).isPresent().get().isEqualTo(nowUTC.plusSeconds(1));
        Assertions.assertThat(execute.data()).isPresent().get().isEqualTo(EventProcessorExecutionJob.Data.builder().timerangeFrom(nowUTC.plusMillis(1)).timerangeTo(nowUTC.plus(millis)).build());
        Assertions.assertThat(execute.status()).isNotPresent();
    }

    @Test
    public void executeWithNextTimeNotBasedOnCurrentTime() throws Exception {
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(60L).getMillis();
        TestEventProcessorParameters create = TestEventProcessorParameters.create(nowUTC.minus(millis), nowUTC);
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(create).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(IntervalJobSchedule.builder().interval(1L).unit(TimeUnit.SECONDS).build()).build();
        JobExecutionContext build3 = JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build();
        ((EventProcessorEngine) Mockito.doAnswer(invocationOnMock -> {
            this.clock.plus(10L, TimeUnit.SECONDS);
            return null;
        }).when(this.eventProcessorEngine)).execute((String) ArgumentMatchers.any(), (EventProcessorParameters) ArgumentMatchers.any());
        JobTriggerUpdate execute = eventProcessorExecutionJob.execute(build3);
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.times(1))).execute("processor-1", create);
        Assertions.assertThat(execute.nextTime()).isPresent().get().isEqualTo(nowUTC.plusSeconds(1));
        Assertions.assertThat(execute.data()).isPresent().get().isEqualTo(EventProcessorExecutionJob.Data.builder().timerangeFrom(nowUTC.plusMillis(1)).timerangeTo(nowUTC.plus(millis)).build());
        Assertions.assertThat(execute.status()).isNotPresent();
    }

    @Test
    public void executeWithTriggerDataTimerange() throws Exception {
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(60L).getMillis();
        DateTime minus = nowUTC.minusDays(10).minus(millis);
        DateTime minusDays = nowUTC.minusDays(10);
        DateTime minus2 = nowUTC.minus(millis);
        TestEventProcessorParameters create = TestEventProcessorParameters.create(minus, minusDays);
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(create).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(IntervalJobSchedule.builder().interval(1L).unit(TimeUnit.SECONDS).build()).data(EventProcessorExecutionJob.Data.create(minus2, nowUTC)).build();
        JobExecutionContext build3 = JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build();
        ((EventProcessorEngine) Mockito.doAnswer(invocationOnMock -> {
            this.clock.plus(5L, TimeUnit.SECONDS);
            return null;
        }).when(this.eventProcessorEngine)).execute((String) ArgumentMatchers.any(), (EventProcessorParameters) ArgumentMatchers.any());
        JobTriggerUpdate execute = eventProcessorExecutionJob.execute(build3);
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.times(1))).execute("processor-1", create.withTimerange(minus2, nowUTC));
        Assertions.assertThat(execute.nextTime()).isPresent().get().isEqualTo(nowUTC.plusSeconds(1));
        Assertions.assertThat(execute.data()).isPresent().get().isEqualTo(EventProcessorExecutionJob.Data.builder().timerangeFrom(nowUTC.plusMillis(1)).timerangeTo(nowUTC.plus(millis)).build());
        Assertions.assertThat(execute.status()).isNotPresent();
    }

    @Test
    public void executeWithInvalidTimerange() throws Exception {
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(60L).getMillis();
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(TestEventProcessorParameters.create(nowUTC.plusSeconds(1), nowUTC)).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(IntervalJobSchedule.builder().interval(1L).unit(TimeUnit.SECONDS).build()).build();
        JobExecutionContext build3 = JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build();
        Assertions.assertThatThrownBy(() -> {
            eventProcessorExecutionJob.execute(build3);
        }).isInstanceOf(JobExecutionException.class).hasMessageContaining("is not after").satisfies(th -> {
            JobExecutionException jobExecutionException = (JobExecutionException) th;
            Assertions.assertThat(jobExecutionException.getTrigger()).isEqualTo(build2);
            Assertions.assertThat(jobExecutionException.getUpdate()).satisfies(jobTriggerUpdate -> {
                Assertions.assertThat(jobTriggerUpdate.nextTime()).isPresent().get().isEqualTo(nowUTC);
                Assertions.assertThat(jobTriggerUpdate.data()).isNotPresent();
                Assertions.assertThat(jobTriggerUpdate.status()).isPresent().get().isEqualTo(JobTriggerStatus.ERROR);
            });
        });
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.never())).execute((String) ArgumentMatchers.any(), (EventProcessorParameters) ArgumentMatchers.any());
    }

    @Test
    public void executeWithTimerangeInTheFuture() throws Exception {
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(60L).getMillis();
        DateTime plusDays = nowUTC.plusDays(1);
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(TestEventProcessorParameters.create(nowUTC, plusDays)).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(IntervalJobSchedule.builder().interval(1L).unit(TimeUnit.SECONDS).build()).build();
        JobTriggerUpdate execute = eventProcessorExecutionJob.execute(JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build());
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.never())).execute((String) ArgumentMatchers.any(), (EventProcessorParameters) ArgumentMatchers.any());
        Assertions.assertThat(execute.nextTime()).isPresent().get().isEqualTo(plusDays);
        Assertions.assertThat(execute.data()).isNotPresent();
        Assertions.assertThat(execute.status()).isNotPresent();
    }

    @Test
    public void executeWithOnceSchedule() throws Exception {
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(60L).getMillis();
        TestEventProcessorParameters create = TestEventProcessorParameters.create(nowUTC.minus(millis), nowUTC);
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(create).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(OnceJobSchedule.create()).build();
        JobTriggerUpdate execute = eventProcessorExecutionJob.execute(JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build());
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.times(1))).execute("processor-1", create);
        Assertions.assertThat(execute.nextTime()).isNotPresent();
        Assertions.assertThat(execute.data()).isNotPresent();
        Assertions.assertThat(execute.status()).isNotPresent();
    }

    @Test
    public void executeWithHoppingWindow() throws Exception {
        this.clock.plus(60L, TimeUnit.SECONDS);
        DateTime nowUTC = this.clock.nowUTC();
        long millis = Duration.standardSeconds(60L).getMillis();
        long millis2 = Duration.standardSeconds(5L).getMillis();
        TestEventProcessorParameters create = TestEventProcessorParameters.create(nowUTC.minus(millis), nowUTC);
        JobDefinitionDto build = JobDefinitionDto.builder().id("job-1").title("Test").description("A test").config(EventProcessorExecutionJob.Config.builder().eventDefinitionId("processor-1").processingWindowSize(millis).processingHopSize(millis2).parameters(create).build()).build();
        EventProcessorExecutionJob eventProcessorExecutionJob = new EventProcessorExecutionJob(this.jobScheduleStrategies, this.clock, this.eventProcessorEngine, build);
        JobTriggerDto build2 = JobTriggerDto.builderWithClock(this.clock).id("trigger-1").jobDefinitionId(build.id()).startTime(nowUTC).nextTime(nowUTC).status(JobTriggerStatus.RUNNABLE).schedule(IntervalJobSchedule.builder().interval(5L).unit(TimeUnit.SECONDS).build()).build();
        JobExecutionContext build3 = JobExecutionContext.builder().definition(build).trigger(build2).isRunning(new AtomicBoolean(true)).jobTriggerUpdates(new JobTriggerUpdates(this.clock, this.jobScheduleStrategies, build2)).build();
        ((EventProcessorEngine) Mockito.doAnswer(invocationOnMock -> {
            this.clock.plus(7L, TimeUnit.SECONDS);
            return null;
        }).when(this.eventProcessorEngine)).execute((String) ArgumentMatchers.any(), (EventProcessorParameters) ArgumentMatchers.any());
        JobTriggerUpdate execute = eventProcessorExecutionJob.execute(build3);
        ((EventProcessorEngine) Mockito.verify(this.eventProcessorEngine, Mockito.times(1))).execute("processor-1", create);
        Assertions.assertThat(execute.nextTime()).isPresent().get().isEqualTo(nowUTC.plusSeconds(5));
        Assertions.assertThat(execute.data()).isPresent().get().isEqualTo(EventProcessorExecutionJob.Data.builder().timerangeFrom(nowUTC.plus(millis2).minus(millis).plusMillis(1)).timerangeTo(nowUTC.plus(millis2)).build());
        Assertions.assertThat(execute.status()).isNotPresent();
    }

    @Test
    public void dataObject() {
        DateTime now = DateTime.now(DateTimeZone.UTC);
        DateTime minusMinutes = now.minusMinutes(10);
        DateTime plusMinutes = now.plusMinutes(10);
        Assertions.assertThat(EventProcessorExecutionJob.Data.create(minusMinutes, plusMinutes)).satisfies(data -> {
            Assertions.assertThat(data.timerangeFrom()).isEqualTo(minusMinutes);
            Assertions.assertThat(data.timerangeTo()).isEqualTo(plusMinutes);
        });
        Assertions.assertThatThrownBy(() -> {
            EventProcessorExecutionJob.Data.create(plusMinutes, minusMinutes);
        }).hasMessageContaining("from").hasMessageContaining("to").isInstanceOf(IllegalArgumentException.class);
    }
}
