Stream生命周期

1. Stream 生命周期

Stream 的生命周期经历以下阶段:

  1. Stream Applications 的 升级回滚

Skipper 是一个服务器,您可以在不同的云平台上部署,以用来管理 Spring Boot Applications 。

Skipper中的 Applications 应该包含 Applications 的资源位置、服务配置以及对应的包。您可以认为 Skipper 包类似于apt-getbrew等工具中的包。

当 Data Flow 部署一个 Stream 时,它将生成一个包并将其上传到Skipper,该包表示 Stream 中的 Applications 。在 Stream 中对 Applications 进行升级或回滚,后续命令同时将传递给 Skipper 。此外,Stream definition 是从包中进行逆向工程而来的,Stream 的状态也委托给 Skipper。

1.1. 注册 Stream App

使用app register命令注册一个Stream App时,您必须提供唯一的 name ,以及 type 和 URI 。type 参数请指定 sourceprocessorsink。命令通过URI解析到具体的包,这里有几个例子:

dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.1
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.2
dataflow:>app register --name mysource --type source --uri maven://com.example:mysource:0.0.3

dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
> mysource-0.0.1 <                 
   │mysource-0.0.2                     
   │mysource-0.0.3                     
╚═══╧══════════════════╧═════════╧════╧════╝

dataflow:>app register --name myprocessor --type processor --uri file:///Users/example/myprocessor-1.2.3.jar

dataflow:>app register --name mysink --type sink --uri https://example.com/mysink-2.0.1.jar

App URI应符合以下架构格式之一:

  • maven schema

maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
  • http schema

http://<web-path>/<artifactName>-<version>.jar
  • file schema

file:///<local-path>/<artifactName>-<version>.jar
  • docker schema

docker:<docker-image-path>/<imageName>:<version>

URI的 <version> 部分对于需要多版本控制的 Stream App 来说是必需指定的。Skipper 可以利用多版本 Stream App的特性,利用部署时的版本指定,做到运行时升级或回滚这些 App。

如果您想添加一个使用RabbitMQ 构建的 http log 的 Applications ,您可以执行以下操作:

dataflow:>app register --name http --type source --uri maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT

如果您想一次添加多个 Applications,可以将它们存储在配置文件中,其存储的格式为 <type>.<name>,值为URI。

上述例子可改为以下方式保存到文件中:(例如stream-apps.properties):

source.http=maven://org.springframework.cloud.stream.app:http-source-rabbit:1.2.1.BUILD-SNAPSHOT
sink.log=maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.1.BUILD-SNAPSHOT

接下来,需要批量导入 Applications,请使用该app import命令并指定--uri 的文件位置,如下所示:

dataflow:>app import --uri file:///<YOUR_FILE_LOCATION>/stream-apps.properties

使用 --type 指定 Applications 的 source,processor,sink 类型进行注册。app type 的 Applications 只允许在 Stream Application DSL中使用,DSL中使用,而不是|符号,并且 Data Flow 不能配置 Applications 的 Spring Cloud Stream 属性。使用--type app 注册的应用程序不一定是 Spring Cloud Stream 的 Applications ,它可以是任何 Spring 创建的 Applications 。有关使用此应用程序类型的更多信息,请参阅Stream Application DSL简介

可以为相同的应用程序注册多个版本(例如相同的名称和类型),但只能将其中一个设置为默认版本。部署 Streams 时使用默认版本。

首次注册的 Applications ,它将被标记为默认版本。可以使用以下app default命令更改默认 Applications 版本:

dataflow:>app default --id source:mysource --version 0.0.2
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
   │mysource-0.0.1                     
> mysource-0.0.2 <                 
   │mysource-0.0.3                     
╚═══╧══════════════════╧═════════╧════╧════╝

使用命令 app list --id <type:name> 可以列出指定Applications 的所有版本。

命令 app unregister 有一个可选 --version 参数,用于指定要取消注册的 Applications 版本。

dataflow:>app unregister --name mysource --type source --version 0.0.1
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
> mysource-0.0.2 <                 
   │mysource-0.0.3                     
╚═══╧══════════════════╧═════════╧════╧════╝

如果没有指定—version,则取消注册默认版本。

Stream 中的所有 Applications 都应该为要部署的 Stream设置默认版本。否则,在部署期间,它们将被视为未注册的 Applications 。使用 app default 以设置默认值。

