package com.appian.kafka;

import com.appian.komodo.util.kafka.SingleTopicPartitionConsumer;
import com.appiancorp.common.monitoring.Stopwatch;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appian/kafka/KafkaConsumerListener.class */
public class KafkaConsumerListener<T> implements Runnable {
    public static final Logger log = Logger.getLogger(KafkaConsumerListener.class);
    private final AtomicBoolean isClosed;
    private final AtomicBoolean isRunning;
    private KafkaConsumerSupport<T> kafkaTopic;
    private final String listenerId;
    private final String listenerName;
    private final ExecutorService handleMessages;
    private final KafkaMetricsCollector metricsCollector;
    private final Supplier<Long> highWaterMarkQueueSize;
    private final Supplier<Double> highWaterMarkTimeSec;
    private int consecutiveFailureCount;
    private final boolean shouldCommit;
    private Consumer<Void, byte[]> consumer;
    private SingleTopicPartitionConsumer singleTopicPartitionConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/appian/kafka/KafkaConsumerListener$KafkaTask.class */
    public class KafkaTask implements Callable<Integer> {
        List<ConsumerRecord<Void, byte[]>> records;
        List<T> dataItems;
        String listenerName;

        KafkaTask(List<ConsumerRecord<Void, byte[]>> list, List<T> list2, String str) {
            this.records = list;
            this.dataItems = list2;
            this.listenerName = str;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() {
            int onKafkaTaskFailure;
            if (KafkaConsumerListener.log.isDebugEnabled()) {
                KafkaConsumerListener.log.debug(this.listenerName + " received " + this.records.size() + " items from the partitions [" + this.records.stream().map((v0) -> {
                    return v0.partition();
                }).distinct().collect(Collectors.toList()) + "] with the last offset = " + this.records.get(this.records.size() - 1).offset());
            }
            try {
                onKafkaTaskFailure = KafkaConsumerListener.this.kafkaTopic.processMessages(this.dataItems);
                if (onKafkaTaskFailure < this.records.size()) {
                    KafkaConsumerListener.log.warn(KafkaConsumerListener.this.kafkaTopic.toString() + "' only commited to index " + onKafkaTaskFailure + " (exclusive) when processing " + this.dataItems);
                } else if (KafkaConsumerListener.log.isDebugEnabled()) {
                    KafkaConsumerListener.log.debug(KafkaConsumerListener.this.kafkaTopic.toString() + "' processed " + this.dataItems);
                }
            } catch (Exception e) {
                KafkaConsumerListener.log.error(KafkaConsumerListener.this.kafkaTopic.toString() + "' has failed to process " + this.dataItems + " with the error: " + e.getMessage(), e);
                onKafkaTaskFailure = KafkaConsumerListener.this.onKafkaTaskFailure(this.dataItems);
            }
            if (onKafkaTaskFailure > 0) {
                KafkaConsumerListener.this.consecutiveFailureCount = 0;
            }
            return Integer.valueOf(onKafkaTaskFailure);
        }
    }

    public KafkaConsumerListener(String str, Supplier<Long> supplier, Supplier<Double> supplier2, KafkaMetricsCollector kafkaMetricsCollector) {
        this(str, supplier, supplier2, kafkaMetricsCollector, true);
    }

    public KafkaConsumerListener(String str, Supplier<Long> supplier, Supplier<Double> supplier2, KafkaMetricsCollector kafkaMetricsCollector, boolean z) {
        this.isClosed = new AtomicBoolean(false);
        this.isRunning = new AtomicBoolean(false);
        this.handleMessages = Executors.newFixedThreadPool(1);
        this.consecutiveFailureCount = 0;
        this.singleTopicPartitionConsumer = null;
        this.listenerId = str;
        this.listenerName = getClass().getSimpleName() + "[" + this.listenerId + "]";
        this.highWaterMarkQueueSize = supplier;
        this.highWaterMarkTimeSec = supplier2;
        this.metricsCollector = kafkaMetricsCollector;
        this.shouldCommit = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setKafkaTopic(KafkaConsumerSupport<T> kafkaConsumerSupport) {
        this.kafkaTopic = kafkaConsumerSupport;
    }

    private void reconnectConsumers() {
        this.metricsCollector.incrementConsumerRecreation(this.kafkaTopic.getTopicName());
        if (this.singleTopicPartitionConsumer != null) {
            log.info("Closing existing consumer to reconnect " + this.listenerName);
            this.singleTopicPartitionConsumer.close();
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                this.isClosed.set(true);
            }
        }
        this.consumer = this.kafkaTopic.createKafkaConsumer();
        log.info("created kafka consumer for " + this.listenerName);
        this.singleTopicPartitionConsumer = this.kafkaTopic.createSingleTopicPartitionConsumer(this.consumer);
        log.info("created SingleTopicPartitionConsumer for " + this.listenerName);
    }

