/*
 * Decompiled with CFR 0.152.
 */
package com.hivemq.client.internal.mqtt;

import com.hivemq.client.internal.mqtt.MqttBlockingClient;
import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.MqttRxClient;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnectBuilder;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnectBuilder;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublishBuilder;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribeBuilder;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribeBuilder;
import com.hivemq.client.internal.mqtt.util.MqttChecks;
import com.hivemq.client.internal.rx.RxFutureConverter;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.NotNull;
import com.hivemq.client.internal.shaded.org.jetbrains.annotations.Nullable;
import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.disconnect.Mqtt5Disconnect;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.Mqtt5Subscribe;
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
import io.reactivex.FlowableSubscriber;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MqttAsyncClient
implements Mqtt5AsyncClient {
    @NotNull
    private final MqttRxClient delegate;

    @NotNull
    private static @NotNull CompletableFuture<@NotNull Mqtt5SubAck> handleSubAck(@NotNull @NotNull CompletableFuture<@NotNull Mqtt5SubAck> future, @NotNull MqttSubscribe subscribe) {
        if (subscribe.getSubscriptions().size() == 1) {
            return future;
        }
        CompletableFuture<Mqtt5SubAck> mappedFuture = new CompletableFuture<Mqtt5SubAck>();
        future.whenComplete((subAck, throwable) -> {
            if (throwable != null) {
                mappedFuture.completeExceptionally((Throwable)throwable);
            } else {
                try {
                    mappedFuture.complete(MqttBlockingClient.handleSubAck(subAck));
                }
                catch (Throwable t) {
                    mappedFuture.completeExceptionally(t);
                }
            }
        });
        return mappedFuture;
    }

    @NotNull
    private static @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> handleUnsubAck(@NotNull @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> future, @NotNull MqttUnsubscribe unsubscribe) {
        if (unsubscribe.getTopicFilters().size() == 1) {
            return future;
        }
        CompletableFuture<Mqtt5UnsubAck> mappedFuture = new CompletableFuture<Mqtt5UnsubAck>();
        future.whenComplete((unsubAck, throwable) -> {
            if (throwable != null) {
                mappedFuture.completeExceptionally((Throwable)throwable);
            } else {
                try {
                    mappedFuture.complete(MqttBlockingClient.handleUnsubAck(unsubAck));
                }
                catch (Throwable t) {
                    mappedFuture.completeExceptionally(t);
                }
            }
        });
        return mappedFuture;
    }

    MqttAsyncClient(@NotNull MqttRxClient delegate) {
        this.delegate = delegate;
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5ConnAck> connect() {
        return this.connect(MqttConnect.DEFAULT);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5ConnAck> connect(@Nullable Mqtt5Connect connect) {
        MqttConnect mqttConnect = MqttChecks.connect(connect);
        return RxFutureConverter.toFuture(this.delegate.connect(mqttConnect));
    }

    public  @NotNull MqttConnectBuilder.Send<CompletableFuture<Mqtt5ConnAck>> connectWith() {
        return new MqttConnectBuilder.Send<CompletableFuture<Mqtt5ConnAck>>(this::connect);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(@Nullable Mqtt5Subscribe subscribe) {
        MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
        return MqttAsyncClient.handleSubAck(RxFutureConverter.toFuture(this.delegate.subscribe(mqttSubscribe)), mqttSubscribe);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(@Nullable Mqtt5Subscribe subscribe, @Nullable Consumer<@NotNull Mqtt5Publish> callback) {
        return this.subscribe(subscribe, callback, false);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(@Nullable Mqtt5Subscribe subscribe, @Nullable Consumer<@NotNull Mqtt5Publish> callback, @Nullable Executor executor) {
        return this.subscribe(subscribe, callback, executor, false);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(@Nullable Mqtt5Subscribe subscribe, @Nullable Consumer<@NotNull Mqtt5Publish> callback, boolean manualAcknowledgement) {
        MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
        Checks.notNull(callback, "Callback");
        return MqttAsyncClient.handleSubAck(this.delegate.subscribePublishes(mqttSubscribe, manualAcknowledgement).subscribeSingleFuture((Subscriber<Mqtt5Publish>)new CallbackSubscriber(callback)), mqttSubscribe);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(@Nullable Mqtt5Subscribe subscribe, @Nullable Consumer<@NotNull Mqtt5Publish> callback, @Nullable Executor executor, boolean manualAcknowledgement) {
        MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
        Checks.notNull(callback, "Callback");
        Checks.notNull(executor, "Executor");
        return MqttAsyncClient.handleSubAck(this.delegate.subscribePublishesUnsafe(mqttSubscribe, manualAcknowledgement).observeOnBoth(Schedulers.from((Executor)executor), true).subscribeSingleFuture((Subscriber<Mqtt5Publish>)new CallbackSubscriber(callback)), mqttSubscribe);
    }

    @Override
    @NotNull
    public MqttSubscribeAndCallbackBuilder subscribeWith() {
        return new MqttSubscribeAndCallbackBuilder();
    }

    @Override
    public void publishes(@Nullable MqttGlobalPublishFilter filter, @Nullable Consumer<@NotNull Mqtt5Publish> callback) {
        this.publishes(filter, callback, false);
    }

    @Override
    public void publishes(@Nullable MqttGlobalPublishFilter filter, @Nullable Consumer<@NotNull Mqtt5Publish> callback, @Nullable Executor executor) {
        this.publishes(filter, callback, executor, false);
    }

    @Override
    public void publishes(@Nullable MqttGlobalPublishFilter filter, @Nullable Consumer<@NotNull Mqtt5Publish> callback, boolean manualAcknowledgement) {
        Checks.notNull(filter, "Global publish filter");
        Checks.notNull(callback, "Callback");
        this.delegate.publishes(filter, manualAcknowledgement).subscribe((FlowableSubscriber)new CallbackSubscriber(callback));
    }

    @Override
    public void publishes(@Nullable MqttGlobalPublishFilter filter, @Nullable Consumer<@NotNull Mqtt5Publish> callback, @Nullable Executor executor, boolean manualAcknowledgement) {
        Checks.notNull(filter, "Global publish filter");
        Checks.notNull(callback, "Callback");
        Checks.notNull(executor, "Executor");
        this.delegate.publishesUnsafe(filter, manualAcknowledgement).observeOn(Schedulers.from((Executor)executor), true).subscribe((FlowableSubscriber)new CallbackSubscriber(callback));
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> unsubscribe(@Nullable Mqtt5Unsubscribe unsubscribe) {
        MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);
        return MqttAsyncClient.handleUnsubAck(RxFutureConverter.toFuture(this.delegate.unsubscribe(mqttUnsubscribe)), mqttUnsubscribe);
    }

    public  @NotNull MqttUnsubscribeBuilder.Send<CompletableFuture<Mqtt5UnsubAck>> unsubscribeWith() {
        return new MqttUnsubscribeBuilder.Send<CompletableFuture<Mqtt5UnsubAck>>(this::unsubscribe);
    }

    @Override
    @NotNull
    public @NotNull CompletableFuture<@NotNull Mqtt5PublishResult> publish(@Nullable Mqtt5Publish publish) {
        MqttPublish mqttPublish = MqttChecks.publish(publish);
        return RxFutureConverter.toFuture(this.delegate.publish(mqttPublish));
    }

    public  @NotNull MqttPublishBuilder.Send<CompletableFuture<Mqtt5PublishResult>> publishWith() {
        return new MqttPublishBuilder.Send<CompletableFuture<Mqtt5PublishResult>>(this::publish);
    }

    @Override
    @NotNull
    public CompletableFuture<Void> reauth() {
        return RxFutureConverter.toFuture(this.delegate.reauth());
    }

    @Override
    @NotNull
    public CompletableFuture<Void> disconnect() {
        return this.disconnect(MqttDisconnect.DEFAULT);
    }

    @Override
    @NotNull
    public CompletableFuture<Void> disconnect(@Nullable Mqtt5Disconnect disconnect) {
        MqttDisconnect mqttDisconnect = MqttChecks.disconnect(disconnect);
        return RxFutureConverter.toFuture(this.delegate.disconnect(mqttDisconnect));
    }

    public  @NotNull MqttDisconnectBuilder.Send<CompletableFuture<Void>> disconnectWith() {
        return new MqttDisconnectBuilder.Send<CompletableFuture<Void>>(this::disconnect);
    }

    @Override
    @NotNull
    public MqttClientConfig getConfig() {
        return this.delegate.getConfig();
    }

    @Override
    @NotNull
    public MqttRxClient toRx() {
        return this.delegate;
    }

    @Override
    @NotNull
    public MqttBlockingClient toBlocking() {
        return this.delegate.toBlocking();
    }

    private class MqttSubscribeAndCallbackBuilder
    extends MqttSubscribeBuilder<MqttSubscribeAndCallbackBuilder>
    implements Mqtt5AsyncClient.Mqtt5SubscribeAndCallbackBuilder.Start.Complete,
    Mqtt5AsyncClient.Mqtt5SubscribeAndCallbackBuilder.Call.Ex {
        @Nullable
        private Consumer<Mqtt5Publish> callback;
        @Nullable
        private Executor executor;
        private boolean manualAcknowledgement;

        private MqttSubscribeAndCallbackBuilder() {
        }

        @Override
        @NotNull
        protected MqttSubscribeAndCallbackBuilder self() {
            return this;
        }

        @Override
        @NotNull
        public MqttSubscribeAndCallbackBuilder callback(@Nullable Consumer<Mqtt5Publish> callback) {
            this.callback = Checks.notNull(callback, "Callback");
            return this;
        }

        @Override
        @NotNull
        public MqttSubscribeAndCallbackBuilder executor(@Nullable Executor executor) {
            this.executor = Checks.notNull(executor, "Executor");
            return this;
        }

        @Override
        @NotNull
        public MqttSubscribeAndCallbackBuilder manualAcknowledgement(boolean manualAcknowledgement) {
            this.manualAcknowledgement = manualAcknowledgement;
            return this;
        }

        @Override
        @NotNull
        public CompletableFuture<Mqtt5SubAck> send() {
            MqttSubscribe subscribe = this.build();
            if (this.callback == null) {
                Checks.state(this.executor == null, "Executor must not be given if callback is null.");
                Checks.state(!this.manualAcknowledgement, "Manual acknowledgement must not be true if callback is null.");
                return MqttAsyncClient.this.subscribe(subscribe);
            }
            if (this.executor == null) {
                return MqttAsyncClient.this.subscribe((Mqtt5Subscribe)subscribe, this.callback, this.manualAcknowledgement);
            }
            return MqttAsyncClient.this.subscribe(subscribe, this.callback, this.executor, this.manualAcknowledgement);
        }
    }

    private static class CallbackSubscriber
    implements FlowableSubscriber<Mqtt5Publish> {
        @NotNull
        private final Consumer<Mqtt5Publish> callback;

        private CallbackSubscriber(@NotNull Consumer<Mqtt5Publish> callback) {
            this.callback = callback;
        }

        public void onSubscribe(@NotNull Subscription s) {
            s.request(Long.MAX_VALUE);
        }

        public void onNext(@NotNull Mqtt5Publish publish) {
            this.callback.accept(publish);
        }

        public void onComplete() {
        }

        public void onError(@NotNull Throwable t) {
        }
    }
}

