Composed Tasks

Spring Cloud Data Flow允许用户创建一个 directed graph(有向图) ,其中图上的每个节点都是一个 task applications 。这是通过使用 DSL 进行组合而来的。可以通过RESTful API,Spring Cloud Data Flow Shell或Spring Cloud Data Flow UI 创建Composed Tasks。

1. Configuring the Composed Task Runner

Composed Task 通过 Composed Task Runner 的进行执行。

1.1. Registering the Composed Task Runner

默认情况下,Composed Task Runner 不会在 Spring Cloud Data Flow中注册。因此,要启动组合任务,我们必须首先将Composed Task Runner注册为 Spring Cloud Data Flow 的 Applications ,如下所示:

app register --name composed-task-runner --type task --uri maven://org.springframework.cloud.task.app:composedtaskrunner-task:2.1.0.RELEASE

您还可以通过 Spring Cloud Data Flow 为 Composed Task Runner 定义不同的 task 名称。设置spring.cloud.dataflow.task.composedTaskRunnerName指定您需要自定义的名称。然后,您可以使用该名称注册 Composed Task Runner Applications。

1.2. Configuring the Composed Task Runner

Composed Task Runner Applications 可以配置 dataflow.server.uri属性,用于验证和启动子 Tasks。默认值为localhost:9393。如果您运行了 Spring Cloud Data Flow 服务器,就像在Cloud Foundry,YARN 或 Kubernetes 上部署服务器一样,您需要提供可用于访问服务器的地址。您可以在启动Composed Task 时为Composed Task Runner Applications 提供dataflow.server.uri属性,也可以在Spring Cloud Data Server启动时设置 spring.cloud.dataflow.server.uri 属性。对于后一种的情况,在启动 Composed Task 时会自动设置Composed Task Runner Applications 的dataflow.server.uri属性。

在某些情况下,您希望通过 Task Launcher sink 执行 Composed Task Runner 的实例。在这种情况下,您必须配置 Composed Task Runner 和 Spring Cloud Data Flow 使用的相同数据源。数据源配置是通过使用commandlineArguments(命令行参数)工具或environmentProperties(环境变量)用 TaskLaunchRequest 设置的。因为Composed Task Runner 会监控 task_executions表来检查正在运行的任务的状态。使用表中的信息,它决定如何展示图表。

配置选项

ComposedTaskRunner Task 拥有以下可配置项:

  • increment-instance-enabled Allows a single ComposedTaskRunner instance to be re-executed without changing the parameters. Default is false which means a ComposedTaskRunner instance can only be executed once with a given set of parameters, if true it can be re-executed. (Boolean, default: false). ComposedTaskRunner is built using Spring Batch and thus upon a successful execution the batch job is considered complete. To launch the same ComposedTaskRunner definition multiple times you must set the increment-instance-enabled property to true or change the parameters for the definition for each launch.

  • interval-time-between-checks The amount of time in millis that the ComposedTaskRunner will wait between checks of the database to see if a task has completed. (Integer, default: 10000). ComposedTaskRunner uses the datastore to determine the status of each child tasks. This interval indicates to ComposedTaskRunner how often it should check the status its child tasks.

  • max-wait-time The maximum amount of time in millis that a individual step can run before the execution of the Composed task is failed (Integer, default: 0). Determines the maximum time each child task is allowed to run before the CTR will terminate with a failure. The default of 0 indicates no timeout.

  • split-thread-allow-core-thread-timeout Specifies whether to allow split core threads to timeout. Default is false; (Boolean, default: false) Sets the policy governing whether core threads may timeout and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive.

  • split-thread-core-pool-size Split’s core pool size. Default is 1; (Integer, default: 1) Each child task contained in a split requires a thread in order to execute. So for example a definition like: <AAA || BBB || CCC> && <DDD || EEE> would require a split-thread-core-pool-size of 3. This is because the largest split contains 3 child tasks. A count of 2 would mean that AAA and BBB would run in parallel but CCC would wait until either AAA or BBB to finish in order to run. Then DDD and EEEwould run in parallel.

  • split-thread-keep-alive-seconds Split’s thread keep alive seconds. Default is 60. (Integer, default: 60) If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime.

  • split-thread-max-pool-size Split’s maximum pool size. Default is {@code Integer.MAX_VALUE} (Integer, default: <none>). Establish the maximum number of threads allowed for the thread pool.

  • split-thread-queue-capacity Capacity for Split’s BlockingQueue. Default is {@code Integer.MAX_VALUE}. (Integer, default: <none>)

    • If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

    • If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

    • If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

  • split-thread-wait-for-tasks-to-complete-on-shutdown Whether to wait for scheduled tasks to complete on shutdown, not interrupting running tasks and executing all tasks in the queue. Default is false; (Boolean, default: false)

