package com.appian.kafka;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/appian/kafka/KafkaTopicManager.class */
public class KafkaTopicManager {
    private static final long MAX_TOPIC_WAIT_TIME_MILLIS = 5000000;
    private final Map<String, KafkaMessageHandler<?>> messageHandlerMap = new ConcurrentHashMap();
    private final Map<String, Producer<Void, ?>> producerMap = new ConcurrentHashMap();
    private final Map<String, List<KafkaConsumerSupport<?>>> consumerSupportMap = new ConcurrentHashMap();
    private final TopicManagerSelector topicManagerSelector;
    private static final Logger LOG = Logger.getLogger(KafkaTopicManager.class);
    private final KafkaProducerFactory producerFactory;

    public KafkaTopicManager(TopicManagerSelector topicManagerSelector, KafkaProducerFactory kafkaProducerFactory, List<RegisteredKafkaTopic<?>> list) {
        this.topicManagerSelector = topicManagerSelector;
        this.producerFactory = kafkaProducerFactory;
        list.forEach(registeredKafkaTopic -> {
            registerTopic(registeredKafkaTopic.getTopicName(), registeredKafkaTopic.getMessageHandler());
        });
    }

    @VisibleForTesting
    <T> void registerTopic(String str, KafkaMessageHandler<T> kafkaMessageHandler) {
        ensureAvailable(str);
        this.messageHandlerMap.put(str, kafkaMessageHandler);
        this.producerMap.put(str, this.producerFactory.createProducer(str, this.topicManagerSelector.selectTopology(str), kafkaMessageHandler));
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x00bc A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:24:0x0002 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void ensureAvailable(java.lang.String r10) {
        /*
            Method dump skipped, instructions count: 266
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.appian.kafka.KafkaTopicManager.ensureAvailable(java.lang.String):void");
    }

    public <T> void sendMessage(T t, String str) throws Exception {
        Producer<Void, ?> producer = this.producerMap.get(str);
        KafkaMessageHandler<?> kafkaMessageHandler = this.messageHandlerMap.get(str);
        if (kafkaMessageHandler == null) {
            throw new NullPointerException("KafkaMessageHandler not defined for topic [" + str + "]");
        }
        if (!kafkaMessageHandler.getSupportedMessageType().isInstance(t)) {
            throw new IllegalArgumentException("Attempted to send a message with message type that doesn't match the registered topic");
        }
        if (producer == null) {
            throw new NullPointerException("Kafka Producer not defined for topic [" + str + "]");
        }
        Future send = producer.send(new ProducerRecord(str, t));
        producer.flush();
        RecordMetadata recordMetadata = (RecordMetadata) send.get();
        LOG.debug(String.format("Kafka Record sent for %s to partition %s in topic %s with offset %d", t, Integer.valueOf(recordMetadata.partition()), str, Long.valueOf(recordMetadata.offset())));
    }

    public <T> void registerBroadcastConsumer(String str, String str2, int i, Supplier<Long> supplier, Supplier<Double> supplier2, KafkaMetricsCollector kafkaMetricsCollector, KafkaConsumerProcessor<T> kafkaConsumerProcessor) {
        KafkaConsumerListener kafkaConsumerListener = new KafkaConsumerListener(str2, supplier, supplier2, kafkaMetricsCollector, false);
        addNewSupport(str, new BroadcastConsumerSupport(str, this.topicManagerSelector.selectTopology(str), i, this.topicManagerSelector.selectTopicManager(str), getAndValidateMessageHandler(str, kafkaConsumerProcessor), kafkaConsumerProcessor, kafkaConsumerListener));
    }

    public <T> void registerQueueConsumer(String str, String str2, int i, Supplier<Long> supplier, Supplier<Double> supplier2, KafkaMetricsCollector kafkaMetricsCollector, KafkaConsumerProcessor<T> kafkaConsumerProcessor, String str3) {
        KafkaConsumerListener kafkaConsumerListener = new KafkaConsumerListener(str2, supplier, supplier2, kafkaMetricsCollector);
        addNewSupport(str, new QueueConsumerSupport(str, this.topicManagerSelector.selectTopology(str), i, this.topicManagerSelector.selectTopicManager(str), getAndValidateMessageHandler(str, kafkaConsumerProcessor), kafkaConsumerProcessor, kafkaConsumerListener, str3));
    }

    public TopicManagerSelector getTopicManagerSelector() {
        return this.topicManagerSelector;
    }

    private <T> KafkaMessageHandler<T> getAndValidateMessageHandler(String str, KafkaConsumerProcessor<T> kafkaConsumerProcessor) {
        KafkaMessageHandler<T> kafkaMessageHandler = (KafkaMessageHandler) this.messageHandlerMap.get(str);
        if (kafkaMessageHandler.getSupportedMessageType().equals(kafkaConsumerProcessor.getSupportedMessageType())) {
            return kafkaMessageHandler;
        }
        throw new IllegalArgumentException("Attempted to register consumer with message type that doesn't match the registered topic");
    }

    private <T> void addNewSupport(String str, KafkaConsumerSupport<T> kafkaConsumerSupport) {
        this.consumerSupportMap.compute(str, (str2, list) -> {
            if (list == null) {
                list = new ArrayList();
            }
            list.add(kafkaConsumerSupport);
            return list;
        });
    }
}
