jpa 多数据源下,scheduled定时任务执行flush报错解决的解决办法

jpa 多数据源下,scheduled定时任务执行flush报错解决的解决办法问题原因:在controller请求中,因为是在主线程的context上下文中,所以,是通过@autowired 注解是可以识别@Transac

欢迎大家来到IT世界,在知识的湖畔探索吧!

目前项目采用springboot + jpa (多数据源)+es开发,主要目的是实现每5分钟从es中统计各个扫描类型的统计信息。为方便测试,还特意留了一个controller方法进行测试。接口测试一切正常,原以为这样就万事大吉了,没想到定时任务触发的时候,却直接抛出了no transaction is in progress的异常。顿时懵逼!跟踪调试,发现主要在threadlocal中没有发现flush的相关key,所以报错。一顿google + 百度下来,终于发现原来国内外开发的同学都遇到了相同的问题。答案大多类似,但是很多根本解决不了我的问题。于是决定将我的解决方案分享出来,希望可以给遇到类似问题的同学一点帮助。

环境:springboot+jpa(多数据源)+es

问题描述:某个接口使用@Scheduled定时调度,内部方法中使用了jpa的saveAll() 和flush()方法,方法执行时报异常’no transaction in progress’。但是,通过controller发送接口请求的时候,却是正常的。

问题原因:在controller请求中,因为是在主线程的context上下文中,所以,是通过@autowired 注解是可以识别@Transaction注解的,但是,在@scheduled注解的定时任务中,spring是通过子线程来实现的。而在子线程中,它与主线程的context上下文不同,它无法识别@transaction注解,从而导致报错。

解决方案1:在@Scheduled注解的地方注入事务管理器。具体参考

https://stackoverflow.com/questions/33248846/spring4-scheduled-transaction-throws-no-transaction-is-in-progress-at-flush-fo

解决方案2:不要采用@autowired注解,而是通过构造函数的方式传递repository对象。

具体实现代码如下:

1、maven依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid-spring-boot-starter</artifactId>
    <version>1.1.20</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>18.0</version>
</dependency>

欢迎大家来到IT世界,在知识的湖畔探索吧!

2、目录结构及配置文件

欢迎大家来到IT世界,在知识的湖畔探索吧!spring:

##多数据源配置 外网轨迹相关数据源
  datasource:
    primary:
      url: jdbc:mysql://xx.xx.xx.xxx:3403/db_waybill_center?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true
      username: xxxxxx
      password: xxxxxx
      max-active: 10
      max-idle: 5
      min-idle: 0
      driver-class-name: com.mysql.cj.jdbc.Driver
      type: com.alibaba.druid.pool.DruidDataSource

    #hibernate配置
    second:
      url: jdbc:mysql://xx.xx.xx.xxx:3403/db_waybill_center_shipid_mgmt?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true
      username: xxxxxx
      password: xxxxxx
      max-active: 10
      max-idle: 5
      min-idle: 0
      driver-class-name: com.mysql.cj.jdbc.Driver
      type: com.alibaba.druid.pool.DruidDataSource


  #hibernate配置
  jpa:
    properties:
      hibernate:
        hbm2ddl:
          auto: none
      dialect: org.hibernate.dialect.MySQL5InnoDBDialect
    show-sql: true


##es相关的配置
  elasticsearch:
    rest:
      connection-timeout: 60000
      read-timeout: 60000
      uris: xx.xx.xx.xxx:1039,xx.xx.xx.xxx:1033,xx.xx.xx.xxx:1029
      username: xxxxxx
      password: xxxxxx
jpa 多数据源下,scheduled定时任务执行flush报错解决的解决办法

代码目录结构

3、多数据源配置类:

package com.yundasys.waybillcenter.waybillcenterstreamcalculation.conf.db;

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Objects;

/**
 * @author liumch
 * date 2020/11/4 17:29
 * description jpa 多相关entity manager配置相关类
 */

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef="entityManagerFactoryPrimary",
        transactionManagerRef="transactionManagerPrimary",
        basePackages= { "com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary" })
public class PrimaryEntityManagerConfig {

    @Bean(name = "primaryDS")
    @Qualifier("primaryDS")
    @ConfigurationProperties(prefix = "spring.datasource.primary")
    @Primary
    public DataSource primaryDS(){
        return DruidDataSourceBuilder.create().build();
    }


