package com.appian.komodo.util.kafka;

import com.appian.komodo.topology.KafkaTopology;
import com.appian.komodo.util.kafka.admin.KafkaAdminUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import komodo.shaded.com.google.common.annotations.VisibleForTesting;
import komodo.shaded.com.google.common.base.Throwables;
import komodo.shaded.com.google.common.collect.ImmutableList;
import komodo.shaded.com.google.common.collect.ImmutableMap;
import komodo.shaded.com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:com/appian/komodo/util/kafka/TopicManager.class */
public class TopicManager implements AutoCloseable {
    private static final String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
    private static final long RETRY_INTERVAL_MS = 250;
    private static final long CLUSTER_CHECK_INTERVAL_MS = 500;
    private static final long LOG_PROGRESS_INTERVAL_MS = 5000;
    private static final long PARTITION_REASSIGNMENT_SLEEP_MS = 1000;
    private static final long MIN_BROKERS_REGISTERED_SLEEP_MS = 1000;
    private static final String OFFSET_TOPIC_SEGMENT_SIZE_CONFIG = "offsets.topic.segment.bytes";
    private static final String TRANSACTION_STATE_TOPIC_SEGMENT_SIZE_CONFIG = "transaction.state.log.segment.bytes";
    private static final String TRANSACTION_STATE_TOPIC_MIN_ISR_CONFIG = "transaction.state.log.min.isr";
    private final KafkaTopology topologySettings;
    private final KafkaTopicConfiguration kafkaTopicConfiguration;

