版本3.0.3

© 2009-2022 VMware, Inc. 保留所有权利。

您可以复制本文档供您自己使用或分发给其他人,前提是您不收取此类副本的任何费用,并且每份副本都包含本版权声明,无论是以印刷版还是电子版形式分发。

前言

本节提供 Spring Cloud Task 参考文档的简要概述。将其视为文档其余部分的地图。您可以以线性方式阅读本参考指南,如果您不感兴趣,也可以跳过某些部分。

1. 关于文档

您可以复制本文档供您自己使用或分发给其他人,前提是您不收取此类副本的任何费用,并且每份副本都包含本版权声明,无论是以印刷版还是电子版形式分发。

2. 寻求帮助

Spring Cloud Task 遇到问题?我们很乐意提供帮助!

所有 Spring Cloud Task 都是开源的,包括文档。如果您发现文档存在问题或者只是想改进它们,请参与其中

3. 第一步

如果您刚刚开始使用 Spring Cloud Task 或一般的“Spring”,我们建议您阅读入门章节。

要从头开始,请阅读以下部分:

要遵循本教程,请阅读 开发您的第一个 Spring Cloud 任务应用程序
要运行您的示例,请阅读 运行示例

入门

如果您刚刚开始使用 Spring Cloud Task,您应该阅读本节。在这里,我们回答基本的“什么?”、“如何?”和“为什么?” 问题。我们首先简单介绍一下 Spring Cloud Task。然后,我们构建一个 Spring Cloud Task 应用程序,并讨论一些核心原则。

4. Spring Cloud任务介绍

Spring Cloud Task 可以轻松创建短期微服务。它提供了允许在生产环境中按需执行短期 JVM 进程的功能。

5. 系统要求

您需要安装 Java(Java 17 或更高版本)。要构建,您还需要安装 Maven。

5.1. 数据库要求

Spring Cloud Task 使用关系数据库来存储执行任务的结果。虽然您可以在没有数据库的情况下开始开发任务(任务状态记录为任务存储库更新的一部分),但对于生产环境,您希望使用受支持的数据库。Spring Cloud Task 目前支持以下数据库:

  • 数据库2

  • 氢2

  • HSQL数据库

  • 数据库

  • 甲骨文

  • Postgres

  • 数据库服务器

6. 开发您的第一个 Spring Cloud 任务应用程序

一个好的起点是简单的“你好,世界!” 应用程序,因此我们创建Spring Cloud Task相当于突出框架的功能。大多数 IDE 对 Apache Maven 都有很好的支持,因此我们使用它作为该项目的构建工具。

spring.io 网站包含许多使用 Spring Boot 的Getting Started指南” 。如果您需要解决特定问题,请先检查那里。您可以通过转到Spring Initializr并创建一个新项目来简化以下步骤 。这样做会自动生成一个新的项目结构,以便您可以立即开始编码。我们建议尝试使用 Spring Initializr 以熟悉它。

6.1. 使用 Spring Initializr 创建 Spring 任务项目

现在我们可以创建并测试一个打印Hello, World!到控制台的应用程序。

为此:

  1. 访问Spring Initialzr网站。

    1. 创建一个新的 Maven 项目,其名称为io.spring.demo工件名称为helloworld

    2. 在“依赖关系”文本框中,键入task并选择Cloud Task依赖关系。

    3. 在“依赖关系”文本框中,键入jdbc并选择JDBC依赖关系。

    4. 在“依赖关系”文本框中,键入h2并选择H2. (或您最喜欢的数据库)

    5. 单击生成项目按钮

  2. 解压 helloworld.zip 文件并将项目导入到您最喜欢的 IDE 中。

6.2. 编写代码

要完成我们的应用程序,我们需要使用HelloworldApplication以下内容更新生成的内容,以便它启动任务。

package io.spring.Helloworld;

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.task.configuration.EnableTask;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@EnableTask
public class HelloworldApplication {

    @Bean
    public ApplicationRunner applicationRunner() {
        return new HelloWorldApplicationRunner();
    }

    public static void main(String[] args) {
        SpringApplication.run(HelloworldApplication.class, args);
    }

    public static class HelloWorldApplicationRunner implements ApplicationRunner {

        @Override
        public void run(ApplicationArguments args) throws Exception {
            System.out.println("Hello, World!");

        }
    }
}

虽然看起来很小,但正在发生很多事情。有关 Spring Boot 详细信息的更多信息,请参阅 Spring Boot 参考文档

现在我们可以打开该application.properties文件了src/main/resources。我们需要在 中配置两个属性application.properties

  • application.name:设置应用程序名称(翻译为任务名称)

  • logging.level:设置 Spring Cloud Task 的日志记录,DEBUG以便了解正在发生的情况。

以下示例展示了如何同时执行这两项操作:

logging.level.org.springframework.cloud.task=DEBUG
spring.application.name=helloWorld

6.2.1. 任务自动配置

当包含 Spring Cloud Task Starter 依赖项时,Task 自动配置所有 bean 以引导其功能。此配置的一部分注册了TaskRepository和基础设施以供其使用。

在我们的演示中,TaskRepository使用嵌入式 H2 数据库来记录任务的结果。这个 H2 嵌入式数据库对于生产环境来说不是一个实用的解决方案,因为一旦任务结束,H2 DB 就会消失。但是,为了获得快速入门体验,我们可以在示例中使用它,并回显该存储库中正在更新的内容到日志。在配置部分(本文档后面)中,我们介绍了如何自定义 Spring Cloud Task 提供的各个部分的配置。

当我们的示例应用程序运行时,Spring Boot 启动HelloWorldCommandLineRunner 并输出“Hello, World!” 消息到标准输出。在存储库中记录TaskLifecycleListener 任务的开始和结束。

6.2.2. 主要方法

main 方法充当任何 java 应用程序的入口点。我们的主要方法委托给 Spring Boot 的SpringApplication类。

6.2.3. 应用程序运行者

Spring 包含许多引导应用程序逻辑的方法。*RunnerSpring Boot 通过其接口(CommandLineRunner或)以有组织的方式提供了一种方便的方法ApplicationRunner。行为良好的任务可以通过使用这两个运行程序之一来引导任何逻辑。

任务的生命周期是从*Runner#run执行方法之前到它们全部完成之后考虑的。Spring Boot 允许应用程序使用多种 *Runner实现,Spring Cloud Task 也是如此。

CommandLineRunnerSpring Cloud Task 不会记录从 a或 以外的机制引导的任何处理(例如 ApplicationRunner通过使用)。InitializingBean#afterPropertiesSet

6.3. 运行示例

此时,我们的应用程序应该可以工作了。由于此应用程序是基于 Spring Boot 的,因此我们可以从应用程序的根目录使用命令行来运行它$ mvn spring-boot:run,如以下示例所示(及其输出):

$ mvn clean spring-boot:run
....... . . .
....... . . . (Maven log output here)
....... . . .

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.3.RELEASE)

