'use strict'; const moment = require('moment'); const Service = require('egg').Service; // 本地清洗数据的统计业务 通用所有模块 class StatisticsService extends Service { async saveBefore(model, data) { const { ctx } = this; const { dateString } = data; const result = await model.findOne({ dateString }); ctx.logger.info(`查询本地${model.modelName}时间${dateString}`); if (result) { return true; } return false; } async save(model, data, isUpdate) { const { ctx } = this; const { dateString } = data; ctx.logger.info(`执行本地${model.modelName}时间${dateString}`); if (isUpdate) { const result = await model.findOne({ dateString }); if (result) { delete data._id; const updateResult = await model.updateOne({ dateString }, data); if (!updateResult) { ctx.throw(`${model.modelName}更新数据异常`); } return updateResult; } } const createResult = await model.create(data); if (!createResult) { ctx.throw(`${model.modelName}插入数据异常`); } return createResult; } async taskCond(baseData) { const { ctx } = this; await this.sign([ 'app_task', 'ivi_online_user' ], baseData, async () => { await ctx.service.onlineUserService2.statistics(baseData);// 重新处理 }); await this.sign([ 'app_task' ], baseData, async () => { await ctx.service.appBehaviorRecordService.statistics(baseData); await ctx.service.appExceptionRecordService.statistics(baseData); await ctx.service.appFeedbackRecordService.statistics(baseData); }); await this.sign([ 't_rbac_user' ], baseData, async () => { await ctx.service.tRbacUserService.statistics(baseData);// 重新处理 }); await this.sign([ 't_register_info' ], baseData, async () => { await ctx.service.tRegisterInfoService.statistics(baseData); }); await this.sign([ 't_vehicle_record' ], baseData, async () => { await ctx.service.tVehicleRecordService2.statistics(baseData);// 重新处理 }); await this.sign([ 'driving_behavior_info' ], baseData, async () => { await ctx.service.drivingBehaviorInfoService2.statistics(baseData);// 重新处理 }); await this.sign([ 'stats_base_info' ], baseData, async () => { await ctx.service.statsBaseInfoService2.statistics(baseData);// 重新处理 }); await this.sign([ 'ivi_behavior_record' ], baseData, async () => { await ctx.service.iviBehaviorRecordService.statistics(baseData);// 重新处理 }); await this.sign([ 'msc_query_record' ], baseData, async () => { await ctx.service.msgQueryRecordService.statistics(baseData); }); await this.sign([ 'msg_center_record' ], baseData, async () => { await ctx.service.msgCenterRecordService2.statistics(baseData); }); } async taskDo(baseData) { const { ctx } = this; await ctx.service.tVehicleReportInfoService.statistics(baseData);// 重新处理 await ctx.service.tBoxAutoTestService.statistics(baseData);// 重新处理 await ctx.service.tFsmHitoryService.statistics(baseData);// 重新处理 } async task(flag, startTime, endTime) { // flag "cond" "do" const { ctx } = this; // 更新历史时间段数据 => 修改createBaseData的参数做循环请求即可 if (process.env.NODE_ENV === 'development') { } else { if (this.app.redis) { // 分布式锁 防止重复执行定时任务 const result = await this.app.redis.setnx(`chart_task_${moment().format('YYYY-MM-DD')}`, '1'); await this.app.redis.expire(`chart_task_${moment().format('YYYY-MM-DD')}`, 10 * 60); if (!result) { return; } } else { return; } } let satrtMoment; if (startTime) { satrtMoment = moment(startTime); } else { if (flag) { const newVar = await ctx.model.Local.StatisticsModel.findOne({ type: flag }).sort({ create_date: 1 }); if (newVar) { satrtMoment = moment(newVar.create_date); await ctx.model.Local.StatisticsModel.deleteMany({ type: flag }); } } else { const newVar = await ctx.model.Local.StatisticsModel.findOne({}).sort({ create_date: 1 }); if (newVar) { satrtMoment = moment(newVar.create_date); await ctx.model.Local.StatisticsModel.deleteMany(); } } } if (!satrtMoment) { satrtMoment = moment(this.ctx.helper.yesterday()); } endTime = endTime || this.ctx.helper.today(); while (satrtMoment.valueOf() < endTime) { const startTime = satrtMoment.valueOf(); const eTime = satrtMoment.add(1, 'days').valueOf(); const baseData = this.createBaseData(startTime, eTime); try { if (!flag) { await this.taskCond(baseData); await this.taskDo(baseData); } else { switch (flag) { case 'do': await this.taskDo(baseData); break; case 'cond': await this.taskCond(baseData); break; default: break; } } } catch (e) { const errData = { ...baseData.initData, start_time: new Date(), end_time: new Date(), type: flag, msg: `任务出现错误${baseData.initData.dateString}:${e.message}` }; this.ctx.logger.info(errData); await ctx.model.Local.StatisticsModel.create(errData); } } } async sign(ids, baseData, callback) { const { ctx } = this; const result = await ctx.model.TaskRecordModel.find({ _id: { $in: ids } }); let flag = false; if (result) { flag = result.every(item => { const time = moment(item.last_task_data).valueOf(); return time >= baseData.timeRangData.endTime; }); } if (flag) { // 可以执行 await callback(); } else { // 不能执行 延迟执行 const errData = { ...baseData.initData, start_time: new Date(), end_time: new Date(), type: 'cond', msg: `任务出现延时${baseData.initData.dateString}:${ids.join(',')}` }; this.ctx.logger.info(errData); await ctx.model.Local.StatisticsModel.create(errData); } } createBaseData(start, end) { const isForceUpdate = false; const yesterday = start || this.ctx.helper.yesterday(); const today = end || this.ctx.helper.today(); const initData = { year: moment(yesterday).year(), month: moment(yesterday).month() + 1, day: moment(yesterday).date(), dateString: moment(yesterday).format('YYYY-MM-DD'), create_date: yesterday, // start_time: new Date(), // end_time: new Date() }; if (yesterday > today) { this.ctx.throw('任务开始时间大于结束时间'); } const timeRangData = { startTime: yesterday, endTime: today };// 这边可以做循环处理 return { timeRangData, initData, isForceUpdate }; } } module.exports = StatisticsService;