package com.aizuda.snailjob.server.job.task.support.dispatch;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import com.aizuda.snailjob.common.core.enums.JobTaskTypeEnum;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.exception.SnailJobServerException;
import com.aizuda.snailjob.server.job.task.dto.ReduceTaskDTO;
import com.aizuda.snailjob.server.job.task.support.JobTaskConverter;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorContext;
import com.aizuda.snailjob.server.job.task.support.executor.job.JobExecutorFactory;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerateContext;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGenerator;
import com.aizuda.snailjob.server.job.task.support.generator.task.JobTaskGeneratorFactory;
import com.aizuda.snailjob.server.job.task.support.handler.DistributedLockHandler;
import com.aizuda.snailjob.server.job.task.support.handler.JobTaskBatchHandler;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.JobTaskMapper;
import com.aizuda.snailjob.template.datasource.persistence.mapper.WorkflowTaskBatchMapper;
import com.aizuda.snailjob.template.datasource.persistence.po.Job;
import com.aizuda.snailjob.template.datasource.persistence.po.JobTask;
import com.aizuda.snailjob.template.datasource.persistence.po.WorkflowTaskBatch;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.PageDTO;
import java.lang.invoke.SerializedLambda;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import lombok.Generated;
import org.apache.pekko.actor.AbstractActor;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component("JobReduceActor")
/* loaded from: input_file:com/aizuda/snailjob/server/job/task/support/dispatch/ReduceActor.class */
public class ReduceActor extends AbstractActor {
    private static final String KEY = "job_generate_reduce_{0}_{1}";
    private final DistributedLockHandler distributedLockHandler;
    private final JobMapper jobMapper;
    private final JobTaskMapper jobTaskMapper;
    private final WorkflowTaskBatchMapper workflowTaskBatchMapper;
    private final JobTaskBatchHandler jobTaskBatchHandler;

    public AbstractActor.Receive createReceive() {
        return receiveBuilder().match(ReduceTaskDTO.class, reduceTaskDTO -> {
            SnailJobLog.LOCAL.info("执行Reduce, [{}]", new Object[]{JsonUtil.toJsonString(reduceTaskDTO)});
            try {
                Assert.notNull(reduceTaskDTO.getMrStage(), () -> {
                    return new SnailJobServerException("mrStage can not be null");
                });
                Assert.notNull(reduceTaskDTO.getJobId(), () -> {
                    return new SnailJobServerException("jobId can not be null");
                });
                Assert.notNull(reduceTaskDTO.getTaskBatchId(), () -> {
                    return new SnailJobServerException("taskBatchId can not be null");
                });
                this.distributedLockHandler.lockWithDisposableAndRetry(() -> {
                    doReduce(reduceTaskDTO);
                }, MessageFormat.format(KEY, reduceTaskDTO.getTaskBatchId(), reduceTaskDTO.getJobId()), Duration.ofSeconds(1L), Duration.ofSeconds(2L), 6);
            } catch (Exception e) {
                SnailJobLog.LOCAL.error("Reduce processing exception. [{}]", new Object[]{reduceTaskDTO, e});
            }
        }).build();
    }

