自定义Runner代码设计说明文档

需要依赖的jar包

自定义runner任务依赖 runner-common.jar <dependency> <groupid>tencent.lhotse.runner</groupid> <artifactid>runner-common</artifactid> <version>1.2.2-SNAPSHOT</version> </dependency>


继承的基类

  • 与hive相关的实现可以继承AbstractTDWDDCTaskRunner这个基类,继承其中对数据库表的debug。
  • HDFS与传统数据库导入的runner可以继承HDFSToDBRunner或者DBToHdfsRunner,调用DBUtil类来调用JDBC执行DAO
  • 普通的runner直接继承jar包中的AbstractTaskRunner基类就可以了。

必须实现的抽象方法

重载AbstractTaskRunner的execute()方法和kill()方法。


启动任务流程

  • 新建main 函数 public static void main(String[] args) {} 并在main 函数中实例化runner对象,将args[0] 作为实例化对象的传入参数。
  • 在main函数中调用对象startwork()方法,启动自定义runner。

获取参数的方法

通过this.getTask()方法获取TaskRuntimeInfo对象。

通过TaskRuntimeInfo对象,获取获取用户填写参数:

  • 获取实例和任务信息

    task.getId() 获取实例id

    task.getType() 获取任务类型

    task.getCurRunDate() 获取实例数据时间

    task.getNextRunDate() 获取实例下一个数据时间

  • 获取用户在ui上填写的填写任务参数

    this.getProperties().get(key) //获取任务参数.


日志输出方法

this.writeLocalLog(Level.INFO, "**");


提交任务实例执行状态

this.commitTask(state, runtimeId, desc);

说明:常用运行状态有:RUNNING(正在运行),KILLED(已经停止),SUCCESSFUL(成功),FAILED(失败)


停止任务实例运行

停止实例运行前,清理runner资源,并提交停止状态。

我们实现了runner调度资源的清理方法, CommonUtils.killProcess(this.taskRuntime, this);

建议开发者,使用统一的停止方法,如下:

public void kill() throws IOException {
    this.writeLocalLog(Level.INFO, " hello word had been kill ");
    boolean killResult = false;
    try {
        killResult = CommonUtils.killProcess(this.taskRuntime, this);
        if (killResult) {
            this.writeLocalLog(Level.SEVERE, "kill job succeed!");
            this.commitTask(LState.KILLED, "", "kill job succeed!");
        } else {
            this.writeLocalLog(Level.SEVERE, "kill job failed!");
            this.commitTask(LState.HANGED, "", "kill job failed!");
        }
    } catch (Exception e) {
        this.writeLocalLog(Level.SEVERE,
                "kill job failed:" + CommonUtils.stackTraceToString(e));
        this.commitTask(LState.HANGED, "", "kill job failed!");
    }
}

停止任务实例运行

public void kill() throws IOException {
    this.writeLocalLog(Level.INFO, " hello word had been kill ");
    boolean killResult = false;
    try {
        killResult = CommonUtils.killProcess(this.taskRuntime, this);
        if (killResult) {
            this.writeLocalLog(Level.SEVERE, "kill job succeed!");
            this.commitTask(LState.KILLED, "", "kill job succeed!");
        } else {
            this.writeLocalLog(Level.SEVERE, "kill job failed!");
            this.commitTask(LState.HANGED, "", "kill job failed!");
        }
    } catch (Exception e) {
        this.writeLocalLog(Level.SEVERE,
                "kill job failed:" + CommonUtils.stackTraceToString(e));
        this.commitTask(LState.HANGED, "", "kill job failed!");
    }
}
HellowordRunner示例
import com.tencent.teg.dc.lhotse.proto.LhotseObject.LState;  
import com.tencent.teg.dc.lhotse.runner.AbstractCustomTaskTypeRunner;  
import com.tencent.teg.dc.lhotse.runner.util.*;  
import org.apache.commons.lang.StringUtils;  
import java.io.IOException;
import java.util.logging.Level;

public class Helloword extends AbstractCustomTaskTypeRunner {

public Helloword(String configFileName) {
    super(configFileName);
}

private boolean success = false;

public static void main(String[] args) {

    String configure = "";
    if(args == null ||args.length&lt;1){
        System.out.println("需要配置文件作为输入参数");
        System.exit(2);
    }else{
        configure = args[0];
    }

    Helloword runner = new Helloword(configure);
    runner.startWork();
}

@Override
public void execute() throws IOException {

    try {
        taskRuntime = this.getTask();
        String p1 = taskRuntime.getProperties().get("parameter1");

        this.writeLocalLog(Level.INFO, "get parameter 1 =" + p1);

        if(!"kill".equals(p1)){
            commitTaskAndLog(LState.FAILED, "", "parameter is kill");
        }
        success = true;
    } catch (Exception e) {
        this.writeLocalLog(Level.SEVERE ,"执行HelloWord runner 出现异常");

        String st = CommonUtils.stackTraceToString(e);
        this.writeLocalLog(Level.SEVERE, "Exception stackTrace: " + st);

        commitTaskAndLog(LState.RUNNING, "", "Exception: " + e.getMessage());
        throw new IOException(e);
    }
    finally {
        if (!success) {
            commitTaskAndLog(LState.FAILED, "", "failed");
        } else {
            commitTaskAndLog(LState.SUCCESSFUL, "", "success execute");

        }
    }
}


@Override
public void kill() throws IOException {
    this.writeLocalLog(Level.INFO, " hello word had been kill ");
    boolean killResult = false;
    try {
        killResult = CommonUtils.killProcess(this.taskRuntime, this);
        if (killResult) {
            this.writeLocalLog(Level.SEVERE, "kill job succeed!");
            this.commitTask(LState.KILLED, "", "kill job succeed!");
        } else {
            this.writeLocalLog(Level.SEVERE, "kill job failed!");
            this.commitTask(LState.HANGED, "", "kill job failed!");
        }
    } catch (Exception e) {
        this.writeLocalLog(Level.SEVERE,
                "kill job failed:" + CommonUtils.stackTraceToString(e));
        this.commitTask(LState.HANGED, "", "kill job failed!");
    }
}

private void commitTaskAndLog(LState state, String runtimeId, String desc) {
    try {
        if (desc != null && desc.length() &gt; 4000) {
            desc = StringUtils.substring(desc, 0, 4000);
        }
        this.commitTask(state, runtimeId, desc);
    }
    catch (Exception e) {
        String st = CommonUtils.stackTraceToString(e);
        this.writeLocalLog(Level.INFO, "Log_desc :" + desc);
        this.writeLocalLog(Level.SEVERE, "Commit task failed, StackTrace: " + st);
    }
}

}

results matching ""

    No results matching ""