app default --id source:mysource --version 0.0.3
dataflow:>app list --id source:mysource
╔═══╤══════════════════╤═════════╤════╤════╗
║app│      source      │processor│sink│task║
╠═══╪══════════════════╪═════════╪════╪════╣
   │mysource-0.0.2                     
> mysource-0.0.3 <                 
╚═══╧══════════════════╧═════════╧════╧════╝

stream deploy需要默认的 Applications 版本。stream updatestream rollback命令可以使用所有(默认和非默认)注册的Applications 版本。

dataflow:>stream create foo --definition "mysource | log"

这将使用默认的mysource版本(0.0.3)创建 Stream。然后我们可以像这样将版本更新到0.0.2:

dataflow:>stream update foo --properties version.mysource=0.0.2

只有预先注册的 Applications 可用于deploy,update或rollback Stream。

试图将mysource更新到0.0.1版本(未注册)将会失败!

17.1.1。注册支持的应用和任务

为方便起见,所有现成的 Applications 和 task/batch Starters 都提供了带有application-URI(适用于maven和docker)的静态文件。您可以指向此文件并批量导入所有应用程序URI。如前所述,您也可以单独注册它们,使用自定义属性文件,其中只包含所需的 Applications URI。建议将自定义属性文件中包含的Applications URI 集中在一个列表中管理。

Spring Cloud Stream App Starters

下表中连接到dataflow.spring.io网站的是基于Spring Cloud Stream 2.0.x和Spring Boot 2.0.x构建的 Stream App Starters:

下表中连接到dataflow.spring.io网站的是基于Spring Cloud Stream 2.1.x和Spring Boot 2.1.x构建的 Stream App Starters:

App Starter 的 actuator endpoints 默认是启用的。您可以通过在部署Stream时,使用以下配置来禁用 actuator:

app.*.spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration

在Kubernetes上,请参阅 Liveness and readiness probes部分,以配置 actuator endpoints。

从Spring Cloud Stream 2.1 GA 开始,我们与 Spring Cloud Function 编程模型结合的更加紧密。在此基础上,通过 Einstein 版本发布,现在可以选择一些 Stream App Starters,函数式编程模型将他们合并为一个应用程序。查看 "Composed Function Support in Spring Cloud Data Flow" 博客,通过示例了解有关开发人员和项目经验的更多信息。

Spring Cloud Task App Starters

下表中连接到dataflow.spring.io网站的是基于Spring Cloud Stream 2.0.x和Spring Boot 2.0.x构建的 Task App Starters:

下表中连接到dataflow.spring.io网站的是基于Spring Cloud Stream 2.1.x和Spring Boot 2.1.x构建的 Task App Starters:

您可以通过 Task App Starters Project 和相关参考文档中找到有关 Task App Starters 的更多信息。有关Stream App Starters 的更多信息,请查看Stream App Starters 和相关参考文档。

例如,如果您的应用程序需要批量注册使用 Kafka,您可以使用以下命令:

$ dataflow:>app import --uri https://dataflow.spring.io/kafka-maven-latest

或者,如果您的应用程序需要批量注册使用 RabbitMQ ,您可以使用以下命令:

$ dataflow:>app import --uri https://dataflow.spring.io/rabbitmq-maven-latest

您还可以指定--local选项(默认情况下为true)表示是在shell进程本身内解析属性文件位置。如果应从 Data Flow Server 解析文件位置,请指定--local false

使用app registerapp import命令时,如果 Applications 已使用相同的 name , type , version 注册,默认情况下不会覆盖它。如果要覆盖预先存在的app uri或metadata-uri坐标,请指定--force选项。

但是,请注意,一旦下载,Data Flow Server 会根据资源位置在本地服务上缓存 Applications。如果资源位置没有改变(即使实际资源大小有改动),也不会重新下载。另一方面,当使用maven:// 为resources时,仍然可能绕过缓存(例如使用的版本为-SNAPSHOT)。

此外,如果已经部署了 Stream 并使用某个版本的已注册应用程序,那么(强制)重新注册使用其他应用程序将无效,直到再次部署该Stream。

某些情况下,资源在服务器端解析。在其他情况下,URI将传递到解析它的运行时容器实例。有关更多详细信息,请参阅每个数据流服务器的特定文档。

1.2. 白名单应用程序属性

