/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.Timer;
import java.io.OutputStream;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.consumer.Blacklist;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.TopicFilter;
import kafka.consumer.Whitelist;
import kafka.consumer.ZookeeperConsumerConnector;
import kafka.javaapi.consumer.ConsumerRebalanceListener;
import kafka.metrics.KafkaMetricsGroup;
import kafka.metrics.KafkaMetricsGroup$class;
import kafka.tools.MirrorMaker;
import kafka.tools.MirrorMaker$defaultMirrorMakerMessageHandler$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.CoreUtils$;
import kafka.utils.Log4jController$;
import kafka.utils.Logging$class;
import kafka.utils.ZKConfig$;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.RichInt$;
import scala.sys.package$;
import scala.util.control.ControlThrowable;

/*
 * Duplicate member names - consider using --renamedupmembers true
 */
public final class MirrorMaker$
implements KafkaMetricsGroup {
    public static final MirrorMaker$ MODULE$;
    private MirrorMaker.MirrorMakerProducer producer;
    private Seq<MirrorMaker.MirrorMakerThread> mirrorMakerThreads;
    private final AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown;
    private final AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages;
    private MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler;
    private int kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    private boolean kafka$tools$MirrorMaker$$abortOnSendFailure;
    private volatile boolean kafka$tools$MirrorMaker$$exitingOnSendFailure;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    private volatile boolean bitmap$0;

    static {
        new MirrorMaker$();
    }

    @Override
    public MetricName metricName(String name, Map<String, String> tags) {
        return KafkaMetricsGroup$class.metricName(this, name, tags);
    }

    @Override
    public MetricName explicitMetricName(String group, String typeName, String name, Map<String, String> tags) {
        return KafkaMetricsGroup$class.explicitMetricName(this, group, typeName, name, tags);
    }

    @Override
    public <T> Gauge<T> newGauge(String name, Gauge<T> metric, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newGauge(this, name, metric, tags);
    }

    @Override
    public Meter newMeter(String name, String eventType, TimeUnit timeUnit, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newMeter(this, name, eventType, timeUnit, tags);
    }

    @Override
    public Histogram newHistogram(String name, boolean biased, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newHistogram(this, name, biased, tags);
    }

    @Override
    public Timer newTimer(String name, TimeUnit durationUnit, TimeUnit rateUnit, Map<String, String> tags) {
        return KafkaMetricsGroup$class.newTimer(this, name, durationUnit, rateUnit, tags);
    }

    @Override
    public void removeMetric(String name, Map<String, String> tags) {
        KafkaMetricsGroup$class.removeMetric(this, name, tags);
    }

    @Override
    public <T> Map<String, String> newGauge$default$3() {
        return KafkaMetricsGroup$class.newGauge$default$3(this);
    }

    @Override
    public Map<String, String> newMeter$default$4() {
        return KafkaMetricsGroup$class.newMeter$default$4(this);
    }

    @Override
    public Map<String, String> removeMetric$default$2() {
        return KafkaMetricsGroup$class.removeMetric$default$2(this);
    }

    @Override
    public Map<String, String> newTimer$default$4() {
        return KafkaMetricsGroup$class.newTimer$default$4(this);
    }

    @Override
    public boolean newHistogram$default$2() {
        return KafkaMetricsGroup$class.newHistogram$default$2(this);
    }

    @Override
    public Map<String, String> newHistogram$default$3() {
        return KafkaMetricsGroup$class.newHistogram$default$3(this);
    }

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    private Logger logger$lzycompute() {
        MirrorMaker$ mirrorMaker$ = this;
        synchronized (mirrorMaker$) {
            if (!this.bitmap$0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 = true;
            }
            return this.logger;
        }
    }

    @Override
    public Logger logger() {
        return this.bitmap$0 ? this.logger : this.logger$lzycompute();
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    @Override
    public Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String x$1) {
        this.loggerName = x$1;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ x$1) {
        this.kafka$utils$Logging$$log4jController = x$1;
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging$class.msgWithLogIdent(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging$class.trace(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging$class.swallowTrace(this, action);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging$class.isDebugEnabled(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging$class.isTraceEnabled(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging$class.debug(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging$class.swallowDebug(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging$class.info(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging$class.swallowInfo(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging$class.warn(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging$class.swallowWarn(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging$class.swallow(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging$class.error(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging$class.swallowError(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging$class.fatal(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    public MirrorMaker.MirrorMakerProducer producer() {
        return this.producer;
    }

    public void producer_$eq(MirrorMaker.MirrorMakerProducer x$1) {
        this.producer = x$1;
    }

    private Seq<MirrorMaker.MirrorMakerThread> mirrorMakerThreads() {
        return this.mirrorMakerThreads;
    }

    private void mirrorMakerThreads_$eq(Seq<MirrorMaker.MirrorMakerThread> x$1) {
        this.mirrorMakerThreads = x$1;
    }

    public AtomicBoolean kafka$tools$MirrorMaker$$isShuttingDown() {
        return this.kafka$tools$MirrorMaker$$isShuttingDown;
    }

    public AtomicInteger kafka$tools$MirrorMaker$$numDroppedMessages() {
        return this.kafka$tools$MirrorMaker$$numDroppedMessages;
    }

    public MirrorMaker.MirrorMakerMessageHandler kafka$tools$MirrorMaker$$messageHandler() {
        return this.kafka$tools$MirrorMaker$$messageHandler;
    }

    private void kafka$tools$MirrorMaker$$messageHandler_$eq(MirrorMaker.MirrorMakerMessageHandler x$1) {
        this.kafka$tools$MirrorMaker$$messageHandler = x$1;
    }

    public int kafka$tools$MirrorMaker$$offsetCommitIntervalMs() {
        return this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs;
    }

    private void kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq(int x$1) {
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$abortOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$abortOnSendFailure;
    }

    private void kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(boolean x$1) {
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = x$1;
    }

    public boolean kafka$tools$MirrorMaker$$exitingOnSendFailure() {
        return this.kafka$tools$MirrorMaker$$exitingOnSendFailure;
    }

    public void kafka$tools$MirrorMaker$$exitingOnSendFailure_$eq(boolean x$1) {
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = x$1;
    }

    public void main(String[] args) {
        Throwable throwable2;
        block16: {
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Starting mirror maker";
                }
            });
            try {
                Seq<MirrorMaker.MirrorMakerBaseConsumer> seq;
                OptionParser parser = new OptionParser(false);
                ArgumentAcceptingOptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Embedded consumer config for consuming from the source cluster.").withRequiredArg().describedAs("config file").ofType(String.class);
                OptionSpecBuilder useNewConsumerOpt = parser.accepts("new.consumer", "Use new consumer in mirror maker (this is the default).");
                ArgumentAcceptingOptionSpec producerConfigOpt = parser.accepts("producer.config", "Embedded producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
                ArgumentAcceptingOptionSpec numStreamsOpt = parser.accepts("num.streams", "Number of consumption streams.").withRequiredArg().describedAs("Number of threads").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
                ArgumentAcceptingOptionSpec whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to mirror.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
                ArgumentAcceptingOptionSpec blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to mirror. Only old consumer supports blacklist.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
                ArgumentAcceptingOptionSpec offsetCommitIntervalMsOpt = parser.accepts("offset.commit.interval.ms", "Offset commit interval in ms.").withRequiredArg().describedAs("offset commit interval in millisecond").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(60000), (Object[])new Integer[0]);
                ArgumentAcceptingOptionSpec consumerRebalanceListenerOpt = parser.accepts("consumer.rebalance.listener", "The consumer rebalance listener to use for mirror maker consumer.").withRequiredArg().describedAs("A custom rebalance listener of type ConsumerRebalanceListener").ofType(String.class);
                ArgumentAcceptingOptionSpec rebalanceListenerArgsOpt = parser.accepts("rebalance.listener.args", "Arguments used by custom rebalance listener for mirror maker consumer.").withRequiredArg().describedAs("Arguments passed to custom rebalance listener constructor as a string.").ofType(String.class);
                ArgumentAcceptingOptionSpec messageHandlerOpt = parser.accepts("message.handler", "Message handler which will process every record in-between consumer and producer.").withRequiredArg().describedAs("A custom message handler of type MirrorMakerMessageHandler").ofType(String.class);
                ArgumentAcceptingOptionSpec messageHandlerArgsOpt = parser.accepts("message.handler.args", "Arguments used by custom message handler for mirror maker.").withRequiredArg().describedAs("Arguments passed to message handler constructor.").ofType(String.class);
                ArgumentAcceptingOptionSpec abortOnSendFailureOpt = parser.accepts("abort.on.send.failure", "Configure the mirror maker to exit on a failed send.").withRequiredArg().describedAs("Stop the entire mirror maker when a send failure occurs").ofType(String.class).defaultsTo((Object)"true", (Object[])new String[0]);
                OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message.");
                if (args.length == 0) {
                    throw CommandLineUtils$.MODULE$.printUsageAndDie(parser, "Continuously copy data between two Kafka clusters.");
                }
                OptionSet options = parser.parse(args);
                if (options.has((OptionSpec)helpOpt)) {
                    parser.printHelpOn((OutputStream)System.out);
                    throw package$.MODULE$.exit(0);
                }
                CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (Seq<OptionSpec<?>>)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{consumerConfigOpt, producerConfigOpt}));
                Properties consumerProps = Utils.loadProps((String)((String)options.valueOf((OptionSpec)consumerConfigOpt)));
                boolean useOldConsumer = consumerProps.containsKey(ZKConfig$.MODULE$.ZkConnectProp());
                if (useOldConsumer) {
                    if (options.has((OptionSpec)useNewConsumerOpt)) {
                        this.error((Function0<String>)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final String apply() {
                                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The consumer configuration parameter `", "` is not valid when using --new.consumer"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{ZKConfig$.MODULE$.ZkConnectProp()}));
                            }
                        });
                        throw package$.MODULE$.exit(1);
                    }
                    if (consumerProps.containsKey("bootstrap.servers")) {
                        this.error((Function0<String>)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final String apply() {
                                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The configuration parameters `", "` (old consumer) and "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{ZKConfig$.MODULE$.ZkConnectProp()}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"`", "` (new consumer) cannot be used together."})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"bootstrap.servers"}))).toString();
                            }
                        });
                        throw package$.MODULE$.exit(1);
                    }
                    if (List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new ArgumentAcceptingOptionSpec[]{whitelistOpt, blacklistOpt})).count((Function1)new Serializable(options){
                        public static final long serialVersionUID = 0L;
                        private final OptionSet options$1;

                        public final boolean apply(OptionSpec<?> x$1) {
                            return this.options$1.has(x$1);
                        }
                        {
                            this.options$1 = options$1;
                        }
                    }) != 1) {
                        this.error((Function0<String>)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final String apply() {
                                return "Exactly one of whitelist or blacklist is required.";
                            }
                        });
                        throw package$.MODULE$.exit(1);
                    }
                } else {
                    if (options.has((OptionSpec)blacklistOpt)) {
                        this.error((Function0<String>)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final String apply() {
                                return "blacklist can not be used when using new consumer in mirror maker. Use whitelist instead.";
                            }
                        });
                        throw package$.MODULE$.exit(1);
                    }
                    if (!options.has((OptionSpec)whitelistOpt)) {
                        this.error((Function0<String>)new Serializable(){
                            public static final long serialVersionUID = 0L;

                            public final String apply() {
                                return "whitelist must be specified when using new consumer in mirror maker.";
                            }
                        });
                        throw package$.MODULE$.exit(1);
                    }
                    if (!consumerProps.containsKey("partition.assignment.strategy")) {
                        System.err.println("WARNING: The default partition assignment strategy of the new-consumer-based mirror maker will change from 'range' to 'roundrobin' in an upcoming release (so that better load balancing can be achieved). If you prefer to make this switch in advance of that release add the following to the corresponding new-consumer config: 'partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor'");
                    }
                }
                this.kafka$tools$MirrorMaker$$abortOnSendFailure_$eq(new StringOps(Predef$.MODULE$.augmentString((String)options.valueOf((OptionSpec)abortOnSendFailureOpt))).toBoolean());
                this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs_$eq((Integer)options.valueOf((OptionSpec)offsetCommitIntervalMsOpt));
                int numStreams = (Integer)options.valueOf((OptionSpec)numStreamsOpt);
                Runtime.getRuntime().addShutdownHook(new Thread(){

                    public void run() {
                        MirrorMaker$.MODULE$.cleanShutdown();
                    }
                });
                Properties producerProps = Utils.loadProps((String)((String)options.valueOf((OptionSpec)producerConfigOpt)));
                boolean sync = producerProps.getProperty("producer.type", "async").equals("sync");
                producerProps.remove("producer.type");
                this.maybeSetDefaultProperty(producerProps, "retries", ((Object)BoxesRunTime.boxToInteger((int)Integer.MAX_VALUE)).toString());
                this.maybeSetDefaultProperty(producerProps, "max.block.ms", ((Object)BoxesRunTime.boxToLong((long)Long.MAX_VALUE)).toString());
                this.maybeSetDefaultProperty(producerProps, "acks", "all");
                this.maybeSetDefaultProperty(producerProps, "max.in.flight.requests.per.connection", "1");
                producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
                producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
                this.producer_$eq(new MirrorMaker.MirrorMakerProducer(sync, producerProps));
                if (useOldConsumer) {
                    String rebalanceListenerArgs;
                    String customRebalanceListenerClass = (String)options.valueOf((OptionSpec)consumerRebalanceListenerOpt);
                    None$ customRebalanceListener = customRebalanceListenerClass == null ? None$.MODULE$ : ((rebalanceListenerArgs = (String)options.valueOf((OptionSpec)rebalanceListenerArgsOpt)) == null ? new Some(CoreUtils$.MODULE$.createObject(customRebalanceListenerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[0]))) : new Some(CoreUtils$.MODULE$.createObject(customRebalanceListenerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[]{rebalanceListenerArgs}))));
                    seq = this.createOldConsumers(numStreams, consumerProps, (Option<ConsumerRebalanceListener>)customRebalanceListener, (Option<String>)Option$.MODULE$.apply(options.valueOf((OptionSpec)whitelistOpt)), (Option<String>)Option$.MODULE$.apply(options.valueOf((OptionSpec)blacklistOpt)));
                } else {
                    String rebalanceListenerArgs;
                    String customRebalanceListenerClass = (String)options.valueOf((OptionSpec)consumerRebalanceListenerOpt);
                    None$ customRebalanceListener = customRebalanceListenerClass == null ? None$.MODULE$ : ((rebalanceListenerArgs = (String)options.valueOf((OptionSpec)rebalanceListenerArgsOpt)) == null ? new Some(CoreUtils$.MODULE$.createObject(customRebalanceListenerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[0]))) : new Some(CoreUtils$.MODULE$.createObject(customRebalanceListenerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[]{rebalanceListenerArgs}))));
                    seq = this.createNewConsumers(numStreams, consumerProps, (Option<org.apache.kafka.clients.consumer.ConsumerRebalanceListener>)customRebalanceListener, (Option<String>)Option$.MODULE$.apply(options.valueOf((OptionSpec)whitelistOpt)));
                }
                Seq<MirrorMaker.MirrorMakerBaseConsumer> mirrorMakerConsumers = seq;
                this.mirrorMakerThreads_$eq((Seq<MirrorMaker.MirrorMakerThread>)((Seq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1)new Serializable(mirrorMakerConsumers){
                    public static final long serialVersionUID = 0L;
                    private final Seq mirrorMakerConsumers$1;

                    public final MirrorMaker.MirrorMakerThread apply(int i) {
                        return new MirrorMaker.MirrorMakerThread((MirrorMaker.MirrorMakerBaseConsumer)this.mirrorMakerConsumers$1.apply(i), i);
                    }
                    {
                        this.mirrorMakerConsumers$1 = mirrorMakerConsumers$1;
                    }
                }, IndexedSeq$.MODULE$.canBuildFrom())));
                String customMessageHandlerClass = (String)options.valueOf((OptionSpec)messageHandlerOpt);
                String messageHandlerArgs = (String)options.valueOf((OptionSpec)messageHandlerArgsOpt);
                this.kafka$tools$MirrorMaker$$messageHandler_$eq(customMessageHandlerClass == null ? MirrorMaker$defaultMirrorMakerMessageHandler$.MODULE$ : (messageHandlerArgs == null ? (MirrorMaker.MirrorMakerMessageHandler)CoreUtils$.MODULE$.createObject(customMessageHandlerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[0])) : (MirrorMaker.MirrorMakerMessageHandler)CoreUtils$.MODULE$.createObject(customMessageHandlerClass, (Seq<Object>)Predef$.MODULE$.wrapRefArray(new Object[]{messageHandlerArgs}))));
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                if (throwable3 instanceof ControlThrowable) {
                    ControlThrowable controlThrowable = (ControlThrowable)throwable3;
                    throw (Throwable)controlThrowable;
                }
                if (throwable3 == null) break block16;
                Throwable throwable4 = throwable3;
                this.error((Function0<String>)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final String apply() {
                        return "Exception when starting mirror maker.";
                    }
                }, (Function0<Throwable>)new Serializable(throwable4){
                    public static final long serialVersionUID = 0L;
                    private final Throwable x8$1;

                    public final Throwable apply() {
                        return this.x8$1;
                    }
                    {
                        this.x8$1 = x8$1;
                    }
                });
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(MirrorMaker.MirrorMakerThread x$1) {
                    x$1.start();
                }
            });
            this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final void apply(MirrorMaker.MirrorMakerThread x$2) {
                    x$2.awaitShutdown();
                }
            });
            return;
        }
        throw throwable2;
    }

    private Seq<MirrorMaker.MirrorMakerBaseConsumer> createOldConsumers(int numStreams, Properties consumerConfigProps, Option<ConsumerRebalanceListener> customRebalanceListener, Option<String> whitelist, Option<String> blacklist) {
        block4: {
            TopicFilter topicFilter;
            IndexedSeq connectors;
            block3: {
                block2: {
                    this.maybeSetDefaultProperty(consumerConfigProps, "auto.commit.enable", "false");
                    this.maybeSetDefaultProperty(consumerConfigProps, "consumer.timeout.ms", "10000");
                    String groupIdString = consumerConfigProps.getProperty("group.id");
                    connectors = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1)new Serializable(consumerConfigProps, groupIdString){
                        public static final long serialVersionUID = 0L;
                        private final Properties consumerConfigProps$1;
                        private final String groupIdString$1;

                        public final ZookeeperConsumerConnector apply(int i) {
                            this.consumerConfigProps$1.setProperty("client.id", new StringBuilder().append((Object)this.groupIdString$1).append((Object)"-").append((Object)((Object)BoxesRunTime.boxToInteger((int)i)).toString()).toString());
                            ConsumerConfig consumerConfig = new ConsumerConfig(this.consumerConfigProps$1);
                            return new ZookeeperConsumerConnector(consumerConfig);
                        }
                        {
                            this.consumerConfigProps$1 = consumerConfigProps$1;
                            this.groupIdString$1 = groupIdString$1;
                        }
                    }, IndexedSeq$.MODULE$.canBuildFrom());
                    if (!whitelist.isDefined()) break block2;
                    topicFilter = new Whitelist((String)whitelist.get());
                    break block3;
                }
                if (!blacklist.isDefined()) break block4;
                topicFilter = new Blacklist((String)blacklist.get());
            }
            TopicFilter filterSpec = topicFilter;
            return (Seq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1)new Serializable(customRebalanceListener, connectors, filterSpec){
                public static final long serialVersionUID = 0L;
                private final Option customRebalanceListener$1;
                private final IndexedSeq connectors$1;
                private final TopicFilter filterSpec$1;

                /*
                 * WARNING - void declaration
                 */
                public final MirrorMaker.MirrorMakerOldConsumer apply(int i) {
                    void var2_2;
                    MirrorMaker.MirrorMakerOldConsumer consumer = new MirrorMaker.MirrorMakerOldConsumer((ZookeeperConsumerConnector)this.connectors$1.apply(i), this.filterSpec$1);
                    MirrorMaker.InternalRebalanceListenerForOldConsumer consumerRebalanceListener = new MirrorMaker.InternalRebalanceListenerForOldConsumer(consumer, (Option<ConsumerRebalanceListener>)this.customRebalanceListener$1);
                    ((ZookeeperConsumerConnector)this.connectors$1.apply(i)).setConsumerRebalanceListener(consumerRebalanceListener);
                    return var2_2;
                }
                {
                    this.customRebalanceListener$1 = customRebalanceListener$1;
                    this.connectors$1 = connectors$1;
                    this.filterSpec$1 = filterSpec$1;
                }
            }, IndexedSeq$.MODULE$.canBuildFrom());
        }
        throw new IllegalArgumentException("Either whitelist or blacklist should be defined!");
    }

    public Seq<MirrorMaker.MirrorMakerBaseConsumer> createNewConsumers(int numStreams, Properties consumerConfigProps, Option<org.apache.kafka.clients.consumer.ConsumerRebalanceListener> customRebalanceListener, Option<String> whitelist) {
        this.maybeSetDefaultProperty(consumerConfigProps, "enable.auto.commit", "false");
        consumerConfigProps.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfigProps.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
        String groupIdString = consumerConfigProps.getProperty("group.id");
        IndexedSeq consumers = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numStreams).map((Function1)new Serializable(consumerConfigProps, groupIdString){
            public static final long serialVersionUID = 0L;
            private final Properties consumerConfigProps$2;
            private final String groupIdString$2;

            public final KafkaConsumer<byte[], byte[]> apply(int i) {
                this.consumerConfigProps$2.setProperty("client.id", new StringBuilder().append((Object)this.groupIdString$2).append((Object)"-").append((Object)((Object)BoxesRunTime.boxToInteger((int)i)).toString()).toString());
                return new KafkaConsumer(this.consumerConfigProps$2);
            }
            {
                this.consumerConfigProps$2 = consumerConfigProps$2;
                this.groupIdString$2 = groupIdString$2;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
        whitelist.getOrElse((Function0)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Nothing$ apply() {
                throw new IllegalArgumentException("White list cannot be empty for new consumer");
            }
        });
        return (Seq)consumers.map((Function1)new Serializable(customRebalanceListener, whitelist){
            public static final long serialVersionUID = 0L;
            private final Option customRebalanceListener$2;
            private final Option whitelist$1;

            public final MirrorMaker.MirrorMakerNewConsumer apply(KafkaConsumer<byte[], byte[]> consumer) {
                return new MirrorMaker.MirrorMakerNewConsumer((Consumer<byte[], byte[]>)consumer, (Option<org.apache.kafka.clients.consumer.ConsumerRebalanceListener>)this.customRebalanceListener$2, (Option<String>)this.whitelist$1);
            }
            {
                this.customRebalanceListener$2 = customRebalanceListener$2;
                this.whitelist$1 = whitelist$1;
            }
        }, IndexedSeq$.MODULE$.canBuildFrom());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void commitOffsets(MirrorMaker.MirrorMakerBaseConsumer mirrorMakerConsumer) {
        if (this.kafka$tools$MirrorMaker$$exitingOnSendFailure()) {
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Exiting on send failure, skip committing offsets.";
                }
            });
            return;
        }
        this.trace((Function0<String>)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply() {
                return "Committing offsets.";
            }
        });
        try {
            mirrorMakerConsumer.commit();
            return;
        }
        catch (CommitFailedException commitFailedException) {
            this.warn((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return new StringBuilder().append((Object)"Failed to commit offsets because the consumer group has rebalanced and assigned partitions to another instance. If you see this regularly, it could indicate that you need to either increase ").append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"the consumer's ", " or reduce the number of records "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"session.timeout.ms"}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"handled on each iteration with ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"max.poll.records"}))).toString();
                }
            });
            return;
        }
        catch (WakeupException wakeupException) {
            mirrorMakerConsumer.commit();
            throw wakeupException;
        }
    }

    public void cleanShutdown() {
        if (this.kafka$tools$MirrorMaker$$isShuttingDown().compareAndSet(false, true)) {
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Start clean shutdown.";
                }
            });
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Shutting down consumer threads.";
                }
            });
            if (this.mirrorMakerThreads() != null) {
                this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final void apply(MirrorMaker.MirrorMakerThread x$3) {
                        x$3.shutdown();
                    }
                });
                this.mirrorMakerThreads().foreach((Function1)new Serializable(){
                    public static final long serialVersionUID = 0L;

                    public final void apply(MirrorMaker.MirrorMakerThread x$4) {
                        x$4.awaitShutdown();
                    }
                });
            }
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Closing producer.";
                }
            });
            this.producer().close();
            this.info((Function0<String>)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return "Kafka mirror maker shutdown successfully";
                }
            });
        }
    }

    private void maybeSetDefaultProperty(Properties properties, String propertyName, String defaultValue) {
        String propertyValue = properties.getProperty(propertyName);
        properties.setProperty(propertyName, (String)Option$.MODULE$.apply((Object)propertyValue).getOrElse((Function0)new Serializable(defaultValue){
            public static final long serialVersionUID = 0L;
            private final String defaultValue$1;

            public final String apply() {
                return this.defaultValue$1;
            }
            {
                this.defaultValue$1 = defaultValue$1;
            }
        }));
        String string = properties.getProperty(propertyName);
        String string2 = defaultValue;
        if (string == null ? string2 != null : !string.equals(string2)) {
            this.info((Function0<String>)new Serializable(propertyName, propertyValue){
                public static final long serialVersionUID = 0L;
                private final String propertyName$1;
                private final String propertyValue$1;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Property %s is overridden to %s - data loss or message reordering is possible.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.propertyName$1, this.propertyValue$1}));
                }
                {
                    this.propertyName$1 = propertyName$1;
                    this.propertyValue$1 = propertyValue$1;
                }
            });
        }
    }

    private MirrorMaker$() {
        MODULE$ = this;
        Logging$class.$init$(this);
        KafkaMetricsGroup$class.$init$(this);
        this.producer = null;
        this.mirrorMakerThreads = null;
        this.kafka$tools$MirrorMaker$$isShuttingDown = new AtomicBoolean(false);
        this.kafka$tools$MirrorMaker$$numDroppedMessages = new AtomicInteger(0);
        this.kafka$tools$MirrorMaker$$messageHandler = null;
        this.kafka$tools$MirrorMaker$$offsetCommitIntervalMs = 0;
        this.kafka$tools$MirrorMaker$$abortOnSendFailure = true;
        this.kafka$tools$MirrorMaker$$exitingOnSendFailure = false;
        this.newGauge("MirrorMaker-numDroppedMessages", new Gauge<Object>(){

            public int value() {
                return MirrorMaker$.MODULE$.kafka$tools$MirrorMaker$$numDroppedMessages().get();
            }
        }, this.newGauge$default$3());
    }
}

