/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.discovery;

import com.google.common.collect.ImmutableMap;
import java.io.Closeable;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.Map;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.impl.discovery.State;
import org.apache.sling.distribution.journal.impl.discovery.TopologyChangeHandler;
import org.apache.sling.distribution.journal.impl.discovery.TopologyView;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewDiff;
import org.apache.sling.distribution.journal.impl.discovery.TopologyViewManager;
import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
import org.apache.sling.distribution.journal.messages.LogMessage;
import org.apache.sling.distribution.journal.messages.SubscriberConfig;
import org.apache.sling.distribution.journal.messages.SubscriberState;
import org.apache.sling.distribution.journal.shared.AgentId;
import org.apache.sling.distribution.journal.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.shared.Topics;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
@Component(service={DiscoveryService.class})
public class DiscoveryService
implements Runnable {
    public static final String KEY_MESSAGE = "message";
    public static final String TOPIC_DISTRIBUTION_LOG = "org/apache/sling/distribution/journal/log";
    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryService.class);
    private static final long REFRESH_TTL_MS = 30000L;
    @Reference
    private JournalAvailable journalAvailable;
    @Reference
    private PublisherConfigurationAvailable publisherConfigurationAvailable;
    @Reference
    private MessagingProvider messagingProvider;
    @Reference
    private Topics topics;
    @Reference(policyOption=ReferencePolicyOption.GREEDY, cardinality=ReferenceCardinality.OPTIONAL)
    private volatile TopologyChangeHandler topologyChangeHandler;
    @Reference
    private EventAdmin eventAdmin;
    private volatile ServiceRegistration<?> reg;
    private final TopologyViewManager viewManager = new TopologyViewManager(30000L);
    private Closeable poller;

    public DiscoveryService() {
    }

    public DiscoveryService(MessagingProvider messagingProvider, TopologyChangeHandler topologyChangeHandler, Topics topics, EventAdmin eventAdmin) {
        this.messagingProvider = messagingProvider;
        this.topologyChangeHandler = topologyChangeHandler;
        this.topics = topics;
        this.eventAdmin = eventAdmin;
    }

    @Activate
    public void activate(BundleContext context) {
        this.poller = this.messagingProvider.createPoller(this.topics.getDiscoveryTopic(), Reset.latest, new HandlerAdapter[]{HandlerAdapter.create(DiscoveryMessage.class, this::handleDiscovery), HandlerAdapter.create(LogMessage.class, this::handleLog)});
        this.startTopologyViewUpdaterTask(context);
        LOG.info("Discovery service started");
    }

    @Deactivate
    public void deactivate() {
        if (this.reg != null) {
            this.reg.unregister();
        }
        IOUtils.closeQuietly((Closeable)this.poller);
        LOG.info("Discovery service stopped");
    }

    public TopologyView getTopologyView() {
        return this.viewManager.getCurrentView();
    }

    @Override
    public void run() {
        TopologyView oldView = this.viewManager.updateView();
        TopologyView newView = this.viewManager.getCurrentView();
        this.handleChanges(newView, oldView);
    }

    private void handleChanges(TopologyView newView, TopologyView oldView) {
        if (!newView.equals(oldView)) {
            String msg = String.format("TopologyView changed from %s to %s", oldView, newView);
            TopologyViewDiff diffView = new TopologyViewDiff(oldView, newView);
            if (diffView.subscribedAgentsChanged()) {
                LOG.info(msg);
            } else {
                LOG.debug(msg);
            }
            TopologyChangeHandler handler = this.topologyChangeHandler;
            if (handler != null) {
                handler.changed(diffView);
            }
        }
    }

    private void startTopologyViewUpdaterTask(BundleContext context) {
        Hashtable<String, Comparable<Boolean>> props = new Hashtable<String, Comparable<Boolean>>();
        ((Dictionary)props).put("scheduler.concurrent", false);
        ((Dictionary)props).put("scheduler.period", 5L);
        this.reg = context.registerService(Runnable.class.getName(), (Object)this, props);
    }

    public void handleDiscovery(MessageInfo info, DiscoveryMessage disMsg) {
        long now = System.currentTimeMillis();
        AgentId subAgentId = new AgentId(disMsg.getSubSlingId(), disMsg.getSubAgentName());
        for (SubscriberState subStateMsg : disMsg.getSubscriberStates()) {
            SubscriberConfig subConfig = disMsg.getSubscriberConfiguration();
            State subState = new State(subStateMsg.getPubAgentName(), subAgentId.getAgentId(), now, subStateMsg.getOffset(), subStateMsg.getRetries(), subConfig.getMaxRetries(), subConfig.isEditable());
            this.viewManager.refreshState(subState);
        }
    }

    public void handleLog(MessageInfo info, LogMessage logMsg) {
        Event event = new Event(TOPIC_DISTRIBUTION_LOG, (Map)ImmutableMap.of((Object)KEY_MESSAGE, (Object)logMsg));
        this.eventAdmin.postEvent(event);
    }
}

