package com.appian.documentunderstanding.queue.kafka;

import com.appian.documentunderstanding.OcrOperationStatus;
import com.appian.documentunderstanding.client.ClientResponse;
import com.appian.documentunderstanding.client.DocumentUnderstandingClient;
import com.appian.documentunderstanding.client.DocumentUnderstandingClientProvider;
import com.appian.documentunderstanding.client.JobsBatch;
import com.appian.documentunderstanding.common.DocumentExtractionLimitsConfiguration;
import com.appian.documentunderstanding.exception.DocExtractionException;
import com.appian.documentunderstanding.exception.DocExtractionGenericException;
import com.appian.documentunderstanding.queue.DocExtractJobService;
import com.appian.documentunderstanding.queue.metrics.DocExtractKafkaMetricsCollectorImpl;
import com.appian.kafka.KafkaTopicManager;
import com.appiancorp.documentunderstanding.persistence.DocExtractBatch;
import com.appiancorp.documentunderstanding.persistence.DocExtractJob;
import com.appiancorp.documentunderstanding.persistence.Vendor;
import com.appiancorp.security.auth.SecurityEscalator;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appian/documentunderstanding/queue/kafka/PendingQueueManager.class */
public class PendingQueueManager extends OffsetQueueManager<PendingMessageToken> {
    private static final Logger LOG = LoggerFactory.getLogger(PendingQueueManager.class);
    private final KafkaTopicManager kafkaTopicManager;
    private final DocumentExtractionLimitsConfiguration limitsConfiguration;
    private final DocumentUnderstandingClientProvider clientProvider;

    /* loaded from: input_file:com/appian/documentunderstanding/queue/kafka/PendingQueueManager$RecordCountAggregator.class */
    private static class RecordCountAggregator {
        private final int limit;
        int numberOfNulls = 0;
        int numberOfNonNulls = 0;

        public RecordCountAggregator(int i) {
            this.limit = i;
        }

        public RecordCountAggregator aggregate(ConsumerRecord<Void, byte[]> consumerRecord) {
            if (consumerRecord == null) {
                this.numberOfNulls++;
            } else {
                this.numberOfNonNulls++;
            }
            return this;
        }

        public boolean withinBatchSize() {
            return this.numberOfNonNulls < this.limit;
        }

        public int getTotalRecordsAggregated() {
            return this.numberOfNonNulls + this.numberOfNulls;
        }
    }

