package com.appian.documentunderstanding.queue.kafka;

import com.appian.documentunderstanding.common.DocumentExtractionLimitsConfiguration;
import com.appian.documentunderstanding.queue.metrics.DocExtractKafkaMetricsCollectorImpl;
import com.appian.kafka.KafkaConsumerProcessor;
import com.appian.kafka.KafkaTopicManager;
import com.google.common.collect.Iterables;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/appian/documentunderstanding/queue/kafka/PendingKafkaConsumer.class */
public class PendingKafkaConsumer implements KafkaConsumerProcessor<PendingMessageToken> {
    private static final String DOC_EXTRACT_PENDING_CONSUMER_GROUP = "DOC_EXTRACT_PENDING_CONSUMER_GROUP";
    static final int MINIMUM_TIME_TO_PAUSE = 2000;
    private final PendingQueueManager pendingQueueManager;
    private static final Logger LOG = LoggerFactory.getLogger(PendingKafkaConsumer.class);
    private static final Duration TIME_TO_PAUSE_PER_RECORD = Duration.of(30, ChronoUnit.SECONDS);

    public PendingKafkaConsumer(DocumentExtractionLimitsConfiguration documentExtractionLimitsConfiguration, KafkaMonitoringConfiguration kafkaMonitoringConfiguration, PendingQueueManager pendingQueueManager, KafkaTopicManager kafkaTopicManager) {
        this.pendingQueueManager = pendingQueueManager;
        int maxNumberOfPolledRecordsFromPending = documentExtractionLimitsConfiguration.getMaxNumberOfPolledRecordsFromPending();
        kafkaMonitoringConfiguration.getClass();
        Supplier supplier = kafkaMonitoringConfiguration::getPendingTopicCountHighWaterMark;
        kafkaMonitoringConfiguration.getClass();
        kafkaTopicManager.registerQueueConsumer(PendingKafkaTopic.DOC_EXTRACT_PENDING_TOPIC, PendingKafkaTopic.DOC_EXTRACT_PENDING_TOPIC, maxNumberOfPolledRecordsFromPending, supplier, kafkaMonitoringConfiguration::getPendingTopicCurrentProcessingTimeHighWaterMarkMs, DocExtractKafkaMetricsCollectorImpl.DOC_EXTRACT_KAFKA_METRICS_COLLECTOR, this, DOC_EXTRACT_PENDING_CONSUMER_GROUP);
    }

    public int processMessages(List<PendingMessageToken> list) {
        return this.pendingQueueManager.processMessages(list);
    }

    public void onDeadLetteringDataItems(List<PendingMessageToken> list) {
        try {
            LOG.warn("Document Extraction: Kafka failed to process messages and is committing " + list.size() + " messages to unblock the queue");
            this.pendingQueueManager.onMessagesFailure(list);
        } catch (Exception e) {
            LOG.warn("Document Extraction: Failed to process the " + list.size() + " errored PendingMessageToken messages");
        }
    }

    public Class<PendingMessageToken> getSupportedMessageType() {
        return PendingMessageToken.class;
    }

    public int getNumberOfRecordsToProcess(List<ConsumerRecord<Void, byte[]>> list) {
        return this.pendingQueueManager.getNumberOfRecordsToProcess(list);
    }

    public int timeToPauseOnEmptyProcessList(List<ConsumerRecord<Void, byte[]>> list) {
        int indexOf = Iterables.indexOf(list, (v0) -> {
            return Objects.nonNull(v0);
        });
        if (indexOf == -1) {
            return MINIMUM_TIME_TO_PAUSE;
        }
        long millis = TIME_TO_PAUSE_PER_RECORD.toMillis() - (System.currentTimeMillis() - list.get(indexOf).timestamp());
        return millis > 0 ? MINIMUM_TIME_TO_PAUSE + ((int) millis) : MINIMUM_TIME_TO_PAUSE;
    }
}