    @Resource
    private JpaProperties jpaProperties;

    @Resource
    private HibernateProperties hibernateProperties;


    @Primary
    @Bean(name = "entityManagerPrimary")
    public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
        return Objects.requireNonNull(entityManagerFactoryPrimary(builder).getObject()).createEntityManager();

    }

    @Primary
    @Bean(name = "entityManagerFactoryPrimary")
    public LocalContainerEntityManagerFactoryBean entityManagerFactoryPrimary (EntityManagerFactoryBuilder builder) {
        // 解决2.1.x版本jpaProperties.getHibernateProperties(hibernateSettings);失效的问题
        Map<String, Object> properties = hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
        return builder.dataSource(primaryDS())
                // .properties(getVendorProperties())
                .properties(properties)
                // 实体类位置
                .packages("com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity")
                // 持久性单元的名称。
                .persistenceUnit("primaryPersistenceUnit")
                .build();

    }

//    会导致no transaction的错误
//    @Primary
//    @Bean(name = "transactionManagerPrimary")
//    public PlatformTransactionManager transactionManagerPrimary(DataSource dataSource)
//    {
//        return new DataSourceTransactionManager(dataSource);
//    }

  /*** 这个地方很关键,如果写成PlatformTransactionManager 则会报错 ***/
  /**** builder飘红没关系,忽略掉 **/
    @Primary
    @Bean(name = "transactionManagerPrimary")
    public JpaTransactionManager transactionManager(EntityManagerFactoryBuilder builder) {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactoryPrimary(builder).getObject());
        return transactionManager;
    }
}
欢迎大家来到IT世界,在知识的湖畔探索吧!package com.yundasys.waybillcenter.waybillcenterstreamcalculation.conf.db;

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Objects;

/**
 * @author liumch
 * date 2020/11/4 17:29
 * description jpa 多相关entity manager配置相关类
 */

@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef="entityManagerFactorySecond",
        transactionManagerRef="transactionManagerSecond",
        basePackages= { "com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.second" })
public class SecondEntityManagerConfig {

    @Bean(name = "SecondDS")
    @Qualifier("SecondDS")
    @ConfigurationProperties(prefix = "spring.datasource.second")
    public DataSource secondDS(){
        return DruidDataSourceBuilder.create().build();
    }

    @Resource
    private JpaProperties jpaProperties;

    @Resource
    private HibernateProperties hibernateProperties;


    @Bean(name = "entityManagerSecond")
    public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
        return Objects.requireNonNull(entityManagerFactorySecond(builder).getObject()).createEntityManager();
    }


    @Bean(name = "entityManagerFactorySecond")
    public LocalContainerEntityManagerFactoryBean entityManagerFactorySecond (EntityManagerFactoryBuilder builder) {
        // 解决2.1.x版本jpaProperties.getHibernateProperties(hibernateSettings);失效的问题
        Map<String, Object> properties = hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
        return builder.dataSource(secondDS())
                // .properties(getVendorProperties())
                .properties(properties)
                // 实体类位置
                .packages("com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity")
                // 持久性单元的名称。
                .persistenceUnit("SecondPersistenceUnit")
                .build();

    }



//    @Bean(name = "transactionManagerSecond")
//    public PlatformTransactionManager transactionManagerYdServer(DataSource dataSource)
//    {
//        return new DataSourceTransactionManager(dataSource);
//    }

    @Bean(name = "transactionManagerSecond")
    public JpaTransactionManager transactionManager(EntityManagerFactoryBuilder builder) {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactorySecond(builder).getObject());
        return transactionManager;
    }
}

4、repository接口

package com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary;

import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity.EsStaticScanTypeEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;


/**
 * @author liuyu
 */
@Repository
public interface EsStaticScanTypeRepository extends JpaRepository<EsStaticScanTypeEntity, Long>, JpaSpecificationExecutor<EsStaticScanTypeEntity> {

}

5、核心的计算服务(从es中聚合查询,并添加到数据库)

package com.yundasys.waybillcenter.waybillcenterstreamcalculation.service;

import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.dto.CountScanTypeDto;
import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity.EsStaticScanTypeEntity;
import com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary.EsStaticScanTypeRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.FastDateFormat;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

