Launching Tasks from a Stream

1. 通过 Stream 启动 Task

您可以使用 tasklauncher-dataflow Sink 从 Stream 中启动任务。Sink 连接 Data Flow server 并使用 REST API 接口启动已定义好的 Task。task launch request(任务启动请求)接受一个 JSON payload ,可以指定 Task Name ,以及命令行参数和部署属性。

app-starters-task-launch-request-common 配合 Spring Cloud Stream 的 functional composition 功能,可以将任何 Source 和 Sink 的 输出 转换为 task launch request.

添加依赖 app-starters-task-launch-request-common,自动配置java.util.function.Function实现,通过 Spring Cloud Function 注册为taskLaunchRequest

例如,您可以从 time source 开始,添加以下依赖项,重新构建,将其注册为自定义的 source 。我们将这个例子名称定义为 time-tlr

<dependency>
    <groupId>org.springframework.cloud.stream.app</groupId>
    <artifactId>app-starters-task-launch-request-common</artifactId>
</dependency>

Spring Cloud Stream Initializr 为创建Stream Application 提供了很好的起点。

接下来, 注册 一个 tasklauncher-dataflow Sink ,并创建一个Task (我们将使用timestamp task )。

stream create --name task-every-minute \
--definition \
"time-tlr --trigger.fixed-delay=60 
          --spring.cloud.stream.function.definition=taskLaunchRequest 
          --task.launch.request.task-name=timestamp-task | tasklauncher-dataflow" \
--deploy

上述的 Stream 每分钟会生成一个 task launch request。该请求提供了启动的任务名称:{"name":"timestamp-task"}

以下 Stream definition 说明了命令行参数的使用。它将生成消息,如下为任务提供命令行参数:

{
    "args": [
        "foo=bar",
        "time=12/03/18 17:44:12"
    ],
    "deploymentProps": {},
    "name": "timestamp-task"
}
stream create --name task-every-second \
--definition \
"time-tlr --spring.cloud.stream.function.definition=taskLaunchRequest 
          --task.launch.request.task-name=timestamp-task
          --task.launch.request.args=foo=bar 
          --task.launch.request.arg-expressions=time=payload | tasklauncher-dataflow" \
--deploy

请注意,SpEL表达式会映射所有有效消息到time命令行参数,以及静态参数 foo=bar

接下来,您可以使用shell命令task execution list查看 Task 的执行列表,如下所示(包括输出结果):

dataflow:>task execution list
╔════════════════════╤══╤════════════════════════════╤════════════════════════════╤═════════╗
║     Task Name      │ID│         Start Time         │          End Time          │Exit Code║
╠════════════════════╪══╪════════════════════════════╪════════════════════════════╪═════════╣
║timestamp-task_26176│4 │Tue May 02 12:13:49 EDT 2017│Tue May 02 12:13:49 EDT 2017│0        ║
║timestamp-task_32996│3 │Tue May 02 12:12:49 EDT 2017│Tue May 02 12:12:49 EDT 2017│0        ║
║timestamp-task_58971│2 │Tue May 02 12:11:50 EDT 2017│Tue May 02 12:11:50 EDT 2017│0        ║
║timestamp-task_13467│1 │Tue May 02 12:10:50 EDT 2017│Tue May 02 12:10:50 EDT 2017│0        ║
╚════════════════════╧══╧════════════════════════════╧════════════════════════════╧═════════╝

在此示例中,我们展示了如何使用time Source 以固定速率启动 Task 。此模式可以应用于任何 Source 进行启动 Task 以及响应任何事件。

1. 从流中启动 Composed Task

可以使用tasklauncher-dataflow Sink 启动 Composed Task ,如下所述。由于我们直接使用 ComposedTaskRunner,所以在创建启动 Composed Task 的 Stream 之前,我们需要为 Composed Task 运行器本身以及 Composed Task 设置 Task Defintion。假设我们创建以下 Composed Task Defintion : AAA && BBB。第一步是创建 Task Defintion ,如下例所示:

task create composed-task-runner --definition "composed-task-runner"
task create AAA --definition "timestamp"
task create BBB --definition "timestamp"

ComposedTaskRunner可在此处找到 Releases 版本。

现在我们已经准备好了 Composed Task Defintion 所需的 Task Defintion,我们需要创建一个启动ComposedTaskRunner的Stream。因此,在这种情况下,我们需要

  • time ,Source 会定时发送启动 Task 的请求,如上所述

  • tasklauncher-dataflow,用于启动ComposedTaskRunner的 Sink。

该 Stream 应类似于以下内容:

stream create ctr-stream \
--definition \
"time --fixed-delay=30 
      --task.launch.request.task-name=composed-task-launcher 
      --task.launch.request.args=
            --graph=AAA&&BBB,
            --increment-instance-enabled=true | tasklauncher-dataflow"

现在,我们重点关注一下ComposedTaskRunner启动所需的配置:

  • graph:这是由ComposedTaskRunner执行的 graph 。在这里是 AAA&&BBB

  • increment-instance-enabled:配置表示ComposedTaskRunner每次执行都是唯一的。 ComposedTaskRunner是使用 Spring Batch 构建的。因此,我们希望每次启动时都有一个新的 ComposedTaskRunner 实例。要做到这一点,我们将increment-instance-enabled成为true

Last updated