    @VisibleForTesting
    protected AdminClient adminClient;
    private boolean needCleanup;
    public static final ListenerName PLAINTEXT_LISTENER = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT);
    private static final Logger LOG = LoggerFactory.getLogger(TopicManager.class);

    @Inject
    public TopicManager(KafkaTopology kafkaTopology, KafkaTopicConfiguration kafkaTopicConfiguration, AdminClient adminClient) {
        this.topologySettings = kafkaTopology;
        this.kafkaTopicConfiguration = kafkaTopicConfiguration;
        this.adminClient = adminClient;
    }

    public TopicManager(KafkaTopology kafkaTopology, KafkaTopicConfiguration kafkaTopicConfiguration) {
        this(kafkaTopology, kafkaTopicConfiguration, KafkaUtils.createAdminClient(kafkaTopology));
        this.needCleanup = true;
    }

    public boolean topicExists(TopicPartition topicPartition) throws ExecutionException, InterruptedException {
        TopicPartitionRuntimeException.validateTopicPartition(topicPartition);
        try {
            return KafkaAdminUtils.getTopicPartitionInfo(this.adminClient, topicPartition).isPresent();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                return false;
            }
            throw e;
        }
    }

    public void reassignConsumerOffsetsPartitionsIfNecessary() throws IllegalStateException {
        try {
            reassignTopicPartitionsIfNecessary(CONSUMER_OFFSETS_TOPIC);
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
                throw Throwables.propagate(e);
            }
        }
    }

    public void reconfigureInternalTopicsIfNecessary() {
        try {
            String valueOf = String.valueOf(getActiveNodes().iterator().next().id());
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, CONSUMER_OFFSETS_TOPIC);
            ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.BROKER, valueOf);
            ConfigResource configResource3 = new ConfigResource(ConfigResource.Type.TOPIC, "__transaction_state");
            Map values = this.adminClient.describeConfigs(ImmutableSet.of(configResource, configResource3, configResource2)).values();
            boolean z = false;
            try {
                z = ((Config) ((KafkaFuture) values.get(configResource)).get()).entries().stream().noneMatch(configEntry -> {
                    return configEntry.source().equals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
                });
            } catch (ExecutionException e) {
                LOG.debug("Skipping config update for {}", CONSUMER_OFFSETS_TOPIC, e);
            }
            boolean z2 = false;
            try {
                z2 = ((Config) ((KafkaFuture) values.get(configResource3)).get()).entries().stream().noneMatch(configEntry2 -> {
                    return configEntry2.source().equals(ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG);
                });
            } catch (ExecutionException e2) {
                LOG.debug("Skipping config update for {}", "__transaction_state", e2);
            }
            if (z) {
                LOG.info("Reconfiguring {} due to missing configs", configResource);
                this.adminClient.incrementalAlterConfigs(ImmutableMap.of(configResource, ImmutableList.of(new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("segment.bytes", ((Config) ((KafkaFuture) values.get(configResource2)).get()).get(OFFSET_TOPIC_SEGMENT_SIZE_CONFIG).value()), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("compression.type", "producer"), AlterConfigOp.OpType.SET)))).all().get();
            }
            if (z2) {
                LOG.info("Reconfiguring {} due to missing configs", configResource3);
                this.adminClient.incrementalAlterConfigs(ImmutableMap.of(configResource3, ImmutableList.of(new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", Boolean.FALSE.toString()), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("compression.type", "uncompressed"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("cleanup.policy", "compact"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("min.insync.replicas", ((Config) ((KafkaFuture) values.get(configResource2)).get()).get(TRANSACTION_STATE_TOPIC_MIN_ISR_CONFIG).value()), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("segment.bytes", ((Config) ((KafkaFuture) values.get(configResource2)).get()).get(TRANSACTION_STATE_TOPIC_SEGMENT_SIZE_CONFIG).value()), AlterConfigOp.OpType.SET)))).all().get();
            }
        } catch (RuntimeException e3) {
            LOG.error("Unable to reconfigure internal topics due to runtime exception", e3);
        } catch (Exception e4) {
            LOG.error("Unable to reconfigure internal topics", e4);
        }
    }

    public void ensureAvailable(SingleTopicPartitionConsumer singleTopicPartitionConsumer) throws InterruptedException, ExecutionException {
        ensureAvailable(singleTopicPartitionConsumer.getTopicPartition());
    }

    public void ensureAvailable(String str, int i) throws InterruptedException, ExecutionException {
        ensureAvailable(new TopicPartition(str, i - 1));
    }

    public void ensureAvailable(TopicPartition topicPartition) throws InterruptedException, IllegalStateException, ExecutionException {
        TopicPartitionRuntimeException.validateTopicPartition(topicPartition);
        String intern = topicPartition.topic().intern();
        int partition = topicPartition.partition() + 1;
        synchronized (this) {
            if (topicExists(topicPartition)) {
                waitForTopicPartitionAvailability(intern, partition);
            } else {
                createTopic(intern, partition);
            }
        }
        LOG.info("Topic {} available.", intern);
        alterConfigs(intern);
    }

    private void alterConfigs(String str) {
        try {
            this.adminClient.incrementalAlterConfigs(ImmutableMap.of(new ConfigResource(ConfigResource.Type.TOPIC, str), (List) this.kafkaTopicConfiguration.getTopicProperties(str).entrySet().stream().map(entry -> {
                return new AlterConfigOp(new ConfigEntry((String) entry.getKey(), (String) entry.getValue()), AlterConfigOp.OpType.SET);
            }).collect(Collectors.toList()))).all().get(30L, TimeUnit.SECONDS);
            LOG.info("Updated configs for topic {}", str);
        } catch (Exception e) {
            LOG.warn("Encountered the following exception while attempting to alter configs", e);
        }
    }

    @VisibleForTesting
    protected boolean reassignTopicPartitionsIfNecessary(String str) throws InterruptedException, ExecutionException {
        return reassignTopicPartitionsIfNecessary(str, KafkaAdminUtils.getTopicPartitions(this.adminClient, str));
    }

    private boolean reassignTopicPartitionsIfNecessary(String str, List<TopicPartitionInfo> list) throws InterruptedException, ExecutionException {
        List list2 = (List) list.stream().filter(topicPartitionInfo -> {
            return topicPartitionInfo.replicas().size() != this.topologySettings.getReplicationFactor();
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            return false;
        }
        Collection<Node> waitForMinimumReplicasActive = waitForMinimumReplicasActive(str);
        Map<TopicPartition, Optional<NewPartitionReassignment>> map = (Map) list2.stream().collect(Collectors.toMap(topicPartitionInfo2 -> {
            return toTopicPartition(str, topicPartitionInfo2);
        }, topicPartitionInfo3 -> {
            return getNewPartitionAssignment(this.topologySettings.getReplicationFactor(), waitForMinimumReplicasActive);
        }));
        doReassignment(str, map);
        return map.size() > 0;
    }

    private TopicPartition toTopicPartition(String str, TopicPartitionInfo topicPartitionInfo) {
        return new TopicPartition(str, topicPartitionInfo.partition());
    }

    @VisibleForTesting
    protected void doReassignment(String str, Map<TopicPartition, Optional<NewPartitionReassignment>> map) throws InterruptedException, ExecutionException {
        LOG.info("Updating replication factor for topic {}", str);
        while (!assignmentsMatch(str, map)) {
            if (requestPartitionReassignment(str, map)) {
                waitForInProgressPartitionReassignments(map.keySet());
            }
        }
    }

    private boolean assignmentsMatch(String str, Map<TopicPartition, Optional<NewPartitionReassignment>> map) throws ExecutionException, InterruptedException {
        List<TopicPartitionInfo> topicPartitions = KafkaAdminUtils.getTopicPartitions(this.adminClient, str);
        for (Map.Entry<TopicPartition, Optional<NewPartitionReassignment>> entry : map.entrySet()) {
            Optional<NewPartitionReassignment> value = entry.getValue();
            if (value.isPresent() && !value.get().targetReplicas().equals(parseReplicas(topicPartitions, entry.getKey()))) {
                return false;
            }
        }
        return true;
    }

    private List<Integer> parseReplicas(List<TopicPartitionInfo> list, TopicPartition topicPartition) {
        for (TopicPartitionInfo topicPartitionInfo : list) {
            if (topicPartitionInfo.partition() == topicPartition.partition()) {
                return (List) topicPartitionInfo.replicas().stream().map((v0) -> {
                    return v0.id();
                }).collect(Collectors.toList());
            }
        }
        return Collections.emptyList();
    }

    @VisibleForTesting
    protected void waitForInProgressPartitionReassignments(Set<TopicPartition> set) throws InterruptedException {
        int i = 0;
        while (((Map) this.adminClient.listPartitionReassignments(set).reassignments().get()).size() != 0) {
            try {
                if (i % 10 == 0) {
                    LOG.info("Waiting for active partition reassignment to complete for {}", this.adminClient.listPartitionReassignments(set).reassignments().get());
                }
                Thread.sleep(1000L);
                i++;
            } catch (ExecutionException e) {
                LOG.warn("Unable to determine if partition reassignment still in progress", e);
                return;
            }
        }
    }

    private Collection<Node> waitForMinimumReplicasActive(String str) throws InterruptedException, ExecutionException {
        Collection<Node> activeNodes = getActiveNodes();
        int i = 30;
        while (!minimumReplicasActive(activeNodes) && i > 0) {
            LOG.info("Waiting for {} brokers to be available to increase replication factor for topic {}", Integer.valueOf(this.topologySettings.getReplicationFactor()), str);
            Thread.sleep(1000L);
            activeNodes = getActiveNodes();
            i--;
        }
        if (i == 0) {
            throw new ExecutionException((Throwable) new UnreachableNodesException());
        }
        return activeNodes;
    }

    @VisibleForTesting
    public Collection<Node> getActiveNodes() {
        return KafkaAdminUtils.getRegisteredBrokers(this.adminClient, 5L, TimeUnit.SECONDS);
    }

    private boolean minimumReplicasActive(Collection<Node> collection) {
        return collection.size() >= this.topologySettings.getReplicationFactor();
    }

    private boolean requestPartitionReassignment(String str, Map<TopicPartition, Optional<NewPartitionReassignment>> map) {
        try {
            this.adminClient.alterPartitionReassignments(map).all().get(30L, TimeUnit.SECONDS);
            return true;
        } catch (InterruptedException e) {
            LOG.warn("Partition reassignment for topic {} interrupted", str, e);
            Thread.currentThread().interrupt();
            return false;
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        } catch (TimeoutException e3) {
            LOG.warn("Partition reassignment for topic {} timed out", str, e3);
            return false;
        }
    }

    private void createTopic(String str, int i) throws InterruptedException, ExecutionException {
        LOG.debug("Creating topic {} partition count {}.", str, Integer.valueOf(i));
        createTopic0(str, i);
        waitForTopicPartitionAvailability(str, i);
    }

    private void createTopic0(String str, int i) throws InterruptedException, ExecutionException {
        Map<String, String> topicProperties = this.kafkaTopicConfiguration.getTopicProperties(str);
        while (true) {
            try {
                NewTopic newTopic = new NewTopic(str, i, (short) Math.min(waitForMinimumReplicasActive(str).size(), this.topologySettings.getReplicationFactor()));
                newTopic.configs(topicProperties);
                this.adminClient.createTopics(ImmutableList.of(newTopic), new CreateTopicsOptions()).all().get();
                return;
            } catch (ExecutionException e) {
                if (e.getCause() instanceof TopicExistsException) {
                    if (getPartitionCount(str) < i) {
                        increasePartitionCountTo(str, i);
                        return;
                    }
                    return;
                } else {
                    if (e.getCause() instanceof UnreachableNodesException) {
                        throw e;
                    }
                    LOG.warn("Error when trying to create topic {}", str, e.getCause());
                    TimeUnit.MILLISECONDS.sleep(CLUSTER_CHECK_INTERVAL_MS);
                }
            }
        }
    }

    private void increasePartitionCountTo(String str, int i) throws InterruptedException {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put(str, NewPartitions.increaseTo(i));
            this.adminClient.createPartitions(hashMap).all().get();
        } catch (InvalidPartitionsException | ExecutionException e) {
            LOG.debug("Could not set partition count to requested {}", Integer.valueOf(i), e);
        }
    }

    public int getPartitionCount(String str) {
        TopicValidationRuntimeException.validate(str);
        try {
            return KafkaAdminUtils.getTopicPartitions(this.adminClient, str).stream().mapToInt((v0) -> {
                return v0.partition();
            }).max().orElse(-1) + 1;
        } catch (InterruptedException | ExecutionException e) {
            LOG.debug("Failed to fetch partition info for {}", str, e);
            return 0;
        }
    }

    private boolean waitForTopicPartitionAvailability(String str, int i) throws InterruptedException {
        while (true) {
            try {
                List<TopicPartitionInfo> topicPartitions = KafkaAdminUtils.getTopicPartitions(this.adminClient, str);
                if (!topicPartitions.isEmpty()) {
                    long count = topicPartitions.stream().mapToInt((v0) -> {
                        return v0.partition();
                    }).filter(i2 -> {
                        return i2 >= 0 && i2 < i;
                    }).count();
                    if (count == i) {
                        return reassignTopicPartitionsIfNecessary(str, topicPartitions);
                    }
                    LOG.debug("Found {} of {} expected partitions for topic {}", new Object[]{Long.valueOf(count), Integer.valueOf(i), str});
                }
            } catch (InterruptedException | ExecutionException e) {
                LOG.debug("Failed to retrieve topic partition info for {}", str, e);
            }
            TimeUnit.MILLISECONDS.sleep(CLUSTER_CHECK_INTERVAL_MS);
        }
    }

    @Deprecated
    public void ensureDeleted(SingleTopicPartitionConsumer singleTopicPartitionConsumer) throws InterruptedException, ExecutionException {
        deleteTopicData(singleTopicPartitionConsumer);
    }

    @Deprecated
    public void ensureDeleted(TopicPartition topicPartition) throws InterruptedException, ExecutionException {
        deleteTopicData(topicPartition);
    }

    @Deprecated
    public void deleteAndRecreate(SingleTopicPartitionConsumer singleTopicPartitionConsumer) throws InterruptedException, ExecutionException {
        deleteTopicData(singleTopicPartitionConsumer);
    }

    @Deprecated
    public void deleteAndRecreate(TopicPartition topicPartition) throws InterruptedException, ExecutionException {
        deleteTopicData(topicPartition);
    }

    public void deleteTopicData(SingleTopicPartitionConsumer singleTopicPartitionConsumer) throws InterruptedException, ExecutionException {
        deleteTopicData(singleTopicPartitionConsumer.getTopicPartition());
        LOG.debug("Clearing committed offset for {} topic consumer.", singleTopicPartitionConsumer.getTopicPartition().topic().intern());
        singleTopicPartitionConsumer.clearCommitSync();
    }

    public void deleteTopicData(TopicPartition topicPartition) throws InterruptedException, ExecutionException {
        TopicPartitionRuntimeException.validateTopicPartition(topicPartition);
        String intern = topicPartition.topic().intern();
        synchronized (intern) {
            if (!topicExists(topicPartition)) {
                LOG.info("Topic {} does not exist", intern);
                return;
            }
            LOG.debug("Retrieving endOffset for topic {} to delete all topic data", intern);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", String.join(",", this.topologySettings.getKafkaHosts()));
            properties.setProperty("client.id", "GetOffsetShell");
            properties.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
            properties.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                long longValue = ((Long) kafkaConsumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition)).longValue();
                LOG.info("Deleting topic data for {} up to offset {}", intern, Long.valueOf(longValue));
                this.adminClient.deleteRecords(ImmutableMap.of(topicPartition, RecordsToDelete.beforeOffset(longValue))).all().get();
                long j = 0;
                while (((Long) kafkaConsumer.beginningOffsets(Collections.singletonList(topicPartition)).get(topicPartition)).longValue() != longValue) {
                    LOG.debug("Waiting for Kafka to delete topic data for {}...", intern);
                    TimeUnit.MILLISECONDS.sleep(RETRY_INTERVAL_MS);
                    j += RETRY_INTERVAL_MS;
                    if (j % LOG_PROGRESS_INTERVAL_MS == 0) {
                        LOG.warn("Waited over {}s for Kafka to delete topic data for {}", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(j)), intern);
                    }
                }
                kafkaConsumer.close();
                LOG.info("Topic {} data deleted", intern);
            } finally {
            }
        }
    }

    @VisibleForTesting
    static Optional<NewPartitionReassignment> getNewPartitionAssignment(int i, Collection<Node> collection) {
        HashMap hashMap = new HashMap();
        for (Node node : collection) {
            ((List) hashMap.computeIfAbsent(node.hasRack() ? node.rack() : "", str -> {
                return new ArrayList();
            })).add(Integer.valueOf(node.id()));
        }
        HashMap hashMap2 = new HashMap();
        hashMap.forEach((str2, list) -> {
            Collections.shuffle(list);
            hashMap2.put(str2, list.iterator());
        });
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList(hashMap2.keySet());
        Collections.shuffle(arrayList2);
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (arrayList.size() >= i) {
                return Optional.of(new NewPartitionReassignment(arrayList));
            }
            Iterator it = (Iterator) hashMap2.get(arrayList2.get(i3));
            if (it.hasNext()) {
                arrayList.add((Integer) it.next());
            }
            i2 = (i3 + 1) % arrayList2.size();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.needCleanup) {
            this.adminClient.close();
        }
    }
}