    public PendingQueueManager(DocExtractJobService docExtractJobService, DocumentExtractionLimitsConfiguration documentExtractionLimitsConfiguration, KafkaTopicManager kafkaTopicManager, SecurityEscalator securityEscalator, DocumentUnderstandingClientProvider documentUnderstandingClientProvider) {
        super(docExtractJobService, securityEscalator);
        this.kafkaTopicManager = kafkaTopicManager;
        this.limitsConfiguration = documentExtractionLimitsConfiguration;
        this.clientProvider = documentUnderstandingClientProvider;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.appian.documentunderstanding.queue.kafka.QueueManager
    public void onMessageFailure(PendingMessageToken pendingMessageToken) {
        updateDocExtractJobStatusInDb(OcrOperationStatus.ERROR, pendingMessageToken.getAppianJobId());
    }

    @Override // com.appian.documentunderstanding.queue.kafka.OffsetQueueManager
    public int processMessages(List<PendingMessageToken> list) {
        int maxNumberOfRequestsPerBatch = this.limitsConfiguration.getMaxNumberOfRequestsPerBatch();
        Vendor vendor = list.get(0).getVendor();
        String processorId = list.get(0).getProcessorId();
        int calculateMessagesProcessed = calculateMessagesProcessed(list, maxNumberOfRequestsPerBatch, vendor, processorId);
        List<Long> filteredJobIds = getFilteredJobIds(list, maxNumberOfRequestsPerBatch, vendor, processorId);
        try {
            DocumentUnderstandingClient documentUnderstandingClient = this.clientProvider.get(vendor);
            Collection<DocExtractJob> docExtractJobs = getDocExtractJobs(filteredJobIds);
            if (docExtractJobs.size() > 0) {
                JobsBatch jobsBatch = new JobsBatch(docExtractJobs, processorId);
                List<Long> list2 = (List) docExtractJobs.stream().map(docExtractJob -> {
                    return docExtractJob.getId();
                }).collect(Collectors.toList());
                try {
                    ClientResponse<DocExtractBatch> startBatch = documentUnderstandingClient.startBatch(jobsBatch);
                    if (startBatch.isRetryableError()) {
                        LOG.warn("Received a retryable error when starting ocr batch. Batch will be retried next time the pending topic is polled");
                        return 0;
                    }
                    DocExtractKafkaMetricsCollectorImpl.DOC_EXTRACT_KAFKA_METRICS_COLLECTOR.recordBatchSize(docExtractJobs.size(), Vendor.GOOGLE.name());
                    sendBatch(documentUnderstandingClient, list2, startBatch);
                } catch (DocExtractionException e) {
                    LOG.error("Non retryable error when starting batch, marking all messages as processed and setting status to error", e);
                    list2.forEach(l -> {
                        updateDocExtractJobStatusInDb(OcrOperationStatus.ERROR, l);
                    });
                    DocExtractKafkaMetricsCollectorImpl.DOC_EXTRACT_KAFKA_METRICS_COLLECTOR.recordBatchError(Vendor.GOOGLE.name());
                    return calculateMessagesProcessed;
                }
            }
            return calculateMessagesProcessed;
        } catch (DocExtractionGenericException e2) {
            LOG.error("Unable to get document understanding client, vendor on message was {}", vendor, e2);
            filteredJobIds.forEach(l2 -> {
                updateDocExtractJobStatusInDb(OcrOperationStatus.ERROR, l2);
            });
            return calculateMessagesProcessed;
        }
    }

    private List<Long> getFilteredJobIds(List<PendingMessageToken> list, int i, Vendor vendor, String str) {
        List list2 = (List) list.stream().limit(i).collect(Collectors.toList());
        Set set = (Set) list2.stream().map((v0) -> {
            return v0.getVendor();
        }).collect(Collectors.toSet());
        if (set.size() > 1) {
            LOG.error("Different vendors found when attempting to process messages vendors={}", set);
            int i2 = 1;
            while (i2 < list2.size() && vendor.equals(((PendingMessageToken) list2.get(i2)).getVendor())) {
                i2++;
            }
            list2 = list2.subList(0, i2);
        }
        return (List) ((List) list2.stream().filter(pendingMessageToken -> {
            return Objects.equals(str, pendingMessageToken.getProcessorId());
        }).collect(Collectors.toList())).stream().map((v0) -> {
            return v0.getAppianJobId();
        }).collect(Collectors.toList());
    }

    private void sendBatch(DocumentUnderstandingClient documentUnderstandingClient, List<Long> list, ClientResponse<DocExtractBatch> clientResponse) {
        DocExtractBatch resultObject = clientResponse.getResultObject();
        try {
            this.securityEscalator.runAsAdmin(() -> {
                this.docExtractJobService.updateDocExtractJobsWithGoogleBatch(list, this.docExtractJobService.createBatch(resultObject).getId());
            });
            this.kafkaTopicManager.sendMessage(documentUnderstandingClient.buildMessageTokenForBatch(resultObject), BatchKafkaTopic.DOC_EXTRACT_POLL_TOPIC);
        } catch (Exception e) {
            LOG.error("Unable to send batch message to kafka", e);
            list.forEach(l -> {
                updateDocExtractJobStatusInDb(OcrOperationStatus.ERROR, l);
            });
            if (resultObject == null || resultObject.getId() == null) {
                return;
            }
            this.securityEscalator.runAsAdmin(() -> {
                this.docExtractJobService.completeBatchById(resultObject.getId());
            });
        }
    }

    private Collection<DocExtractJob> getDocExtractJobs(List<Long> list) {
        return (Collection) ((Collection) this.securityEscalator.runAsAdmin(() -> {
            return this.docExtractJobService.getLazy(list);
        })).stream().filter(docExtractJob -> {
            return (OcrOperationStatus.ERROR.equals(docExtractJob.getJobStatus()) || Objects.isNull(docExtractJob.getGoogleInputDocument()) || !Objects.isNull(docExtractJob.getGoogleInputDocument().getBatch())) ? false : true;
        }).collect(Collectors.toList());
    }

    private int calculateMessagesProcessed(List<PendingMessageToken> list, int i, Vendor vendor, String str) {
        int i2 = 1;
        while (i2 < list.size() && Objects.equals(str, list.get(i2).getProcessorId()) && Objects.equals(vendor, list.get(i2).getVendor())) {
            i2++;
        }
        return Math.min(i2, i);
    }

    @Override // com.appian.documentunderstanding.queue.kafka.QueueManager
    public int getNumberOfRecordsToProcess(List<ConsumerRecord<Void, byte[]>> list) {
        if (!areBatchSlotsAvailable()) {
            return 0;
        }
        RecordCountAggregator recordCountAggregator = new RecordCountAggregator(this.limitsConfiguration.getMaxNumberOfRequestsPerBatch());
        Iterator<ConsumerRecord<Void, byte[]>> it = list.iterator();
        while (it.hasNext() && recordCountAggregator.withinBatchSize()) {
            recordCountAggregator.aggregate(it.next());
        }
        return recordCountAggregator.getTotalRecordsAggregated();
    }

    private boolean areBatchSlotsAvailable() {
        SecurityEscalator securityEscalator = this.securityEscalator;
        DocExtractJobService docExtractJobService = this.docExtractJobService;
        docExtractJobService.getClass();
        return ((Integer) securityEscalator.runAsAdmin(docExtractJobService::getInFlightBatchCount)).intValue() < this.limitsConfiguration.getMaxNumberOfConcurrentBatches();
    }
}