注意当上面的配置项通过环境变量进行设置是,请转换为大写,删除短划线并替换为下划线。例如:increment-instance-enabled将为INCREMENT_INSTANCE_ENABLED。

2. Composed Task 的生命周期

组合任务的生命周期有三个部分:

2.1. 创建 Composed Task

通过 task create 命令创建 Task definition 时,同时使用 Composed Task 的 DSL,如以下示例所示:

dataflow:> app register --name timestamp --type task --uri maven://org.springframework.cloud.task.app:timestamp-task:
dataflow:> app register --name mytaskapp --type task --uri file:///home/tasks/mytask.jar
dataflow:> task create my-composed-task --definition "mytaskapp && timestamp"
dataflow:> task launch my-composed-task

在上面的示例中,假设我们的所有 Applications 都没进行注册。因此,在前两个步骤中,我们注册了两个Task Applications 。然后,我们使用 task create命令创建我们的 Composed Task Definition。上上面的示例中,Composed Task DSL 的含义是先运行 mytaskapp 然后在运行 timestamp。

我们通过配置 definition 启动 my-composed-task之前,通过执行task list命令,可以查看Spring Cloud Data Flow为我们生成了什么。如下图所示(包括其输出):

dataflow:>task list
╔══════════════════════════╤══════════════════════╤═══════════╗
        Task Name            Task Definition    │Task Status║
╠══════════════════════════╪══════════════════════╪═══════════╣
║my-composed-task          │mytaskapp && timestamp│unknown    
║my-composed-task-mytaskapp│mytaskapp             │unknown    
║my-composed-task-timestamp│timestamp             │unknown    
╚══════════════════════════╧══════════════════════╧═══════════╝

在该示例中,Spring Cloud Data Flow创建了三个任务定义,my-composed-task-mytaskappmy-composed-task-timestamp用于组成Composed task ,my-composed-task用于Composed Task的定义 。我们还看到,每个子Task 的名称都由 Composed Task 的名称 加上 Applications 的名称组合而成,用短划线分隔- (例如:my-composed-task-mytaskapp 由 my-composed-task加上-mytaskapp而来)。

Task Application Parameters

Composed Task Definitions 中的 Applications 也可以配置参数,如以下示例所示:

dataflow:> task create my-composed-task \
--definition "mytaskapp --displayMessage=hello && timestamp --format=YYYY"

2.2. 启动 Composed Task

启动 Composed Task 的方式与启动独立 Task 的方式相同,如下所示:

task launch my-composed-task

启动 Task 后,假设所有任务都成功完成,执行task execution list时可以看到三个 Task 执行,如下例所示:

dataflow:>task execution list
╔══════════════════════════╤═══╤════════════════════════════╤════════════════════════════╤═════════╗
        Task Name         │ID          Start Time                   End Time          │Exit Code║
╠══════════════════════════╪═══╪════════════════════════════╪════════════════════════════╪═════════╣
║my-composed-task-timestamp│713│Wed Apr 12 16:43:07 EDT 2017│Wed Apr 12 16:43:07 EDT 2017│0        
║my-composed-task-mytaskapp│712│Wed Apr 12 16:42:57 EDT 2017│Wed Apr 12 16:42:57 EDT 2017│0        
║my-composed-task          │711│Wed Apr 12 16:42:55 EDT 2017│Wed Apr 12 16:43:15 EDT 2017│0        
╚══════════════════════════╧═══╧════════════════════════════╧════════════════════════════╧═════════╝