2018-07-23 17:44:34.426  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Starting HelloworldApplication on Glenns-MBP-2.attlocal.net with PID 1978 (/Users/glennrenfro/project/helloworld/target/classes started by glennrenfro in /Users/glennrenfro/project/helloworld)
2018-07-23 17:44:34.430  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : No active profile set, falling back to default profiles: default
2018-07-23 17:44:34.472  INFO 1978 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@1d24f32d: startup date [Mon Jul 23 17:44:34 EDT 2018]; root of context hierarchy
2018-07-23 17:44:35.280  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2018-07-23 17:44:35.410  INFO 1978 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2018-07-23 17:44:35.419 DEBUG 1978 --- [           main] o.s.c.t.c.SimpleTaskConfiguration        : Using org.springframework.cloud.task.configuration.DefaultTaskConfigurer TaskConfigurer
2018-07-23 17:44:35.420 DEBUG 1978 --- [           main] o.s.c.t.c.DefaultTaskConfigurer          : No EntityManager was found, using DataSourceTransactionManager
2018-07-23 17:44:35.522 DEBUG 1978 --- [           main] o.s.c.t.r.s.TaskRepositoryInitializer    : Initializing task schema for h2 database
2018-07-23 17:44:35.525  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql]
2018-07-23 17:44:35.558  INFO 1978 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/cloud/task/schema-h2.sql] in 33 ms.
2018-07-23 17:44:35.728  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2018-07-23 17:44:35.730  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'dataSource' has been autodetected for JMX exposure
2018-07-23 17:44:35.733  INFO 1978 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2018-07-23 17:44:35.738  INFO 1978 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
2018-07-23 17:44:35.762 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Creating: TaskExecution{executionId=0, parentExecutionId=null, exitCode=null, taskName='application', startTime=Mon Jul 23 17:44:35 EDT 2018, endTime=null, exitMessage='null', externalExecutionId='null', errorMessage='null', arguments=[]}
2018-07-23 17:44:35.772  INFO 1978 --- [           main] i.s.d.helloworld.HelloworldApplication   : Started HelloworldApplication in 1.625 seconds (JVM running for 4.764)
Hello, World!
2018-07-23 17:44:35.782 DEBUG 1978 --- [           main] o.s.c.t.r.support.SimpleTaskRepository   : Updating: TaskExecution with executionId=1 with the following {exitCode=0, endTime=Mon Jul 23 17:44:35 EDT 2018, exitMessage='null', errorMessage='null'}

前面的输出中有三行是我们感兴趣的:

  • SimpleTaskRepository记录 中条目的创建TaskRepository

  • 我们的执行CommandLineRunner,通过“Hello, World!”来演示。输出。

  • SimpleTaskRepository将任务的完成情况记录在TaskRepository.

可以在此处的 Spring Cloud 任务项目的示例模块中找到一个简单的任务应用程序 。

特征

本节将更详细地介绍 Spring Cloud Task,包括如何使用它、如何配置它以及适当的扩展点。

7. Spring Cloud 任务的生命周期

在大多数情况下,现代云环境是围绕执行预计不会结束的进程而设计的。如果它们确实结束,通常会重新启动。虽然大多数平台确实有某种方法来运行一个进程,该进程在结束时不会重新启动,但该运行的结果通常不会以可消耗的方式维护。Spring Cloud Task 提供了在环境中执行短期进程并记录结果的能力。这样做可以实现围绕短期进程的微服务架构,以及通过消息集成任务来实现长期运行的服务。

虽然此功能在云环境中很有用,但在传统部署模型中也可能出现相同的问题。当使用 cron 等调度程序运行 Spring Boot 应用程序时,能够在应用程序完成后监视其结果会很有用。

Spring Cloud Task 采用的方法是 Spring Boot 应用程序可以有开始和结束,并且仍然成功。批处理应用程序是预期结束(通常是短暂的)进程如何发挥作用的示例之一。

Spring Cloud Task 记录给定任务的生命周期事件。大多数长时间运行的进程(以大多数 Web 应用程序为代表)不会保存其生命周期事件。Spring Cloud Task 的核心任务就是这样做的。

生命周期由单个任务执行组成。这是配置为任务的 Spring Boot 应用程序的物理执行(即,它具有 Sprint Cloud 任务依赖项)。

在任务开始时,在 运行任何实现之前,会在 中创建一个CommandLineRunner记录启动事件的条目。该事件是由Spring框架触发的。这向系统表明所有 bean 都已准备好使用,并且在运行Spring Boot 提供的任何或实现之前出现。ApplicationRunnerTaskRepositorySmartLifecycle#startCommandLineRunnerApplicationRunner

任务的记录仅在成功引导 ApplicationContext. 如果上下文根本无法引导,则不会记录任务的运行。

当 Spring Boot 的所有调用完成*Runner#run或失败 ApplicationContext(由 指示ApplicationFailedEvent)时,任务执行将在存储库中更新结果。

