/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.cli.app;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import java.util.ArrayList;
import java.util.Objects;
import java.util.function.BiConsumer;
import javax.annotation.PostConstruct;
import org.eclipse.hono.application.client.ApplicationClient;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageContext;
import org.eclipse.hono.application.client.amqp.AmqpApplicationClient;
import org.eclipse.hono.cli.app.AbstractApplicationClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;

@Component
@Profile(value={"receiver"})
public class Receiver
extends AbstractApplicationClient {
    private static final String TYPE_TELEMETRY = "telemetry";
    private static final String TYPE_EVENT = "event";
    private static final String TYPE_ALL = "all";
    @Value(value="${message.type}")
    protected String messageType = "all";
    private BiConsumer<String, DownstreamMessage<? extends MessageContext>> messageHandler = (arg_0, arg_1) -> this.handleMessage(arg_0, arg_1);
    private ApplicationClient<? extends MessageContext> client;

    @Autowired
    public final void setApplicationClient(ApplicationClient<? extends MessageContext> client) {
        this.client = Objects.requireNonNull(client);
    }

    void setMessageHandler(BiConsumer<String, DownstreamMessage<? extends MessageContext>> messageHandler) {
        this.messageHandler = Objects.requireNonNull(messageHandler);
    }

    @PostConstruct
    Future<CompositeFuture> start() {
        Future startFuture;
        if (this.client instanceof AmqpApplicationClient) {
            AmqpApplicationClient amqpClient = (AmqpApplicationClient)this.client;
            startFuture = amqpClient.connect().onComplete(con -> amqpClient.addReconnectListener(client -> this.createConsumer())).mapEmpty();
        } else {
            startFuture = Future.succeededFuture();
        }
        return startFuture.compose(c -> this.createConsumer()).onComplete(arg_0 -> this.handleCreateConsumerStatus(arg_0));
    }

    private CompositeFuture createConsumer() {
        Handler closeHandler = cause -> {
            this.log.info("close handler of consumer is called", cause);
            this.vertx.setTimer((long)this.connectionRetryInterval, reconnect -> {
                this.log.info("attempting to re-create the consumer ...");
                this.createConsumer();
            });
        };
        ArrayList<Future> consumerFutures = new ArrayList<Future>();
        if (this.messageType.equals(TYPE_EVENT) || this.messageType.equals(TYPE_ALL)) {
            consumerFutures.add(this.client.createEventConsumer(this.tenantId, msg -> this.messageHandler.accept(TYPE_EVENT, msg), closeHandler));
        }
        if (this.messageType.equals(TYPE_TELEMETRY) || this.messageType.equals(TYPE_ALL)) {
            consumerFutures.add(this.client.createTelemetryConsumer(this.tenantId, msg -> this.messageHandler.accept(TYPE_TELEMETRY, msg), closeHandler));
        }
        if (consumerFutures.isEmpty()) {
            consumerFutures.add(Future.failedFuture((String)String.format("Invalid message type [\"%s\"]. Valid types are \"telemetry\", \"event\" or \"all\"", this.messageType)));
        }
        return CompositeFuture.all(consumerFutures);
    }

    private void handleMessage(String endpoint, DownstreamMessage<? extends MessageContext> msg) {
        String deviceId = msg.getDeviceId();
        Buffer payload = msg.getPayload();
        this.log.info("received {} message [device: {}, content-type: {}]: {}", new Object[]{endpoint, deviceId, msg.getContentType(), payload});
        this.log.info("... with properties: {}", (Object)msg.getProperties().getPropertiesMap());
    }

    private void handleCreateConsumerStatus(AsyncResult<CompositeFuture> startup) {
        if (startup.succeeded()) {
            this.log.info("Receiver [tenant: {}, mode: {}] created successfully, hit ctrl-c to exit", (Object)this.tenantId, (Object)this.messageType);
        } else {
            this.log.error("Error occurred during initialization of receiver: {}", (Object)startup.cause().getMessage());
            this.vertx.close();
        }
    }
}

