123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- 'use strict';
- const moment = require('moment');
- const Service = require('egg').Service;
- // 本地清洗数据的统计业务 通用所有模块
- class StatisticsService extends Service {
- async saveBefore(model, data) {
- const { ctx } = this;
- const { dateString } = data;
- const result = await model.findOne({ dateString });
- ctx.logger.info(`查询本地${model.modelName}时间${dateString}`);
- if (result) {
- return true;
- }
- return false;
- }
- async save(model, data, isUpdate) {
- const { ctx } = this;
- const { dateString } = data;
- ctx.logger.info(`执行本地${model.modelName}时间${dateString}`);
- if (isUpdate) {
- const result = await model.findOne({ dateString });
- if (result) {
- delete data._id;
- const updateResult = await model.updateOne({ dateString }, data);
- if (!updateResult) {
- ctx.throw(`${model.modelName}更新数据异常`);
- }
- return updateResult;
- }
- }
- const createResult = await model.create(data);
- if (!createResult) {
- ctx.throw(`${model.modelName}插入数据异常`);
- }
- return createResult;
- }
- async taskCond(baseData) {
- const { ctx } = this;
- await this.sign([ 'app_task', 'ivi_online_user' ], baseData, async () => {
- await ctx.service.onlineUserService2.statistics(baseData);// 重新处理
- });
- await this.sign([ 'app_task' ], baseData, async () => {
- await ctx.service.appBehaviorRecordService.statistics(baseData);
- await ctx.service.appExceptionRecordService.statistics(baseData);
- await ctx.service.appFeedbackRecordService.statistics(baseData);
- });
- await this.sign([ 't_rbac_user' ], baseData, async () => {
- await ctx.service.tRbacUserService.statistics(baseData);// 重新处理
- });
- await this.sign([ 't_register_info' ], baseData, async () => {
- await ctx.service.tRegisterInfoService.statistics(baseData);
- });
- await this.sign([ 't_vehicle_record' ], baseData, async () => {
- await ctx.service.tVehicleRecordService2.statistics(baseData);// 重新处理
- });
- await this.sign([ 'driving_behavior_info' ], baseData, async () => {
- await ctx.service.drivingBehaviorInfoService2.statistics(baseData);// 重新处理
- });
- await this.sign([ 'stats_base_info' ], baseData, async () => {
- await ctx.service.statsBaseInfoService2.statistics(baseData);// 重新处理
- });
- await this.sign([ 'ivi_behavior_record' ], baseData, async () => {
- await ctx.service.iviBehaviorRecordService.statistics(baseData);// 重新处理
- });
- await this.sign([ 'msc_query_record' ], baseData, async () => {
- await ctx.service.msgQueryRecordService.statistics(baseData);
- });
- await this.sign([ 'msg_center_record' ], baseData, async () => {
- await ctx.service.msgCenterRecordService2.statistics(baseData);
- });
- }
- async taskDo(baseData) {
- const { ctx } = this;
- await ctx.service.tVehicleReportInfoService.statistics(baseData);// 重新处理
- await ctx.service.tBoxAutoTestService.statistics(baseData);// 重新处理
- await ctx.service.tFsmHitoryService.statistics(baseData);// 重新处理
- }
- async task(flag, startTime, endTime) { // flag "cond" "do"
- const { ctx } = this;
- // 更新历史时间段数据 => 修改createBaseData的参数做循环请求即可
- if (process.env.NODE_ENV === 'development') {
- } else {
- if (this.app.redis) { // 分布式锁 防止重复执行定时任务
- const result = await this.app.redis.setnx(`chart_task_${moment().format('YYYY-MM-DD')}`, '1');
- await this.app.redis.expire(`chart_task_${moment().format('YYYY-MM-DD')}`, 10 * 60);
- if (!result) {
- return;
- }
- } else {
- return;
- }
- }
- let satrtMoment;
- if (startTime) {
- satrtMoment = moment(startTime);
- } else {
- if (flag) {
- const newVar = await ctx.model.Local.StatisticsModel.findOne({ type: flag }).sort({ create_date: 1 });
- if (newVar) {
- satrtMoment = moment(newVar.create_date);
- await ctx.model.Local.StatisticsModel.deleteMany({ type: flag });
- }
- } else {
- const newVar = await ctx.model.Local.StatisticsModel.findOne({}).sort({ create_date: 1 });
- if (newVar) {
- satrtMoment = moment(newVar.create_date);
- await ctx.model.Local.StatisticsModel.deleteMany();
- }
- }
- }
- if (!satrtMoment) {
- satrtMoment = moment(this.ctx.helper.yesterday());
- }
- endTime = endTime || this.ctx.helper.today();
- while (satrtMoment.valueOf() < endTime) {
- const startTime = satrtMoment.valueOf();
- const eTime = satrtMoment.add(1, 'days').valueOf();
- const baseData = this.createBaseData(startTime, eTime);
- try {
- if (!flag) {
- await this.taskCond(baseData);
- await this.taskDo(baseData);
- } else {
- switch (flag) {
- case 'do':
- await this.taskDo(baseData);
- break;
- case 'cond':
- await this.taskCond(baseData);
- break;
- default:
- break;
- }
- }
- } catch (e) {
- const errData =
- { ...baseData.initData, start_time: new Date(), end_time: new Date(),
- type: flag,
- msg: `任务出现错误${baseData.initData.dateString}:${e.message}` };
- this.ctx.logger.info(errData);
- await ctx.model.Local.StatisticsModel.create(errData);
- }
- }
- }
- async sign(ids, baseData, callback) {
- const { ctx } = this;
- const result = await ctx.model.TaskRecordModel.find({ _id: { $in: ids } });
- let flag = false;
- if (result) {
- flag = result.every(item => {
- const time = moment(item.last_task_data).valueOf();
- return time >= baseData.timeRangData.endTime;
- });
- }
- if (flag) {
- // 可以执行
- await callback();
- } else {
- // 不能执行 延迟执行
- const errData =
- { ...baseData.initData, start_time: new Date(), end_time: new Date(),
- type: 'cond',
- msg: `任务出现延时${baseData.initData.dateString}:${ids.join(',')}` };
- this.ctx.logger.info(errData);
- await ctx.model.Local.StatisticsModel.create(errData);
- }
- }
- createBaseData(start, end) {
- const isForceUpdate = false;
- const yesterday = start || this.ctx.helper.yesterday();
- const today = end || this.ctx.helper.today();
- const initData = {
- year: moment(yesterday).year(),
- month: moment(yesterday).month() + 1,
- day: moment(yesterday).date(),
- dateString: moment(yesterday).format('YYYY-MM-DD'),
- create_date: yesterday,
- // start_time: new Date(),
- // end_time: new Date()
- };
- if (yesterday > today) {
- this.ctx.throw('任务开始时间大于结束时间');
- }
- const timeRangData = { startTime: yesterday, endTime: today };// 这边可以做循环处理
- return { timeRangData, initData, isForceUpdate };
- }
- }
- module.exports = StatisticsService;
|