自定义Runner代码设计说明文档
需要依赖的jar包
自定义runner任务依赖 runner-common.jar
<dependency>
<groupid>tencent.lhotse.runner</groupid>
<artifactid>runner-common</artifactid>
<version>1.2.2-SNAPSHOT</version>
</dependency>
继承的基类
- 与hive相关的实现可以继承AbstractTDWDDCTaskRunner这个基类,继承其中对数据库表的debug。
- HDFS与传统数据库导入的runner可以继承HDFSToDBRunner或者DBToHdfsRunner,调用DBUtil类来调用JDBC执行DAO
- 普通的runner直接继承jar包中的AbstractTaskRunner基类就可以了。
必须实现的抽象方法
重载AbstractTaskRunner的execute()方法和kill()方法。
启动任务流程
- 新建main 函数 public static void main(String[] args) {} 并在main 函数中实例化runner对象,将args[0] 作为实例化对象的传入参数。
- 在main函数中调用对象startwork()方法,启动自定义runner。
获取参数的方法
通过this.getTask()方法获取TaskRuntimeInfo对象。
通过TaskRuntimeInfo对象,获取获取用户填写参数:
获取实例和任务信息
task.getId() 获取实例id
task.getType() 获取任务类型
task.getCurRunDate() 获取实例数据时间
task.getNextRunDate() 获取实例下一个数据时间
获取用户在ui上填写的填写任务参数
this.getProperties().get(key) //获取任务参数.
日志输出方法
this.writeLocalLog(Level.INFO, "**");
提交任务实例执行状态
this.commitTask(state, runtimeId, desc);
说明:常用运行状态有:RUNNING(正在运行),KILLED(已经停止),SUCCESSFUL(成功),FAILED(失败)
停止任务实例运行
停止实例运行前,清理runner资源,并提交停止状态。
我们实现了runner调度资源的清理方法, CommonUtils.killProcess(this.taskRuntime, this);
建议开发者,使用统一的停止方法,如下:
public void kill() throws IOException {
this.writeLocalLog(Level.INFO, " hello word had been kill ");
boolean killResult = false;
try {
killResult = CommonUtils.killProcess(this.taskRuntime, this);
if (killResult) {
this.writeLocalLog(Level.SEVERE, "kill job succeed!");
this.commitTask(LState.KILLED, "", "kill job succeed!");
} else {
this.writeLocalLog(Level.SEVERE, "kill job failed!");
this.commitTask(LState.HANGED, "", "kill job failed!");
}
} catch (Exception e) {
this.writeLocalLog(Level.SEVERE,
"kill job failed:" + CommonUtils.stackTraceToString(e));
this.commitTask(LState.HANGED, "", "kill job failed!");
}
}
停止任务实例运行
public void kill() throws IOException {
this.writeLocalLog(Level.INFO, " hello word had been kill ");
boolean killResult = false;
try {
killResult = CommonUtils.killProcess(this.taskRuntime, this);
if (killResult) {
this.writeLocalLog(Level.SEVERE, "kill job succeed!");
this.commitTask(LState.KILLED, "", "kill job succeed!");
} else {
this.writeLocalLog(Level.SEVERE, "kill job failed!");
this.commitTask(LState.HANGED, "", "kill job failed!");
}
} catch (Exception e) {
this.writeLocalLog(Level.SEVERE,
"kill job failed:" + CommonUtils.stackTraceToString(e));
this.commitTask(LState.HANGED, "", "kill job failed!");
}
}
HellowordRunner示例
import com.tencent.teg.dc.lhotse.proto.LhotseObject.LState;
import com.tencent.teg.dc.lhotse.runner.AbstractCustomTaskTypeRunner;
import com.tencent.teg.dc.lhotse.runner.util.*;
import org.apache.commons.lang.StringUtils;
import java.io.IOException;
import java.util.logging.Level;
public class Helloword extends AbstractCustomTaskTypeRunner {
public Helloword(String configFileName) {
super(configFileName);
}
private boolean success = false;
public static void main(String[] args) {
String configure = "";
if(args == null ||args.length<1){
System.out.println("需要配置文件作为输入参数");
System.exit(2);
}else{
configure = args[0];
}
Helloword runner = new Helloword(configure);
runner.startWork();
}
@Override
public void execute() throws IOException {
try {
taskRuntime = this.getTask();
String p1 = taskRuntime.getProperties().get("parameter1");
this.writeLocalLog(Level.INFO, "get parameter 1 =" + p1);
if(!"kill".equals(p1)){
commitTaskAndLog(LState.FAILED, "", "parameter is kill");
}
success = true;
} catch (Exception e) {
this.writeLocalLog(Level.SEVERE ,"执行HelloWord runner 出现异常");
String st = CommonUtils.stackTraceToString(e);
this.writeLocalLog(Level.SEVERE, "Exception stackTrace: " + st);
commitTaskAndLog(LState.RUNNING, "", "Exception: " + e.getMessage());
throw new IOException(e);
}
finally {
if (!success) {
commitTaskAndLog(LState.FAILED, "", "failed");
} else {
commitTaskAndLog(LState.SUCCESSFUL, "", "success execute");
}
}
}
@Override
public void kill() throws IOException {
this.writeLocalLog(Level.INFO, " hello word had been kill ");
boolean killResult = false;
try {
killResult = CommonUtils.killProcess(this.taskRuntime, this);
if (killResult) {
this.writeLocalLog(Level.SEVERE, "kill job succeed!");
this.commitTask(LState.KILLED, "", "kill job succeed!");
} else {
this.writeLocalLog(Level.SEVERE, "kill job failed!");
this.commitTask(LState.HANGED, "", "kill job failed!");
}
} catch (Exception e) {
this.writeLocalLog(Level.SEVERE,
"kill job failed:" + CommonUtils.stackTraceToString(e));
this.commitTask(LState.HANGED, "", "kill job failed!");
}
}
private void commitTaskAndLog(LState state, String runtimeId, String desc) {
try {
if (desc != null && desc.length() > 4000) {
desc = StringUtils.substring(desc, 0, 4000);
}
this.commitTask(state, runtimeId, desc);
}
catch (Exception e) {
String st = CommonUtils.stackTraceToString(e);
this.writeLocalLog(Level.INFO, "Log_desc :" + desc);
this.writeLocalLog(Level.SEVERE, "Commit task failed, StackTrace: " + st);
}
}
}