Composed Tasks
Spring Cloud Data Flow允许用户创建一个 directed graph(有向图) ,其中图上的每个节点都是一个 task applications 。这是通过使用 DSL 进行组合而来的。可以通过RESTful API,Spring Cloud Data Flow Shell或Spring Cloud Data Flow UI 创建Composed Tasks。
1. Configuring the Composed Task Runner
Composed Task 通过 Composed Task Runner 的进行执行。
1.1. Registering the Composed Task Runner
默认情况下,Composed Task Runner 不会在 Spring Cloud Data Flow中注册。因此,要启动组合任务,我们必须首先将Composed Task Runner注册为 Spring Cloud Data Flow 的 Applications ,如下所示:
app register --name composed-task-runner --type task --uri maven://org.springframework.cloud.task.app:composedtaskrunner-task:2.1.0.RELEASE
您还可以通过 Spring Cloud Data Flow 为 Composed Task Runner 定义不同的 task 名称。设置spring.cloud.dataflow.task.composedTaskRunnerName
指定您需要自定义的名称。然后,您可以使用该名称注册 Composed Task Runner Applications。
1.2. Configuring the Composed Task Runner
Composed Task Runner Applications 可以配置 dataflow.server.uri
属性,用于验证和启动子 Tasks。默认值为localhost:9393
。如果您运行了 Spring Cloud Data Flow 服务器,就像在Cloud Foundry,YARN 或 Kubernetes 上部署服务器一样,您需要提供可用于访问服务器的地址。您可以在启动Composed Task 时为Composed Task Runner Applications 提供dataflow.server.uri
属性,也可以在Spring Cloud Data Server启动时设置 spring.cloud.dataflow.server.uri
属性。对于后一种的情况,在启动 Composed Task 时会自动设置Composed Task Runner Applications 的dataflow.server.uri
属性。
在某些情况下,您希望通过 Task Launcher sink 执行 Composed Task Runner 的实例。在这种情况下,您必须配置 Composed Task Runner 和 Spring Cloud Data Flow 使用的相同数据源。数据源配置是通过使用commandlineArguments(命令行参数)
工具或environmentProperties(环境变量)
用 TaskLaunchRequest 设置的。因为Composed Task Runner 会监控 task_executions
表来检查正在运行的任务的状态。使用表中的信息,它决定如何展示图表。
配置选项
ComposedTaskRunner Task 拥有以下可配置项:
increment-instance-enabled Allows a single ComposedTaskRunner instance to be re-executed without changing the parameters. Default is false which means a ComposedTaskRunner instance can only be executed once with a given set of parameters, if true it can be re-executed. (Boolean, default: false). ComposedTaskRunner is built using Spring Batch and thus upon a successful execution the batch job is considered complete. To launch the same ComposedTaskRunner definition multiple times you must set the
increment-instance-enabled
property to true or change the parameters for the definition for each launch.interval-time-between-checks The amount of time in millis that the ComposedTaskRunner will wait between checks of the database to see if a task has completed. (Integer, default: 10000). ComposedTaskRunner uses the datastore to determine the status of each child tasks. This interval indicates to ComposedTaskRunner how often it should check the status its child tasks.
max-wait-time The maximum amount of time in millis that a individual step can run before the execution of the Composed task is failed (Integer, default: 0). Determines the maximum time each child task is allowed to run before the CTR will terminate with a failure. The default of
0
indicates no timeout.split-thread-allow-core-thread-timeout Specifies whether to allow split core threads to timeout. Default is false; (Boolean, default: false) Sets the policy governing whether core threads may timeout and terminate if no tasks arrive within the keep-alive time, being replaced if needed when new tasks arrive.
split-thread-core-pool-size Split’s core pool size. Default is 1; (Integer, default: 1) Each child task contained in a split requires a thread in order to execute. So for example a definition like:
<AAA || BBB || CCC> && <DDD || EEE>
would require a split-thread-core-pool-size of 3. This is because the largest split contains 3 child tasks. A count of 2 would mean thatAAA
andBBB
would run in parallel but CCC would wait until eitherAAA
orBBB
to finish in order to run. ThenDDD
andEEE
would run in parallel.split-thread-keep-alive-seconds Split’s thread keep alive seconds. Default is 60. (Integer, default: 60) If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime.
split-thread-max-pool-size Split’s maximum pool size. Default is {@code Integer.MAX_VALUE} (Integer, default: <none>). Establish the maximum number of threads allowed for the thread pool.
split-thread-queue-capacity Capacity for Split’s BlockingQueue. Default is {@code Integer.MAX_VALUE}. (Integer, default: <none>)
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
split-thread-wait-for-tasks-to-complete-on-shutdown Whether to wait for scheduled tasks to complete on shutdown, not interrupting running tasks and executing all tasks in the queue. Default is false; (Boolean, default: false)
注意当上面的配置项通过环境变量进行设置是,请转换为大写,删除短划线并替换为下划线。例如:increment-instance-enabled将为INCREMENT_INSTANCE_ENABLED。
2. Composed Task 的生命周期
组合任务的生命周期有三个部分:
2.1. 创建 Composed Task
通过 task create 命令创建 Task definition 时,同时使用 Composed Task 的 DSL,如以下示例所示:
在上面的示例中,假设我们的所有 Applications 都没进行注册。因此,在前两个步骤中,我们注册了两个Task Applications 。然后,我们使用 task create
命令创建我们的 Composed Task Definition。上上面的示例中,Composed Task DSL 的含义是先运行 mytaskapp 然后在运行 timestamp。
我们通过配置 definition 启动 my-composed-task
之前,通过执行task list
命令,可以查看Spring Cloud Data Flow为我们生成了什么。如下图所示(包括其输出):
在该示例中,Spring Cloud Data Flow创建了三个任务定义,my-composed-task-mytaskapp
和my-composed-task-timestamp
用于组成Composed task ,my-composed-task
用于Composed Task的定义 。我们还看到,每个子Task 的名称都由 Composed Task 的名称 加上 Applications 的名称组合而成,用短划线分隔-
(例如:my-composed-task-mytaskapp 由 my-composed-task
加上-
和mytaskapp
而来)。
Task Application Parameters
Composed Task Definitions 中的 Applications 也可以配置参数,如以下示例所示:
2.2. 启动 Composed Task
启动 Composed Task 的方式与启动独立 Task 的方式相同,如下所示:
task launch my-composed-task
启动 Task 后,假设所有任务都成功完成,执行task execution list
时可以看到三个 Task 执行,如下例所示:
在前面的示例中,我们看到已my-compose-task
启动,并且它还按顺序启动了其他Task。Exit Code
为0
表示这些都执行成功了。
将属性传递给子 Task
要在 Task 启动时为Composed Task graph 图中的子 task 设置属性,可以使用app.<composed task definition name>.<child task app name>.<property>
使用以下的 Composed Task Definition 作为示例:
要让 mytaskapp
显示“HELLO”并将mytimestamp
时间戳格式设置为“Composed Task”定义的“YYYY”,需要使用如下方式启动:
与 Applications 属性配置方式类似,deployer
时也可以使用如下格式为子Task 设置属性deployer.<composed task definition name>.<child task app name>.<deployer-property>
传递参数到Composed task runner
可以使用——arguments
选项将配置参数传递给 Composed task runner。
例如:
Exit Statuses
下面的列表显示了如何为每个步骤执行后的Composed Task 中包含的每个步骤(Task)设置 Exit 状态:
If the
TaskExecution
has anExitMessage
, that is used as theExitStatus
.If no
ExitMessage
is present and theExitCode
is set to zero, then theExitStatus
for the step isCOMPLETED
.If no
ExitMessage
is present and theExitCode
is set to any non-zero number, theExitStatus
for the step isFAILED
.
2.3 Destroying Composed Task
用于 Destroying (销毁) 单独 Task 的命令与用于 Destroying (销毁) Composed Task 的命令相同。唯一的区别是 Destroying(销毁) Composed Task 也会 Destroying(销毁) 与之相关的子任务。以下示例显示使用该destroy
命令之前和之后的任务列表:
2.4. Stopping Composed Task
如果需要 Stopping Composed Task 执行,您可以通过以下方式执行:
RESTful API
Spring Cloud Data Flow Dashboard
要通过 Dashboard 停止 Composed Task ,请选择 Jobs 选项卡,然后单击要停止的 Task 旁边的 Stop 按钮。
当前正在运行的子 Task 完成时,将停止Composed Task。。
2.5. Restarting Composed Task
如果Composed Task 在执行期间失败并且状态被标记为FAILED
,则可以重新启动 Task 。你可以通过以下方式这样做:
RESTful API
The shell
Spring Cloud Data Flow Dashboard
要通过shell重新启动 Composed Task ,请使用相同的参数启动Task 。要通过 Dashboard 重新启动 Composed Task ,请选择 Jobs 选项卡并单击要重新启动的 Task 旁边的 restart 按钮。
Last updated