/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;

public class StreamThreadStateStoreProvider {
    private final StreamThread streamThread;
    private final InternalTopologyBuilder internalTopologyBuilder;

    public StreamThreadStateStoreProvider(StreamThread streamThread, InternalTopologyBuilder internalTopologyBuilder) {
        this.streamThread = streamThread;
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    public <T> List<T> stores(StoreQueryParameters storeQueryParams) {
        String storeName = storeQueryParams.storeName();
        QueryableStoreType queryableStoreType = storeQueryParams.queryableStoreType();
        TaskId keyTaskId = this.createKeyTaskId(storeName, storeQueryParams.partition());
        if (this.streamThread.state() == StreamThread.State.DEAD) {
            return Collections.emptyList();
        }
        StreamThread.State state = this.streamThread.state();
        if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) {
            Map<TaskId, Task> tasks = storeQueryParams.staleStoresEnabled() ? this.streamThread.allTasks() : this.streamThread.activeTaskMap();
            ArrayList stores = new ArrayList();
            if (keyTaskId != null) {
                Task task = tasks.get(keyTaskId);
                if (task == null) {
                    return Collections.emptyList();
                }
                Object store = this.validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId);
                if (store != null) {
                    return Collections.singletonList(store);
                }
            } else {
                for (Task streamTask : tasks.values()) {
                    Object store = this.validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id());
                    if (store == null) continue;
                    stores.add(store);
                }
            }
            return stores;
        }
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + state + ", not RUNNING" + (storeQueryParams.staleStoresEnabled() ? " or REBALANCING" : ""));
    }

    private <T> T validateAndListStores(StateStore store, QueryableStoreType<T> queryableStoreType, String storeName, TaskId taskId) {
        if (store != null && queryableStoreType.accepts(store)) {
            if (!store.isOpen()) {
                throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + taskId + " because the store is not open. The state store may have migrated to another instances.");
            }
            if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                return (T)new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore)store);
            }
            if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                return (T)new ReadOnlyWindowStoreFacade((TimestampedWindowStore)store);
            }
            return (T)store;
        }
        return null;
    }

    private TaskId createKeyTaskId(String storeName, Integer partition) {
        if (partition == null) {
            return null;
        }
        List<String> sourceTopics = this.internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName);
        HashSet<String> sourceTopicsSet = new HashSet<String>(sourceTopics);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = this.internalTopologyBuilder.topicGroups();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> topicGroup : topicGroups.entrySet()) {
            if (!topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) continue;
            return new TaskId(topicGroup.getKey(), partition);
        }
        throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + partition + " is not available on this instance");
    }
}