Stream 和 Task Applications 是采用Spring Boot构建的应用程序,它们可以使用很多常见应用程序属性,例如server.port 系列,和 spring.jmx、logging的属性。在创建Applications 时,可以通过配置白名单属性,使 shell 和 UI 可以在通过选项卡或在下拉框中显示选项时将它们作为主要属性首先显示。

白名单配置方式如下,创建一个文件名为spring-configuration-metadata-whitelist.propertiesMETA-INF资源目录。在这个文件中可以使用两个属性。

  • 第一个命名configuration-properties.classes。该值是包含如下注解@ConfigurationProperty 的完整类名,通过, 分隔。

  • 第二个键是configuration-properties.names。其值是以,分隔的 property 列表。这实用属性的全名,例如server.port,或者将部分名称列入白名单的属性名称,例如spring.jmx

Spring Cloud Stream application starters 可以找到很多很好的例子。以下示例来自上述例子中的spring-configuration-metadata-whitelist.properties文件:

configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties

如果我们还想添加server.port白名单,它将成为以下行:

configuration-properties.classes=org.springframework.cloud.stream.app.file.sink.FileSinkProperties
configuration-properties.names=server.port

确保添加 'spring-boot-configuration-processor' 依赖,以便生成configuration metadata 配置文件。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>

1.3. 创建和使用 Dedicated Metadata Artifact

通过创建 Dedicated Metadata Artifact,您可以更详细的描述 Stream 或Task Application 支持的主要属性。这个jar文件只包含关于 configuration properties Metadata 的 Spring boot JSON文件和上一节描述的白名单文件。

下面的示例显示了用于log sink 的配置文件:

$ jar tvf log-sink-rabbit-1.2.1.BUILD-SNAPSHOT-metadata.jar
373848 META-INF/spring-configuration-metadata.json
   174 META-INF/spring-configuration-metadata-whitelist.properties

请注意,该spring-configuration-metadata.json文件非常大。这是因为它包含log 的所有属性的(有些来自spring-boot-actuator.jar,有些来自 spring-boot-autoconfigure.jar,有些来自spring-cloud-starter-stream-sink-log.jar,等等)。Data Flow 始终依赖于这些属性,即便其中有组件不可用,但这里所有属性都已合并为单个文件。

为了简化开发(你不会想尝试手工制作这个巨大的JSON文件),你可以在你的构建中使用以下插件:

<plugin>
 	<groupId>org.springframework.cloud</groupId>
 	<artifactId>spring-cloud-app-starter-metadata-maven-plugin</artifactId>
 	<executions>
 		<execution>
 			<id>aggregate-metadata</id>
 			<phase>compile</phase>
 			<goals>
 				<goal>aggregate-metadata</goal>
 			</goals>
 		</execution>
 	</executions>
 </plugin>

这个插件创建的Json文件是对 spring-boot-configuration-processor的补充。请确保同时配置这两个选项。

Dedicated Metadata Artifact 的好处包括:

  • 轻量。(Dedicated Metadata Artifact 通常只有几KB,而实际应用程序的大小为几MB)因此,下载速度更快,在使用Applications 或 Dashboaard UI 时可以得到更快的反馈。

  • 由于更轻,因此当 metadata 是唯一需要的配置时,它们可用于资源受限的环境(例如PaaS)。

  • 对于不直接处理Spring Boot 的 uber-jar 环境(例如,基于Docker的运行时,或者Kubernetes和Cloud Foundry),这是提供Applications 支持的属性 metadata 的唯一方法。

不过请记住,在处理uber jar 时,这完全是可选的。uber jar本身也已经包含了元数据。

1.4. 使用Companion Artifact

如果您已经拥有 Companion Artifact ,您下一步需要让系统能够使用它。

使用app register 注册单个应用程序时,在shell中使用——metada -uri选项,如下所示:

dataflow:>app register --name log --type sink
    --uri maven://org.springframework.cloud.stream.app:log-sink:2.1.0.RELEASE
    --metadata-uri maven://org.springframework.cloud.stream.app:log-sink:jar:metadata:2.1.0.RELEASE

使用app import命令注册多个文件时,该文件除了<type>.<name>.metadata行外还应包含一行<type>.<name>。严格来说,这是可选的(如果某些应用程序有它,但有些应用程序没有,也是可以工作的),但这是最好的做法。