如果应用程序要求ApplicationContext在任务完成时关闭(*Runner#run已调用所有方法并且已更新任务存储库),请将该属性设置spring.cloud.task.closecontextEnabled 为 true。

7.1. 任务执行

中存储的信息TaskRepository在类中建模TaskExecution并包含以下信息:

场地 描述

executionid

任务运行的唯一 ID。

exitCode

从实现生成的退出代码ExitCodeExceptionMapper。如果没有生成退出代码但ApplicationFailedEvent抛出了退出代码,则设置为 1。否则,假定为 0。

taskName

任务的名称,由配置的TaskNameResolver.

startTime

任务开始的时间,如调用所示SmartLifecycle#start

endTime

任务完成的时间,如 所示ApplicationReadyEvent

exitMessage

退出时可用的任何信息。这可以通过编程方式设置 TaskExecutionListener

errorMessage

如果异常是任务结束的原因(如 所示 ApplicationFailedEvent),则该异常的堆栈跟踪存储在此处。

arguments

List传递到可执行引导应用程序时的字符串命令行参数。

7.2. 映射退出代码

当任务完成时,它会尝试向操作系统返回退出代码。如果我们看一下原来的示例,我们可以看到我们没有控制应用程序的这方面。因此,如果抛出异常,JVM 将返回一段代码,该代码可能对您的调试有任何用处,也可能没有用处。

因此,Spring Boot 提供了一个接口 ,ExitCodeExceptionMapper可让您将未捕获的异常映射到退出代码。这样做可以让您在退出代码级别指示出了什么问题。此外,通过以这种方式映射退出代码,Spring Cloud Task 记录返回的退出代码。

如果任务以 SIG-INT 或 SIG-TERM 终止,则退出代码为零,除非代码中另有指定。

当任务运行时,退出代码在存储库中存储为 null。任务完成后,将根据本节前面描述的准则存储适当的退出代码。

8. 配置

Spring Cloud Task 提供了即用型配置,如 DefaultTaskConfigurerSimpleTaskConfiguration类中所定义。本节将介绍默认设置以及如何根据您的需求自定义 Spring Cloud Task。

8.1. 数据源

Spring Cloud Task 使用数据源来存储任务执行的结果。默认情况下,我们提供 H2 的内存实例,以提供简单的引导开发方法。但是,在生产环境中,您可能想要配置自己的DataSource.

如果您的应用程序仅使用一个DataSource并且既充当业务模式又充当任务存储库,那么您所需要做的就是提供任何一个DataSource(最简单的方法是通过 Spring Boot 的配置约定)。Spring Cloud Task 自动将其 DataSource用于存储库。

如果您的应用程序使用多个DataSource任务存储库,则需要使用适当的配置任务存储库DataSource。这种定制可以通过 的实现来完成 TaskConfigurer

8.2. 表前缀

TaskRepository任务表的表前缀是其中一项可修改的属性。默认情况下,它们都以 开头TASK_TASK_EXECUTIONTASK_EXECUTION_PARAMS 是两个例子。但是,修改此前缀有潜在的原因。如果模式名称需要添加到表名称前面,或者同一模式中需要多组任务表,则必须更改表前缀。您可以通过将 设为spring.cloud.task.tablePrefix所需的前缀来实现此目的,如下所示:

spring.cloud.task.tablePrefix=yourPrefix

通过使用spring.cloud.task.tablePrefix,用户承担创建任务表的责任,该任务表既满足任务表模式的标准,又进行了用户业务需求所需的修改。在创建您自己的任务 DDL 时,您可以使用 Spring Cloud 任务架构 DDL 作为指南,如此处 所示

8.3. 启用/禁用表初始化

如果您正在创建任务表并且不希望 Spring Cloud Task 在任务启动时创建它们,请将属性设置spring.cloud.task.initialize-enabledfalse,如下所示:

spring.cloud.task.initialize-enabled=false

它默认为true.

该属性spring.cloud.task.initialize.enable已被弃用。

8.4. 外部生成的任务 ID

在某些情况下,您可能需要考虑任务请求时间和基础结构实际启动任务之间的时间差。TaskExecutionSpring Cloud Task 允许您在请求任务时创建一个。然后将生成的执行ID传递TaskExecution给任务,以便它可以TaskExecution在任务的生命周期中更新。

可以通过调用引用保存对象的数据存储的实现上的方法TaskExecution来创建A。createTaskExecutionTaskRepositoryTaskExecution

为了将您的任务配置为使用生成的TaskExecutionId,请添加以下属性:

spring.cloud.task.executionid=yourtaskId

8.5。外部任务 ID

Spring Cloud Task 允许您为每个 TaskExecution. 为了将您的任务配置为使用生成的TaskExecutionId,请添加以下属性:

spring.cloud.task.external-execution-id=<externalTaskId>

8.6。父任务 ID

Spring Cloud Task 允许您为每个TaskExecution. 例如,一个任务执行另一个或多个任务,并且您想要记录哪个任务启动了每个子任务。为了配置您的任务以设置父任务, TaskExecutionId请在子任务上添加以下属性:

spring.cloud.task.parent-execution-id=<parentExecutionTaskId>

8.7. 任务配置器

TaskConfigurer是一个策略接口,可让您自定义 Spring Cloud Task 组件的配置方式。默认情况下,我们提供DefaultTaskConfigurer提供逻辑默认值的:Map基于内存的组件(如果没有 DataSource提供,则对开发有用)和基于 JDBC 的组件(如果有可用的,则有用DataSource )。

TaskConfigurer允许您配置三个主要组件:

成分 描述 默认(由 提供DefaultTaskConfigurer

TaskRepository

TaskRepository要使用的实现。

SimpleTaskRepository

TaskExplorer

TaskExplorer要使用的(用于只读访问任务存储库的组件)的实现。

SimpleTaskExplorer

PlatformTransactionManager

运行任务更新时使用的事务管理器。

JdbcTransactionManagerDataSource如果使用 a 。ResourcelessTransactionManager如果不是。

您可以通过创建接口的自定义实现来自定义上表中描述的任何组件TaskConfigurer。通常,扩展 (如果未找到DefaultTaskConfigurera 则提供)并覆盖所需的 getter 就足够了。TaskConfigurer但是,可能需要从头开始实现您自己的。

用户不应该直接使用 getter 方法,TaskConfigurer除非他们使用它来提供作为 Spring Bean 公开的实现。

8.8。任务执行监听器

TaskExecutionListener允许您注册任务生命周期中发生的特定事件的侦听器。为此,请创建一个实现该 TaskExecutionListener接口的类。实现该接口的类TaskExecutionListener 会收到以下事件的通知:

  • onTaskStartup:在将其存储TaskExecutionTaskRepository.

  • onTaskEndTaskExecution:在更新条目TaskRepository并标记任务的最终状态之前。

  • onTaskFailedonTaskEnd:在任务抛出未处理的异常时调用该方法之前。

Spring Cloud Task 还允许您TaskExecution使用以下方法注释将侦听器添加到 bean 内的方法:

  • @BeforeTask:在将其存储TaskExecution到之前TaskRepository

  • @AfterTask:在更新条目之前TaskExecution标记TaskRepository 任务的最终状态。

  • @FailedTask@AfterTask:在任务抛出未处理的异常时调用该方法之前。

以下示例显示了正在使用的三个注释:

 public class MyBean {

    @BeforeTask
    public void methodA(TaskExecution taskExecution) {
    }

    @AfterTask
    public void methodB(TaskExecution taskExecution) {
    }

    @FailedTask
    public void methodC(TaskExecution taskExecution, Throwable throwable) {
    }
}
ApplicationListener在链中 插入早于TaskLifecycleListener现有的可能会导致意想不到的效果。

8.8.1. 任务执行监听器抛出的异常

如果事件处理程序引发异常TaskExecutionListener,则该事件处理程序的所有侦听器处理都会停止。例如,如果onTaskStartup启动了三个侦听器并且第一个事件处理程序引发异常,则不会调用onTaskStartup其他两个方法。onTaskStartup但是,会调用其他事件处理程序(onTaskEndonTaskFailedTaskExecutionListeners

事件处理程序引发异常时返回的退出代码是ExitCodeEventTaskExecutionListener 报告的退出代码 。如果没有发出,则评估抛出的异常以查看它是否是 ExitCodeGenerator类型。如果是,则返回退出代码。否则, 返回。ExitCodeEventExitCodeGenerator1

如果方法中引发异常onTaskStartup,应用程序的退出代码将为1onTaskEnd如果在 a or方法中引发异常onTaskFailed ,则应用程序的退出代码将是使用上面列举的规则建立的代码。

如果在 、onTaskStartuponTaskEnd中引发异常,onTaskFailed 您无法使用 覆盖应用程序的退出代码ExitCodeExceptionMapper

8.8.2. 退出消息

您可以使用 .a 文件以编程方式设置任务的退出消息 TaskExecutionListener。这是通过设置 来完成的TaskExecution’s exitMessage,然后将其传递到TaskExecutionListener. 以下示例显示了用 注释的方法@AfterTask ExecutionListener

@AfterTask
public void afterMe(TaskExecution taskExecution) {
    taskExecution.setExitMessage("AFTER EXIT MESSAGE");
}

ExitMessage可以在任何侦听器事件(onTaskStartuponTaskFailed和)处设置An onTaskEnd。三个侦听器的优先顺序如下:

  1. onTaskEnd

  2. onTaskFailed

  3. onTaskStartup

例如,如果您exitMessageonTaskStartuponTaskFailed 侦听器设置了 ,并且任务顺利结束,则exitMessage和 侦听器onTaskStartup 将存储在存储库中。否则,如果发生故障,则会存储exitMessage来自的信息。此外,如果您使用 侦听器onTaskFailed设置,则中的将取代和中的退出消息。exitMessageonTaskEndexitMessageonTaskEndonTaskStartuponTaskFailed

8.9。限制 Spring Cloud 任务实例

Spring Cloud Task 允许您确定一次只能运行一个具有给定任务名称的任务。为此,您需要为每个任务执行建立任务名称和设置 spring.cloud.task.single-instance-enabled=true。当第一个任务执行正在运行时,任何其他时间您尝试运行具有相同 任务名称和“spring.cloud.task.single-instance-enabled=true”的任务时,该任务都会失败并显示以下错误消息Task with name "application" is already running.spring.cloud.task.single-instance-enabled的值为false。以下示例显示如何spring.cloud.task.single-instance-enabled设置true

spring.cloud.task.single-instance-enabled=true or false

要使用此功能,您必须将以下 Spring Integration 依赖项添加到您的应用程序中:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
</dependency>
如果任务因启用此功能且另一个任务正在使用相同的任务名称运行而失败,则应用程序的退出代码将为 1。

8.9.1. Spring AOT 和本机编译的单实例使用

要在创建本机编译的应用程序时使用 Spring Cloud Task 的单实例功能,您需要在构建时启用该功能。为此,请添加 process-aot 执行并将其设置spring.cloud.task.single-step-instance-enabled=true为 JVM 参数,如下所示:

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.cloud.task.single-instance-enabled=true
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

8.10。启用 ApplicationRunner 和 CommandLineRunner 的观察

启用任务观察ApplicationRunnerCommandLineRunner设置spring.cloud.task.observation.enabled为 true。

SimpleMeterRegistry可以在此处找到具有观察功能的示例任务应用程序。

8.11。禁用 Spring Cloud 任务自动配置

如果 Spring Cloud Task 不应自动配置实现,您可以禁用 Task 的自动配置。这可以通过将以下注释添加到您的任务应用程序来完成:

@EnableAutoConfiguration(exclude={SimpleTaskAutoConfiguration.class})

您还可以通过将该spring.cloud.task.autoconfiguration.enabled属性设置为 来禁用任务自动配置false

8.12. 关闭上下文

如果应用程序要求ApplicationContext在任务完成时关闭(*Runner#run已调用所有方法并且已更新任务存储库),请将该属性设置spring.cloud.task.closecontextEnabledtrue

关闭上下文的另一种情况是任务执行完成但应用程序未终止时。在这些情况下,上下文保持打开状态,因为已分配线程(例如:如果您正在使用 TaskExecutor)。在这些情况下,启动任务时将该spring.cloud.task.closecontextEnabled属性设置为。true一旦任务完成,这将关闭应用程序的上下文。从而允许应用程序终止。

8.13。启用任务指标

Spring Cloud Task 与 Micrometer 集成并为其执行的任务创建观察。要启用任务可观察性集成,您必须将spring-boot-starter-actuator、您的首选注册表实现(如果您想发布指标)和微米跟踪(如果您想发布跟踪数据)添加到您的任务应用程序。使用 Influx 启用任务可观察性和指标的 Maven 依赖项示例集如下:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-registry-influx</artifactId>
    <scope>runtime</scope>
</dependency>

8.14。Spring 任务和 Spring Cloud 任务属性

该术语task是行业中经常使用的词。在一个这样的示例中,Spring Boot 提供了这些属性,spring.task而 Spring Cloud Task 则提供了这些spring.cloud.task属性。这在过去引起了一些混乱,认为这两组属性是直接相关的。然而,它们代表了 Spring 生态系统中提供的两组不同的功能。

  • spring.task指的是配置ThreadPoolTaskScheduler.

  • spring.cloud.task指配置 Spring Cloud Task 功能的属性。

本节将更详细地介绍 Spring Cloud Task 与 Spring Batch 的集成。本节介绍跟踪作业执行和执行作业的任务之间的关联以及通过 Spring Cloud Deployer 进行远程分区。

9. 将作业执行与执行该作业的任务相关联

Spring Boot 提供了在 uber-jar 中执行批处理作业的工具。Spring Boot 对此功能的支持使开发人员可以在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业的执行(一次作业执行)与任务的执行关联起来的能力,以便可以追溯到另一​​个。

Spring Cloud Task 通过使用TaskBatchExecutionListener. 默认情况下,此侦听器在任何配置了 Spring Batch 作业(通过在Job上下文中定义类型的 bean)和 spring-cloud-task-batch类路径上的 jar 的上下文中自动配置。侦听器被注入到满足这些条件的所有作业中。

9.1. 重写 TaskBatchExecutionListener

为了防止侦听器被注入到当前上下文中的任何批处理作业中,您可以使用标准 Spring Boot 机制禁用自动配置。

要仅将侦听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor并提供作业 bean ID 列表,如以下示例所示:

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
    TaskBatchExecutionListenerBeanPostProcessor postProcessor =
        new TaskBatchExecutionListenerBeanPostProcessor();

    postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

    return postProcessor;
}
您可以在 Spring Cloud 任务项目的示例模块中找到示例批处理应用程序, 此处

10. 远程分区

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的工具。并将工作步骤执行的启动委托给 Spring Cloud Deployer DeployerPartitionHandlerDeployerStepExecutionHandler

要配置DeployerStepExecutionHandler,您必须提供Resource 代表要执行的 Spring Boot über-jar 、 aTaskLauncherHandler和 a JobExplorer。您可以配置任何环境属性以及一次执行的最大工作线程数、轮询结果的时间间隔(默认为 10 秒)以及超时(默认为 -1 或无超时)。以下示例显示了配置方式PartitionHandler

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
        JobExplorer jobExplorer) throws Exception {

    MavenProperties mavenProperties = new MavenProperties();
    mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
        new MavenProperties.RemoteRepository(repository))));

    Resource resource =
        MavenResource.parse(String.format("%s:%s:%s",
                "io.spring.cloud",
                "partitioned-batch-job",
                "1.1.0.RELEASE"), mavenProperties);

    DeployerPartitionHandler partitionHandler =
        new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

    List<String> commandLineArgs = new ArrayList<>(3);
    commandLineArgs.add("--spring.profiles.active=worker");
    commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
    commandLineArgs.add("--spring.batch.initializer.enabled=false");

    partitionHandler.setCommandLineArgsProvider(
        new PassThroughCommandLineArgsProvider(commandLineArgs));
    partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
    partitionHandler.setMaxWorkers(2);
    partitionHandler.setApplicationName("PartitionedBatchJobTask");

    return partitionHandler;
}
将环境变量传递给分区时,每个分区可能位于具有不同环境设置的不同计算机上。因此,您应该仅传递所需的环境变量。