/**
 * @program: waybill-center-stream
 * @description:
 * @author: liumch
 * @create: 2021-07-09 16:03
 */
@Service
@Slf4j
public class CalculateRuleService {
    @Resource
    RestHighLevelClient elasticsearchClient;
    
    final EsStaticScanTypeRepository repository;

    /** 这个地方很关键,如果写成@autowired  会报错**/
    public CalculateRuleService(EsStaticScanTypeRepository repository) {
        this.repository = repository;
    }

    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
    /**
     * 索引前缀格式信息
     */
    private static final FastDateFormat INDEX_PREFIX_FORMATTER = FastDateFormat.getInstance("'smi_'yyyyMMdd", Locale.CHINA);


    /**
     * es统计
     */
    public void esCalc(long start, long end) {
//        start = 1625623201000L;
//        end = 1625625000000L;
        List<CountScanTypeDto> countScanTypeDtos = new ArrayList<>(20);
        final long[] scanTypes = new long[]{14L, 24L, 10L, 18L, 64L};
        BoolQueryBuilder externalQueryBuilder = QueryBuilders.boolQuery();

        externalQueryBuilder.filter(
                QueryBuilders.termsQuery("scanType", scanTypes))
                .filter(QueryBuilders.rangeQuery("scanTime").gte(start).lt(end))
                .queryName("queryByScanType");

        //对于聚合函数,必须要有一个聚合的字段。相当于sql中的聚合函数必须要有 GroupBy
        TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("GroupByScanType").field("scanType").size(50)
                .subAggregation(AggregationBuilders.count("count").field("shipId.keyword"));

        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        sourceBuilder.query(externalQueryBuilder)
                .aggregation(aggregationBuilder)
                .size(0)
                .clearRescorers();

//        String index = "smi_20210707";
        String index = INDEX_PREFIX_FORMATTER.format(new Date());
        SearchRequest searchRequest = new SearchRequest();
        searchRequest.indices(index);
        searchRequest.source(sourceBuilder);
        log.info("请求语句:" + searchRequest.toString());
        SearchResponse search = null;
        try {
            search = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
        } catch (Exception e) {
            log.error("获取es数据失败", e);
            return;
        }
        final Aggregations aggregations = search.getAggregations();
        for (Aggregation agg : aggregations) {
            String name = agg.getName();
            if (name.equals(aggregationBuilder.getName())) {
                for (long scanType : scanTypes) {
                    MultiBucketsAggregation.Bucket bucket = ((ParsedLongTerms) agg).getBucketByKey("" + scanType);
                    if (null != bucket) {
                        countScanTypeDtos.add(new CountScanTypeDto(scanType, bucket.getDocCount()));
                    }
                }

            }
        }
        log.info("返回结果" + countScanTypeDtos.toString());
        saveDb(countScanTypeDtos);
    }

    @Transactional(rollbackFor = RuntimeException.class)
    public int saveDb(List<CountScanTypeDto> dtos) {
        final Date date = new Date();
        final Integer day = Integer.parseInt(LocalDate.now().format(DATE_TIME_FORMATTER));
        final List<EsStaticScanTypeEntity> entities = dtos.stream()
                .map(x -> new EsStaticScanTypeEntity(null, (int) x.getScanType(), (int) x.getCount(), day, date, date)
                ).collect(Collectors.toList());
        final List<EsStaticScanTypeEntity> typeEntities = repository.saveAll(entities);
        repository.flush();
        return typeEntities.size();
    }

}

6、Scheduled task任务

@Component
@EnableScheduling
@Slf4j
public class OfflineCalcTask {

    private final CalculateRuleService calculateRuleService;

    public OfflineCalcTask(CalculateRuleService calculateRuleService) {
        this.calculateRuleService = calculateRuleService;
    }


    @Scheduled(cron = "0 0/5 * * * ?")
    public void calcByScanType() {
        TimeRangeDto timeRangeDto = CommonUtil.timeRangeByInterval();
        calculateRuleService.esCalc(timeRangeDto.getStart(), timeRangeDto.getEnd());

    }


}

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/48971.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们YX

mu99908888

在线咨询: 微信交谈

邮件:itzsgw@126.com

工作时间:时刻准备着!

关注微信