当前位置:首页 > 科技  > 软件

SpringBatch高阶应用:大数据批处理框架实战指南

来源: 责编: 时间:2024-05-07 09:12:51 94观看
导读本篇文章主要内容:通过Spring Batch从一个库中读取数据进过处理后写入到另外一个库中。
1. 环境准备1.1 引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch

本篇文章主要内容:通过Spring Batch从一个库中读取数据进过处理后写入到另外一个库中。
OiH28资讯网——每日最新资讯28at.com

1. 环境准备

1.1 引入依赖

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId></dependency>

2.2 配置Job

配置Job启动器OiH28资讯网——每日最新资讯28at.com

@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  return jobLauncher ;}

配置任务Repository存储元信息OiH28资讯网——每日最新资讯28at.com

@BeanJobRepository userJobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) {  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean() ;  factory.setDatabaseType("mysql") ;  factory.setTransactionManager(transactionManager) ;  factory.setDataSource(dataSource) ;  try {    factory.afterPropertiesSet() ;     return factory.getObject() ;  } catch (Exception e) {    throw new RuntimeException(e) ;  }}

配置ItemReader读取器OiH28资讯网——每日最新资讯28at.com

@BeanItemReader<User> userReader(JobOperator jobOperator) throws Exception {  JpaPagingItemReaderBuilder<User> builder = new JpaPagingItemReaderBuilder<>() ;  builder.entityManagerFactory(entityManagerFactory) ;  // 每次分页查询多少条数据  builder.pageSize(10) ;  builder.queryString("select u from User u where u.uid <= 50") ;  builder.saveState(true) ;  builder.name("userReader") ;  return builder.build() ;}

配置数据源,该数据源是用来写入操作的OiH28资讯网——每日最新资讯28at.com

public DataSource dataSource() {  HikariDataSource dataSource = new HikariDataSource() ;  dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/testjpa?serverTimezone=GMT%2B8&useSSL=false") ;  dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver") ;  dataSource.setUsername("root") ;  dataSource.setPassword("xxxooo") ;  return dataSource ;}

配置ItemWriter用来写入操作(当前库的数据写入到另外一个库,上面的数据源)OiH28资讯网——每日最新资讯28at.com

