Launching Tasks from a Stream
1. 通过 Stream 启动 Task
您可以使用 tasklauncher-dataflow Sink 从 Stream 中启动任务。Sink 连接 Data Flow server 并使用 REST API 接口启动已定义好的 Task。task launch request(任务启动请求)
接受一个 JSON payload ,可以指定 Task Name ,以及命令行参数和部署属性。
app-starters-task-launch-request-common 配合 Spring Cloud Stream 的 functional composition 功能,可以将任何 Source 和 Sink 的 输出 转换为 task launch request.
添加依赖 app-starters-task-launch-request-common
,自动配置java.util.function.Function
实现,通过 Spring Cloud Function 注册为taskLaunchRequest
。
例如,您可以从 time source 开始,添加以下依赖项,重新构建,将其注册为自定义的 source 。我们将这个例子名称定义为 time-tlr
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>app-starters-task-launch-request-common</artifactId>
</dependency>
接下来, 注册 一个 tasklauncher-dataflow
Sink ,并创建一个Task (我们将使用timestamp task )。
stream create --name task-every-minute \
--definition \
"time-tlr --trigger.fixed-delay=60
--spring.cloud.stream.function.definition=taskLaunchRequest
--task.launch.request.task-name=timestamp-task | tasklauncher-dataflow" \
--deploy
上述的 Stream 每分钟会生成一个 task launch request。该请求提供了启动的任务名称:{"name":"timestamp-task"}
。
以下 Stream definition 说明了命令行参数的使用。它将生成消息,如下为任务提供命令行参数:
{
"args": [
"foo=bar",
"time=12/03/18 17:44:12"
],
"deploymentProps": {},
"name": "timestamp-task"
}
stream create --name task-every-second \
--definition \
"time-tlr --spring.cloud.stream.function.definition=taskLaunchRequest
--task.launch.request.task-name=timestamp-task
--task.launch.request.args=foo=bar
--task.launch.request.arg-expressions=time=payload | tasklauncher-dataflow" \
--deploy
请注意,SpEL表达式会映射所有有效消息到time
命令行参数,以及静态参数 foo=bar
接下来,您可以使用shell命令task execution list
查看 Task 的执行列表,如下所示(包括输出结果):
dataflow:>task execution list
╔════════════════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
║ Task Name │ID│ Start Time │ End Time │Exit Code║
╠════════════════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
║timestamp-task_26176│4 │Tue May 02 12:13:49 EDT 2017│Tue May 02 12:13:49 EDT 2017│0 ║
║timestamp-task_32996│3 │Tue May 02 12:12:49 EDT 2017│Tue May 02 12:12:49 EDT 2017│0 ║
║timestamp-task_58971│2 │Tue May 02 12:11:50 EDT 2017│Tue May 02 12:11:50 EDT 2017│0 ║
║timestamp-task_13467│1 │Tue May 02 12:10:50 EDT 2017│Tue May 02 12:10:50 EDT 2017│0 ║
╚════════════════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝
在此示例中,我们展示了如何使用time
Source 以固定速率启动 Task 。此模式可以应用于任何 Source 进行启动 Task 以及响应任何事件。
1. 从流中启动 Composed Task
可以使用tasklauncher-dataflow
Sink 启动 Composed Task ,如下所述。由于我们直接使用 ComposedTaskRunner,所以在创建启动 Composed Task 的 Stream 之前,我们需要为 Composed Task 运行器本身以及 Composed Task 设置 Task Defintion。假设我们创建以下 Composed Task Defintion : AAA && BBB
。第一步是创建 Task Defintion ,如下例所示:
task create composed-task-runner --definition "composed-task-runner"
task create AAA --definition "timestamp"
task create BBB --definition "timestamp"
现在我们已经准备好了 Composed Task Defintion 所需的 Task Defintion,我们需要创建一个启动ComposedTaskRunner
的Stream。因此,在这种情况下,我们需要
time
,Source 会定时发送启动 Task 的请求,如上所述。tasklauncher-dataflow
,用于启动ComposedTaskRunner
的 Sink。
该 Stream 应类似于以下内容:
stream create ctr-stream \
--definition \
"time --fixed-delay=30
--task.launch.request.task-name=composed-task-launcher
--task.launch.request.args=
--graph=AAA&&BBB,
--increment-instance-enabled=true | tasklauncher-dataflow"
现在,我们重点关注一下ComposedTaskRunner
启动所需的配置:
graph:这是由
ComposedTaskRunner
执行的 graph 。在这里是AAA&&BBB
。increment-instance-enabled:配置表示
ComposedTaskRunner
每次执行都是唯一的。ComposedTaskRunner
是使用 Spring Batch 构建的。因此,我们希望每次启动时都有一个新的ComposedTaskRunner
实例。要做到这一点,我们将increment-instance-enabled
成为true
。
Last updated
Was this helpful?