请注意,在上面的示例中,我们将最大工作线程数设置为 2。设置最大工作线程数可确定一次应运行的最大分区数。

Resource执行的预计是一个 Spring Boot über-jar,并 在当前上下文中DeployerStepExecutionHandler配置为 a 。CommandLineRunner上例中列举的存储库应该是über-jar所在的远程存储库。管理者和工作人员都希望能够看到用作作业存储库和任务存储库的同一数据存储。一旦底层基础设施引导了 Spring Boot jar 并且 Spring Boot 启动了DeployerStepExecutionHandler,步骤处理程序就会执行所请求的 Step。以下示例展示了如何配置DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
    DeployerStepExecutionHandler handler =
        new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

    return handler;
}
您可以在 Spring Cloud Task 项目的示例模块中找到示例远程分区应用程序, 此处

10.1. 异步启动远程批处理分区

默认情况下,批处理分区按顺序启动。但是,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到配置资源(例如:在 Kubernetes 中配置 pod)。在这些情况下,您可以ThreadPoolTaskExecutorDeployerPartitionHandler. 这将根据 的配置启动远程批处理分区ThreadPoolTaskExecutor。例如:

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setThreadNamePrefix("default_task_executor_thread");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.initialize();
        return executor;
    }

    @Bean
    public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
        TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
        Resource resource = this.resourceLoader
            .getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

        DeployerPartitionHandler partitionHandler =
            new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep", taskRepository, executor);
    ...
    }
