Launching Tasks from a Stream
1. 通过 Stream 启动 Task
您可以使用 tasklauncher-dataflow Sink 从 Stream 中启动任务。Sink 连接 Data Flow server 并使用 REST API 接口启动已定义好的 Task。task launch request(任务启动请求)
接受一个 JSON payload ,可以指定 Task Name ,以及命令行参数和部署属性。
app-starters-task-launch-request-common 配合 Spring Cloud Stream 的 functional composition 功能,可以将任何 Source 和 Sink 的 输出 转换为 task launch request.
添加依赖 app-starters-task-launch-request-common
,自动配置java.util.function.Function
实现,通过 Spring Cloud Function 注册为taskLaunchRequest
。
例如,您可以从 time source 开始,添加以下依赖项,重新构建,将其注册为自定义的 source 。我们将这个例子名称定义为 time-tlr
Spring Cloud Stream Initializr 为创建Stream Application 提供了很好的起点。
接下来, 注册 一个 tasklauncher-dataflow
Sink ,并创建一个Task (我们将使用timestamp task )。
上述的 Stream 每分钟会生成一个 task launch request。该请求提供了启动的任务名称:{"name":"timestamp-task"}
。
以下 Stream definition 说明了命令行参数的使用。它将生成消息,如下为任务提供命令行参数:
请注意,SpEL表达式会映射所有有效消息到time
命令行参数,以及静态参数 foo=bar
接下来,您可以使用shell命令task execution list
查看 Task 的执行列表,如下所示(包括输出结果):
在此示例中,我们展示了如何使用time
Source 以固定速率启动 Task 。此模式可以应用于任何 Source 进行启动 Task 以及响应任何事件。
1. 从流中启动 Composed Task
可以使用tasklauncher-dataflow
Sink 启动 Composed Task ,如下所述。由于我们直接使用 ComposedTaskRunner,所以在创建启动 Composed Task 的 Stream 之前,我们需要为 Composed Task 运行器本身以及 Composed Task 设置 Task Defintion。假设我们创建以下 Composed Task Defintion : AAA && BBB
。第一步是创建 Task Defintion ,如下例所示:
ComposedTaskRunner
可在此处找到 Releases 版本。
现在我们已经准备好了 Composed Task Defintion 所需的 Task Defintion,我们需要创建一个启动ComposedTaskRunner
的Stream。因此,在这种情况下,我们需要
time
,Source 会定时发送启动 Task 的请求,如上所述。tasklauncher-dataflow
,用于启动ComposedTaskRunner
的 Sink。
该 Stream 应类似于以下内容:
现在,我们重点关注一下ComposedTaskRunner
启动所需的配置:
graph:这是由
ComposedTaskRunner
执行的 graph 。在这里是AAA&&BBB
。increment-instance-enabled:配置表示
ComposedTaskRunner
每次执行都是唯一的。ComposedTaskRunner
是使用 Spring Batch 构建的。因此,我们希望每次启动时都有一个新的ComposedTaskRunner
实例。要做到这一点,我们将increment-instance-enabled
成为true
。
Last updated