以下示例显示了一个Dockerized app,其中 metadata artifact 托管在 Maven 存储库中(通过http://file://同样可以检索它)。

...
source.http=docker:springcloudstream/http-source-rabbit:latest
source.http.metadata=maven://org.springframework.cloud.stream.app:http-source-rabbit:jar:metadata:2.1.0.RELEASE
...

1.5. 创建自定义的 Applications

虽然有现成的源,处理器,接收器应用程序,但您可以扩展这些应用程序或编写自定义的Spring Cloud Stream Applications。

使用 Spring Initializr 创建Spring Cloud Stream Applications的过程在Spring Cloud Stream 文档中有详细介绍。一个 Applications 可以绑定多个 binders 。如果需要这样做,请参阅[passing_producer_consumer_properties]

为了支持属性白名单,在 Spring Cloud Data Flow 中运行的 Spring Cloud Stream Applications 可能包含Spring Boot configuration-processor作为可选依赖项,如以下示例所示:

<dependencies>
  <!-- other dependencies -->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
  </dependency>
</dependencies>

确保POM中包含spring-boot-maven-plugin,该插件是创建在Spring Cloud Data Flow中注册的可执行jar所必需的。Spring Initialzr 默认将插件包含在生成的POM中。

创建自定义应用程序后,可以按 Register a Stream App 的说明执行注册。

2. 创建一个 Stream

Spring Cloud Data Flow Server 公开了一套完整的 RESTful API 来管理 Stream 的生命周期,但最简单的方法是通过Spring Cloud Data Flow shell。按照 Getting Started 部分的说明启动 shell 。

在 stream --definitions 的帮助下创建一个欣的 Stream 。这些定义是由一个简单的DSL构建的。例如,考虑一下如果我们执行以下shell命令会发生什么:

dataflow:> stream create --definition "time | log" --name ticktock

这定义了一个名为ticktock的 Stream ,它基于DSL表达式time|log。DSL使用“管道”符号(|)将源连接到 sink。

stream info命令显示了关于 Stream 的有效信息,如下面的示例所示:

dataflow:>stream info ticktock
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  
╠═══════════╪═════════════════╪══════════╣
║ticktock   │time | log       │undeployed║
╚═══════════╧═════════════════╧══════════╝

2.1. Application Properties

Stream 中的每一个 Applications 都有相对应的 properties。部署 Applications 时,properties 将通过命令行参数或环境变量应用于app,具体方式取决于部署形式。

下面的例子展示了在创建 Stream 时自定义 properties:

dataflow:> stream create --definition "time | log" --name ticktock

shell命令 app info --name <appName> --type <appType>显示app可用的属性。详情可参考 Whitelisting application properties

以下清单显示了time应用的可用属性:

dataflow:> app info --name time --type source
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║trigger.time-unit             │The TimeUnit to apply to delay│<none>                        │java.util.concurrent.TimeUnit ║
║                              │values.                       │                              │                              ║
║trigger.fixed-delay           │Fixed delay for periodic      │1                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.cron                  │Cron expression value for the │<none>                        │java.lang.String              ║
║                              │Cron Trigger.                 │                              │                              ║
║trigger.initial-delay         │Initial delay for periodic    │0                             │java.lang.Integer             ║
║                              │triggers.                     │                              │                              ║
║trigger.max-messages          │Maximum messages per poll, -1 │1                             │java.lang.Long                ║
║                              │means infinity.               │                              │                              ║
║trigger.date-format           │Format for the date value.    │<none>                        │java.lang.String              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

以下列表显示了log应用程序的可用属性:

dataflow:> app info --name log --type sink
╔══════════════════════════════╤══════════════════════════════╤══════════════════════════════╤══════════════════════════════╗
║         Option Name          │         Description          │           Default            │             Type             ║
╠══════════════════════════════╪══════════════════════════════╪══════════════════════════════╪══════════════════════════════╣
║log.name                      │The name of the logger to use.│<none>                        │java.lang.String              ║
║log.level                     │The level at which to log     │<none>                        │org.springframework.integratio║
║                              │messages.                     │                              │n.handler.LoggingHandler$Level║
║log.expression                │A SpEL expression (against the│payload                       │java.lang.String              ║
║                              │incoming message) to evaluate │                              │                              ║
║                              │as the logged message.        │                              │                              ║
╚══════════════════════════════╧══════════════════════════════╧══════════════════════════════╧══════════════════════════════╝

在创建时可以指定timelog Application 的 properties,stream如下所示:

dataflow:> stream create --definition "time --fixed-delay=5 | log --level=WARN" --name ticktock

需要注意的是,在前面的示例中,fixed-delaylevel属性是timelog通过 shell 提供的 'short-form' 属性名称。这些 'short-form' 属性名称仅适用于可用的属性。其余情况下,应使用完整的名称进行设置。

2.2. 常见的 Application Properties

除了通过 DSL 的方式进行配置外,Spring Cloud Data Flow 还提供了一种机制,用于配置所有由其管理的Stream Applications 设置公共 properties。具体方式是通过添加前缀为 spring.cloud.dataflow.applicationProperties 的属性来实现。这样做时,在启动服务后,服务器将不带前缀的所有属性传递给它启动的实例。

例如,通过使用以下选项启动 Data Flow server,可以将所有已启动的 Applications 配置为使用特定的 Kafka 代理:

--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.brokers=192.168.1.100:9092
--spring.cloud.dataflow.applicationProperties.stream.spring.cloud.stream.kafka.binder.zkNodes=192.168.1.100:2181

这样做会将属性spring.cloud.stream.kafka.binder.brokers以及spring.cloud.stream.kafka.binder.zkNodes 传递给所有启动的Application。

使用此机制配置的 properties 的优先级低于 Stream 部署配置的peoperties 。如果在 Stream 部署时指定了具有相同密钥的属性(例如,app.http.spring.cloud.stream.kafka.binder),则会覆盖公共属性。

3. 部署一个 Stream

本节将讲述当 Spring Cloud Data Flow 负责部署 Stream 时,如何部署 Stream 。它包括了利用Skipper服务部署和升级 Stream 。描述如何将部署 properties 应用于部署 Stream 的两种方法。

首先创建ticktock Stream:

dataflow:> stream create --definition "time | log" --name ticktock

其次部署 Stream ,请使用以下shell命令:

dataflow:> stream deploy --name ticktock

Data Flow Server 将 timelog 委托到 Skipper 中进行解析和部署。

stream info命令显示有关 Stream 的有用信息,包括部署 properties :

dataflow:>stream info --name ticktock
╔═══════════╤═════════════════╤═════════╗
║Stream Name│Stream Definition│  Status 
╠═══════════╪═════════════════╪═════════╣
║ticktock   │time | log       │deploying║
╚═══════════╧═════════════════╧═════════╝

Stream Deployment properties: {
  "log" : {
    "resource" : "maven://org.springframework.cloud.stream.app:log-sink-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  },
  "time" : {
    "resource" : "maven://org.springframework.cloud.stream.app:time-source-rabbit",
    "spring.cloud.deployer.group" : "ticktock",
    "version" : "2.0.1.RELEASE"
  }
}

stream deploy 命令有一个重要的可选命令参数(称为——platformName)。通过配置 Skipper 可以部署到多个 platform。Skipper 预先配置了一个名为 default 的platform ,将 Applications 部署到本地运行 Skipper 的机器上。——platformName是默认值是 default 。如果您通常只部署到一个platform,在安装Skipper时,您可以覆盖 platform 的默认配置。如果部署多个,将为platform指定一个具体的 platformName 值,通过 stream platform-list命令进行查看。

在前面的示例中,time source 每秒将当前时间作为消息发送,并且 log sink 使用日志记录框架输出它。您可以查看stdout日志(具有<instance>后缀)。日志文件位于Data Flow Server 日志输出中显示的目录中,如下所示:

$ tail -f /var/folders/wn/8jxm_tbd1vj28c8vj37n900m0000gn/T/spring-cloud-dataflow-912434582726479179/ticktock-1464788481708/ticktock.log/stdout_0.log
2016-06-01 09:45:11.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:11
2016-06-01 09:45:12.250  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:12
2016-06-01 09:45:13.251  INFO 79194 --- [  kafka-binder-] log.sink    : 06/01/16 09:45:13

您还可以通过在创建 Stream 时,配置--deploy来同时进行创建和部署Stream

dataflow:> stream create --definition "time | log" --name ticktock --deploy

然而,在实际的用例中,同时创建和部署 Stream 并不常见。 原因是当您使用stream deploy命令时,您可以定义 Applications 映射到平台的属性(例如,要使用的容器的内存大小,要运行的每个应用程序的数量以及是否启用数据分区功能)。同时还可以覆盖创建时 Stream 设置的 Applications 属性。接下来的部分将详细介绍此功能。

3.1. 部署属性

部署 Stream 时,您可以指定 Applications 部署和配置方式的属性。有关详细信息,请参阅 Deployment Properties

4. 销毁 Stream

您可以通过shell 命令stream destroy来删除流,如下所示:

dataflow:> stream destroy --name ticktock

如果已经部署了 Stream ,请在删除Stream 之前先解除部署。

5. 取消部署 Stream

通常,您希望停止 Stream ,但保留名称和定义以备将来使用。在这种情况下,可以执行undeploy取消部署流。

dataflow:> stream undeploy --name ticktock
dataflow:> stream deploy --name ticktock

您可以稍后执行deploy命令来重新启动它。

dataflow:> stream deploy --name ticktock

6. 验证Stream

有时,Stream definition 的中包含的一个或多个应用程序,在注册过程中包含无效的URI。这可能是因为在应用程序注册时输入的URI无效,或者应用程序从要从中提取它的存储库中删除。要验证流中包含的所有应用程序都是可解析的,用户可以使用validate命令。例如:

dataflow:>stream validate ticktock
╔═══════════╤═════════════════╗
║Stream Name│Stream Definition║
╠═══════════╪═════════════════╣
║ticktock   │time | log       
╚═══════════╧═════════════════╝


ticktock is a valid stream.
╔═══════════╤═════════════════╗
 App Name  │Validation Status║
╠═══════════╪═════════════════╣
║source:time│valid            
║sink:log   │valid            
╚═══════════╧═════════════════╝

在上面的示例中,用户验证了 ticktock Stream。正如我们所看到的那样,source:timesink:log都是有效的。现在让我们看看如果我们有一个带有无效URI的注册应用程序的 stream definition 会发生什么。

dataflow:>stream validate bad-ticktock
╔════════════╤═════════════════╗
║Stream Name │Stream Definition║
╠════════════╪═════════════════╣
║bad-ticktock│bad-time | log   
╚════════════╧═════════════════╝


bad-ticktock is an invalid stream.
╔═══════════════╤═════════════════╗
   App Name    │Validation Status║
╠═══════════════╪═════════════════╣
║source:bad-time│invalid          
║sink:log       │valid            
╚═══════════════╧═════════════════╝

在这种情况下,Spring Cloud Data Flow 声明 Stream 无效,因为source:bad-time具有无效的URI。

7. 更新 Stream

要更新 Stream,请使用stream update命令。它接受命令参数——properties——propertiesFile。在使用Skipper时,有一个重要前缀,即version。如果部署了http | log Stream ,并且在部署时注册的版本是 1.1.0.RELEASE:

dataflow:> stream create --name httptest --definition "http --server.port=9000 | log"
dataflow:> stream deploy --name httptest
dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
             Name                          DSL                        Status            
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.1.0.RELEASE"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
  }
}