我们需要关闭上下文,因为使用 会使ThreadPoolTaskExecutor线程处于活动状态,因此应用程序不会终止。要正确关闭应用程序,我们需要将spring.cloud.task.closecontextEnabled属性设置为true

10.2. 为 Kubernetes 平台开发批量分区应用程序的注意事项

  • 在 Kubernetes 平台上部署分区应用程序时,必须对 Spring Cloud Kubernetes Deployer 使用以下依赖项:

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,将引发异常。

11. 批量信息消息

Spring Cloud Task 为批处理作业提供了发出信息性消息的能力。“ Spring Batch Events ”部分详细介绍了此功能。

12. 批处理作业退出代码

如前所述,Spring Cloud Task 应用程序支持记录任务执行的退出代码的功能。但是,如果您在任务中运行 Spring Batch 作业,则无论批处理作业执行如何完成,在使用默认批处理/引导行为时,任务的结果始终为零。请记住,任务是启动应用程序,并且从任务返回的退出代码与启动应用程序相同。要覆盖此行为并允许任务在批处理作业返回 BatchStatus 时返回非零的退出代码 FAILED设置spring.cloud.task.batch.fail-on-job-failuretrue。那么退出代码可以是1(默认)或者基于 指定 ExitCodeGenerator

此功能使用一种新功能ApplicationRunner来替换 Spring Boot 提供的功能。默认情况下,配置顺序相同。但是,如果您想自定义运行的顺序ApplicationRunner,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder属性来设置其顺序。要让您的任务根据批处理作业执行的结果返回退出代码,您需要编写自己的 CommandLineRunner.

单步批量作业启动器

本节介绍如何使用 Spring Cloud Task 中包含的启动器Job来开发单个Spring Batch。Step此启动器允许您使用配置来定义ItemReaderItemWriter或完整的单步 Spring Batch Job。有关 Spring Batch 及其功能的更多信息,请参阅 Spring Batch 文档

要获取 Maven 的启动器,请将以下内容添加到您的构建中:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要获取 Gradle 的启动器,请将以下内容添加到您的构建中:

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

13. 定义工作

您可以使用 starter 来定义少至一个ItemReader或一个ItemWriter,或多至一个完整的Job. 在本节中,我们定义需要定义哪些属性来配置 Job.

13.1. 特性

首先,启动器提供了一组属性,让您可以通过一个步骤配置作业的基础知识:

表 1. 作业属性
财产 类型 默认值 描述

spring.batch.job.jobName

String

null

职位名称。

spring.batch.job.stepName

String

null

步骤的名称。

spring.batch.job.chunkSize

Integer

null

每笔交易要处理的项目数量。

配置上述属性后,您就拥有了一个具有单个基于块的步骤的作业。Map<String, Object>这个基于块的步骤将实例作为项目进行读取、处理和写入。但是,该步骤尚未执行任何操作。您需要配置一个ItemReader、一个可选的ItemProcessor和一个ItemWriter让它做一些事情。要配置其中之一,您可以使用属性并配置提供自动配置的选项之一,也可以使用标准 Spring 配置机制来配置您自己的选项。

如果您配置自己的输入和输出类型,则输入和输出类型必须与步骤中的其他类型匹配。本starter中的实现和实现均使用a作为ItemReader输入和输出项。 ItemWriterMap<String, Object>

14. ItemReader 实现的自动配置

该启动器为四种不同的ItemReader实现提供自动配置: AmqpItemReaderFlatFileItemReaderJdbcCursorItemReaderKafkaItemReader。在本节中,我们概述如何使用提供的自动配置来配置其中每一个。

14.1. AmqpItemReader

您可以使用 AMQP 从队列或主题中读取数据AmqpItemReader。此实现的自动配置ItemReader取决于两组配置。第一个是AmqpTemplate. 您可以自行配置或使用 Spring Boot 提供的自动配置。请参阅 Spring Boot AMQP 文档。配置完成后AmqpTemplate,您可以通过设置以下属性来启用批处理功能以支持它:

表 2.AmqpItemReader属性
财产 类型 默认值 描述

spring.batch.job.amqpitemreader.enabled

boolean

false

如果true,将执行自动配置。

spring.batch.job.amqpitemreader.jsonConverterEnabled

boolean

true

指示是否Jackson2JsonMessageConverter应该注册来解析消息。

有关更多信息,请参阅AmqpItemReader文档

14.2. 平面文件项读取器

FlatFileItemReader允许您读取平面文件(例如 CSV 和其他文件格式)。要读取文件,您可以通过正常的 Spring 配置自行提供一些组件(LineTokenizerRecordSeparatorPolicyFieldSetMapperLineMapperSkippedLinesCallback)。您还可以使用以下属性来配置读取器:

表 3.FlatFileItemReader属性
财产 类型 默认值 描述

spring.batch.job.flatfileitemreader.saveState

boolean

true

确定是否应保存状态以供重新启动。

spring.batch.job.flatfileitemreader.name

String

null

用于在 中提供唯一键的名称ExecutionContext

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

从文件中读取的最大项目数。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已读的项目数。用于重新启动。

spring.batch.job.flatfileitemreader.comments

List<String>

空列表

指示文件中注释行(要忽略的行)的字符串列表。

spring.batch.job.flatfileitemreader.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemreader.strict

boolean

true

如果设置为true,则如果未找到资源,读取器将引发异常。

spring.batch.job.flatfileitemreader.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

读取文件时使用的编码。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示在文件开头要跳过的行数。

spring.batch.job.flatfileitemreader.delimited

boolean

false

指示文件是否为分隔文件(CSV 和其他格式)。此属性只能有其中之一,或者spring.batch.job.flatfileitemreader.fixedLength可以true同时存在。

spring.batch.job.flatfileitemreader.delimiter

String

DelimitedLineTokenizer.DELIMITER_COMMA

如果读取分隔文件,则指示要解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

char

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用于确定用于引用值的字符。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

空列表

索引列表,用于确定记录中的哪些字段要包含在项目中。

spring.batch.job.flatfileitemreader.fixedLength

boolean

false

指示文件的记录是否按列号进行解析。此属性只能有其中之一,或者spring.batch.job.flatfileitemreader.delimited可以true同时存在。

spring.batch.job.flatfileitemreader.ranges

List<Range>

空列表

用于解析固定宽度记录的列范围列表。请参阅范围文档

spring.batch.job.flatfileitemreader.names

String []

null

从记录中解析的每个字段的名称列表。Map<String, Object>这些名称是从 this 返回的项目中的键ItemReader

spring.batch.job.flatfileitemreader.parsingStrict

boolean

true

如果设置为true,则如果字段无法映射,则映射失败。

14.3。JdbcCursorItemReader

对关系数据库运行JdbcCursorItemReader查询并迭代结果游标 ( ResultSet) 以提供结果项。此自动配置允许您提供 a PreparedStatementSetter、 aRowMapper或两者。您还可以使用以下属性来配置JdbcCursorItemReader

表 4.JdbcCursorItemReader属性
财产 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.saveState

boolean

true

确定是否应保存状态以供重新启动。

spring.batch.job.jdbccursoritemreader.name

