package io.micronaut.http.netty.stream;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.netty.content.HttpContentUtil;
import io.netty.handler.codec.http.HttpContent;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

@Internal
/* loaded from: input_file:WEB-INF/lib/micronaut-http-netty-4.1.11.jar:io/micronaut/http/netty/stream/JsonSubscriber.class */
public final class JsonSubscriber implements CoreSubscriber<HttpContent> {
    private final AtomicBoolean empty = new AtomicBoolean(true);
    private final CoreSubscriber<? super HttpContent> upstream;

    public JsonSubscriber(CoreSubscriber<? super HttpContent> coreSubscriber) {
        this.upstream = coreSubscriber;
    }

    @Override // reactor.core.CoreSubscriber
    public Context currentContext() {
        return this.upstream.currentContext();
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        this.upstream.onSubscribe(subscription);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(HttpContent httpContent) {
        if (this.empty.compareAndSet(true, false)) {
            this.upstream.onNext(HttpContentUtil.prefixOpenBracket(httpContent));
        } else {
            this.upstream.onNext(HttpContentUtil.prefixComma(httpContent));
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        this.upstream.onError(th);
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.empty.get()) {
            this.upstream.onNext(HttpContentUtil.prefixOpenBracket(HttpContentUtil.closeBracket()));
        } else {
            this.upstream.onNext(HttpContentUtil.closeBracket());
        }
        this.upstream.onComplete();
    }

    public static Flux<HttpContent> lift(Publisher<HttpContent> publisher) {
        return (Flux) Operators.lift((scannable, coreSubscriber) -> {
            return new JsonSubscriber(coreSubscriber);
        }).apply(publisher);
    }
}