然后,以下命令将更新Stream以使用1.2.0.RELEASElog Applications。在使用Applications 的特定版本更新 Stream 之前,我们需要确保 Applications 已经注册过该版本。

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-rabbit:1.2.0.RELEASE
Successfully registered application 'sink:log'
dataflow:>stream update --name httptest --properties version.log=1.2.0.RELEASE

只有预先注册的 Applications 版本可用于deploy,updaterollback流。

为了验证部署属性和更新版本,我们可以使用stream info,如下面的示例所示(及其输出):

dataflow:>stream info httptest
╔══════════════════════════════╤══════════════════════════════╤════════════════════════════╗
             Name                          DSL                        Status            
╠══════════════════════════════╪══════════════════════════════╪════════════════════════════╣
║httptest                      │http --server.port=9000 | log │deploying                   
╚══════════════════════════════╧══════════════════════════════╧════════════════════════════╝

Stream Deployment properties: {
  "log" : {
    "spring.cloud.deployer.indexed" : "true",
    "spring.cloud.deployer.count" : "1",
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:log-sink-rabbit" : "1.2.0.RELEASE"
  },
  "http" : {
    "spring.cloud.deployer.group" : "httptest",
    "maven://org.springframework.cloud.stream.app:http-source-rabbit" : "1.1.0.RELEASE"
  }
}

