package com.appian.documentunderstanding.queue.kafka;

import com.appian.documentunderstanding.OcrOperationStatus;
import com.appian.documentunderstanding.client.ClientResponse;
import com.appian.documentunderstanding.client.DocumentUnderstandingClient;
import com.appian.documentunderstanding.client.DocumentUnderstandingClientProvider;
import com.appian.documentunderstanding.client.google.GoogleClientConnectionTester;
import com.appian.documentunderstanding.common.DocumentExtractionLimitsConfiguration;
import com.appian.documentunderstanding.common.DocumentExtractionMetricConstants;
import com.appian.documentunderstanding.prediction.keyvalue.DocumentUnderstandingKvpEsSpringConfig;
import com.appian.documentunderstanding.queue.DocExtractJobService;
import com.appian.documentunderstanding.queue.metrics.DocExtractKafkaMetricsCollector;
import com.appian.kafka.KafkaTopicManager;
import com.appiancorp.common.monitoring.ProductMetricsAggregatedDataCollector;
import com.appiancorp.common.monitoring.Stopwatch;
import com.appiancorp.documentunderstanding.persistence.DocExtractJob;
import com.appiancorp.documentunderstanding.persistence.GoogleBatch;
import com.appiancorp.documentunderstanding.persistence.Vendor;
import com.appiancorp.security.auth.SecurityEscalator;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appian/documentunderstanding/queue/kafka/BatchQueueManager.class */
public class BatchQueueManager extends OffsetQueueManager<BatchMessageToken> {
    static final int MAX_NUM_POLLED_MESSAGES_FROM_BATCH_TOPIC = 1;
    private final KafkaTopicManager kafkaTopicManager;
    private final DocExtractKafkaMetricsCollector metricsCollector;
    private final DocumentUnderstandingClientProvider documentUnderstandingClientProvider;
    private final DocumentExtractionLimitsConfiguration documentExtractionLimitsConfiguration;
    private static final Logger LOG = LoggerFactory.getLogger(BatchQueueManager.class);
    static final Duration RECORD_PAUSE_TIME = Duration.of(30, ChronoUnit.SECONDS);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.appian.documentunderstanding.queue.kafka.BatchQueueManager$1, reason: invalid class name */
    /* loaded from: input_file:com/appian/documentunderstanding/queue/kafka/BatchQueueManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$appian$documentunderstanding$OcrOperationStatus = new int[OcrOperationStatus.values().length];

        static {
            try {
                $SwitchMap$com$appian$documentunderstanding$OcrOperationStatus[OcrOperationStatus.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$appian$documentunderstanding$OcrOperationStatus[OcrOperationStatus.IN_PROGRESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$appian$documentunderstanding$OcrOperationStatus[OcrOperationStatus.ERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public BatchQueueManager(DocExtractJobService docExtractJobService, KafkaTopicManager kafkaTopicManager, SecurityEscalator securityEscalator, DocExtractKafkaMetricsCollector docExtractKafkaMetricsCollector, DocumentUnderstandingClientProvider documentUnderstandingClientProvider, DocumentExtractionLimitsConfiguration documentExtractionLimitsConfiguration) {
        super(docExtractJobService, securityEscalator);
        this.kafkaTopicManager = kafkaTopicManager;
        this.metricsCollector = docExtractKafkaMetricsCollector;
        this.documentUnderstandingClientProvider = documentUnderstandingClientProvider;
        this.documentExtractionLimitsConfiguration = documentExtractionLimitsConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.appian.documentunderstanding.queue.kafka.QueueManager
    public void onMessageFailure(BatchMessageToken batchMessageToken) {
        Long l = -1L;
        try {
            l = getBatchId(batchMessageToken);
            ProductMetricsAggregatedDataCollector.recordData(DocumentExtractionMetricConstants.DE_START_EXTRACTION_QUEUE_ERROR_POLL, batchMessageToken.getRetryCount());
            this.metricsCollector.recordStatusRetries(batchMessageToken.getRetryCount(), BatchKafkaTopic.DOC_EXTRACT_POLL_TOPIC);
            completeBatchWithJobStatusInDb(OcrOperationStatus.ERROR, l);
        } catch (Exception e) {
            LOG.warn("Document Extraction: Failed change the job status to {} for batch [" + batchMessageToken.getVendorJobId() + "].", OcrOperationStatus.ERROR, l);
        }
    }

    void completeBatchWithJobStatusInDb(OcrOperationStatus ocrOperationStatus, Long l) {
        ((List) this.securityEscalator.runAsAdmin(() -> {
            return this.docExtractJobService.listJobsByBatch(l.longValue());
        })).forEach(docExtractJob -> {
            updateDocExtractJobStatusInDb(ocrOperationStatus, docExtractJob.getId());
        });
        this.securityEscalator.runAsAdmin(() -> {
            this.docExtractJobService.completeBatchById(l);
        });
    }

    @Override // com.appian.documentunderstanding.queue.kafka.OffsetQueueManager
    public int processMessages(List<BatchMessageToken> list) {
        BatchMessageToken batchMessageToken = list.get(0);
        return ((Integer) getClient(batchMessageToken.getVendor()).map(documentUnderstandingClient -> {
            return Integer.valueOf(processMessage(batchMessageToken, documentUnderstandingClient));
        }).orElse(0)).intValue();
    }

    private int processMessage(BatchMessageToken batchMessageToken, DocumentUnderstandingClient documentUnderstandingClient) {
        switch (AnonymousClass1.$SwitchMap$com$appian$documentunderstanding$OcrOperationStatus[getOperationStatus(batchMessageToken, documentUnderstandingClient).ordinal()]) {
            case 1:
                if (!processMessageWithCompleteStatus(batchMessageToken)) {
                    return 0;
                }
                this.metricsCollector.recordStatusRetries(batchMessageToken.getRetryCount(), BatchKafkaTopic.DOC_EXTRACT_POLL_TOPIC);
                ProductMetricsAggregatedDataCollector.recordData(DocumentExtractionMetricConstants.DE_START_EXTRACTION_QUEUE_COMPLETE_POLL, batchMessageToken.getRetryCount());
                return 1;
            case GoogleClientConnectionTester.DEFAULT_MAX_RETRY_COUNT /* 2 */:
                Long batchId = getBatchId(batchMessageToken);
                if (!hasBatchExpired(batchId, batchMessageToken.getRetryCount())) {
                    return 0;
                }
                LOG.warn("Document Extraction: Expiring batch [{}]", batchId);
                this.metricsCollector.recordStatusRetries(batchMessageToken.getRetryCount(), BatchKafkaTopic.DOC_EXTRACT_POLL_TOPIC);
                ProductMetricsAggregatedDataCollector.recordData(DocumentExtractionMetricConstants.DE_START_EXTRACTION_QUEUE_TIMEOUT);
                ProductMetricsAggregatedDataCollector.recordData(DocumentExtractionMetricConstants.DE_START_EXTRACTION_QUEUE_ERROR_POLL, batchMessageToken.getRetryCount());
                completeBatchWithJobStatusInDb(OcrOperationStatus.ERROR, batchId);
                return 1;
            case DocumentUnderstandingKvpEsSpringConfig.DOCUMENT_EXTRACTION_VERSION /* 3 */:
            default:
                onMessageFailure(batchMessageToken);
                return 1;
        }
    }

    private boolean processMessageWithCompleteStatus(BatchMessageToken batchMessageToken) {
        GoogleBatch googleBatch = (GoogleBatch) this.securityEscalator.runAsAdmin(() -> {
            return batchMessageToken.getBatchId() != null ? this.docExtractJobService.getBatchById(batchMessageToken.getBatchId()) : this.docExtractJobService.getLatestBatchByVendorJobId(batchMessageToken.getVendorJobId());
        });
        for (DocExtractJob docExtractJob : (List) this.securityEscalator.runAsAdmin(() -> {
            return this.docExtractJobService.listJobsByBatch(googleBatch.getId().longValue());
        })) {
            try {
                this.kafkaTopicManager.sendMessage(new DownloadMessageToken(docExtractJob.getVendor(), googleBatch.getGoogleJobId(), docExtractJob.getId()), DownloadKafkaTopic.DOC_EXTRACT_DOWNLOAD_TOPIC);
            } catch (Exception e) {
                LOG.error(String.format("Unable to send a Download message to kafka for id: %d", docExtractJob.getId()), e);
                updateDocExtractJobStatusInDb(OcrOperationStatus.ERROR, docExtractJob.getId());
            }
        }
        this.securityEscalator.runAsAdmin(() -> {
            this.docExtractJobService.completeBatchById(googleBatch.getId());
        });
        return true;
    }

    private Optional<DocumentUnderstandingClient> getClient(Vendor vendor) {
        try {
            return Optional.of(this.documentUnderstandingClientProvider.get(vendor));
        } catch (Exception e) {
            LOG.warn("Document Extraction: Apparent misconfiguration of AI provider, or provider is unavailable", e);
            return Optional.empty();
        }
    }

    private OcrOperationStatus getOperationStatus(BatchMessageToken batchMessageToken, DocumentUnderstandingClient documentUnderstandingClient) {
        try {
            Stopwatch stopwatch = new Stopwatch();
            ClientResponse<OcrOperationStatus> jobStatus = documentUnderstandingClient.getJobStatus(batchMessageToken.getVendorJobId());
            this.metricsCollector.recordQueryStatusLatency(stopwatch, BatchKafkaTopic.DOC_EXTRACT_POLL_TOPIC);
            return jobStatus.isRetryableError() ? OcrOperationStatus.IN_PROGRESS : jobStatus.getResultObject();
        } catch (IOException e) {
            LOG.warn(String.format("Document Extraction: Unexpected error getting job status for vendor job : %s", batchMessageToken.getVendorJobId()), e);
            return OcrOperationStatus.IN_PROGRESS;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.appian.documentunderstanding.queue.kafka.QueueManager
    public int getNumberOfRecordsToProcess(List<ConsumerRecord<Void, byte[]>> list) {
        Instant now = Instant.now();
        int indexOf = Iterables.indexOf(list, consumerRecord -> {
            return consumerRecord != null && recordIsNotReadyToProcess(now, consumerRecord);
        });
        return indexOf == -1 ? list.size() : indexOf;
    }

    private boolean recordIsNotReadyToProcess(Instant instant, ConsumerRecord<Void, byte[]> consumerRecord) {
        return instant.minus((TemporalAmount) RECORD_PAUSE_TIME).isBefore(Instant.ofEpochMilli(consumerRecord.timestamp()));
    }

    private Long getBatchId(BatchMessageToken batchMessageToken) {
        Long batchId = batchMessageToken.getBatchId();
        return batchId == null ? ((GoogleBatch) this.securityEscalator.runAsAdmin(() -> {
            return this.docExtractJobService.getLatestBatchByVendorJobId(batchMessageToken.getVendorJobId());
        })).getId() : batchId;
    }

    private boolean hasBatchExpired(Long l, int i) {
        int maxVendorProcessingTimeSeconds = this.documentExtractionLimitsConfiguration.getMaxVendorProcessingTimeSeconds();
        if (i <= ((int) Math.ceil(maxVendorProcessingTimeSeconds / RECORD_PAUSE_TIME.getSeconds()))) {
            return false;
        }
        Duration ofSeconds = Duration.ofSeconds(maxVendorProcessingTimeSeconds);
        Long l2 = (Long) this.securityEscalator.runAsAdmin(() -> {
            return this.docExtractJobService.getBatchCreateTime(l);
        });
        if (l2 != null) {
            return Instant.now().minus((TemporalAmount) ofSeconds).compareTo(Instant.ofEpochMilli(l2.longValue())) >= 0;
        }
        LOG.error("Document Extraction: Expiring batch due to inability to find a create time for batch [{}]", l);
        return true;
    }
}
