/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.client.kafka.consumer;

import io.quarkus.runtime.annotations.RegisterForReflection;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.ext.healthchecks.Status;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.CooperativeStickyAssignor;
import org.apache.kafka.common.metrics.Metrics;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.kafka.KafkaClientFactory;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.client.kafka.consumer.HonoKafkaConsumerHelper;
import org.eclipse.hono.client.kafka.metrics.KafkaClientMetricsSupport;
import org.eclipse.hono.client.util.ServiceClient;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Lifecycle;
import org.eclipse.hono.util.LifecycleStatus;
import org.eclipse.hono.util.Pair;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RegisterForReflection(targets={KafkaReadStreamImpl.class})
public class HonoKafkaConsumer<V>
implements Lifecycle,
ServiceClient {
    public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 250L;
    private static final long WAIT_FOR_REBALANCE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final long OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(30L);
    private static final String MSG_CONSUMER_NOT_INITIALIZED_STARTED = "consumer not initialized/started";
    private static final Logger LOG = LoggerFactory.getLogger(HonoKafkaConsumer.class);
    protected final Vertx vertx;
    protected final Map<String, String> consumerConfig;
    protected final Set<String> topics;
    protected final Pattern topicPattern;
    protected final LifecycleStatus lifecycleStatus = new LifecycleStatus();
    private final AtomicReference<Pair<Promise<Void>, Promise<Void>>> subscriptionUpdatedAndPartitionsAssignedPromiseRef = new AtomicReference();
    private final AtomicBoolean pollingPaused = new AtomicBoolean();
    private final AtomicBoolean recordFetchingPaused = new AtomicBoolean();
    private Handler<KafkaConsumerRecord<String, V>> recordHandler;
    private KafkaConsumer<String, V> kafkaConsumer;
    private Context context;
    private ExecutorService kafkaConsumerWorker;
    private volatile Set<String> subscribedTopicPatternTopics = new HashSet<String>();
    private Handler<Set<TopicPartition>> onPartitionsAssignedHandler;
    private Handler<Set<TopicPartition>> onRebalanceDoneHandler;
    private Handler<Set<TopicPartition>> onPartitionsRevokedHandler;
    private Handler<Set<TopicPartition>> onPartitionsLostHandler;
    private boolean respectTtl = true;
    private Duration pollTimeout = Duration.ofMillis(250L);
    private Supplier<Consumer<String, V>> kafkaConsumerSupplier;
    private KafkaClientMetricsSupport metricsSupport;
    private Long pollPauseTimeoutTimerId;
    private Duration consumerCreationRetriesTimeout = Duration.ZERO;

    public HonoKafkaConsumer(Vertx vertx, Set<String> topics, Handler<KafkaConsumerRecord<String, V>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, Objects.requireNonNull(topics), (Pattern)null, consumerConfig);
        this.setRecordHandler(Objects.requireNonNull(recordHandler));
    }

    public HonoKafkaConsumer(Vertx vertx, Pattern topicPattern, Handler<KafkaConsumerRecord<String, V>> recordHandler, Map<String, String> consumerConfig) {
        this(vertx, null, Objects.requireNonNull(topicPattern), consumerConfig);
        this.setRecordHandler(Objects.requireNonNull(recordHandler));
    }

    protected HonoKafkaConsumer(Vertx vertx, Set<String> topics, Pattern topicPattern, Map<String, String> consumerConfig) {
        this.vertx = Objects.requireNonNull(vertx);
        if (topics == null == (topicPattern == null)) {
            throw new NullPointerException("exactly one of topics or topicPattern has to be set");
        }
        this.topicPattern = topicPattern;
        this.topics = Optional.ofNullable(topics).map(HashSet::new).orElse(null);
        this.consumerConfig = Objects.requireNonNull(consumerConfig);
        if (!consumerConfig.containsKey("group.id")) {
            if ("true".equals(consumerConfig.get("enable.auto.commit"))) {
                throw new IllegalArgumentException("%s config entry has to be set if auto-commit is enabled".formatted("group.id"));
            }
            LOG.trace("no group.id set, using a random UUID as default and disabling auto-commit");
            consumerConfig.put("group.id", UUID.randomUUID().toString());
            consumerConfig.put("enable.auto.commit", "false");
        }
    }

    public final void setRecordHandler(Handler<KafkaConsumerRecord<String, V>> handler) {
        Objects.requireNonNull(handler);
        if (!this.lifecycleStatus.isStopped()) {
            throw new IllegalStateException("Record handler can only be set if consumer has not been started yet");
        }
        this.recordHandler = handler;
    }

    protected final void addTopic(String topicName) {
        Objects.requireNonNull(topicName);
        if (!this.lifecycleStatus.isStopped()) {
            throw new IllegalStateException("Topics can only be set if consumer has not been started yet");
        }
        if (this.topics == null) {
            throw new IllegalStateException("Cannot add topic on consumer which has been created with a topic pattern");
        }
        this.topics.add(topicName);
    }

    public final void addOnKafkaConsumerReadyHandler(Handler<AsyncResult<Void>> handler) {
        if (handler != null) {
            this.lifecycleStatus.addOnStartedHandler(handler);
        }
    }

    public final void setOnPartitionsAssignedHandler(Handler<Set<TopicPartition>> onPartitionsAssignedHandler) {
        this.onPartitionsAssignedHandler = Objects.requireNonNull(onPartitionsAssignedHandler);
    }

    public final void setOnRebalanceDoneHandler(Handler<Set<TopicPartition>> handler) {
        this.onRebalanceDoneHandler = Objects.requireNonNull(handler);
    }

    public final void setOnPartitionsRevokedHandler(Handler<Set<TopicPartition>> onPartitionsRevokedHandler) {
        this.onPartitionsRevokedHandler = Objects.requireNonNull(onPartitionsRevokedHandler);
    }

    public final void setOnPartitionsLostHandler(Handler<Set<TopicPartition>> onPartitionsLostHandler) {
        this.onPartitionsLostHandler = Objects.requireNonNull(onPartitionsLostHandler);
    }

    public final void setMetricsSupport(KafkaClientMetricsSupport metricsSupport) {
        this.metricsSupport = metricsSupport;
    }

    public final void setConsumerCreationRetriesTimeout(Duration consumerCreationRetriesTimeout) {
        this.consumerCreationRetriesTimeout = consumerCreationRetriesTimeout;
    }

    public final void setRespectTtl(boolean respectTtl) {
        this.respectTtl = respectTtl;
    }

    public final void setPollTimeout(Duration pollTimeout) {
        this.pollTimeout = Objects.requireNonNull(pollTimeout);
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.asStream().pollTimeout(pollTimeout);
        }
    }

    public void setKafkaConsumerSupplier(Supplier<Consumer<String, V>> supplier) {
        this.kafkaConsumerSupplier = supplier;
    }

    public final boolean pauseRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.runOnKafkaWorkerThread(v -> {
            Set<org.apache.kafka.common.TopicPartition> partitions = this.getUnderlyingConsumer().assignment();
            if (!partitions.isEmpty()) {
                this.getUnderlyingConsumer().pause(partitions);
            }
        });
        return true;
    }

    public final boolean resumeRecordFetching() {
        if (!this.recordFetchingPaused.compareAndSet(true, false)) {
            return false;
        }
        this.runOnKafkaWorkerThread(v -> {
            Set<org.apache.kafka.common.TopicPartition> partitions = this.getUnderlyingConsumer().assignment();
            if (!partitions.isEmpty()) {
                this.getUnderlyingConsumer().resume(partitions);
            }
        });
        return true;
    }

    public final boolean isRecordFetchingPaused() {
        return this.recordFetchingPaused.get();
    }

    public final boolean pauseRecordHandlingAndPolling(Duration timeout) {
        if (!this.pollingPaused.compareAndSet(false, true)) {
            return false;
        }
        this.pollPauseTimeoutTimerId = this.vertx.setTimer(timeout.toMillis(), tid -> {
            this.pollPauseTimeoutTimerId = null;
            if (this.resumeRecordHandlingAndPolling()) {
                LOG.debug("resumed consumer record polling - timeout of {}ms was reached [client-id: {}]", (Object)timeout.toMillis(), (Object)this.getClientId());
            }
        });
        this.getKafkaConsumer().pause();
        return true;
    }

    public final boolean resumeRecordHandlingAndPolling() {
        if (!this.pollingPaused.compareAndSet(true, false)) {
            return false;
        }
        if (this.pollPauseTimeoutTimerId != null) {
            this.vertx.cancelTimer(this.pollPauseTimeoutTimerId);
            this.pollPauseTimeoutTimerId = null;
        }
        this.getKafkaConsumer().resume();
        return true;
    }

    public final boolean isRecordHandlingAndPollingPaused() {
        return this.pollingPaused.get();
    }

    protected final KafkaConsumer<String, V> getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer;
    }

    protected final Consumer<String, V> getUnderlyingConsumer() {
        if (this.kafkaConsumer == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        return this.kafkaConsumer.asStream().unwrap();
    }

    protected final String getClientId() {
        return this.consumerConfig.get("client.id");
    }

    @Override
    public void registerLivenessChecks(HealthCheckHandler livenessHandler) {
    }

    @Override
    public void registerReadinessChecks(HealthCheckHandler readinessHandler) {
        readinessHandler.register("kafka-consumer[%s]-creation".formatted(this.getClientId()), status -> {
            if (this.lifecycleStatus.isStarted()) {
                status.tryComplete(Status.OK());
            } else {
                JsonObject data = new JsonObject();
                if (this.lifecycleStatus.isStarting()) {
                    if (this.kafkaConsumer == null) {
                        LOG.debug("readiness check failed, consumer not created yet (Kafka server URL possibly not resolvable (yet)) [client-id: {}]", (Object)this.getClientId());
                        data.put("status", "consumer not created yet (Kafka server URL possibly not resolvable (yet))");
                    } else {
                        LOG.debug("readiness check failed, consumer initialization not finished yet [client-id: {}]", (Object)this.getClientId());
                        data.put("status", "consumer initialization not finished yet");
                    }
                }
                status.tryComplete(Status.KO(data));
            }
        });
    }

    public final HealthCheckResponse checkReadiness() {
        return HealthCheckResponse.builder().name("kafka-consumer-status").status(this.lifecycleStatus.isStarted()).build();
    }

    private Future<KafkaConsumer<String, V>> initConsumer(KafkaConsumer<String, V> consumer) {
        Promise initResult = Promise.promise();
        Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.registerKafkaConsumer(consumer.unwrap()));
        consumer.handler(record -> {
            if (!initResult.future().isComplete()) {
                LOG.debug("postponing record handling until consumer has been initialized [topic: {}, partition: {}, offset: {}]\n", record.topic(), record.partition(), record.offset());
            }
            initResult.future().onSuccess(ok -> {
                if (this.respectTtl && KafkaRecordHelper.isTtlElapsed(record.headers())) {
                    this.onRecordHandlerSkippedForExpiredRecord((KafkaConsumerRecord<String, V>)record);
                } else {
                    try {
                        this.recordHandler.handle((KafkaConsumerRecord<String, V>)record);
                    }
                    catch (Exception e) {
                        LOG.warn("error handling record [topic: {}, partition: {}, offset: {}, headers: {}]", record.topic(), record.partition(), record.offset(), record.headers(), e);
                    }
                }
            });
        });
        consumer.batchHandler(this::onBatchOfRecordsReceived);
        consumer.exceptionHandler(error -> LOG.error("consumer error occurred [client-id: {}]", (Object)this.getClientId(), error));
        this.installRebalanceListeners();
        consumer.asStream().pollTimeout(Duration.ofMillis(10L));
        this.subscribeAndWaitForRebalance().onSuccess(ok -> {
            consumer.asStream().pollTimeout(this.pollTimeout);
            this.logSubscribedTopicsWhenConsumerIsReady();
            initResult.complete(consumer);
        }).onFailure(initResult::fail);
        return initResult.future();
    }

    @Override
    public Future<Void> start() {
        if (this.recordHandler == null) {
            throw new IllegalStateException("Record handler must be set");
        }
        if (this.lifecycleStatus.isStarting()) {
            LOG.debug("already starting consumer");
            return Future.succeededFuture();
        }
        if (!this.lifecycleStatus.setStarting()) {
            return Future.failedFuture(new IllegalStateException("consumer is already started/stopping"));
        }
        this.context = this.vertx.getOrCreateContext();
        Supplier<KafkaConsumer> consumerSupplier = () -> Optional.ofNullable(this.kafkaConsumerSupplier).map(s -> KafkaConsumer.create(this.vertx, (Consumer)s.get())).orElseGet(() -> KafkaConsumer.create(this.vertx, this.consumerConfig));
        this.runOnContext(v -> {
            KafkaClientFactory kafkaClientFactory = new KafkaClientFactory(this.vertx);
            kafkaClientFactory.createClientWithRetries(consumerSupplier, this.lifecycleStatus::isStarting, this.consumerConfig.get("bootstrap.servers"), this.consumerCreationRetriesTimeout).onFailure(t -> LOG.error("error creating consumer [client-id: {}]", (Object)this.getClientId(), t)).onSuccess(consumer -> {
                this.kafkaConsumer = consumer;
            }).compose(this::initConsumer).onSuccess(c -> this.lifecycleStatus.setStarted());
        });
        return Future.succeededFuture();
    }

    private void logSubscribedTopicsWhenConsumerIsReady() {
        if (this.topicPattern != null) {
            if (this.subscribedTopicPatternTopics.size() <= 5) {
                LOG.debug("consumer started, subscribed to topic pattern [{}], matching topics: {}", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics);
            } else {
                LOG.debug("consumer started, subscribed to topic pattern [{}], matching {} topics", (Object)this.topicPattern, (Object)this.subscribedTopicPatternTopics.size());
            }
        } else {
            LOG.debug("consumer started, subscribed to topics {}", (Object)this.topics);
        }
    }

    protected void onBatchOfRecordsReceived(KafkaConsumerRecords<String, V> records) {
    }

    protected void onRecordHandlerSkippedForExpiredRecord(KafkaConsumerRecord<String, V> record) {
    }

    private void installRebalanceListeners() {
        this.replaceRebalanceListener(this.kafkaConsumer, new ConsumerRebalanceListener(){

            @Override
            public void onPartitionsAssigned(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set<TopicPartition> partitionsSet = Helper.from(partitions);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("partitions assigned: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.ensurePositionsHaveBeenSetIfNeeded(partitionsSet);
                HonoKafkaConsumer.this.updateSubscribedTopicPatternTopicsAndRemoveMetrics();
                if (HonoKafkaConsumer.this.recordFetchingPaused.get()) {
                    HonoKafkaConsumer.this.getUnderlyingConsumer().pause(partitions);
                }
                HonoKafkaConsumer.this.onPartitionsAssignedBlocking(partitionsSet);
                Set allAssignedPartitions = Optional.ofNullable(HonoKafkaConsumer.this.onRebalanceDoneHandler).map(h -> Helper.from(HonoKafkaConsumer.this.getKafkaConsumer().asStream().unwrap().assignment())).orElse(null);
                HonoKafkaConsumer.this.context.runOnContext(v -> {
                    HonoKafkaConsumer.this.onPartitionsAssigned(partitionsSet);
                    if (HonoKafkaConsumer.this.onRebalanceDoneHandler != null) {
                        HonoKafkaConsumer.this.onRebalanceDoneHandler.handle(allAssignedPartitions);
                    }
                });
            }

            @Override
            public void onPartitionsRevoked(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set<TopicPartition> partitionsSet = Helper.from(partitions);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("partitions revoked: [{}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions));
                }
                HonoKafkaConsumer.this.onPartitionsRevokedBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsRevoked(partitionsSet));
            }

            @Override
            public void onPartitionsLost(Collection<org.apache.kafka.common.TopicPartition> partitions) {
                Set<TopicPartition> partitionsSet = Helper.from(partitions);
                if (LOG.isInfoEnabled()) {
                    LOG.info("partitions lost: [{}] [client-id: {}]", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(partitions), (Object)HonoKafkaConsumer.this.getClientId());
                }
                HonoKafkaConsumer.this.onPartitionsLostBlocking(partitionsSet);
                HonoKafkaConsumer.this.context.runOnContext(v -> HonoKafkaConsumer.this.onPartitionsLost(partitionsSet));
            }
        });
    }

    private void ensurePositionsHaveBeenSetIfNeeded(Set<TopicPartition> assignedPartitions) {
        if (!assignedPartitions.isEmpty() && this.isAutoOffsetResetConfigLatest()) {
            LOG.trace("checking positions for {} newly assigned partitions...", (Object)assignedPartitions.size());
            Set<org.apache.kafka.common.TopicPartition> partitions = Helper.to(assignedPartitions);
            try {
                LinkedList<org.apache.kafka.common.TopicPartition> outOfRangeOffsetPartitions = new LinkedList<org.apache.kafka.common.TopicPartition>();
                Map<org.apache.kafka.common.TopicPartition, Long> beginningOffsets = this.getUnderlyingConsumer().beginningOffsets(partitions);
                partitions.forEach(partition -> {
                    long position = this.getUnderlyingConsumer().position((org.apache.kafka.common.TopicPartition)partition);
                    Long beginningOffset = (Long)beginningOffsets.get(partition);
                    if (beginningOffset != null && position < beginningOffset) {
                        LOG.debug("committed offset {} for [{}] is smaller than beginning offset, resetting it to the beginning offset {}", position, partition, beginningOffset);
                        this.getUnderlyingConsumer().seek((org.apache.kafka.common.TopicPartition)partition, beginningOffset);
                        outOfRangeOffsetPartitions.add((org.apache.kafka.common.TopicPartition)partition);
                    }
                });
                if (!outOfRangeOffsetPartitions.isEmpty()) {
                    LOG.info("found out-of-range committed offsets, corresponding records having already been deleted; positions were reset to beginning offsets; partitions: [{}] [client-id: {}]\n", (Object)HonoKafkaConsumerHelper.getPartitionsDebugString(outOfRangeOffsetPartitions), (Object)this.getClientId());
                }
            }
            catch (Exception e) {
                LOG.error("error checking positions for {} newly assigned partitions [client-id: {}]", assignedPartitions.size(), this.getClientId(), e);
            }
            LOG.trace("done checking positions for {} newly assigned partitions", (Object)assignedPartitions.size());
        }
    }

    protected final boolean isCooperativeRebalancingConfigured() {
        return Optional.ofNullable(this.consumerConfig.get("partition.assignment.strategy")).map(value -> value.equals(CooperativeStickyAssignor.class.getName())).orElse(false);
    }

    protected final boolean isAutoOffsetResetConfigLatest() {
        return Optional.ofNullable(this.consumerConfig.get("auto.offset.reset")).map(value -> value.equals("latest")).orElse(true);
    }

    private void updateSubscribedTopicPatternTopicsAndRemoveMetrics() {
        if (this.topicPattern != null) {
            Set<String> oldSubscribedTopicPatternTopics = this.subscribedTopicPatternTopics;
            try {
                this.subscribedTopicPatternTopics = new HashSet<String>(this.getUnderlyingConsumer().subscription());
            }
            catch (Exception e) {
                LOG.warn("error getting subscription", e);
            }
            Set deletedTopics = oldSubscribedTopicPatternTopics.stream().filter(t -> !this.subscribedTopicPatternTopics.contains(t)).collect(Collectors.toSet());
            if (!deletedTopics.isEmpty()) {
                this.runOnContext(v -> this.vertx.setTimer(OBSOLETE_METRICS_REMOVAL_DELAY_MILLIS, tid -> this.runOnKafkaWorkerThread(v2 -> this.removeMetricsForDeletedTopics(deletedTopics.stream().filter(t -> !this.subscribedTopicPatternTopics.contains(t))))));
            }
        }
    }

    private Future<Void> subscribeAndWaitForRebalance() {
        if (this.lifecycleStatus.isStopping() || this.lifecycleStatus.isStopped()) {
            return Future.failedFuture(new ServerErrorException(503, "already stopped"));
        }
        Promise partitionAssignmentDone = Promise.promise();
        Promise<Void> subscriptionUpdated = Promise.promise();
        Pair newPromisePair = Pair.of(subscriptionUpdated, partitionAssignmentDone);
        Pair<Promise<Void>, Promise<Void>> promisePair = this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.updateAndGet(existingPair -> Optional.ofNullable(existingPair).orElse(newPromisePair));
        if (!promisePair.equals(newPromisePair)) {
            LOG.debug("subscribeAndWaitForRebalance: will wait for ongoing invocation to complete");
            return CompositeFuture.all(promisePair.one().future(), promisePair.two().future()).mapEmpty();
        }
        if (this.topicPattern != null) {
            this.kafkaConsumer.subscribe(this.topicPattern, subscriptionUpdated);
        } else {
            this.topics.forEach(topic -> HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic).onSuccess(partitions -> {
                if (partitions.isEmpty()) {
                    LOG.info("subscription topic doesn't exist as of now: {} [client-id: {}]", topic, (Object)this.getClientId());
                }
            }));
            this.kafkaConsumer.subscribe(this.topics, subscriptionUpdated);
        }
        if (this.kafkaConsumerWorker == null) {
            this.kafkaConsumerWorker = this.getKafkaConsumerWorker(this.kafkaConsumer);
        } else {
            this.vertx.setTimer(WAIT_FOR_REBALANCE_TIMEOUT_MILLIS, ar -> {
                if (!partitionAssignmentDone.future().isComplete()) {
                    this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.compareAndSet(promisePair, null);
                    String errorMsg = "timed out waiting for rebalance and update of subscribed topics";
                    LOG.warn("timed out waiting for rebalance and update of subscribed topics");
                    partitionAssignmentDone.tryFail(new ServerErrorException(503, "timed out waiting for rebalance and update of subscribed topics"));
                }
            });
        }
        subscriptionUpdated.future().onFailure(thr -> this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.compareAndSet(promisePair, null));
        return CompositeFuture.all(subscriptionUpdated.future(), partitionAssignmentDone.future()).mapEmpty();
    }

    protected void onPartitionsAssignedBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsAssigned(Set<TopicPartition> partitionsSet) {
        Optional.ofNullable(this.subscriptionUpdatedAndPartitionsAssignedPromiseRef.getAndSet(null)).ifPresent(promisePair -> ((Promise)promisePair.two()).tryComplete());
        if (this.onPartitionsAssignedHandler != null) {
            this.onPartitionsAssignedHandler.handle(partitionsSet);
        }
    }

    protected void onPartitionsRevokedBlocking(Set<TopicPartition> partitionsSet) {
    }

    protected void onPartitionsLostBlocking(Set<TopicPartition> partitionsSet) {
    }

    private void onPartitionsRevoked(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsRevokedHandler != null) {
            this.onPartitionsRevokedHandler.handle(partitionsSet);
        }
    }

    private void onPartitionsLost(Set<TopicPartition> partitionsSet) {
        if (this.onPartitionsLostHandler != null) {
            this.onPartitionsLostHandler.handle(partitionsSet);
        }
    }

    @Override
    public Future<Void> stop() {
        return this.lifecycleStatus.runStopAttempt(() -> {
            if (this.pollPauseTimeoutTimerId != null) {
                this.vertx.cancelTimer(this.pollPauseTimeoutTimerId);
                this.pollPauseTimeoutTimerId = null;
            }
            return Optional.ofNullable(this.kafkaConsumer).map(consumer -> consumer.close().onComplete(ar -> Optional.ofNullable(this.metricsSupport).ifPresent(ms -> ms.unregisterKafkaConsumer(this.kafkaConsumer.unwrap())))).orElseGet(Future::succeededFuture).onFailure(t -> LOG.info("error stopping Kafka consumer", (Throwable)t));
        });
    }

    protected void runOnContext(Handler<Void> codeToRun) {
        Objects.requireNonNull(codeToRun);
        if (this.context != Vertx.currentContext()) {
            this.context.runOnContext(go -> codeToRun.handle(null));
        } else {
            codeToRun.handle(null);
        }
    }

    protected void runOnKafkaWorkerThread(Handler<Void> handler) {
        Objects.requireNonNull(handler);
        if (this.kafkaConsumerWorker == null) {
            throw new IllegalStateException(MSG_CONSUMER_NOT_INITIALIZED_STARTED);
        }
        if (this.lifecycleStatus.isStarted()) {
            this.kafkaConsumerWorker.submit(() -> {
                if (this.lifecycleStatus.isStarted()) {
                    try {
                        handler.handle(null);
                    }
                    catch (Exception ex) {
                        LOG.error("error running task on Kafka worker thread [client-id: {}]", (Object)this.getClientId(), (Object)ex);
                    }
                }
            });
        }
    }

    public final Set<String> getSubscribedTopicPatternTopics() {
        if (this.topicPattern == null) {
            return Set.of();
        }
        return new HashSet<String>(this.subscribedTopicPatternTopics);
    }

    public final boolean isAmongKnownSubscribedTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            return this.topics.contains(topic);
        }
        return this.subscribedTopicPatternTopics.contains(topic);
    }

    public final Future<Void> ensureTopicIsAmongSubscribedTopicPatternTopics(String topic) {
        Objects.requireNonNull(topic);
        if (this.topics != null) {
            throw new IllegalStateException("consumer doesn't use topic pattern");
        }
        if (!this.topicPattern.matcher(topic).find()) {
            throw new IllegalArgumentException("topic doesn't match pattern");
        }
        if (!this.lifecycleStatus.isStarted()) {
            return Future.failedFuture(new ServerErrorException(500, "not started"));
        }
        if (this.subscribedTopicPatternTopics.contains(topic)) {
            LOG.debug("ensureTopicIsAmongSubscribedTopics: topic is already subscribed [{}]", (Object)topic);
            return Future.succeededFuture();
        }
        HashSet<String> subscribedTopicPatternTopicsBefore = new HashSet<String>(this.subscribedTopicPatternTopics);
        Promise resultPromise = Promise.promise();
        AsyncResult topicCheckFuture = HonoKafkaConsumerHelper.partitionsFor(this.kafkaConsumer, topic).onFailure(thr -> LOG.warn("ensureTopicIsAmongSubscribedTopics: error getting partitions for topic [{}]", (Object)topic, thr)).compose(partitions -> {
            if (partitions.isEmpty()) {
                LOG.warn("ensureTopicIsAmongSubscribedTopics: topic doesn't exist and didn't get auto-created: {}", (Object)topic);
                return Future.failedFuture(new ServerErrorException(503, "command topic doesn't exist and didn't get auto-created"));
            }
            return Future.succeededFuture();
        }).onFailure(resultPromise::tryFail).mapEmpty();
        LOG.debug("ensureTopicIsAmongSubscribedTopics: wait for subscription update and rebalance [{}]", (Object)topic);
        this.subscribeAndWaitForRebalance().compose(v -> {
            boolean someTopicDeleted = subscribedTopicPatternTopicsBefore.stream().anyMatch(t -> !this.subscribedTopicPatternTopics.contains(t));
            if (!this.subscribedTopicPatternTopics.contains(topic)) {
                LOG.debug("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance; try again [topic: {}]", (Object)topic);
                return this.subscribeAndWaitForRebalance();
            }
            if (this.isCooperativeRebalancingConfigured() && someTopicDeleted && this.isAutoOffsetResetConfigLatest()) {
                return this.kafkaConsumer.assignment().compose(partitions -> {
                    if (partitions.stream().anyMatch(p -> p.getTopic().equals(topic))) {
                        return Future.succeededFuture(v);
                    }
                    LOG.debug("ensureTopicIsAmongSubscribedTopics: wait for another rebalance before considering update of topic subscription [{}] as done", (Object)topic);
                    Promise rebalanceResultPromise = Promise.promise();
                    this.runOnKafkaWorkerThread(v2 -> {
                        this.getUnderlyingConsumer().enforceRebalance();
                        this.runOnContext(v3 -> this.subscribeAndWaitForRebalance().onComplete(rebalanceResultPromise));
                    });
                    return rebalanceResultPromise.future();
                });
            }
            return Future.succeededFuture(v);
        }).compose(v -> {
            if (!this.subscribedTopicPatternTopics.contains(topic)) {
                LOG.warn("ensureTopicIsAmongSubscribedTopics: subscription not updated with topic after rebalance [topic: {}]", (Object)topic);
                return Future.failedFuture(new ServerErrorException(503, "subscription not updated with topic after rebalance"));
            }
            LOG.debug("ensureTopicIsAmongSubscribedTopics: done updating topic subscription [{}]", (Object)topic);
            return Future.succeededFuture(v);
        }).onComplete(ar -> Futures.tryHandleResult(resultPromise, ar));
        if (!this.isAutoOffsetResetConfigLatest()) {
            topicCheckFuture.onSuccess(v -> resultPromise.tryComplete());
        }
        return resultPromise.future();
    }

    private void removeMetricsForDeletedTopics(Stream<String> deletedTopics) {
        Metrics metrics = this.getInternalMetricsObject(this.kafkaConsumer.unwrap());
        if (metrics != null) {
            deletedTopics.forEach(topic -> {
                metrics.removeSensor("topic." + topic + ".bytes-fetched");
                metrics.removeSensor("topic." + topic + ".records-fetched");
            });
        }
    }

    private Metrics getInternalMetricsObject(Consumer<String, V> consumer) {
        if (consumer instanceof org.apache.kafka.clients.consumer.KafkaConsumer) {
            try {
                Field field = org.apache.kafka.clients.consumer.KafkaConsumer.class.getDeclaredField("metrics");
                field.setAccessible(true);
                return (Metrics)field.get(consumer);
            }
            catch (Exception e) {
                LOG.warn("failed to get metrics object", e);
            }
        }
        return null;
    }

    private void replaceRebalanceListener(KafkaConsumer<String, V> consumer, ConsumerRebalanceListener listener) {
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("rebalanceListener");
            field.setAccessible(true);
            field.set(consumer.asStream(), listener);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to adapt rebalance listener", e);
        }
    }

    private ExecutorService getKafkaConsumerWorker(KafkaConsumer<String, V> consumer) {
        ExecutorService worker;
        try {
            Field field = KafkaReadStreamImpl.class.getDeclaredField("worker");
            field.setAccessible(true);
            worker = (ExecutorService)field.get(consumer.asStream());
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Failed to get worker", e);
        }
        if (worker == null) {
            throw new IllegalStateException("worker not set");
        }
        return worker;
    }
}