8. 强制更新 Stream

升级流时,即使应用程序或部署属性没有更改,也可以使用--force选项来强制部署 Applications 的新实例。当 Applications 本身在启动时(例如从Spring Cloud Config Server)获得配置信息时,就需要采取这种方式。可以使用--app-names指定要强制更新的 Applications 。如果不指定任何应用程序名称,所有应用程序都将被强制升级。您可以指定 --force--app-names 以及--properties--propertiesFile选项。

9. Stream 版本

Skipper保存了Stream 的部署记录。更新 Stream 之后,将会产生 Stream 的第二个版本。您可以使用stream history --name <name-of-stream>查询版本的历史。

dataflow:>stream history --name httptest
╔═══════╤════════════════════════════╤════════╤════════════╤═══════════════╤════════════════╗
║Version│        Last updated         Status │Package Name│Package Version│  Description   
╠═══════╪════════════════════════════╪════════╪════════════╪═══════════════╪════════════════╣
║2      │Mon Nov 27 22:41:16 EST 2017│DEPLOYED│httptest    │1.0.0          │Upgrade complete║
║1      │Mon Nov 27 22:40:41 EST 2017│DELETED │httptest    │1.0.0          │Delete complete 
╚═══════╧════════════════════════════╧════════╧════════════╧═══════════════╧════════════════╝

