/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.hono.application.client.kafka.impl;

import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageProperties;
import org.eclipse.hono.application.client.kafka.KafkaMessageContext;
import org.eclipse.hono.application.client.kafka.KafkaMessageProperties;
import org.eclipse.hono.client.kafka.HonoTopic;
import org.eclipse.hono.client.kafka.KafkaRecordHelper;
import org.eclipse.hono.util.QoS;

public class KafkaDownstreamMessage
implements DownstreamMessage<KafkaMessageContext> {
    private final String tenantId;
    private final String deviceId;
    private final MessageProperties properties;
    private final String contentType;
    private final KafkaMessageContext messageContext;
    private final QoS qos;
    private final Buffer payload;
    private final Instant creationTime;
    private final Duration timeToLive;
    private final Integer timeTillDisconnect;

    public KafkaDownstreamMessage(KafkaConsumerRecord<String, Buffer> record) {
        Objects.requireNonNull(record);
        this.tenantId = this.getTenantIdFromTopic(record);
        this.deviceId = record.key();
        this.properties = new KafkaMessageProperties(record);
        this.contentType = this.getContentTypeHeaderValue(record.headers());
        this.messageContext = new KafkaMessageContext(record);
        this.qos = this.getQosHeaderValue(record.headers());
        this.payload = record.value();
        this.creationTime = this.getCreationTimeHeaderValue(record.headers());
        this.timeToLive = this.getTimeToLiveHeaderValue(record.headers());
        this.timeTillDisconnect = this.getTimeTillDisconnectHeaderValue(record.headers());
    }

    private String getTenantIdFromTopic(KafkaConsumerRecord<String, Buffer> record) {
        return Optional.ofNullable(HonoTopic.fromString(record.topic())).map(HonoTopic::getTenantId).orElseThrow(() -> new IllegalArgumentException("Invalid topic name"));
    }

    private String getContentTypeHeaderValue(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getContentType(headers).orElse("application/octet-stream");
    }

    private QoS getQosHeaderValue(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getQoS(headers).orElse(QoS.AT_LEAST_ONCE);
    }

    private Instant getCreationTimeHeaderValue(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getCreationTime(headers).orElse(null);
    }

    private Duration getTimeToLiveHeaderValue(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getHeaderValue(headers, "ttl", Long.class).map(Duration::ofMillis).orElse(null);
    }

    private Integer getTimeTillDisconnectHeaderValue(List<KafkaHeader> headers) {
        return KafkaRecordHelper.getHeaderValue(headers, "ttd", Integer.class).orElse(null);
    }

    @Override
    public final String getTenantId() {
        return this.tenantId;
    }

    @Override
    public final String getDeviceId() {
        return this.deviceId;
    }

    @Override
    public final MessageProperties getProperties() {
        return this.properties;
    }

    @Override
    public final String getContentType() {
        return this.contentType;
    }

    @Override
    public final KafkaMessageContext getMessageContext() {
        return this.messageContext;
    }

    @Override
    public final QoS getQos() {
        return this.qos;
    }

    @Override
    public final Buffer getPayload() {
        return this.payload;
    }

    @Override
    public Instant getCreationTime() {
        return this.creationTime;
    }

    @Override
    public Duration getTimeToLive() {
        return this.timeToLive;
    }

    @Override
    public Integer getTimeTillDisconnect() {
        return this.timeTillDisconnect;
    }

    @Override
    public String getCorrelationId() {
        return this.properties.getProperty("correlation-id", String.class);
    }

    @Override
    public Integer getStatus() {
        return this.properties.getProperty("status", Integer.class);
    }
}