    @Override // java.lang.Runnable
    public void run() {
        this.isRunning.set(true);
        long pollTimeout = this.kafkaTopic.getPollTimeout() / 3;
        log.info("Starting  " + this.listenerName + " ...");
        reconnectConsumers();
        while (!this.isClosed.get()) {
            try {
                try {
                    List<ConsumerRecord<Void, byte[]>> poll = this.singleTopicPartitionConsumer.poll(this.kafkaTopic.getPollTimeout());
                    if (poll.isEmpty()) {
                        this.metricsCollector.recordNumberInQueueMetric(0L, this.kafkaTopic.getTopicName());
                    }
                    ImmutableSet<TopicPartition> of = ImmutableSet.of(this.singleTopicPartitionConsumer.getTopicPartition());
                    if (!this.isClosed.get() && !poll.isEmpty()) {
                        processRecords(pollTimeout, poll, of);
                    }
                } catch (Exception e) {
                    String format = String.format(this.listenerName + " has failed with the error: %s", e.getMessage());
                    if (this.isClosed.get()) {
                        log.info(format);
                    } else {
                        log.error(format, e);
                        reconnectConsumers();
                    }
                }
            } finally {
                this.consumer.close();
                this.isRunning.set(false);
                log.info(this.listenerName + " has stopped");
            }
        }
    }

    private void processRecords(long j, List<ConsumerRecord<Void, byte[]>> list, ImmutableSet<TopicPartition> immutableSet) {
        recordNumQueued(this.singleTopicPartitionConsumer, immutableSet, this.kafkaTopic.getTopicName());
        Long valueOf = Long.valueOf(this.singleTopicPartitionConsumer.position() - list.size());
        Integer num = null;
        try {
            try {
                this.consumer.pause(immutableSet);
                int numberOfRecordsToProcess = this.kafkaTopic.getNumberOfRecordsToProcess(list);
                if (numberOfRecordsToProcess == 0) {
                    this.singleTopicPartitionConsumer.poll(this.kafkaTopic.timeToPauseOnEmptyProcessList(list));
                    if (0 == 0) {
                        try {
                            this.consumer.resume(immutableSet);
                            if (0 == 0) {
                                this.singleTopicPartitionConsumer.seek(valueOf.longValue());
                            } else {
                                if (this.shouldCommit) {
                                    this.singleTopicPartitionConsumer.commitOffset(valueOf.longValue() + num.intValue());
                                }
                                if (num.intValue() < list.size()) {
                                    this.singleTopicPartitionConsumer.seek(valueOf.longValue() + num.intValue());
                                }
                            }
                        } catch (Exception e) {
                            log.warn("Exception when Kafka consumer resuming or committing offset for: " + this.listenerName, e);
                            reconnectConsumers();
                            return;
                        }
                    }
                    return;
                }
                List<ConsumerRecord<Void, byte[]>> subList = list.subList(0, numberOfRecordsToProcess);
                Stopwatch stopwatch = new Stopwatch();
                List<T> convertToMessageToken = convertToMessageToken(subList);
                Integer doCommitValue = getDoCommitValue(j, this.handleMessages.submit(new KafkaTask(subList, convertToMessageToken, this.listenerName)), convertToMessageToken, stopwatch);
                this.metricsCollector.recordLatency(stopwatch, this.kafkaTopic.getTopicName());
                if (0 == 0) {
                    try {
                        this.consumer.resume(immutableSet);
                        if (doCommitValue == null) {
                            this.singleTopicPartitionConsumer.seek(valueOf.longValue());
                        } else {
                            if (this.shouldCommit) {
                                this.singleTopicPartitionConsumer.commitOffset(valueOf.longValue() + doCommitValue.intValue());
                            }
                            if (doCommitValue.intValue() < list.size()) {
                                this.singleTopicPartitionConsumer.seek(valueOf.longValue() + doCommitValue.intValue());
                            }
                        }
                    } catch (Exception e2) {
                        log.warn("Exception when Kafka consumer resuming or committing offset for: " + this.listenerName, e2);
                        reconnectConsumers();
                    }
                }
            } catch (IllegalStateException e3) {
                log.warn("Appears that the Kafka partition has been re-assigned before pause for: " + this.listenerName, e3);
                reconnectConsumers();
                if (1 == 0) {
                    try {
                        this.consumer.resume(immutableSet);
                        if (0 == 0) {
                            this.singleTopicPartitionConsumer.seek(valueOf.longValue());
                        } else {
                            if (this.shouldCommit) {
                                this.singleTopicPartitionConsumer.commitOffset(valueOf.longValue() + num.intValue());
                            }
                            if (num.intValue() < list.size()) {
                                this.singleTopicPartitionConsumer.seek(valueOf.longValue() + num.intValue());
                            }
                        }
                    } catch (Exception e4) {
                        log.warn("Exception when Kafka consumer resuming or committing offset for: " + this.listenerName, e4);
                        reconnectConsumers();
                    }
                }
            }
        } catch (Throwable th) {
            if (0 == 0) {
                try {
                    this.consumer.resume(immutableSet);
                    if (0 == 0) {
                        this.singleTopicPartitionConsumer.seek(valueOf.longValue());
                    } else {
                        if (this.shouldCommit) {
                            this.singleTopicPartitionConsumer.commitOffset(valueOf.longValue() + num.intValue());
                        }
                        if (num.intValue() < list.size()) {
                            this.singleTopicPartitionConsumer.seek(valueOf.longValue() + num.intValue());
                        }
                    }
                } catch (Exception e5) {
                    log.warn("Exception when Kafka consumer resuming or committing offset for: " + this.listenerName, e5);
                    reconnectConsumers();
                    throw th;
                }
            }
            throw th;
        }
    }

