/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.helpers;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.mutiny.subscription.MultiEmitter;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class MultiEmitterProcessor<T>
implements Flow.Processor<T, T>,
MultiEmitter<T> {
    private final UnicastProcessor<T> processor;
    private final AtomicReference<Runnable> onTermination = new AtomicReference();
    private final AtomicBoolean terminated = new AtomicBoolean();
    private final AtomicLong requested = new AtomicLong();

    private MultiEmitterProcessor() {
        this.processor = UnicastProcessor.create();
    }

    public static <T> MultiEmitterProcessor<T> create() {
        return new MultiEmitterProcessor<T>();
    }

    @Override
    public MultiEmitter<T> emit(T item) {
        this.onNext(item);
        return this;
    }

    @Override
    public void fail(Throwable failure) {
        this.onError(failure);
    }

    @Override
    public void complete() {
        this.onComplete();
    }

    @Override
    public MultiEmitter<T> onTermination(Runnable onTermination) {
        this.onTermination.set(onTermination);
        return this;
    }

    @Override
    public boolean isCancelled() {
        return this.terminated.get();
    }

    @Override
    public long requested() {
        return this.requested.get();
    }

    @Override
    public void subscribe(final Flow.Subscriber<? super T> subscriber) {
        ((AbstractMulti)this.processor).subscribe(new Flow.Subscriber<T>(){

            @Override
            public void onSubscribe(final Flow.Subscription subscription) {
                subscriber.onSubscribe(new Flow.Subscription(){

                    @Override
                    public void request(long l) {
                        Subscriptions.add(MultiEmitterProcessor.this.requested, l);
                        subscription.request(l);
                    }

                    @Override
                    public void cancel() {
                        subscription.cancel();
                        MultiEmitterProcessor.this.fireTermination();
                    }
                });
            }

            @Override
            public void onNext(T item) {
                subscriber.onNext(item);
            }

            @Override
            public void onError(Throwable failure) {
                subscriber.onError(failure);
                MultiEmitterProcessor.this.fireTermination();
            }

            @Override
            public void onComplete() {
                subscriber.onComplete();
                MultiEmitterProcessor.this.fireTermination();
            }
        });
    }

    private void fireTermination() {
        Runnable runnable;
        if (this.terminated.compareAndSet(false, true) && (runnable = (Runnable)this.onTermination.getAndSet(null)) != null) {
            runnable.run();
        }
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.processor.onSubscribe(subscription);
    }

    @Override
    public void onNext(T item) {
        Subscriptions.subtract(this.requested, 1L);
        this.processor.onNext(item);
    }

    @Override
    public void onError(Throwable failure) {
        this.processor.onError(failure);
    }

    @Override
    public void onComplete() {
        this.processor.onComplete();
    }

    public Multi<T> toMulti() {
        return Multi.createFrom().publisher(this);
    }

    @Override
    public Context context() {
        throw new UnsupportedOperationException("This class is used in tests");
    }
}

