/*
 * Decompiled with CFR 0.152.
 */
package org.opentestfactory.services.components.logger;

import ch.qos.logback.core.AppenderBase;
import jakarta.inject.Inject;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.opentestfactory.dto.v1.WorkflowLogNotification;
import org.opentestfactory.messages.OTFMessage;
import org.opentestfactory.services.components.bus.BusApiClient;
import org.opentestfactory.services.components.logger.WorkflowContext;

public class WorkflowAppender
extends AppenderBase {
    private static final Integer DEFAULT_QUEUE_SIZE = 100;
    private final BusApiClient busApiClient;
    private boolean run = false;
    protected LinkedBlockingQueue<WorkflowLogNotification> loggingQueue = new LinkedBlockingQueue(DEFAULT_QUEUE_SIZE);
    protected Thread publicationWorker;

    @Inject
    public WorkflowAppender(BusApiClient busApiClient) {
        this.busApiClient = busApiClient;
    }

    public synchronized void setQueueSize(Integer size) {
        if (this.run) {
            throw new IllegalStateException("Already running, cannot change queue size.");
        }
        this.addInfo("Logging queue size will be: " + size);
        this.loggingQueue = new LinkedBlockingQueue(size);
    }

    public void start() {
        super.start();
        this.run = true;
        this.publicationWorker = new WorkflowAppenderWorker();
        this.publicationWorker.start();
        this.addInfo("Started");
    }

    public void stop() {
        this.run = false;
        this.addInfo("Initiated stop.");
        super.stop();
    }

    protected void append(Object e) {
        this.addInfo("Got message");
        WorkflowContext ctx = WorkflowContext.get();
        if (ctx != null) {
            this.addInfo("In workflow, queuing notification");
            WorkflowLogNotification notification = this.buildNotification(ctx, e);
            if (this.loggingQueue.offer(notification)) {
                this.addInfo("Notification queued");
            } else {
                this.addInfo("The queue is full, publishing old messages before enqueuing.");
                do {
                    WorkflowLogNotification oldNotification = this.loggingQueue.poll();
                    this.busApiClient.publishEvent((OTFMessage)oldNotification);
                } while (!this.loggingQueue.offer(notification));
            }
        }
    }

    public WorkflowLogNotification buildNotification(WorkflowContext ctx, Object e) {
        WorkflowLogNotification notification = new WorkflowLogNotification("opentestfactory.org/v1alpha1", this.getName(), ctx.workflowId, new WorkflowLogNotification.LogSpec(new String[]{e.toString()}));
        if (ctx.hasJobId()) {
            notification.addJobId(ctx.jobId());
        }
        if (ctx.hasJobOrigin()) {
            notification.addJobOrigin(ctx.jobOrigin());
        }
        if (ctx.hasStepId()) {
            notification.addStepId(ctx.stepId());
        }
        if (ctx.hasStepOrigin()) {
            notification.addStepOrigin(ctx.stepOrigin());
        }
        return notification;
    }

    private void publish() throws InterruptedException {
        while (this.run) {
            WorkflowLogNotification notification = this.loggingQueue.poll(500L, TimeUnit.MILLISECONDS);
            while (notification != null) {
                this.addInfo("Publishing message as notification.");
                try {
                    this.busApiClient.publishEvent((OTFMessage)notification);
                }
                catch (RuntimeException e) {
                    this.addError("Failed to publish log " + String.valueOf(notification.getSpec().getLogs()), e);
                }
                notification = this.loggingQueue.poll();
            }
        }
        this.addInfo("Got out of publishing main loop.");
    }

    private class WorkflowAppenderWorker
    extends Thread {
        public WorkflowAppenderWorker() {
            this.setDaemon(true);
            this.setName("WFAppenderPublicationWorker-" + this.hashCode());
        }

        @Override
        public void run() {
            try {
                WorkflowAppender.this.publish();
            }
            catch (InterruptedException ex) {
                WorkflowAppender.this.addWarn("Logging worker thread interrupted " + ex.getMessage());
                Thread.currentThread().interrupt();
            }
        }
    }
}

