package org.opentestfactory.services.components.logger;

import ch.qos.logback.core.AppenderBase;
import jakarta.inject.Inject;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opentestfactory.dto.v1.WorkflowLogNotification;
import org.opentestfactory.messages.OTFMessageAPI;
import org.opentestfactory.services.components.bus.BusApiClient;

/* loaded from: input_file:WEB-INF/lib/otf-microservice-components-1.9.0.jar:org/opentestfactory/services/components/logger/WorkflowAppender.class */
public class WorkflowAppender extends AppenderBase {
    private static final Integer DEFAULT_QUEUE_SIZE = 100;
    private final BusApiClient busApiClient;
    private boolean run = false;
    protected LinkedBlockingQueue<WorkflowLogNotification> loggingQueue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE.intValue());
    protected Thread publicationWorker;

    /* loaded from: input_file:WEB-INF/lib/otf-microservice-components-1.9.0.jar:org/opentestfactory/services/components/logger/WorkflowAppender$WorkflowAppenderWorker.class */
    private class WorkflowAppenderWorker extends Thread {
        public WorkflowAppenderWorker() {
            setDaemon(true);
            setName("WFAppenderPublicationWorker-" + hashCode());
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                WorkflowAppender.this.publish();
            } catch (InterruptedException e) {
                WorkflowAppender.this.addWarn("Logging worker thread interrupted " + e.getMessage());
                Thread.currentThread().interrupt();
            }
        }
    }

    @Inject
    public WorkflowAppender(BusApiClient busApiClient) {
        this.busApiClient = busApiClient;
    }

    public synchronized void setQueueSize(Integer num) {
        if (this.run) {
            throw new IllegalStateException("Already running, cannot change queue size.");
        }
        addInfo("Logging queue size will be: " + num);
        this.loggingQueue = new LinkedBlockingQueue<>(num.intValue());
    }

    @Override // ch.qos.logback.core.AppenderBase, ch.qos.logback.core.spi.LifeCycle
    public void start() {
        super.start();
        this.run = true;
        this.publicationWorker = new WorkflowAppenderWorker();
        this.publicationWorker.start();
        addInfo("Started");
    }

    @Override // ch.qos.logback.core.AppenderBase, ch.qos.logback.core.spi.LifeCycle
    public void stop() {
        this.run = false;
        addInfo("Initiated stop.");
        super.stop();
    }

    @Override // ch.qos.logback.core.AppenderBase
    protected void append(Object obj) {
        addInfo("Got message");
        WorkflowContext workflowContext = WorkflowContext.get();
        if (workflowContext != null) {
            addInfo("In workflow, queuing notification");
            WorkflowLogNotification buildNotification = buildNotification(workflowContext, obj);
            if (this.loggingQueue.offer(buildNotification)) {
                addInfo("Notification queued");
                return;
            }
            addInfo("The queue is full, publishing old messages before enqueuing.");
            do {
                this.busApiClient.publishEvent(this.loggingQueue.poll());
            } while (!this.loggingQueue.offer(buildNotification));
        }
    }

    public WorkflowLogNotification buildNotification(WorkflowContext workflowContext, Object obj) {
        WorkflowLogNotification workflowLogNotification = new WorkflowLogNotification(OTFMessageAPI.CURRENT_API_VERSION, getName(), workflowContext.workflowId, new WorkflowLogNotification.LogSpec(obj.toString()));
        if (workflowContext.hasJobId()) {
            workflowLogNotification.addJobId(workflowContext.jobId());
        }
        if (workflowContext.hasJobOrigin()) {
            workflowLogNotification.addJobOrigin(workflowContext.jobOrigin());
        }
        if (workflowContext.hasStepId()) {
            workflowLogNotification.addStepId(workflowContext.stepId());
        }
        if (workflowContext.hasStepOrigin()) {
            workflowLogNotification.addStepOrigin(workflowContext.stepOrigin());
        }
        return workflowLogNotification;
    }

    private void publish() throws InterruptedException {
        while (this.run) {
            WorkflowLogNotification poll = this.loggingQueue.poll(500L, TimeUnit.MILLISECONDS);
            while (true) {
                WorkflowLogNotification workflowLogNotification = poll;
                if (workflowLogNotification != null) {
                    addInfo("Publishing message as notification.");
                    try {
                        this.busApiClient.publishEvent(workflowLogNotification);
                    } catch (RuntimeException e) {
                        addError("Failed to publish log " + workflowLogNotification.getSpec().getLogs(), e);
                    }
                    poll = this.loggingQueue.poll();
                }
            }
        }
        addInfo("Got out of publishing main loop.");
    }
}
