package com.appian.komodo.util.kafka;

import com.google.auto.factory.AutoFactory;
import com.google.auto.factory.Provided;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import komodo.shaded.com.google.common.base.Throwables;
import komodo.shaded.com.google.common.collect.ImmutableList;
import komodo.shaded.com.google.common.collect.ImmutableMap;
import komodo.shaded.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
@AutoFactory
/* loaded from: input_file:com/appian/komodo/util/kafka/SingleTopicPartitionConsumer.class */
public class SingleTopicPartitionConsumer implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SingleTopicPartitionConsumer.class);
    private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 1000;
    private static final long RETRY_INTERVAL_MS = 250;
    private static final long LOG_PROGRESS_INTERVAL_MS = 5000;
    private final TopicPartition topicPartition;
    private final Consumer<Void, byte[]> consumer;
    private final long pollTimeout;

    /* loaded from: input_file:com/appian/komodo/util/kafka/SingleTopicPartitionConsumer$TopicCreationException.class */
    public static class TopicCreationException extends RuntimeException {
        static final long serialVersionUID = 1;

        TopicCreationException(Throwable th) {
            super(th);
        }
    }

    public static SingleTopicPartitionConsumer fromExistingTopic(TopicPartition topicPartition, Consumer<Void, byte[]> consumer) {
        return new SingleTopicPartitionConsumer(topicPartition, consumer, null, DEFAULT_KAFKA_POLL_TIMEOUT);
    }

    public static SingleTopicPartitionConsumer fromExistingTopic(TopicPartition topicPartition, Consumer<Void, byte[]> consumer, TopicManager topicManager) {
        return new SingleTopicPartitionConsumer(topicPartition, consumer, topicManager, DEFAULT_KAFKA_POLL_TIMEOUT);
    }

    public static SingleTopicPartitionConsumer fromExistingTopicWithRebalance(TopicPartition topicPartition, Consumer<Void, byte[]> consumer, TopicManager topicManager) {
        return new SingleTopicPartitionConsumer(topicPartition, consumer, topicManager, DEFAULT_KAFKA_POLL_TIMEOUT, true);
    }

    public SingleTopicPartitionConsumer(TopicPartition topicPartition, @Provided Consumer<Void, byte[]> consumer, @Nullable @Provided TopicManager topicManager, long j) {
        this(topicPartition, consumer, topicManager, j, false);
    }

    public SingleTopicPartitionConsumer(TopicPartition topicPartition, @Provided Consumer<Void, byte[]> consumer, @Nullable @Provided TopicManager topicManager, long j, boolean z) {
        this.topicPartition = topicPartition;
        this.consumer = consumer;
        this.pollTimeout = j;
        String str = topicPartition.topic();
        if (z) {
            this.consumer.subscribe(ImmutableList.of(str));
        } else {
            this.consumer.assign(ImmutableList.of(topicPartition));
        }
        ensureTopicAvailable(topicManager, str);
    }

    private void ensureTopicAvailable(TopicManager topicManager, String str) {
        if (topicManager == null) {
            return;
        }
        while (true) {
            try {
                topicManager.ensureAvailable(this);
                blockUntilTopicAvailable();
                return;
            } catch (Exception e) {
                if (Throwables.getRootCause(e) instanceof InterruptedException) {
                    LOG.debug("Interrupted creating topic {}", str, e);
                } else {
                    LOG.error("Non retryable error creating topic {}", str, e);
                }
                close();
                throw new TopicCreationException(e);
            } catch (RetriableException e2) {
                LOG.debug("Retrying to ensure topic {} is available.", str, e2);
            }
        }
    }

    public TopicPartition getTopicPartition() {
        return this.topicPartition;
    }

    public long seekToBeginning() {
        OffsetAndMetadata committed = this.consumer.committed(this.topicPartition);
        if (committed == null) {
            this.consumer.seekToBeginning(Collections.singletonList(this.topicPartition));
        } else {
            this.consumer.seek(this.topicPartition, committed.offset());
        }
        return position();
    }

    public long seekToEnd() {
        this.consumer.seekToEnd(Collections.singletonList(this.topicPartition));
        return position();
    }

    public void commitOffset(long j) {
        this.consumer.commitSync(ImmutableMap.of(getTopicPartition(), new OffsetAndMetadata(j, "Commit Offset")));
    }

    public long committedOffset() {
        OffsetAndMetadata committed = this.consumer.committed(getTopicPartition());
        if (committed != null) {
            return committed.offset();
        }
        return 0L;
    }

    public Optional<Long> beginningOffset() {
        try {
            return Optional.ofNullable((Long) this.consumer.beginningOffsets(Collections.singletonList(this.topicPartition)).get(this.topicPartition));
        } catch (KafkaException e) {
            return Optional.empty();
        }
    }

    public List<ConsumerRecord<Void, byte[]>> pollIndefinitely() {
        List<ConsumerRecord<Void, byte[]>> poll;
        do {
            poll = poll(this.pollTimeout);
        } while (poll.isEmpty());
        return poll;
    }

    public List<ConsumerRecord<Void, byte[]>> poll() {
        return poll(this.pollTimeout);
    }

    public List<ConsumerRecord<Void, byte[]>> poll(long j) {
        List<ConsumerRecord<Void, byte[]>> emptyList;
        try {
            emptyList = this.consumer.poll(Duration.ofMillis(j)).records(this.topicPartition);
        } catch (OffsetOutOfRangeException e) {
            LOG.warn("Unexpected OffsetOutOfRangeException for {}", this.topicPartition, e);
            throw e;
        } catch (NoOffsetForPartitionException e2) {
            LOG.info("No offset committed for partition {} on initial poll() call, seeking to beginning of topic partition", this.topicPartition);
            seekToBeginning();
            emptyList = Collections.emptyList();
        }
        return emptyList;
    }

    public long position() {
        try {
            return this.consumer.position(this.topicPartition);
        } catch (NoOffsetForPartitionException e) {
            LOG.info("No offset committed for partition {} on initial position() call, seeking to beginning of topic partition", this.topicPartition);
            return seekToBeginning();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            LOG.debug("Closing consumer for topic {}", this.topicPartition.topic());
            this.consumer.close();
        } catch (IllegalStateException e) {
            LOG.debug("Consumer was already closed for topic {}", this.topicPartition.topic());
        }
    }

    public void seek(long j) {
        this.consumer.seek(this.topicPartition, j);
    }

    public List<PartitionInfo> partitionsFor() {
        return this.consumer.partitionsFor(this.topicPartition.topic());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearCommitSync() {
        commitOffset(((Long) this.consumer.beginningOffsets(Collections.singletonList(this.topicPartition)).get(this.topicPartition)).longValue());
    }

    public void blockUntilTopicAvailable() {
        long j = 0;
        while (!partitionAvailable()) {
            poll();
            LOG.info("Waiting for Kafka to make topic {} available ...", this.topicPartition.topic());
            Uninterruptibles.sleepUninterruptibly(RETRY_INTERVAL_MS, TimeUnit.MILLISECONDS);
            j += RETRY_INTERVAL_MS;
            if (j % LOG_PROGRESS_INTERVAL_MS == 0) {
                LOG.warn("Waited over {}s for topic {} to be available.", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(j)), this.topicPartition.topic());
            }
        }
    }

    private boolean partitionAvailable() {
        String str = this.topicPartition.topic();
        List<PartitionInfo> partitionsFor = partitionsFor();
        if (partitionsFor != null) {
            Stream<R> map = partitionsFor.stream().filter(partitionInfo -> {
                return (partitionInfo.topic() == null || partitionInfo.leader() == null) ? false : true;
            }).map((v0) -> {
                return v0.topic();
            });
            Objects.requireNonNull(str);
            if (map.anyMatch((v1) -> {
                return r1.equals(v1);
            })) {
                return true;
            }
        }
        return false;
    }

    public void wakeup() {
        this.consumer.wakeup();
    }
}
