欢迎大家来到IT世界,在知识的湖畔探索吧!
目前项目采用springboot + jpa (多数据源)+es开发,主要目的是实现每5分钟从es中统计各个扫描类型的统计信息。为方便测试,还特意留了一个controller方法进行测试。接口测试一切正常,原以为这样就万事大吉了,没想到定时任务触发的时候,却直接抛出了no transaction is in progress的异常。顿时懵逼!跟踪调试,发现主要在threadlocal中没有发现flush的相关key,所以报错。一顿google + 百度下来,终于发现原来国内外开发的同学都遇到了相同的问题。答案大多类似,但是很多根本解决不了我的问题。于是决定将我的解决方案分享出来,希望可以给遇到类似问题的同学一点帮助。
环境:springboot+jpa(多数据源)+es
问题描述:某个接口使用@Scheduled定时调度,内部方法中使用了jpa的saveAll() 和flush()方法,方法执行时报异常’no transaction in progress’。但是,通过controller发送接口请求的时候,却是正常的。
问题原因:在controller请求中,因为是在主线程的context上下文中,所以,是通过@autowired 注解是可以识别@Transaction注解的,但是,在@scheduled注解的定时任务中,spring是通过子线程来实现的。而在子线程中,它与主线程的context上下文不同,它无法识别@transaction注解,从而导致报错。
解决方案1:在@Scheduled注解的地方注入事务管理器。具体参考
https://stackoverflow.com/questions/33248846/spring4-scheduled-transaction-throws-no-transaction-is-in-progress-at-flush-fo
解决方案2:不要采用@autowired注解,而是通过构造函数的方式传递repository对象。
具体实现代码如下:
1、maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
欢迎大家来到IT世界,在知识的湖畔探索吧!
2、目录结构及配置文件
欢迎大家来到IT世界,在知识的湖畔探索吧!spring:
##多数据源配置 外网轨迹相关数据源
datasource:
primary:
url: jdbc:mysql://xx.xx.xx.xxx:3403/db_waybill_center?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true
username: xxxxxx
password: xxxxxx
max-active: 10
max-idle: 5
min-idle: 0
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
#hibernate配置
second:
url: jdbc:mysql://xx.xx.xx.xxx:3403/db_waybill_center_shipid_mgmt?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&useSSL=true
username: xxxxxx
password: xxxxxx
max-active: 10
max-idle: 5
min-idle: 0
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
#hibernate配置
jpa:
properties:
hibernate:
hbm2ddl:
auto: none
dialect: org.hibernate.dialect.MySQL5InnoDBDialect
show-sql: true
##es相关的配置
elasticsearch:
rest:
connection-timeout: 60000
read-timeout: 60000
uris: xx.xx.xx.xxx:1039,xx.xx.xx.xxx:1033,xx.xx.xx.xxx:1029
username: xxxxxx
password: xxxxxx
3、多数据源配置类:
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.conf.db;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Objects;
/**
* @author liumch
* date 2020/11/4 17:29
* description jpa 多相关entity manager配置相关类
*/
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef="entityManagerFactoryPrimary",
transactionManagerRef="transactionManagerPrimary",
basePackages= { "com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary" })
public class PrimaryEntityManagerConfig {
@Bean(name = "primaryDS")
@Qualifier("primaryDS")
@ConfigurationProperties(prefix = "spring.datasource.primary")
@Primary
public DataSource primaryDS(){
return DruidDataSourceBuilder.create().build();
}
@Resource
private JpaProperties jpaProperties;
@Resource
private HibernateProperties hibernateProperties;
@Primary
@Bean(name = "entityManagerPrimary")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return Objects.requireNonNull(entityManagerFactoryPrimary(builder).getObject()).createEntityManager();
}
@Primary
@Bean(name = "entityManagerFactoryPrimary")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryPrimary (EntityManagerFactoryBuilder builder) {
// 解决2.1.x版本jpaProperties.getHibernateProperties(hibernateSettings);失效的问题
Map<String, Object> properties = hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
return builder.dataSource(primaryDS())
// .properties(getVendorProperties())
.properties(properties)
// 实体类位置
.packages("com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity")
// 持久性单元的名称。
.persistenceUnit("primaryPersistenceUnit")
.build();
}
// 会导致no transaction的错误
// @Primary
// @Bean(name = "transactionManagerPrimary")
// public PlatformTransactionManager transactionManagerPrimary(DataSource dataSource)
// {
// return new DataSourceTransactionManager(dataSource);
// }
/*** 这个地方很关键,如果写成PlatformTransactionManager 则会报错 ***/
/**** builder飘红没关系,忽略掉 **/
@Primary
@Bean(name = "transactionManagerPrimary")
public JpaTransactionManager transactionManager(EntityManagerFactoryBuilder builder) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactoryPrimary(builder).getObject());
return transactionManager;
}
}
欢迎大家来到IT世界,在知识的湖畔探索吧!package com.yundasys.waybillcenter.waybillcenterstreamcalculation.conf.db;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateProperties;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateSettings;
import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import javax.annotation.Resource;
import javax.persistence.EntityManager;
import javax.sql.DataSource;
import java.util.Map;
import java.util.Objects;
/**
* @author liumch
* date 2020/11/4 17:29
* description jpa 多相关entity manager配置相关类
*/
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef="entityManagerFactorySecond",
transactionManagerRef="transactionManagerSecond",
basePackages= { "com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.second" })
public class SecondEntityManagerConfig {
@Bean(name = "SecondDS")
@Qualifier("SecondDS")
@ConfigurationProperties(prefix = "spring.datasource.second")
public DataSource secondDS(){
return DruidDataSourceBuilder.create().build();
}
@Resource
private JpaProperties jpaProperties;
@Resource
private HibernateProperties hibernateProperties;
@Bean(name = "entityManagerSecond")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) {
return Objects.requireNonNull(entityManagerFactorySecond(builder).getObject()).createEntityManager();
}
@Bean(name = "entityManagerFactorySecond")
public LocalContainerEntityManagerFactoryBean entityManagerFactorySecond (EntityManagerFactoryBuilder builder) {
// 解决2.1.x版本jpaProperties.getHibernateProperties(hibernateSettings);失效的问题
Map<String, Object> properties = hibernateProperties.determineHibernateProperties(jpaProperties.getProperties(), new HibernateSettings());
return builder.dataSource(secondDS())
// .properties(getVendorProperties())
.properties(properties)
// 实体类位置
.packages("com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity")
// 持久性单元的名称。
.persistenceUnit("SecondPersistenceUnit")
.build();
}
// @Bean(name = "transactionManagerSecond")
// public PlatformTransactionManager transactionManagerYdServer(DataSource dataSource)
// {
// return new DataSourceTransactionManager(dataSource);
// }
@Bean(name = "transactionManagerSecond")
public JpaTransactionManager transactionManager(EntityManagerFactoryBuilder builder) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactorySecond(builder).getObject());
return transactionManager;
}
}
4、repository接口
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary;
import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity.EsStaticScanTypeEntity;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.stereotype.Repository;
/**
* @author liuyu
*/
@Repository
public interface EsStaticScanTypeRepository extends JpaRepository<EsStaticScanTypeEntity, Long>, JpaSpecificationExecutor<EsStaticScanTypeEntity> {
}
5、核心的计算服务(从es中聚合查询,并添加到数据库)
package com.yundasys.waybillcenter.waybillcenterstreamcalculation.service;
import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.dto.CountScanTypeDto;
import com.yundasys.waybillcenter.waybillcenterstreamcalculation.domain.entity.EsStaticScanTypeEntity;
import com.yundasys.waybillcenter.waybillcenterstreamcalculation.repository.primary.EsStaticScanTypeRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.FastDateFormat;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
/**
* @program: waybill-center-stream
* @description:
* @author: liumch
* @create: 2021-07-09 16:03
*/
@Service
@Slf4j
public class CalculateRuleService {
@Resource
RestHighLevelClient elasticsearchClient;
final EsStaticScanTypeRepository repository;
/** 这个地方很关键,如果写成@autowired 会报错**/
public CalculateRuleService(EsStaticScanTypeRepository repository) {
this.repository = repository;
}
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
/**
* 索引前缀格式信息
*/
private static final FastDateFormat INDEX_PREFIX_FORMATTER = FastDateFormat.getInstance("'smi_'yyyyMMdd", Locale.CHINA);
/**
* es统计
*/
public void esCalc(long start, long end) {
// start = 1625623201000L;
// end = 1625625000000L;
List<CountScanTypeDto> countScanTypeDtos = new ArrayList<>(20);
final long[] scanTypes = new long[]{14L, 24L, 10L, 18L, 64L};
BoolQueryBuilder externalQueryBuilder = QueryBuilders.boolQuery();
externalQueryBuilder.filter(
QueryBuilders.termsQuery("scanType", scanTypes))
.filter(QueryBuilders.rangeQuery("scanTime").gte(start).lt(end))
.queryName("queryByScanType");
//对于聚合函数,必须要有一个聚合的字段。相当于sql中的聚合函数必须要有 GroupBy
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("GroupByScanType").field("scanType").size(50)
.subAggregation(AggregationBuilders.count("count").field("shipId.keyword"));
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(externalQueryBuilder)
.aggregation(aggregationBuilder)
.size(0)
.clearRescorers();
// String index = "smi_20210707";
String index = INDEX_PREFIX_FORMATTER.format(new Date());
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(index);
searchRequest.source(sourceBuilder);
log.info("请求语句:" + searchRequest.toString());
SearchResponse search = null;
try {
search = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("获取es数据失败", e);
return;
}
final Aggregations aggregations = search.getAggregations();
for (Aggregation agg : aggregations) {
String name = agg.getName();
if (name.equals(aggregationBuilder.getName())) {
for (long scanType : scanTypes) {
MultiBucketsAggregation.Bucket bucket = ((ParsedLongTerms) agg).getBucketByKey("" + scanType);
if (null != bucket) {
countScanTypeDtos.add(new CountScanTypeDto(scanType, bucket.getDocCount()));
}
}
}
}
log.info("返回结果" + countScanTypeDtos.toString());
saveDb(countScanTypeDtos);
}
@Transactional(rollbackFor = RuntimeException.class)
public int saveDb(List<CountScanTypeDto> dtos) {
final Date date = new Date();
final Integer day = Integer.parseInt(LocalDate.now().format(DATE_TIME_FORMATTER));
final List<EsStaticScanTypeEntity> entities = dtos.stream()
.map(x -> new EsStaticScanTypeEntity(null, (int) x.getScanType(), (int) x.getCount(), day, date, date)
).collect(Collectors.toList());
final List<EsStaticScanTypeEntity> typeEntities = repository.saveAll(entities);
repository.flush();
return typeEntities.size();
}
}
6、Scheduled task任务
@Component
@EnableScheduling
@Slf4j
public class OfflineCalcTask {
private final CalculateRuleService calculateRuleService;
public OfflineCalcTask(CalculateRuleService calculateRuleService) {
this.calculateRuleService = calculateRuleService;
}
@Scheduled(cron = "0 0/5 * * * ?")
public void calcByScanType() {
TimeRangeDto timeRangeDto = CommonUtil.timeRangeByInterval();
calculateRuleService.esCalc(timeRangeDto.getStart(), timeRangeDto.getEnd());
}
}
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/48971.html