欢迎大家来到IT世界,在知识的湖畔探索吧!
前言
物联网设备采集到的实时数据以csv格式文件存储,需要定时导入到mongoDB数据库,数据文件大概20多M(天),10万左右数据量。
概述
前端基于VUE AntDesign实现UI及进度条,后端采用Java Spring Boot。服务器端采用redis存储处理进度,前端以特定key定时调用获取后端redis存储的处理进度信息。
后端代码
数据上传导入接口:
// 导入方法
@PostMapping("/importdata")
@ResponseBody
@RepeatSubmit(interval = 1000, message = "请求过于频繁")
public AjaxResult importData(MultipartFile file,String wid,String equipmentId, boolean updateSupport)
{
String message = "ok";
try{
message = System.currentTimeMillis() + "_" + UUID.randomUUID().toString();
AsyncManager.me().execute(asyncTask(file.getInputStream(),wid,equipmentId,message,file.getSize()));
return AjaxResult.success(message);
}catch (Exception e){
e.printStackTrace();
message = e.getMessage();
return AjaxResult.error(message);
}
}
欢迎大家来到IT世界,在知识的湖畔探索吧!
上传后异步线程处理数据导入,并以时间+uuid生成唯一标识,返回前端。
查询处理进度接口:
欢迎大家来到IT世界,在知识的湖畔探索吧!@GetMapping("/importdata/{uuid}")
@ResponseBody
public AjaxResult getProgressValue(@PathVariable("uuid") String uuid)
{
String message = "";
try{
String error = redisCache.getCacheObject(uuid +"error");
if(error != null && !error.isEmpty()){
return AjaxResult.error(error);
}
String data = redisCache.getCacheObject(uuid);
if(data == "" || data == null){
return AjaxResult.error(uuid + "此key值不存在");
}
return AjaxResult.success(message,data);
}catch (Exception e){
e.printStackTrace();
message = e.getMessage();
return AjaxResult.error(message);
}
}
根据前端传入的标识key从redis里获取处理进度,如果异步线程出现异常,error信息也缓存在redis里。
采用InputStream流式处理数据导入,使用BufferedReader方式按行读取数据,然后200条数据以batch的方式保存到mongodb数据库。
InputStreamReader ir = null;
BufferedReader br = null;
try{
Long totalSize = Long.valueOf(0);
List<Wits58> list = new ArrayList<Wits58>();
List<String> row = new ArrayList<String>();
List<String> fields = new ArrayList<String>();
ir = new InputStreamReader(is,"UTF-8");
br =new BufferedReader(ir);
欢迎大家来到IT世界,在知识的湖畔探索吧!setProgress(uuid,"0%");
while ((line = br.readLine()) != null){
totalSize = totalSize + Long.valueOf(line.getBytes().length);
row.clear();
list.add(entity);
if(list.size()>200){
importCSVData(list,wid,equipmentId);
list.clear();
}
String p = String.format("%.1f", ((totalSize.doubleValue() / fileSize.doubleValue()) * 100)) + "";
setProgress(uuid,p);
处理完毕:
redisCache.setCacheObject(uuid,
"100");
setProgress(uuid,"100");
出现异常:redisCache.setCacheObject(uuid +"error",
e.getMessage());
数据入库:
private void importCSVData(List<Wits58> dataRow,String wid,String equipmentId){
List<MongoWits58> importMongoList = new ArrayList<MongoWits58>();
for (Wits58 item :dataRow) {
item.setEquipmentID(equipmentId);
item.setWid(wid);
MongoWits58 mongoWits = MongoWits58.convertToWits58(item);
importMongoList.add(mongoWits);
}
iMongoWits58Service.addWits58Batch(importMongoList,equipmentId);
}
前端代码
<a-form-model-item label="请选择从采集设备导出CSV格式数据文件" :rules="[{ required: true, message: '请选择从采集设备导出CSV格式数据文件!' }]" >
<a-upload
:file-list="fileList"
:show-upload-list="true"
:remove="handleRemove"
:before-upload="beforeUpload"
>
<a-button> <a-icon type="upload" /> 选择文件</a-button>
</a-upload>
</a-form-model-item>
<a-form-model-item label="数据导入完成之前,请勿关闭此页面" >
<div class="baseinfo">
<a-progress :percent="percent" strokeWidth="26" class="baseProgess" />
</div>
</a-form-model-item>
<div class="bottom-control">
<a-button :disabled="fileList.length === 0"
:loading="uploading" type="primary" @click="submitForm">
{{ uploading ? '导入中...' : '导入' }}
</a-button>
</div>
methods: {
handleRemove(file) {
//const index = this.fileList.indexOf(file);
//const newFileList = this.fileList.slice();
//newFileList.splice(index, 1);
this.fileList = [];
},
beforeUpload(file) {
const isCSV = file.type === 'application/vnd.ms-excel';
if (!isCSV) {
this.$message.error(`${file.name} 文件格式不正确,请选择CSV文件上传。`);
return Upload.LIST_IGNORE
}
this.fileList = [file];
return true;
},
/** 提交按钮 */
submitForm: function () {
if(this.form.wellName == "" || this.form.wellName == undefined){
this.$message.error(`请输入井号`);
return false;
}
if(this.fileList == false){
this.$message.error(`请选择CSV文件上传。`);
return false;
}
if (this.form.equipmentId !== undefined) {
const formData = new FormData();
this.fileList.forEach(file => {
formData.append("file", file);
});
formData.append("wid", this.form.wellName);
formData.append("equipmentId", this.form.equipmentCode);
this.form.formData = formData;
this.uploading = true;
importData(formData).then(response => {
console.info(response.msg) ;
this.uploadKey = response.msg;
//this.open = false
//this.uploading = false;
this.$emit('ok')
});
this.processVisible = true
this.percent = 0
let that = this
let interval = setInterval(function() {
getImportData(that.uploadKey).then(response =>{
var msg = response.msg;
console.info(msg)
if(!!msg){
clearInterval(interval);
this.$message.error(
'导入失败',
3
)
that.uploading = false;
}
var percentCount = response.data;
if(percentCount == "100"){
clearInterval(interval);
that.$message.success(
'实时数据导入成功',
3
);
that.processVisible = false
that.open = false
that.uploading = false;
}
that.percent = percentCount;
},response =>{
clearInterval(interval);
that.processVisible = false
that.uploading = false;
}
)
总结: 虽然是小功能,也碰到一些问题,最初的设计直接从文件流读取全部数据到List,然后导入mongodb,因为能获取记录总数和当前处理数,可精确记录处理的进度。发现有时用户数据文件很大,会出现内存问题。更改为按行读边读边入库的模式。按文件大小和已处理数据大小的比率计算处理进度。期间也考虑过把数据文件存储到服务器上后台单开线程做数据导入处理,这样还需要专门设计查看线程执行情况的功能,最后放弃了。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/17818.html