String

null

用于在 中提供唯一键的名称ExecutionContext

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

从文件中读取的最大项目数。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已读的项目数。用于重新启动。

spring.batch.job.jdbccursoritemreader.fetchSize

int

向驱动程序发出提示,指示每次调用数据库系统要检索多少条记录。为了获得最佳性能,您通常希望将其设置为匹配块大小。

spring.batch.job.jdbccursoritemreader.maxRows

int

从数据库读取的最大项目数。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查询超时的毫秒数。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

boolean

true

确定读者在处理时是否应忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

boolean

true

指示是否应在每次读取后验证光标的位置,以验证是否RowMapper未使光标前进。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

boolean

false

指示驱动程序是否支持光标的绝对定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

boolean

false

指示连接是否与其他处理共享(因此是事务的一部分)。

spring.batch.job.jdbccursoritemreader.sql

String

null

从中读取的 SQL 查询。

您还可以使用以下属性专门为读者指定 JDBC DataSource: 。JdbcCursorItemReader特性

财产 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

boolean

false

确定是否JdbcCursorItemReader DataSource应启用。

jdbccursoritemreader.datasource.url

String

null

数据库的 JDBC URL。

jdbccursoritemreader.datasource.username

String

null

数据库的登录用户名。

jdbccursoritemreader.datasource.password

String

null

数据库的登录密码。

jdbccursoritemreader.datasource.driver-class-name

String

null

JDBC 驱动程序的完全限定名称。

如果未指定DataSource,则将使用 默认值。 JDBCCursorItemReaderjdbccursoritemreader_datasource

14.4. KafkaItemReader

从 Kafka 主题中提取数据分区非常有用,而且正是它的功能 KafkaItemReader。要配置KafkaItemReader,需要两项配置。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅 Spring Boot Kafka 文档)。从 Spring Boot 配置 Kafka 属性后,您可以KafkaItemReader 通过设置以下属性来配置其自身:

表 5.KafkaItemReader属性
财产 类型 默认值 描述

spring.batch.job.kafkaitemreader.name

String

null

用于在 中提供唯一键的名称ExecutionContext

spring.batch.job.kafkaitemreader.topic

String

null

要阅读的主题的名称。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

空列表

要从中读取的分区索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

操作超时poll()

spring.batch.job.kafkaitemreader.saveState

boolean

true

确定是否应保存状态以供重新启动。

请参阅KafkaItemReader文档

14.5。原生编译

单步批处理的优点是,当您使用 JVM 时,它允许您在运行时动态选择要使用的读取器和写入器 Bean。但是,当您使用本机编译时,您必须在构建时而不是运行时确定读取器和写入器。以下示例执行此操作:

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

15. 项目处理器配置

单步批处理作业自动配置接受一个ItemProcessor是否在ApplicationContext. 如果找到正确的类型 ( ItemProcessor<Map<String, Object>, Map<String, Object>>),则会将其自动连接到步骤中。

16. ItemWriter 实现的自动配置

此启动器为ItemWriter与支持的ItemReader实现相匹配的实现提供自动配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本节介绍如何使用自动配置来配置受支持的ItemWriter.

16.1. AmqpItemWriter

要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个 AmqpTemplate. 实现这一点的最简单方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅Spring Boot AMQP 文档

配置完后,您可以通过设置以下属性来AmqpTemplate配置:AmqpItemWriter

表 6.AmqpItemWriter属性
财产 类型 默认值 描述

spring.batch.job.amqpitemwriter.enabled

boolean

false

如果true,则自动配置运行。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

boolean

true

指示是否Jackson2JsonMessageConverter应该注册转换消息。

16.2. FlatFileItemWriter

要将文件写入作为步骤的输出,您可以配置FlatFileItemWriter. 自动配置接受已显式配置的组件(例如LineAggregatorFieldExtractorFlatFileHeaderCallback或 a FlatFileFooterCallback)以及通过设置以下指定属性进行配置的组件:

表 7.FlatFileItemWriter属性
财产 类型 默认值 描述

spring.batch.job.flatfileitemwriter.resource

Resource

null

要读取的资源。

spring.batch.job.flatfileitemwriter.delimited

boolean

false

指示输出文件是否是分隔文件。如果truespring.batch.job.flatfileitemwriter.formatted一定是false

spring.batch.job.flatfileitemwriter.formatted

boolean

false

指示输出文件是否为格式化文件。如果truespring.batch.job.flatfileitemwriter.delimited一定是false

spring.batch.job.flatfileitemwriter.format

String

null

用于生成格式化文件的输出的格式。格式化是通过使用 来执行的String.format

spring.batch.job.flatfileitemwriter.locale

Locale

Locale.getDefault()

Locale生成文件时要使用的。

spring.batch.job.flatfileitemwriter.maximumLength

int

0

记录的最大长度。如果为 0,则大小无界。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

最小记录长度。

spring.batch.job.flatfileitemwriter.delimiter

String

,

用于String分隔分隔文件中的字段。

spring.batch.job.flatfileitemwriter.encoding

String

FlatFileItemReader.DEFAULT_CHARSET

写入文件时使用的编码。

spring.batch.job.flatfileitemwriter.forceSync

boolean

false

指示文件是否应在刷新时强制同步到磁盘。

spring.batch.job.flatfileitemwriter.names

String []

null

从记录中解析的每个字段的名称列表。Map<String, Object>这些名称是this 收到的项目的键ItemWriter

spring.batch.job.flatfileitemwriter.append

boolean

false

指示如果找到输出文件是否应附加文件。

spring.batch.job.flatfileitemwriter.lineSeparator

String

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

使用什么String来分隔输出文件中的行。

spring.batch.job.flatfileitemwriter.name

String

null

用于在 中提供唯一键的名称ExecutionContext

spring.batch.job.flatfileitemwriter.saveState

boolean

true

确定是否应保存状态以供重新启动。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

boolean

false