    private List<T> convertToMessageToken(List<ConsumerRecord<Void, byte[]>> list) {
        return (List) list.stream().map((v0) -> {
            return v0.value();
        }).map(bArr -> {
            try {
                return this.kafkaTopic.getMessageHandler().fromKafkaMessageValue(bArr);
            } catch (Exception e) {
                log.error(String.format("The listener for Kafka Consumer [%s] has failed to parse message: [%s]%nError: %s", this.listenerName, bArr, e.getMessage()), e);
                return null;
            }
        }).collect(Collectors.toList());
    }

    private Integer getDoCommitValue(long j, Future<Integer> future, List<T> list, Stopwatch stopwatch) {
        Integer num = null;
        do {
            try {
                num = future.get(j, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException e) {
                log.error(this.kafkaTopic.toString() + "' has failed to execute " + list + " with the error: " + e.getMessage(), e);
                num = 0;
                if (e instanceof InterruptedException) {
                    this.isClosed.set(true);
                }
            } catch (CancellationException e2) {
                log.error(this.kafkaTopic.toString() + "' has been canceled for " + list + " with the error: " + e2.getMessage(), e2);
                num = 0;
            } catch (TimeoutException e3) {
                log.debug("About to send heartbeat to kafka broker - waiting for utility thread to complete task...");
                this.singleTopicPartitionConsumer.poll(0L);
            }
            recordCurrentItemProcessingTime(stopwatch, this.kafkaTopic.getTopicName());
            if (num != null) {
                break;
            }
        } while (!this.isClosed.get());
        return num;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int onKafkaTaskFailure(List<T> list) {
        this.consecutiveFailureCount++;
        if (this.consecutiveFailureCount < this.kafkaTopic.getMaxConsecutiveFailuresAllowed()) {
            return 0;
        }
        int min = Math.min(this.kafkaTopic.getMaxDataItemsToDeadletterOnError(), list.size());
        this.kafkaTopic.onDeadLetteringDataItems((List) list.stream().limit(min).collect(Collectors.toList()));
        return min;
    }

    private boolean recordNumQueued(SingleTopicPartitionConsumer singleTopicPartitionConsumer, ImmutableSet<TopicPartition> immutableSet, String str) {
        long longValue = ((Long) this.consumer.endOffsets(immutableSet).get(singleTopicPartitionConsumer.getTopicPartition())).longValue() - this.consumer.position((TopicPartition) immutableSet.iterator().next());
        this.metricsCollector.recordNumberInQueueMetric(longValue, str);
        if (longValue <= this.highWaterMarkQueueSize.get().longValue()) {
            return false;
        }
        log.warn("Potentially backed up kafka queue. The Number of messages in the queue is:  " + longValue + " for Topic " + str);
        return true;
    }

    private boolean recordCurrentItemProcessingTime(Stopwatch stopwatch, String str) {
        double recordCurrentTimeSpentProcessing = this.metricsCollector.recordCurrentTimeSpentProcessing(stopwatch, str);
        if (recordCurrentTimeSpentProcessing <= this.highWaterMarkTimeSec.get().doubleValue()) {
            return false;
        }
        log.warn("Potentially long message processed. Current time spent processing this record is " + recordCurrentTimeSpentProcessing + " seconds for Topic: " + str);
        return true;
    }

    @VisibleForTesting
    void cancel() {
        this.isClosed.set(true);
    }

    @VisibleForTesting
    public boolean isRunning() {
        return this.isRunning.get();
    }
}