10. Stream Manifests

Skipper保存所有 Applications 的 Manifests 。包括Applications的属性以及部署过程中替换覆盖的所有属性。这表示在platform上完成了部署。您可以使用以下命令查看 Sttream 的任何版本的 manifest:

stream manifest --name <name-of-stream> --releaseVersion <optional-version>

如果--releaseVersion未指定,则返回上一版本的 manifest。

以下示例显示了 manifest 的用法:

dataflow:>stream manifest --name httptest

使用该命令会产生以下输出:

# Source: log.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: log
spec:
  resource: maven://org.springframework.cloud.stream.app:log-sink-rabbit
  version: 1.2.0.RELEASE
  applicationProperties:
    spring.metrics.export.triggers.application.includes: integration**
    spring.cloud.dataflow.stream.app.label: log
    spring.cloud.stream.metrics.key: httptest.log.${spring.cloud.application.guid}
    spring.cloud.stream.bindings.input.group: httptest
    spring.cloud.stream.metrics.properties: spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: sink
    spring.cloud.stream.bindings.input.destination: httptest.http
  deploymentProperties:
    spring.cloud.deployer.indexed: true
    spring.cloud.deployer.group: httptest
    spring.cloud.deployer.count: 1

---
# Source: http.yml
apiVersion: skipper.spring.io/v1
kind: SpringCloudDeployerApplication
metadata:
  name: http
spec:
  resource: maven://org.springframework.cloud.stream.app:http-source-rabbit
  version: 1.2.0.RELEASE
  applicationProperties:
    spring.metrics.export.triggers.application.includes: integration**
    spring.cloud.dataflow.stream.app.label: http
    spring.cloud.stream.metrics.key: httptest.http.${spring.cloud.application.guid}
    spring.cloud.stream.bindings.output.producer.requiredGroups: httptest
    spring.cloud.stream.metrics.properties: spring.application.name,spring.application.index,spring.cloud.application.*,spring.cloud.dataflow.*
    server.port: 9000
    spring.cloud.stream.bindings.output.destination: httptest.http
    spring.cloud.dataflow.stream.name: httptest
    spring.cloud.dataflow.stream.app.type: source
  deploymentProperties:
    spring.cloud.deployer.group: httptest

大多数部署时的属性和 Applications 的属性都是由 Data Flow 设置的,以使 Applications 之间能够互相通信。

11. 回滚 Stream

您可以使用命令stream rollback回滚到 Stream 的前一个版本。

dataflow:>stream rollback --name httptest

可选的--releaseVersion命令参数指定 Stream 的版本。如果未指定,则回滚将转到上一个 Stream 版本。

12. Application Count

application count 是系统的一个动态属性,用于指定应用程序实例的数量。有关更多信息,请参阅 Application Count

13. Skipper’s 的升级策略

Skipper有一个简单的 'red/black‘ 升级策略。它先部署一个全新的 Applications 版本,使用与当前运行版本相同的实例,并检查 Applications 的/health端点。如果新 Applications 健康状况良好,则取消部署前一个Applications。如果新应用程序的健康状况不佳,则所有新 Applications 将被卸载,升级将被认为不成功。

Last updated