如果设置为true,则作业完成时将删除空文件(没有输出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

boolean

true

如果设置为true并且在输出文件应位于的位置找到文件,则该文件会在步骤开始之前删除。

spring.batch.job.flatfileitemwriter.transactional

boolean

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示读取器是否是事务性队列(指示读取的项目在失败时返回到队列)。

16.3。JdbcBatchItemWriter

要将步骤的输出写入关系数据库,此启动器提供了自动配置JdbcBatchItemWriter. 自动配置允许您通过设置以下属性来提供自己的ItemPreparedStatementSetter或和配置选项:ItemSqlParameterSourceProvider

表 8.JdbcBatchItemWriter属性
财产 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.name

String

null

用于在 中提供唯一键的名称ExecutionContext

spring.batch.job.jdbcbatchitemwriter.sql

String

null

用于插入每个项目的 SQL。

spring.batch.job.jdbcbatchitemwriter.assertUpdates

boolean

true

是否验证每次插入都会导致至少一条记录的更新。

您还可以使用以下属性专门为编写器指定 JDBC 数据源: 。JdbcBatchItemWriter特性

财产 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

boolean

false

确定是否JdbcCursorItemReader DataSource应启用。

jdbcbatchitemwriter.datasource.url

String

null

数据库的 JDBC URL。

jdbcbatchitemwriter.datasource.username

String

null

数据库的登录用户名。

jdbcbatchitemwriter.datasource.password

String

null

数据库的登录密码。

jdbcbatchitemreader.datasource.driver-class-name

String

null

JDBC 驱动程序的完全限定名称。

如果未指定DataSource,则将使用 默认值。 JdbcBatchItemWriterjdbcbatchitemwriter_datasource

16.4。KafkaItemWriter

要将步骤输出写入 Kafka 主题,您需要KafkaItemWriter. 该启动器KafkaItemWriter通过使用两个地方的设施来提供自动配置。第一个是 Spring Boot 的 Kafka 自动配置。(请参阅Spring Boot Kafka 文档。)其次,此启动器允许您在编写器上配置两个属性。

表 9.KafkaItemWriter属性
财产 类型 默认值 描述

spring.batch.job.kafkaitemwriter.topic

String

null

要写入的 Kafka 主题。

spring.batch.job.kafkaitemwriter.delete

boolean

false

传递给 writer 的项目是否全部作为删除事件发送到主题。

有关配置选项的更多信息KafkaItemWriter,请参阅 KafkaItemWiter文档

16.5。春季AOT

将 Spring AOT 与 Single Step Batch Starter 结合使用时,您必须在编译时设置读取器和写入器名称属性(除非您为读取器和/或写入器创建 bean)。为此,您必须在 boot maven 插件或 gradle 插件中包含您希望用作参数或环境变量的读取器和写入器的名称。例如,如果您希望在 Maven 中启用FlatFileItemReaderFlatFileItemWriter,它将如下所示:

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>

Spring Cloud Stream集成

任务本身可能很有用,但是将任务集成到更大的生态系统中可以使其对于更复杂的处理和编排有用。本节介绍 Spring Cloud Task 与 Spring Cloud Stream 的集成选项。

17. 从 Spring Cloud Stream 启动任务

您可以从流启动任务。为此,请创建一个接收器来侦听包含 aTaskLaunchRequest作为其有效负载的消息。其中TaskLaunchRequest包含:

  • uri:到要执行的任务工件。

  • applicationName:与任务关联的名称。如果未设置 applicationName,则会TaskLaunchRequest生成由以下内容组成的任务名称:Task-<UUID>

  • commandLineArguments:包含任务的命令行参数的列表。

  • environmentProperties:包含任务要使用的环境变量的映射。

  • deploymentProperties:包含部署者用于部署任务的属性的映射。

如果有效负载属于不同类型,则接收器将引发异常。

例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源接收数据并创建GenericMessage包含消息的流TaskLaunchRequest,并将消息发送到其输出通道。然后,任务接收器将从其输入通道接收消息,然后启动任务。

要创建taskSink,您只需要创建一个包含该 EnableTaskLauncher注解的Spring Boot应用程序,如下例所示:

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
    public static void main(String[] args) {
        SpringApplication.run(TaskSinkApplication.class, args);
    }
}

Spring Cloud Task 项目的示例模块包含示例 Sink 和 Processor。要将这些示例安装到本地 Maven 存储库中,请从 属性设置为 的spring-cloud-task-samples目录运行 Maven 构建,如以下示例所示:skipInstallfalse

mvn clean install

maven.remoteRepositories.springRepo.url属性必须设置为 über-jar 所在的远程存储库的位置。如果不设置,则没有远程存储库,因此仅依赖于本地存储库。

17.1。Spring Cloud 数据流

要在 Spring Cloud Data Flow 中创建流,必须首先注册我们创建的任务接收器应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册 Processor 和 Sink 示例应用程序:

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例展示了如何从 Spring Cloud Data Flow shell 创建流:

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

18. Spring Cloud 任务事件

当任务通过 Spring Cloud Stream 通道运行时,Spring Cloud Task 提供了通过 Spring Cloud Stream 通道发出事件的功能。任务侦听器用于TaskExecution在名为 的消息通道上发布task-events。此功能会自动装配到其类路径上具有spring-cloud-streamspring-cloud-stream-<binder>和已定义任务的任何任务中。

要禁用事件发射侦听器,请将该spring.cloud.task.events.enabled 属性设置为false

定义了适当的类路径后,以下任务将TaskExecution在通道上作为事件发出task-events(在任务开始和结束时):

@SpringBootApplication
public class TaskEventsApplication {

    public static void main(String[] args) {
        SpringApplication.run(TaskEventsApplication.class, args);
    }

    @Configuration
    public static class TaskConfiguration {

        @Bean
        public ApplicationRunner applicationRunner() {
            return new ApplicationRunner() {
                @Override
                public void run(ApplicationArguments args) {
                    System.out.println("The ApplicationRunner was executed");
                }
            };
        }
    }
}
绑定器实现也需要位于类路径上。
示例任务事件应用程序可以在 Spring Cloud 任务项目的示例模块中找到, 此处

18.1. 禁用特定任务事件

要禁用任务事件,您可以将该spring.cloud.task.events.enabled属性设置为 false

19. 春季批次活动

通过任务执行 Spring Batch 作业时,Spring Cloud Task 可以配置为根据 Spring Batch 中可用的 Spring Batch 侦听器发出信息性消息。具体来说,以下 Spring Batch 侦听器会自动配置到每个批处理作业中,并在通过 Spring Cloud Task 运行时在关联的 Spring Cloud Stream 通道上发出消息:

  • JobExecutionListener倾听job-execution-events

  • StepExecutionListener倾听step-execution-events

  • ChunkListener倾听chunk-events

  • ItemReadListener倾听item-read-events

  • ItemProcessListener倾听item-process-events

  • ItemWriteListener倾听item-write-events

  • SkipListener倾听skip-events

当上下文中存在AbstractJob适当的bean(aJob和a )时,这些侦听器会自动配置为any 。TaskLifecycleListener监听这些事件的配置的处理方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 a Source,侦听应用程序充当 aProcessor或 a Sink

一个示例是让应用程序侦听通道job-execution-events以了解作业的启动和停止。要配置监听应用程序,您可以将输入配置如下job-execution-events

spring.cloud.stream.bindings.input.destination=job-execution-events

绑定器实现也需要位于类路径上。
可以在 Spring Cloud 任务项目的示例模块中找到示例批处理事件应用程序, 此处

19.1。将批量事件发送到不同通道

Spring Cloud Task 为批处理事件提供的选项之一是能够更改特定侦听器可以向其发出消息的通道。为此,请使用以下配置: spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果StepExecutionListener需要将其消息发送到另一个名为的通道 my-step-execution-events而不是默认通道step-execution-events,则可以添加以下配置:

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

19.2。禁用批量事件

要禁用所有批处理事件的侦听器功能,请使用以下配置:

spring.cloud.task.batch.events.enabled=false

要禁用特定批处理事件,请使用以下配置:

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了您可以禁用的各个侦听器:

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

19.3。发出批量事件的顺序

默认情况下,批处理事件具有Ordered.LOWEST_PRECEDENCE. 要更改此值(例如,更改为 5 ),请使用以下配置:

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5

附录

20. 任务存储库架构

本附录提供任务存储库中使用的数据库模式的 ERD。

任务图式

20.1. 表格信息

任务执行

存储任务执行信息。

列名 必需的 类型 字段长度 笔记

TASK_EXECUTION_ID

真的

BIGINT

X

Spring Cloud 任务框架在应用程序启动时建立从TASK_SEQ. 或者,如果记录是在任务外部创建的,则必须在创建记录时填充该值。

开始时间

错误的

日期时间(6)

X

