/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.provisioning;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaTopicProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer;
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public class KafkaTopicProvisioner
implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>>,
InitializingBean {
    private static final Log logger = LogFactory.getLog(KafkaTopicProvisioner.class);
    private static final int DEFAULT_OPERATION_TIMEOUT = 30;
    private final KafkaBinderConfigurationProperties configurationProperties;
    private final int operationTimeout = 30;
    private final Map<String, Object> adminClientProperties;
    private RetryOperations metadataRetryOperations;

    public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, KafkaProperties kafkaProperties, AdminClientConfigCustomizer adminClientConfigCustomizer) {
        Assert.isTrue((kafkaProperties != null ? 1 : 0) != 0, (String)"KafkaProperties cannot be null");
        this.configurationProperties = kafkaBinderConfigurationProperties;
        this.adminClientProperties = kafkaProperties.buildAdminProperties();
        KafkaTopicProvisioner.normalalizeBootPropsWithBinder(this.adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
        if (adminClientConfigCustomizer != null) {
            adminClientConfigCustomizer.configure(this.adminClientProperties);
        }
    }

    public void setMetadataRetryOperations(RetryOperations metadataRetryOperations) {
        this.metadataRetryOperations = metadataRetryOperations;
    }

    public void afterPropertiesSet() {
        if (this.metadataRetryOperations == null) {
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(10);
            retryTemplate.setRetryPolicy((RetryPolicy)simpleRetryPolicy);
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100L);
            backOffPolicy.setMultiplier(2.0);
            backOffPolicy.setMaxInterval(1000L);
            retryTemplate.setBackOffPolicy((BackOffPolicy)backOffPolicy);
            this.metadataRetryOperations = retryTemplate;
        }
    }

    public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
        if (logger.isInfoEnabled()) {
            logger.info((Object)("Using kafka topic for outbound: " + name));
        }
        KafkaTopicUtils.validateTopicName(name);
        try (AdminClient adminClient = this.createAdminClient();){
            TopicDescription topicDescription;
            this.createTopic(adminClient, name, properties.getPartitionCount(), false, ((KafkaProducerProperties)properties.getExtension()).getTopic());
            int partitions = 0;
            HashMap topicDescriptions = new HashMap();
            if (this.configurationProperties.isAutoCreateTopics()) {
                this.metadataRetryOperations.execute(context -> {
                    try {
                        if (logger.isDebugEnabled()) {
                            logger.debug((Object)("Attempting to retrieve the description for the topic: " + name));
                        }
                        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
                        KafkaFuture all = describeTopicsResult.all();
                        this.getClass();
                        topicDescriptions.putAll((Map)all.get(30L, TimeUnit.SECONDS));
                    }
                    catch (Exception ex) {
                        throw new ProvisioningException("Problems encountered with partitions finding for: " + name, (Throwable)ex);
                    }
                    return null;
                });
            }
            if ((topicDescription = (TopicDescription)topicDescriptions.get(name)) != null) {
                partitions = topicDescription.partitions().size();
            }
            KafkaProducerDestination kafkaProducerDestination = new KafkaProducerDestination(name, partitions);
            return kafkaProducerDestination;
        }
    }

    public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        String[] destinations;
        if (!properties.isMultiplex()) {
            return this.doProvisionConsumerDestination(name, group, properties);
        }
        for (String destination : destinations = StringUtils.commaDelimitedListToStringArray((String)name)) {
            this.doProvisionConsumerDestination(destination.trim(), group, properties);
        }
        return new KafkaConsumerDestination(name);
    }

    private ConsumerDestination doProvisionConsumerDestination(String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
        KafkaConsumerDestination consumerDestination;
        block18: {
            if (((KafkaConsumerProperties)properties.getExtension()).isDestinationIsPattern()) {
                Assert.isTrue((!((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"enableDLQ is not allowed when listening to topic patterns");
                if (logger.isDebugEnabled()) {
                    logger.debug((Object)("Listening to a topic pattern - " + name + " - no provisioning performed"));
                }
                return new KafkaConsumerDestination(name);
            }
            KafkaTopicUtils.validateTopicName(name);
            boolean anonymous = !StringUtils.hasText((String)group);
            Assert.isTrue((!anonymous || !((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() ? 1 : 0) != 0, (String)"DLQ support is not available for anonymous subscriptions");
            if (properties.getInstanceCount() == 0) {
                throw new IllegalArgumentException("Instance count cannot be zero");
            }
            int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
            consumerDestination = new KafkaConsumerDestination(name);
            try (AdminClient adminClient = this.createAdminClient();){
                this.createTopic(adminClient, name, partitionCount, ((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled(), ((KafkaConsumerProperties)properties.getExtension()).getTopic());
                if (!this.configurationProperties.isAutoCreateTopics()) break block18;
                DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
                KafkaFuture all = describeTopicsResult.all();
                try {
                    this.getClass();
                    Map topicDescriptions = (Map)all.get(30L, TimeUnit.SECONDS);
                    TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(name);
                    int partitions = topicDescription.partitions().size();
                    consumerDestination = this.createDlqIfNeedBe(adminClient, name, group, properties, anonymous, partitions);
                    if (consumerDestination == null) {
                        consumerDestination = new KafkaConsumerDestination(name, partitions);
                    }
                }
                catch (Exception ex) {
                    throw new ProvisioningException("Provisioning exception encountered for " + name, (Throwable)ex);
                }
            }
        }
        return consumerDestination;
    }

    AdminClient createAdminClient() {
        return AdminClient.create(this.adminClientProperties);
    }

    public static void normalalizeBootPropsWithBinder(Map<String, Object> adminProps, KafkaProperties bootProps, KafkaBinderConfigurationProperties binderProps) {
        String kafkaConnectionString = binderProps.getKafkaConnectionString();
        if (ObjectUtils.isEmpty((Object)adminProps.get("bootstrap.servers")) || !kafkaConnectionString.equals(binderProps.getDefaultKafkaConnectionString())) {
            adminProps.put("bootstrap.servers", kafkaConnectionString);
        }
        Map<String, String> binderProperties = binderProps.getConfiguration();
        Set adminConfigNames = AdminClientConfig.configNames();
        binderProperties.forEach((key, value) -> {
            String replaced;
            if (key.equals("bootstrap.servers")) {
                throw new IllegalStateException("Set binder bootstrap servers via the 'brokers' property, not 'configuration'");
            }
            if (adminConfigNames.contains(key) && (replaced = adminProps.put((String)key, value)) != null && logger.isDebugEnabled()) {
                logger.debug((Object)("Overrode boot property: [" + key + "], from: [" + replaced + "] to: [" + value + "]"));
            }
        });
    }

    private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties, boolean anonymous, int partitions) {
        if (((KafkaConsumerProperties)properties.getExtension()).isEnableDlq() && !anonymous) {
            String dlqTopic = StringUtils.hasText((String)((KafkaConsumerProperties)properties.getExtension()).getDlqName()) ? ((KafkaConsumerProperties)properties.getExtension()).getDlqName() : "error." + name + "." + group;
            int dlqPartitions = ((KafkaConsumerProperties)properties.getExtension()).getDlqPartitions() == null ? partitions : ((KafkaConsumerProperties)properties.getExtension()).getDlqPartitions();
            try {
                this.createTopicAndPartitions(adminClient, dlqTopic, dlqPartitions, ((KafkaConsumerProperties)properties.getExtension()).isAutoRebalanceEnabled(), ((KafkaConsumerProperties)properties.getExtension()).getTopic());
            }
            catch (Throwable throwable) {
                if (throwable instanceof Error) {
                    throw (Error)throwable;
                }
                throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
            }
            return new KafkaConsumerDestination(name, partitions, dlqTopic);
        }
        return null;
    }

    private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaTopicProperties properties) {
        try {
            this.createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties);
        }
        catch (Throwable throwable) {
            if (throwable instanceof Error) {
                throw (Error)throwable;
            }
            throw new ProvisioningException("Provisioning exception encountered for " + name, throwable);
        }
    }

    private void createTopicIfNecessary(AdminClient adminClient, String topicName, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaTopicProperties properties) throws Throwable {
        if (this.configurationProperties.isAutoCreateTopics()) {
            this.createTopicAndPartitions(adminClient, topicName, partitionCount, tolerateLowerPartitionsOnBroker, properties);
        } else {
            logger.info((Object)"Auto creation of topics is disabled.");
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void createTopicAndPartitions(AdminClient adminClient, String topicName, int partitionCount, boolean tolerateLowerPartitionsOnBroker, KafkaTopicProperties topicProperties) throws Throwable {
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        KafkaFuture namesFutures = listTopicsResult.names();
        this.getClass();
        Set names = (Set)namesFutures.get(30L, TimeUnit.SECONDS);
        if (names.contains(topicName)) {
            if (this.configurationProperties.isAutoAlterTopics()) {
                this.alterTopicConfigsIfNecessary(adminClient, topicName, topicProperties);
            }
            int effectivePartitionCount = this.configurationProperties.isAutoAddPartitions() ? Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount) : partitionCount;
            DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
            KafkaFuture topicDescriptionsFuture = describeTopicsResult.all();
            this.getClass();
            Map topicDescriptions = (Map)topicDescriptionsFuture.get(30L, TimeUnit.SECONDS);
            TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topicName);
            int partitionSize = topicDescription.partitions().size();
            if (partitionSize >= effectivePartitionCount) return;
            if (this.configurationProperties.isAutoAddPartitions()) {
                CreatePartitionsResult partitions = adminClient.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo((int)effectivePartitionCount)));
                KafkaFuture kafkaFuture = partitions.all();
                this.getClass();
                kafkaFuture.get(30L, TimeUnit.SECONDS);
                return;
            } else {
                if (!tolerateLowerPartitionsOnBroker) throw new ProvisioningException("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`");
                logger.warn((Object)("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.There will be " + (effectivePartitionCount - partitionSize) + " idle consumers"));
            }
            return;
        } else {
            int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(), partitionCount);
            this.metadataRetryOperations.execute(context -> {
                Map<Integer, List<Integer>> replicasAssignments = topicProperties.getReplicasAssignments();
                NewTopic newTopic = replicasAssignments != null && replicasAssignments.size() > 0 ? new NewTopic(topicName, topicProperties.getReplicasAssignments()) : new NewTopic(topicName, effectivePartitionCount, topicProperties.getReplicationFactor() != null ? topicProperties.getReplicationFactor().shortValue() : this.configurationProperties.getReplicationFactor());
                if (topicProperties.getProperties().size() > 0) {
                    newTopic.configs(topicProperties.getProperties());
                }
                CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
                try {
                    KafkaFuture kafkaFuture = createTopicsResult.all();
                    this.getClass();
                    kafkaFuture.get(30L, TimeUnit.SECONDS);
                }
                catch (Exception ex) {
                    if (ex instanceof ExecutionException) {
                        if (ex.getCause() instanceof TopicExistsException) {
                            if (logger.isWarnEnabled()) {
                                logger.warn((Object)("Attempt to create topic: " + topicName + ". Topic already exists."));
                            }
                        }
                        logger.error((Object)"Failed to create topics", ex.getCause());
                        throw ex.getCause();
                    }
                    logger.error((Object)"Failed to create topics", ex.getCause());
                    throw ex.getCause();
                }
                return null;
            });
        }
    }

    private void alterTopicConfigsIfNecessary(AdminClient adminClient, String topicName, KafkaTopicProperties topicProperties) throws InterruptedException, ExecutionException, TimeoutException {
        ConfigResource topicConfigResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Collections.singletonList(topicConfigResource));
        KafkaFuture topicConfigurationFuture = describeConfigsResult.all();
        this.getClass();
        Map topicConfigMap = (Map)topicConfigurationFuture.get(30L, TimeUnit.SECONDS);
        Config config = (Config)topicConfigMap.get(topicConfigResource);
        List updatedConfigEntries = topicProperties.getProperties().entrySet().stream().filter(propertiesEntry -> {
            if (config.get((String)propertiesEntry.getKey()) == null) {
                return true;
            }
            return !config.get((String)propertiesEntry.getKey()).value().equals(propertiesEntry.getValue());
        }).map(propertyEntry -> new ConfigEntry((String)propertyEntry.getKey(), (String)propertyEntry.getValue())).map(configEntry -> new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)).collect(Collectors.toList());
        if (!updatedConfigEntries.isEmpty()) {
            if (logger.isDebugEnabled()) {
                logger.debug((Object)("Attempting to alter configs " + updatedConfigEntries + " for the topic:" + topicName));
            }
            HashMap alterConfigForTopics = new HashMap();
            alterConfigForTopics.put(topicConfigResource, updatedConfigEntries);
            AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(alterConfigForTopics);
            KafkaFuture kafkaFuture = alterConfigsResult.all();
            this.getClass();
            kafkaFuture.get(30L, TimeUnit.SECONDS);
        }
    }

    public Collection<PartitionInfo> getPartitionsForTopic(int partitionCount, boolean tolerateLowerPartitionsOnBroker, Callable<Collection<PartitionInfo>> callable, String topicName) {
        try {
            return (Collection)this.metadataRetryOperations.execute(context -> {
                int partitionSize;
                Collection<Object> partitions = Collections.emptyList();
                try {
                    partitions = (Collection)callable.call();
                }
                catch (Exception ex) {
                    if (ex instanceof UnknownTopicOrPartitionException) {
                        throw ex;
                    }
                    logger.error((Object)"Failed to obtain partition information", (Throwable)ex);
                }
                if (CollectionUtils.isEmpty(partitions)) {
                    try (AdminClient adminClient = AdminClient.create(this.adminClientProperties);){
                        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(topicName));
                        describeTopicsResult.all().get();
                    }
                    catch (ExecutionException ex) {
                        if (ex.getCause() instanceof UnknownTopicOrPartitionException) {
                            throw (UnknownTopicOrPartitionException)ex.getCause();
                        }
                        logger.warn((Object)("No partitions have been retrieved for the topic (" + topicName + "). This will affect the health check."));
                    }
                }
                int n = partitionSize = CollectionUtils.isEmpty(partitions) ? 0 : partitions.size();
                if (partitionSize < partitionCount) {
                    if (tolerateLowerPartitionsOnBroker) {
                        logger.warn((Object)("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead.There will be " + (partitionCount - partitionSize) + " idle consumers"));
                    } else {
                        throw new IllegalStateException("The number of expected partitions was: " + partitionCount + ", but " + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead");
                    }
                }
                return partitions;
            });
        }
        catch (Exception ex) {
            logger.error((Object)"Cannot initialize Binder", (Throwable)ex);
            throw new BinderException("Cannot initialize binder:", (Throwable)ex);
        }
    }

    private static final class KafkaConsumerDestination
    implements ConsumerDestination {
        private final String consumerDestinationName;
        private final int partitions;
        private final String dlqName;

        KafkaConsumerDestination(String consumerDestinationName) {
            this(consumerDestinationName, 0, null);
        }

        KafkaConsumerDestination(String consumerDestinationName, int partitions) {
            this(consumerDestinationName, partitions, null);
        }

        KafkaConsumerDestination(String consumerDestinationName, Integer partitions, String dlqName) {
            this.consumerDestinationName = consumerDestinationName;
            this.partitions = partitions;
            this.dlqName = dlqName;
        }

        public String getName() {
            return this.consumerDestinationName;
        }

        public String toString() {
            return "KafkaConsumerDestination{consumerDestinationName='" + this.consumerDestinationName + '\'' + ", partitions=" + this.partitions + ", dlqName='" + this.dlqName + '\'' + '}';
        }
    }

    private static final class KafkaProducerDestination
    implements ProducerDestination {
        private final String producerDestinationName;
        private final int partitions;

        KafkaProducerDestination(String destinationName, Integer partitions) {
            this.producerDestinationName = destinationName;
            this.partitions = partitions;
        }

        public String getName() {
            return this.producerDestinationName;
        }

        public String getNameForPartition(int partition) {
            return this.producerDestinationName;
        }

        public String toString() {
            return "KafkaProducerDestination{producerDestinationName='" + this.producerDestinationName + '\'' + ", partitions=" + this.partitions + '}';
        }
    }
}