@BeanItemWriter<User> userWriter() {  // 通过JDBC批量处理  JdbcBatchItemWriterBuilder<User> builder = new JdbcBatchItemWriterBuilder<>() ;  DataSource dataSource = dataSource() ;  builder.dataSource(dataSource) ;  builder.sql("insert into st (id, name, sex, mobile, age, birthday) values (?, ?, ?, ?, ?, ?)") ;  builder.itemPreparedStatementSetter(new ItemPreparedStatementSetter<User>() {    @Override    public void setValues(User item, PreparedStatement ps) throws SQLException {      ps.setInt(1, item.getUid()) ;      ps.setString(2, item.getName()) ;      ps.setString(3, item.getSex()) ;      ps.setString(4, item.getMobile()) ;      ps.setInt(5, item.getAge()) ;      ps.setObject(6, item.getBirthday()) ;    }  }) ;  return builder.build() ;}

配置ItemProcessor处理器,数据从当前库读取处理后经过处理后再写入另外的库中OiH28资讯网——每日最新资讯28at.com

@BeanItemProcessor<User, User> userProcessor() {  return new ItemProcessor<User, User>() {    @Override    public User process(User item) throws Exception {      System.out.printf("%s - 开始处理数据:%s%n", Thread.currentThread().getName(), item.toString()) ;      // 模拟耗时操作      TimeUnit.SECONDS.sleep(1) ;      // 在这里你可以对数据进行相应的处理。      return item ;    }  } ;}

配置Step将ItemReader、ItemProcessor、ItemWriter串联在一起。OiH28资讯网——每日最新资讯28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    .build() ;}

配置Job,Job是封装整个批处理流程的实体。在 Spring Batch 中,Job只是Step实例的容器。它将逻辑上属于一个流程的多个步骤组合在一起,并允许对所有步骤的全局属性(如可重启性)进行配置。作业配置包含:OiH28资讯网——每日最新资讯28at.com

  • 简单的工作名称。
  • Step实例的定义和排序。
  • Job是否可重新启动。
@BeanJob userJob(Step userStep1, Step userStep2) {  return jobs.get("userJob").start(userStep1).build();}

以上是Spring Batch定义配置一个Job所需的核心组件。接下来会以上面的基础配置进行高阶知识点进行介绍。OiH28资讯网——每日最新资讯28at.com

2. 高阶配置管理

2.1 通过Controller接口启动Job

@RequestMapping("/userJob")public class UserJobController {  @Resource  private JobLauncher userJobLauncher ;  @GetMapping("/start")  public Object start() throws Exception {    JobParameters jobParameters = new JobParameters() ;    this.userJobLauncher.run(userJob, jobParameters) ;    return "started" ;  }}

通过JobLauncher#run方法启动Job。当你调用该接口时,你会发现接口一直不会返回,一直阻塞,下图是Job的启动序列OiH28资讯网——每日最新资讯28at.com

图片图片OiH28资讯网——每日最新资讯28at.com

根据上图能知道,当你调用run方法后,会等待整个Job退出状态为FINISHED或者FAILED后才能结束。所以,你需要异步完成,以便 SimpleJobLauncher 立即返回给调用者。而正确的序列应该是如下:OiH28资讯网——每日最新资讯28at.com

图片图片OiH28资讯网——每日最新资讯28at.com

上图通过异步方式启动Job序列。
OiH28资讯网——每日最新资讯28at.com

2.2 异步启动Job

@BeanTaskExecutor taskExecutor() {  ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor() ;  taskExecutor.setThreadNamePrefix("spring_batch_launcher") ;  taskExecutor.setCorePoolSize(10) ;  taskExecutor.setMaxPoolSize(10) ;  taskExecutor.initialize() ;   return taskExecutor ;}@BeanJobLauncher userJobLauncher(JobRepository userJobRepository) {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher() ;  jobLauncher.setJobRepository(userJobRepository) ;  jobLauncher.setTaskExecutor(taskExecutor()) ;  return jobLauncher ;}

通过上面配置后,Job启动将是异步的会直接返回JobExecution。OiH28资讯网——每日最新资讯28at.com

2.3 重启Job

当一个Job正在执行,由于断电或者强制终止了程序。当程序恢复后你希望能够接着程序终止前的进度继续执行,这时候你需要进行如下的操作(本人没有发现有什么API能够操作的,可能文档没看仔细)。
OiH28资讯网——每日最新资讯28at.com

当程序非正常终止是,下面两张表的状态都是STARTED,END_TIME为null
OiH28资讯网——每日最新资讯28at.com

batch_job_execution表OiH28资讯网——每日最新资讯28at.com

图片图片OiH28资讯网——每日最新资讯28at.com

batch_step_execution表OiH28资讯网——每日最新资讯28at.com

图片图片OiH28资讯网——每日最新资讯28at.com

想要重新启动必须将上面的状态修改为STOPPED,END_TIME字段设置上值(是什么值无所谓)。OiH28资讯网——每日最新资讯28at.com

然后我们就可以继续使用上面的Controller接口启动任务继续执行了。OiH28资讯网——每日最新资讯28at.com

2.4 多线程执行Step

为了加快程序的执行,我们可以为Step配置线程池
OiH28资讯网——每日最新资讯28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")    .<User, User>chunk(5)    .reader(userReader)    .processor(userProcessor)    .writer(userWriter)    // 配置线程池    .taskExecutor(taskExecutor())    .build() ;}

注意:Step中使用的任何池化资源(如数据源)都可能对并发性设置限制。请确保这些资源池至少与步骤中所需的并发线程数一样大。OiH28资讯网——每日最新资讯28at.com

通过上面配置线程池后,你将在控制台看到如下输出。OiH28资讯网——每日最新资讯28at.com

图片图片OiH28资讯网——每日最新资讯28at.com

默认将有4个线程同时进行处理。可以通过如下配置进行调整OiH28资讯网——每日最新资讯28at.com

@BeanStep userStep1(ItemReader<User> userReader, ItemProcessor<User, User> userProcessor, ItemWriter<User> userWriter) {  return steps.get("userStep1")      // ...      // 节流限制10,这里配置的大小应该与你的数据库连接池大小及使用的线程池核心线程数一致。      .throttleLimit(10)      .build() ;}

2.5 重复启动Job

要想重复启动Job,我们可以在启动Job时设置不同的JobParameters参数,只要参数不同那么就可以重复的启动Job。如下示例:OiH28资讯网——每日最新资讯28at.com

@GetMapping("/start/{page}")public Object start(@PathVariable("page") Long page) throws Exception {  Map<String, JobParameter> parameters = new HashMap<>() ;  // 每次设置的参数值不同即可。  parameters.put("page", new JobParameter(page)) ;  JobParameters jobParameters = new JobParameters(parameters) ;  this.userJobLauncher.run(userJob, jobParameters) ;  return "started" ;}

以上是本篇文章的全部内容,希望对你有帮助。OiH28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-87012-0.htmlSpringBatch高阶应用:大数据批处理框架实战指南

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 架构设计中如何应对接口级故障?

下一篇: Web Components 取代 Vue?我觉得不太行!

标签:
  • 热门焦点
Top