Spring Cloud 任务框架在应用程序启动时建立该值。

时间结束

错误的

日期时间(6)

X

Spring Cloud Task Framework 在应用程序退出时建立该值。

任务名称

错误的

VARCHAR

100

应用程序启动时的 Spring Cloud 任务框架会将其设置为“应用程序”,除非用户使用spring.application.name.

退出代码

错误的

整数

X

遵循 Spring Boot 默认值,除非被用户覆盖(如此处所述

退出消息

错误的

VARCHAR

2500

用户定义如此处所述

错误信息

错误的

VARCHAR

2500

Spring Cloud Task Framework 在应用程序退出时建立该值。

最近更新时间

真的

时间戳

X

Spring Cloud 任务框架在应用程序启动时建立该值。或者,如果记录是在任务外部创建的,则必须在创建记录时填充该值。

EXTERNAL_EXECUTION_ID

错误的

VARCHAR

250

如果spring.cloud.task.external-execution-id设置了该属性,那么 Spring Cloud Task Framework 在应用程序启动时会将其设置为指定的值。更多信息可以在这里找到

PARENT_TASK_EXECUTION_ID

错误的

BIGINT

X

如果spring.cloud.task.parent-execution-id设置了该属性,那么 Spring Cloud Task Framework 在应用程序启动时会将其设置为指定的值。更多信息可以在这里找到

任务执行参数

存储用于任务执行的参数

列名 必需的 类型 字段长度

TASK_EXECUTION_ID

真的

BIGINT

X

任务参数

错误的

VARCHAR

2500

任务_任务_批次

用于将任务执行链接到批处理执行。

列名 必需的 类型 字段长度

TASK_EXECUTION_ID

真的

BIGINT

X

作业_执行_ID

真的

BIGINT

X

任务锁定

用于此处single-instance-enabled讨论的功能。

列名 必需的 类型 字段长度 笔记

钥匙

真的

字符

36

该锁的 UUID

地区

真的

VARCHAR

100

用户可以使用该字段建立一组锁。

CLIENT_ID

真的

字符

36

包含要锁定的应用程序名称的任务执行 ID。

创建日期

真的

约会时间

X

条目创建日期

可以在此处 找到为每种数据库类型设置表的 DDL 。

20.2. SQL服务器

默认情况下,Spring Cloud Task 使用序列表来确定TASK_EXECUTION_ID表的顺序TASK_EXECUTION。但是,当使用 SQL Server 同时启动多个任务时,这可能会导致表上发生死锁TASK_SEQ。解决方案是删除该TASK_EXECUTION_SEQ表并使用相同名称创建一个序列。例如:

DROP TABLE TASK_SEQ;

CREATE SEQUENCE [DBO].[TASK_SEQ] AS BIGINT
 START WITH 1
 INCREMENT BY 1;
将 设定START WITH 为比当前执行 ID 更高的值。

21. 构建本文档

该项目使用 Maven 生成此文档。要自己生成它,请运行以下命令:$ mvn clean install -DskipTests -P docs

22.可观测性元数据

22.1。可观察性 - 指标

您可以在下面找到该项目声明的所有指标的列表。

22.1.1。任务活跃

围绕任务执行创建的指标。

指标名称 spring.cloud.task(由约定类定义org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention)。类型 timer.

指标名称 spring.cloud.task.active(由约定类定义org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention)。类型 long task timer.

*.active 指标中可能会缺少启动观察后添加的键值。
千分尺内部用作nanoseconds基本单元。然而,每个后端决定实际的基本单元。(即普罗米修斯使用秒)

封闭类的完全限定名称org.springframework.cloud.task.listener.TaskExecutionObservation

所有标签必须加前缀spring.cloud.taskprefix!
表 10. 低基数键

姓名

描述

spring.cloud.task.cf.app.id (必需的)

CF云的应用程序ID。

spring.cloud.task.cf.app.name (必需的)

CF云的应用程序名称。

spring.cloud.task.cf.app.version (必需的)

CF云应用程序版本。

spring.cloud.task.cf.instance.index (必需的)

CF云的实例索引。

spring.cloud.task.cf.org.name (必需的)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需的)

CF云的空间id。

spring.cloud.task.cf.space.name (必需的)

CF云的空间名称。

spring.cloud.task.execution.id (必需的)

任务执行id。

spring.cloud.task.exit.code (必需的)

任务退出代码。

spring.cloud.task.external.execution.id (必需的)

任务的外部执行 ID。

spring.cloud.task.name (必需的)

任务名称测量。

spring.cloud.task.parent.execution.id (必需的)

任务父执行 ID。

spring.cloud.task.status (必需的)

任务状态。可以是成功,也可以是失败。

22.1.2. 任务运行者观察

执行任务运行程序时创建的观察。

指标名称 spring.cloud.task.runner(由约定类定义org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention)。类型 timer.

指标名称 spring.cloud.task.runner.active(由约定类定义org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention)。类型 long task timer.

*.active 指标中可能会缺少启动观察后添加的键值。
千分尺内部用作nanoseconds基本单元。然而,每个后端决定实际的基本单元。(即普罗米修斯使用秒)

封闭类的完全限定名称org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation

所有标签必须加前缀spring.cloud.taskprefix!
表 11. 低基数键

姓名

描述

spring.cloud.task.runner.bean-name (必需的)

Spring Cloud Task 执行的 bean 的名称。

22.2. 可观察性 - 跨度

您可以在下面找到该项目声明的所有跨度的列表。

22.2.1. 任务活动跨度

围绕任务执行创建的指标。

跨度名称 spring.cloud.task(由约定类定义org.springframework.cloud.task.listener.DefaultTaskExecutionObservationConvention)。

封闭类的完全限定名称org.springframework.cloud.task.listener.TaskExecutionObservation

所有标签必须加前缀spring.cloud.taskprefix!
表 12. 标签键

姓名

描述

spring.cloud.task.cf.app.id (必需的)

CF云的应用程序ID。

spring.cloud.task.cf.app.name (必需的)

CF云的应用程序名称。

spring.cloud.task.cf.app.version (必需的)

CF云应用程序版本。

spring.cloud.task.cf.instance.index (必需的)

CF云的实例索引。

spring.cloud.task.cf.org.name (必需的)

CF 云的组织名称。

spring.cloud.task.cf.space.id (必需的)

CF云的空间id。

spring.cloud.task.cf.space.name (必需的)

CF云的空间名称。

spring.cloud.task.execution.id (必需的)

任务执行id。

spring.cloud.task.exit.code (必需的)

任务退出代码。

spring.cloud.task.external.execution.id (必需的)

任务的外部执行 ID。

spring.cloud.task.name (必需的)

任务名称测量。

spring.cloud.task.parent.execution.id (必需的)

任务父执行 ID。

spring.cloud.task.status (必需的)

任务状态。可以是成功,也可以是失败。

22.2.2. 任务运行器观察跨度

执行任务运行程序时创建的观察。

跨度名称 spring.cloud.task.runner(由约定类定义org.springframework.cloud.task.configuration.observation.DefaultTaskObservationConvention)。

封闭类的完全限定名称org.springframework.cloud.task.configuration.observation.TaskDocumentedObservation

所有标签必须加前缀spring.cloud.taskprefix!
表 13. 标签键

姓名

描述

spring.cloud.task.runner.bean-name (必需的)

Spring Cloud Task 执行的 bean 的名称。