/*
 * Decompiled with CFR 0.152.
 */
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusCause;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.dsl.ExecListener;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.internal.ExecWatchInputStream;
import io.fabric8.kubernetes.client.dsl.internal.OperationSupport;
import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext;
import io.fabric8.kubernetes.client.http.HttpResponse;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocketHandshakeException;
import io.fabric8.kubernetes.client.http.WebSocketUpgradeResponse;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecWebSocketListener
implements ExecWatch,
AutoCloseable,
WebSocket.Listener {
    static final String CAUSE_REASON_EXIT_CODE = "ExitCode";
    static final String REASON_NON_ZERO_EXIT_CODE = "NonZeroExitCode";
    static final String STATUS_SUCCESS = "Success";
    private static final long MAX_QUEUE_SIZE = 0x1000000L;
    static final Logger LOGGER = LoggerFactory.getLogger(ExecWebSocketListener.class);
    private static final String HEIGHT = "Height";
    private static final String WIDTH = "Width";
    private final InputStream in;
    private final OutputStream input;
    private final ListenerStream out;
    private final ListenerStream error;
    private final ListenerStream errorChannel;
    private final boolean terminateOnError;
    private final ExecListener listener;
    private final AtomicReference<WebSocket> webSocketRef = new AtomicReference();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private final SerialExecutor serialExecutor;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CompletableFuture<Integer> exitCode = new CompletableFuture();
    private KubernetesSerialization serialization;

    public static String toString(ByteBuffer buffer) {
        return StandardCharsets.UTF_8.decode(buffer).toString();
    }

    public ExecWebSocketListener(PodOperationContext context, Executor executor, KubernetesSerialization serialization) {
        this.serialization = serialization;
        this.listener = context.getExecListener();
        Integer bufferSize = context.getBufferSize();
        if (context.isRedirectingIn()) {
            this.input = InputStreamPumper.writableOutputStream(this::sendWithErrorChecking, bufferSize);
            this.in = null;
        } else {
            this.input = null;
            this.in = context.getIn();
        }
        this.terminateOnError = context.isTerminateOnError();
        this.out = this.createStream("stdOut", context.getOutput());
        this.error = this.createStream("stdErr", context.getError());
        this.errorChannel = this.createStream("errorChannel", context.getErrorChannel());
        this.serialExecutor = new SerialExecutor(executor);
    }

    private ListenerStream createStream(String name, PodOperationContext.StreamContext streamContext) {
        ListenerStream stream = new ListenerStream(name);
        if (streamContext == null) {
            return stream;
        }
        OutputStream os = streamContext.getOutputStream();
        if (os == null) {
            stream.inputStream = new ExecWatchInputStream(() -> this.webSocketRef.get().request());
            this.exitCode.whenComplete(stream.inputStream::onExit);
            stream.handler = b -> stream.inputStream.consume(Arrays.asList(b));
        } else {
            WritableByteChannel channel = Channels.newChannel(os);
            stream.handler = b -> this.asyncWrite(channel, b);
        }
        return stream;
    }

    private void asyncWrite(WritableByteChannel channel, ByteBuffer b) {
        CompletableFuture.runAsync(() -> {
            try {
                channel.write(b);
            }
            catch (IOException e) {
                throw KubernetesClientException.launderThrowable(e);
            }
        }, this.serialExecutor).whenComplete((v, t) -> {
            this.webSocketRef.get().request();
            if (t != null) {
                if (this.closed.get()) {
                    LOGGER.debug("Stream write failed after close", (Throwable)t);
                } else {
                    LOGGER.warn("Stream write failed", (Throwable)t);
                }
            }
        });
    }

    @Override
    public void close() {
        if (this.closed.get()) {
            return;
        }
        this.closeWebSocketOnce(1000, "Closing...");
    }

    private void cleanUpOnce() {
        this.executorService.shutdownNow();
        this.serialExecutor.shutdownNow();
    }

    private void closeWebSocketOnce(int code, String reason) {
        try {
            WebSocket ws = this.webSocketRef.get();
            if (ws != null) {
                ws.sendClose(code, reason);
            }
        }
        catch (Throwable t) {
            LOGGER.debug("Error closing WebSocket.", t);
        }
    }

    @Override
    public void onOpen(WebSocket webSocket) {
        try {
            this.exitCode.whenComplete((i, t) -> webSocket.request());
            this.webSocketRef.set(webSocket);
            if (this.in != null && !this.executorService.isShutdown()) {
                InputStreamPumper.pump(InputStreamPumper.asInterruptible(this.in), this::send, this.executorService);
            }
        }
        finally {
            if (this.listener != null) {
                this.listener.onOpen();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onError(WebSocket webSocket, Throwable t) {
        this.closed.set(true);
        WebSocketUpgradeResponse response = null;
        try {
            if (t instanceof WebSocketHandshakeException && (response = ((WebSocketHandshakeException)t).getResponse()) != null) {
                Status status = OperationSupport.createStatus(response, this.serialization);
                status.setMessage(t.getMessage());
                t = new KubernetesClientException(status).initCause(t);
            }
            this.cleanUpOnce();
        }
        finally {
            if (this.exitCode.isDone()) {
                LOGGER.debug("Exec failure after done", t);
            } else {
                try {
                    if (this.listener != null) {
                        SimpleResponse execResponse = null;
                        if (response != null) {
                            execResponse = new SimpleResponse(response);
                        }
                        this.listener.onFailure(t, execResponse);
                    } else {
                        LOGGER.error("Exec Failure", t);
                    }
                }
                finally {
                    this.exitCode.completeExceptionally(t);
                }
            }
        }
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        LOGGER.debug("Exec Web Socket: onMessage(String)");
        this.onMessage(webSocket, ByteBuffer.wrap(text.getBytes(StandardCharsets.UTF_8)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void onMessage(WebSocket webSocket, ByteBuffer bytes) {
        boolean close = false;
        try {
            byte streamID = bytes.get(0);
            bytes.position(1);
            ByteBuffer byteString = bytes.slice();
            if (byteString.remaining() == 0) {
                webSocket.request();
                return;
            }
            switch (streamID) {
                case 1: {
                    this.out.handle(byteString, webSocket);
                    return;
                }
                case 2: {
                    if (this.terminateOnError) {
                        String stringValue = ExecWebSocketListener.toString(bytes);
                        this.exitCode.completeExceptionally(new KubernetesClientException(stringValue));
                        close = true;
                        return;
                    } else {
                        this.error.handle(byteString, webSocket);
                        return;
                    }
                }
                case 3: {
                    close = true;
                    try {
                        this.errorChannel.handle(bytes, webSocket);
                        return;
                    }
                    finally {
                        this.handleExitStatus(byteString);
                    }
                }
                default: {
                    throw new IOException("Unknown stream ID " + streamID);
                }
            }
        }
        catch (IOException e) {
            throw KubernetesClientException.launderThrowable(e);
        }
        finally {
            if (close) {
                this.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleExitStatus(ByteBuffer bytes) {
        Status status = null;
        int code = -1;
        try {
            String stringValue = ExecWebSocketListener.toString(bytes);
            status = this.serialization.unmarshal(stringValue, Status.class);
            if (status != null) {
                code = ExecWebSocketListener.parseExitCode(status);
            }
        }
        catch (Exception e) {
            LOGGER.warn("Could not determine exit code", e);
        }
        try {
            if (this.listener != null) {
                this.listener.onExit(code, status);
            }
        }
        finally {
            this.exitCode.complete(code);
        }
    }

    @Override
    public void onClose(WebSocket webSocket, int code, String reason) {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        this.closeWebSocketOnce(code, reason);
        LOGGER.debug("Exec Web Socket: On Close with code:[{}], due to: [{}]", (Object)code, (Object)reason);
        this.serialExecutor.execute(() -> {
            try {
                if (this.exitCode.complete(null)) {
                    LOGGER.debug("Exec Web Socket: completed with a null exit code - no status was received prior to onClose");
                }
                this.cleanUpOnce();
            }
            finally {
                if (this.listener != null) {
                    this.listener.onClose(code, reason);
                }
            }
        });
    }

    @Override
    public OutputStream getInput() {
        return this.input;
    }

    @Override
    public InputStream getOutput() {
        return this.out.inputStream;
    }

    @Override
    public InputStream getError() {
        return this.error.inputStream;
    }

    @Override
    public InputStream getErrorChannel() {
        return this.errorChannel.inputStream;
    }

    @Override
    public void resize(int cols, int rows) {
        if (cols < 0 || rows < 0) {
            return;
        }
        try {
            HashMap<String, Integer> map = new HashMap<String, Integer>(4);
            map.put(HEIGHT, rows);
            map.put(WIDTH, cols);
            byte[] bytes = this.serialization.asJson(map).getBytes(StandardCharsets.UTF_8);
            this.send(bytes, 0, bytes.length, (byte)4);
        }
        catch (Exception e) {
            throw KubernetesClientException.launderThrowable(e);
        }
    }

    private void send(byte[] bytes, int offset, int length, byte flag) {
        if (length > 0) {
            this.waitForQueue(length);
            WebSocket ws = this.webSocketRef.get();
            byte[] toSend = new byte[length + 1];
            toSend[0] = flag;
            System.arraycopy(bytes, offset, toSend, 1, length);
            if (!ws.send(ByteBuffer.wrap(toSend))) {
                this.exitCode.completeExceptionally(new IOException("could not send"));
            }
        }
    }

    private void send(byte[] bytes, int offset, int length) {
        this.send(bytes, offset, length, (byte)0);
    }

    void sendWithErrorChecking(byte[] bytes, int offset, int length) {
        this.checkError();
        this.send(bytes, offset, length);
        this.checkError();
    }

    public static int parseExitCode(Status status) {
        if (STATUS_SUCCESS.equals(status.getStatus())) {
            return 0;
        }
        if (REASON_NON_ZERO_EXIT_CODE.equals(status.getReason())) {
            if (status.getDetails() == null) {
                return -1;
            }
            List<StatusCause> causes = status.getDetails().getCauses();
            if (causes == null) {
                return -1;
            }
            return causes.stream().filter(c -> CAUSE_REASON_EXIT_CODE.equals(c.getReason())).map(StatusCause::getMessage).map(Integer::valueOf).findFirst().orElse(-1);
        }
        return -1;
    }

    @Override
    public CompletableFuture<Integer> exitCode() {
        return this.exitCode;
    }

    final void waitForQueue(int length) {
        try {
            while (this.webSocketRef.get().queueSize() + (long)length > 0x1000000L && !Thread.interrupted()) {
                this.checkError();
                Thread.sleep(50L);
            }
        }
        catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }

    final void checkError() {
        if (this.exitCode.isDone()) {
            try {
                this.exitCode.getNow(null);
            }
            catch (CompletionException e) {
                throw KubernetesClientException.launderThrowable(e.getCause());
            }
        }
    }

    private final class ListenerStream {
        private MessageHandler handler;
        private ExecWatchInputStream inputStream;
        private String name;

        public ListenerStream(String name) {
            this.name = name;
        }

        private void handle(ByteBuffer byteString, WebSocket webSocket) throws IOException {
            if (this.handler != null) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("exec message received {} bytes on channel {}", (Object)byteString.remaining(), (Object)this.name);
                }
                this.handler.handle(byteString);
            } else {
                if (LOGGER.isDebugEnabled()) {
                    String message = ExecWebSocketListener.toString(byteString);
                    if (message.length() > 200) {
                        message = message.substring(0, 197) + "...";
                    }
                    LOGGER.debug("exec message received on channel {}: {}", (Object)this.name, (Object)message);
                }
                webSocket.request();
            }
        }
    }

    @FunctionalInterface
    public static interface MessageHandler {
        public void handle(ByteBuffer var1) throws IOException;
    }

    private final class SimpleResponse
    implements ExecListener.Response {
        private final HttpResponse<?> response;

        private SimpleResponse(HttpResponse<?> response) {
            this.response = response;
        }

        @Override
        public int code() {
            return this.response.code();
        }

        @Override
        public String body() throws IOException {
            return this.response.bodyString();
        }
    }
}