在前面的示例中,我们看到已my-compose-task启动,并且它还按顺序启动了其他Task。Exit Code0 表示这些都执行成功了。

将属性传递给子 Task

要在 Task 启动时为Composed Task graph 图中的子 task 设置属性,可以使用app.<composed task definition name>.<child task app name>.<property>使用以下的 Composed Task Definition 作为示例:

dataflow:> task create my-composed-task --definition "mytaskapp  && mytimestamp"

要让 mytaskapp 显示“HELLO”并将mytimestamp时间戳格式设置为“Composed Task”定义的“YYYY”,需要使用如下方式启动:

task launch my-composed-task \
--properties "app.my-composed-task.mytaskapp.displayMessage=HELLO,
              app.my-composed-task.mytimestamp.timestamp.format=YYYY"

与 Applications 属性配置方式类似,deployer时也可以使用如下格式为子Task 设置属性deployer.<composed task definition name>.<child task app name>.<deployer-property>

task launch my-composed-task \
--properties "deployer.my-composed-task.mytaskapp.memory=2048m,
              app.my-composed-task.mytimestamp.timestamp.format=HH:mm:ss"
Launched task 'a1'

传递参数到Composed task runner

可以使用——arguments 选项将配置参数传递给 Composed task runner。

例如:

dataflow:>task create my-composed-task \
--definition "<aaa: timestamp || bbb: timestamp>"
Created new task 'my-composed-task'

dataflow:>task launch my-composed-task \
--arguments "--increment-instance-enabled=true 
             --max-wait-time=50000 
             --split-thread-core-pool-size=4" \
--properties "app.my-composed-task.bbb.timestamp.format=dd/MM/yyyy HH:mm:ss"

Launched task 'my-composed-task'

Exit Statuses

下面的列表显示了如何为每个步骤执行后的Composed Task 中包含的每个步骤(Task)设置 Exit 状态:

  • If the TaskExecution has an ExitMessage, that is used as the ExitStatus.

  • If no ExitMessage is present and the ExitCode is set to zero, then the ExitStatusfor the step is COMPLETED.

  • If no ExitMessage is present and the ExitCode is set to any non-zero number, the ExitStatus for the step is FAILED.

2.3 Destroying Composed Task

用于 Destroying (销毁) 单独 Task 的命令与用于 Destroying (销毁) Composed Task 的命令相同。唯一的区别是 Destroying(销毁) Composed Task 也会 Destroying(销毁) 与之相关的子任务。以下示例显示使用该destroy命令之前和之后的任务列表:

dataflow:>task list
╔══════════════════════════╤══════════════════════╤═══════════╗
        Task Name            Task Definition    │Task Status║
╠══════════════════════════╪══════════════════════╪═══════════╣
║my-composed-task          │mytaskapp && timestamp│COMPLETED  
║my-composed-task-mytaskapp│mytaskapp             │COMPLETED  
║my-composed-task-timestamp│timestamp             │COMPLETED  
╚══════════════════════════╧══════════════════════╧═══════════╝
...
dataflow:>task destroy my-composed-task
dataflow:>task list
╔═════════╤═══════════════╤═══════════╗
║Task Name│Task Definition│Task Status║
╚═════════╧═══════════════╧═══════════╝

2.4. Stopping Composed Task

如果需要 Stopping Composed Task 执行,您可以通过以下方式执行:

  • RESTful API

  • Spring Cloud Data Flow Dashboard

要通过 Dashboard 停止 Composed Task ,请选择 Jobs 选项卡,然后单击要停止的 Task 旁边的 Stop 按钮。

当前正在运行的子 Task 完成时,将停止Composed Task。。

2.5. Restarting Composed Task

如果Composed Task 在执行期间失败并且状态被标记为FAILED,则可以重新启动 Task 。你可以通过以下方式这样做:

  • RESTful API

  • The shell

  • Spring Cloud Data Flow Dashboard

要通过shell重新启动 Composed Task ,请使用相同的参数启动Task 。要通过 Dashboard 重新启动 Composed Task ,请选择 Jobs 选项卡并单击要重新启动的 Task 旁边的 restart 按钮。

Last updated