    private void doReduce(ReduceTaskDTO reduceTaskDTO) {
        if (CollUtil.isNotEmpty(this.jobTaskMapper.selectList(new PageDTO(1L, 1L), (Wrapper) ((LambdaQueryWrapper) ((LambdaQueryWrapper) new LambdaQueryWrapper().select(new SFunction[]{(v0) -> {
            return v0.getId();
        }}).eq((v0) -> {
            return v0.getTaskBatchId();
        }, reduceTaskDTO.getTaskBatchId())).eq((v0) -> {
            return v0.getMrStage();
        }, reduceTaskDTO.getMrStage())).orderByAsc((v0) -> {
            return v0.getId();
        })))) {
            return;
        }
        Job job = (Job) this.jobMapper.selectById(reduceTaskDTO.getJobId());
        if (JobTaskTypeEnum.MAP_REDUCE.getType() != job.getTaskType().intValue()) {
            return;
        }
        String argStr = this.jobTaskBatchHandler.getArgStr(reduceTaskDTO.getTaskBatchId(), job);
        JobTaskGenerator taskInstance = JobTaskGeneratorFactory.getTaskInstance(Integer.valueOf(JobTaskTypeEnum.MAP_REDUCE.getType()));
        JobTaskGenerateContext jobTaskInstanceGenerateContext = JobTaskConverter.INSTANCE.toJobTaskInstanceGenerateContext(job);
        jobTaskInstanceGenerateContext.setTaskBatchId(reduceTaskDTO.getTaskBatchId());
        jobTaskInstanceGenerateContext.setMrStage(reduceTaskDTO.getMrStage());
        jobTaskInstanceGenerateContext.setWfContext(reduceTaskDTO.getWfContext());
        jobTaskInstanceGenerateContext.setArgsStr(argStr);
        List<JobTask> generate = taskInstance.generate(jobTaskInstanceGenerateContext);
        if (CollUtil.isEmpty(generate)) {
            SnailJobLog.LOCAL.warn("Job task is empty, taskBatchId:[{}]", new Object[]{reduceTaskDTO.getTaskBatchId()});
            return;
        }
        String str = null;
        if (Objects.nonNull(reduceTaskDTO.getWorkflowTaskBatchId())) {
            str = ((WorkflowTaskBatch) this.workflowTaskBatchMapper.selectOne((Wrapper) new LambdaQueryWrapper().select(new SFunction[]{(v0) -> {
                return v0.getWfContext();
            }, (v0) -> {
                return v0.getId();
            }}).eq((v0) -> {
                return v0.getId();
            }, reduceTaskDTO.getWorkflowTaskBatchId()))).getWfContext();
        }
        JobExecutorFactory.getJobExecutor(Integer.valueOf(JobTaskTypeEnum.MAP_REDUCE.getType())).execute(buildJobExecutorContext(reduceTaskDTO, job, generate, str));
    }

    private static JobExecutorContext buildJobExecutorContext(ReduceTaskDTO reduceTaskDTO, Job job, List<JobTask> list, String str) {
        JobExecutorContext jobExecutorContext = JobTaskConverter.INSTANCE.toJobExecutorContext(job);
        jobExecutorContext.setTaskList(list);
        jobExecutorContext.setTaskBatchId(reduceTaskDTO.getTaskBatchId());
        jobExecutorContext.setWorkflowTaskBatchId(reduceTaskDTO.getWorkflowTaskBatchId());
        jobExecutorContext.setWorkflowNodeId(reduceTaskDTO.getWorkflowNodeId());
        jobExecutorContext.setWfContext(str);
        return jobExecutorContext;
    }

    @Generated
    public ReduceActor(DistributedLockHandler distributedLockHandler, JobMapper jobMapper, JobTaskMapper jobTaskMapper, WorkflowTaskBatchMapper workflowTaskBatchMapper, JobTaskBatchHandler jobTaskBatchHandler) {
        this.distributedLockHandler = distributedLockHandler;
        this.jobMapper = jobMapper;
        this.jobTaskMapper = jobTaskMapper;
        this.workflowTaskBatchMapper = workflowTaskBatchMapper;
        this.jobTaskBatchHandler = jobTaskBatchHandler;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1960525789:
                if (implMethodName.equals("getMrStage")) {
                    z = true;
                    break;
                }
                break;
            case -354109142:
                if (implMethodName.equals("getWfContext")) {
                    z = 3;
                    break;
                }
                break;
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = 2;
                    break;
                }
                break;
            case 916018042:
                if (implMethodName.equals("getTaskBatchId")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/JobTask") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getTaskBatchId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/JobTask") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Integer;")) {
                    return (v0) -> {
                        return v0.getMrStage();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/JobTask") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/JobTask") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Long;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/baomidou/mybatisplus/core/toolkit/support/SFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/aizuda/snailjob/template/datasource/persistence/po/WorkflowTaskBatch") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getWfContext();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
