Commit 9e2500d3 by liangkaiping

copy

parent b94570d0
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-xxl-job-executor</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>com.yizhi</groupId>
<artifactId>wmy-parent</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-site-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 专辑api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-album-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 本地依赖 -->
<!-- 课程api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-course-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 报表 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-statistics-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 培训项目api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-training-project-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 签到api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-sign-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 学习日志api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-course-studyLog-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 积分api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-point-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 考试api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-exam-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 调研api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-research-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--微信服务依赖-->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-wechat-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- 案例api 依赖 -->
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>library-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-system-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-newMessage-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-aliyun-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-suyinbean-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yizhi</groupId>
<artifactId>cloud-live-api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>wmy4.0</id>
<url>http://mvn.km365.pw/nexus/content/groups/wmy4.0-group/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
</project>
\ No newline at end of file
package com.yizhi.xxl.job.executor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
/**
* @author xuxueli 2018-10-28 00:38:13
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableFeignClients("com")
@ComponentScan(value = {"com.yizhi", "com.yizhi.xxl"})
public class XxlJobExecutorApplication {
public static void main(String[] args) {
SpringApplication.run(XxlJobExecutorApplication.class, args);
}
}
\ No newline at end of file
package com.yizhi.xxl.job.executor.constant;
/**
* @ClassName ReturnTContent
* @Description TODO
* @Author shengchenglong
* @DATE 2020/9/3 20:12
* @Version 1.0
*/
public interface ErrorMessage {
String FAIL_GET_REDIS_CONNECTION = "redisConnection 获取失败";
}
package com.yizhi.xxl.job.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
\ No newline at end of file
package com.yizhi.xxl.job.executor.mvc.controller;//package com.xxl.job.executor.mvc.controller;
//
//import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
//import org.springframework.stereotype.Controller;
//import org.springframework.web.bind.annotation.RequestMapping;
//import org.springframework.web.bind.annotation.ResponseBody;
//
//@Controller
//@EnableAutoConfiguration
//public class IndexController {
//
// @RequestMapping("/")
// @ResponseBody
// String index() {
// return "xxl job executor running.";
// }
//
//}
\ No newline at end of file
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.aliyun.application.feign.AliyunClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 媒体转码
*/
@Component
public class AddMediaTranscodeJob implements BaseJob {
@Autowired
private AliyunClient aliyunClient;
@Override
public void execute() throws Exception {
XxlJobLogger.log("-------------------------开始 处理媒体转码-----------------------------");
aliyunClient.dealFinishedTranscoding();
XxlJobLogger.log("-------------------------结束 处理媒体转码-----------------------------");
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.StatisticsAlbumClientV2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class AlbumJob implements BaseJob {
@Autowired
private StatisticsAlbumClientV2 albumClientV2;
@Override
public void execute() throws Exception {
try{
XxlJobLogger.log("智能陪练任务,启动");
albumClientV2.albumManualImport(null,null,null,null);
XxlJobLogger.log("智能陪练任务,完毕");
} catch(Exception e){
XxlJobLogger.log("智能陪练任务,异常", e);
throw e;
}
}
public void executeParams(String data) {
try{
XxlJobLogger.log("智能陪练任务,启动"+data);
if (!StringUtils.isEmpty(data)){
JSONObject jsonObject = JSONObject.parseObject(data);
String startDate = jsonObject.getString("startDate");
String endDate = jsonObject.getString("endDate");
Long siteId = jsonObject.getLong("siteId");
Long companyId = jsonObject.getLong("companyId");
albumClientV2.albumManualImport(startDate,endDate,siteId,companyId);
}
XxlJobLogger.log("智能陪练任务,完毕");
} catch(Exception e){
XxlJobLogger.log("智能陪练任务,异常", e);
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.album.application.feign.AlbumJobHandleClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class AlbumUnLockJob implements BaseJob {
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private AlbumJobHandleClient albumJobHandleClient;
@Override
public void execute() throws Exception {
XxlJobLogger.log("定时任务开始执行解锁课程,当前时间:" + format.format(new Date()));
try {
albumJobHandleClient.unLockCourse();
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("解锁课程失败=====================");
XxlJobLogger.log(e);
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.biz.model.ReturnT;
/**
* @ClassName BaseJob
* @Description TODO
* @Author shengchenglong
* @DATE 2020/9/2 10:44
* @Version 1.0
*/
public interface BaseJob {
void execute() throws Exception;
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.job.PracticeJobClient;
@Component
public class ChatPracticeJob implements BaseJob {
@Autowired
private PracticeJobClient client;
@Override
public void execute() throws Exception {
try{
XxlJobLogger.log("智能陪练任务,启动");
client.syncPracticeData(null, null);
XxlJobLogger.log("智能陪练任务,完毕");
} catch(Exception e){
XxlJobLogger.log("智能陪练任务,异常", e);
throw e;
}
}
public void executeParams(String data) {
try{
XxlJobLogger.log("智能陪练任务,启动"+data);
String[] arr = data.split(",");
client.syncPracticeData(arr[0], arr[1]);
XxlJobLogger.log("智能陪练任务,完毕");
} catch(Exception e){
XxlJobLogger.log("智能陪练任务,异常", e);
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.live.application.feign.LiveReplayClient;
import com.yizhi.statistics.application.feign.StatisticsCourseClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CourseHandlerJob implements BaseJob {
@Autowired
private StatisticsCourseClient statisticsCourseClient;
@Override
public void execute() throws Exception {
try{
XxlJobLogger.log("直播初始化回放视频,开始");
XxlJobLogger.log("直播初始化回放视频,完毕");
} catch(Exception e){
XxlJobLogger.log("直播初始化回放视频,异常", e);
throw e;
}
}
public void executeParams(String data) {
String[] params = data.split(",");
Long companyId = null;
Long siteId = null;
try {
companyId = Long.getLong(params[2]);
}catch (Exception e) {
XxlJobLogger.log("companyId is null", e);
}
try {
siteId = Long.getLong(params[3]);
}catch (Exception e) {
XxlJobLogger.log("companyId is null", e);
}
statisticsCourseClient.AsynchronousCourse(params[0],params[1],companyId,siteId);
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.library.application.feign.StatisticStudentCaseClient;
import com.yizhi.statistics.application.feign.StatisticsCourseClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class CourseIntoTableJob implements BaseJob {
@Autowired
private StatisticsCourseClient statisticsCourseClient;
@Autowired
private StatisticStudentCaseClient statisticStudentCaseClient;
@Override
public void execute() throws Exception {
try {
XxlJobLogger.log("在线课程自动入表任务,启动");
statisticsCourseClient.AsynchronousCourse(null, null);
XxlJobLogger.log("在线课程自动入表任务,完毕");
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("在线课程自动入表任务,异常");
throw e;
}
try {
XxlJobLogger.log("学员案例自动入表任务,启动");
statisticStudentCaseClient.insertAll();
XxlJobLogger.log("学员案例自动入表任务,完毕");
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("学员案例自动入表任务,异常");
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.course.application.feign.CourseStudyHourRankingClient;
import com.yizhi.system.application.system.remote.SiteClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 课程学习时长排名处理
* <br> 第二步骤START_TIME 时间初始化为0,表示job不会触发,等功能实现完需要填写开始启动的时间戳
* <br> 如果需要在job的corn之前再次启动可以修改第二步的NEXT_FIRE_TIME 字段为下次启动的时间戳
*/
@Component
public class CourseStudyHourRankingJob implements BaseJob {
private final Logger LOG = LoggerFactory.getLogger(CourseStudyHourRankingJob.class);
//feature_867_xiehaijun
@Autowired
private CourseStudyHourRankingClient courseStudyHourRankingClient;
@Autowired
private SiteClient siteClient;
// 参考http://www.ibloger.net/article/2650.html
// 1 在QRTZ_JOB_DETAILS 配置 job执行的处理类,实现 implements BaseJob, InterruptableJob
//SCHED_NAME JOB_NAME JOB_GROUP DESCRIPTION JOB_CLASS_NAME IS_DURABLE IS_NONCONCURRENT IS_UPDATE_DATA REQUESTS_RECOVERY
//scheduler studyHourRanking course 课程学习时间排名 Job定义 com.fulan.application.handle.quartz.job.CourseStudyHourRankingJob 0 0 0 0
// 2 在QRTZ_TRIGGERS(存储已配置的 Trigger 的信息)表添加触发时间点(每次执行完批处理后,根据QRTZ_CRON_TRIGGERS表配置的cron表达式生成新的下一个执行时间)
// NEXT_FIRE_TIME(下次执行时间戳例如1566282600000)和NEXT_FIRE_TIME设置为0;为0表示初始化,这些字段在下个job启动完会更新
// NEXT_FIRE_TIME的更新:在暂停、恢复任务状态或是每次时间轮询时都会判断NEXT_FIRE_TIME值是否小于当前时间,如果小于当前时间将永远不会执行
//SCHED_NAME TRIGGER_NAME TRIGGER_GROUP JOB_NAME JOB_GROUP DESCRIPTION NEXT_FIRE_TIME PREV_FIRE_TIME PRIORITY TRIGGER_STATE TRIGGER_TYPE START_TIME END_TIME CALENDAR_NAME MISFIRE_INSTR
//scheduler studyHourRankingTrig courseTrig studyHourRanking course 课程学习时间排名触发器 0 0 null WAITING CRON 填写开始启动时间戳 0 null null
// 3 在QRTZ_CRON_TRIGGERS 表配置cron表达式
//SCHED_NAME TRIGGER_NAME TRIGGER_GROUP CRON_EXPRESSION TIME_ZONE_ID
//scheduler studyHourRankingTrig courseTrig 0 0 4 * * ? Asia/Shanghai
@Override
public void execute() throws Exception {
LOG.info("######################################################课程学习时长排名生成处理开始");
List<Long> siteIds = siteClient.getAllSiteId();
for (Long id : siteIds) {
XxlJobLogger.log("站点{}开始生成站点排名......", id);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
XxlJobLogger.log("等待发生一次{}", e);
throw e;
}
courseStudyHourRankingClient.rankingGenerated(id);
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import cn.hutool.core.date.DateUtil;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.course.application.feign.RemoteDaIndicatorCourseClient;
import com.yizhi.system.application.system.remote.RemoteDaIndicatorSystemClient;
import com.yizhi.system.application.system.remote.SiteClient;
import com.yizhi.system.application.vo.RemoteDaIndicatorVo;
import com.yizhi.training.application.feign.RemoteDaIndicatorTpClient;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
/**
* 首页dashboard指标job
*/
@Component
public class DashboardJob implements BaseJob {
private static final Logger log = LoggerFactory.getLogger(DashboardJob.class);
@Autowired
private RemoteDaIndicatorSystemClient systemClient;
@Autowired
private RemoteDaIndicatorCourseClient courseClient;
@Autowired
private RemoteDaIndicatorTpClient tpClient;
@Autowired
private SiteClient siteClient;
@Override
public void execute() throws Exception {
List<Long> allSiteIds = siteClient.getAllSiteId();
if (CollectionUtils.isEmpty(allSiteIds)) {
XxlJobLogger.log("-------------------- dashboard 未查询到站点列表,结束执行 ----------------------");
return;
}
Date now = new Date();
Date yesterday = DateUtil.offsetDay(now, -1).toJdkDate();
Date endOfWeek = DateUtil.beginOfDay(DateUtil.endOfWeek(yesterday)).toJdkDate();
Date startOfWeek = DateUtil.beginOfDay(DateUtil.beginOfWeek(yesterday)).toJdkDate();
for (Long siteId : allSiteIds) {
try {
XxlJobLogger.log("-------------------- dashboard 折线图 开始处理 site_id: {} ----------------------", siteId);
systemClient.accountDay(new RemoteDaIndicatorVo(siteId, null, null, yesterday, now));
XxlJobLogger.log("-------------------- dashboard 折线图 处理完成 site_id: {} ----------------------", siteId);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
// 如果是最后一天,则统计dashboard4个指标
if (DateUtil.isSameDay(yesterday, endOfWeek)) {
try {
XxlJobLogger.log("-------------------- dashboard 活跃用户比例 开始处理 site_id: {} ----------------------", siteId);
systemClient.account(new RemoteDaIndicatorVo(siteId, startOfWeek, endOfWeek, null, now));
XxlJobLogger.log("-------------------- dashboard 活跃用户比例 处理完成 site_id: {} ----------------------", siteId);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
try {
XxlJobLogger.log("-------------------- dashboard 课程完成数 开始处理 site_id: {} ----------------------", siteId);
courseClient.courseFinish(new com.yizhi.course.application.vo.RemoteDaIndicatorVo(siteId, startOfWeek, endOfWeek, null, now));
XxlJobLogger.log("-------------------- dashboard 课程完成数 处理完成 site_id: {} ----------------------", siteId);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
try {
XxlJobLogger.log("-------------------- dashboard 课程学习时长 开始处理 site_id: {} ----------------------", siteId);
courseClient.courseTime(new com.yizhi.course.application.vo.RemoteDaIndicatorVo(siteId, startOfWeek, endOfWeek, null, now));
XxlJobLogger.log("-------------------- dashboard 课程学习时长 处理完成 site_id: {} ----------------------", siteId);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
try {
XxlJobLogger.log("-------------------- dashboard 项目通过率 开始处理 site_id: {} ----------------------", siteId);
tpClient.tpFinish(new com.yizhi.training.application.vo.RemoteDaIndicatorVo(siteId, startOfWeek, endOfWeek, null, now));
XxlJobLogger.log("-------------------- dashboard 项目通过率 处理完成 site_id: {} ----------------------", siteId);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.exam.application.feign.StatisticsExamMetadataClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ExamInToTableJob implements BaseJob {
@Autowired
private StatisticsExamMetadataClient statisticsExamMetadataClient;
@Override
public void execute() throws Exception {
try {
XxlJobLogger.log("考试报表自动入表任务,启动");
statisticsExamMetadataClient.AsynchronousExam("", "");
XxlJobLogger.log("考试报表自动入表任务,完毕");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
XxlJobLogger.log("考试报表自动入表任务,异常");
throw e;
}
}
}
/**
* FileName: HttpSetjob
* Author: wenjunlong
* Date: 2018/5/26 13:59
* Description: 发送请求job
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改时间 版本号 描述
*/
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.exam.application.feign.ExamQuestionApiClient;
import com.yizhi.exam.application.vo.exam.ExamAnswerVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 定时检查是否有过期的考试,有的话自动提交考试信息
* @author mei
*
*/
@Component
public class ExamJob implements BaseJob {
@Autowired
ExamQuestionApiClient examQuestionApiClient;
@Override
public void execute() throws Exception {
try{
XxlJobLogger.log("考试失效自动提交任务,启动");
ExamAnswerVO answerVo = new ExamAnswerVO();
examQuestionApiClient.jobSubmitExamLose(answerVo);
XxlJobLogger.log("考试失效自动提交任务,完毕");
} catch(Exception e){
e.printStackTrace();
XxlJobLogger.log("考试失效自动提交任务,异常");
throw e;
}
}
}
/**
* FileName: HttpSetjob
* Author: wenjunlong
* Date: 2018/5/26 13:59
* Description: 发送请求job
* History:
* <author> <time> <version> <desc>
* 作者姓名 修改时间 版本号 描述
*/
package com.yizhi.xxl.job.executor.service.job;
import com.alibaba.fastjson.JSON;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.application.feign.StudyLogClient;
import com.yizhi.application.vo.StudyLogVo;
import com.yizhi.core.application.cache.RedisCache;
import com.yizhi.core.application.cache.distributedlock.impl.RedisDistributedLock;
import com.yizhi.course.application.feign.CourseClient;
import com.yizhi.course.application.feign.CourseStudyClient;
import com.yizhi.course.application.feign.RecordeClient;
import com.yizhi.course.application.vo.HeartBeatVo;
import com.yizhi.course.application.vo.StudyLog;
import com.yizhi.course.application.vo.domain.ChapterEntityVo;
import com.yizhi.course.application.vo.domain.CourseEntityVo;
import com.yizhi.course.application.vo.domain.MaterialEntityVo;
import com.yizhi.course.application.vo.domain.RecordeEntityVo;
import com.yizhi.util.application.date.DateUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;
import java.util.stream.Collectors;
/**
* 课程学习心跳处理
* <br> 原处理已经备份到HttpSetjobBack
*
* @author 谢海军
* @create 2019/1/23
* @since 1.0.0
*/
@Component
public class HttpSetjob implements BaseJob {
private final static String REDIS_KEY = "httpSetjob-singleton";
private final static String SPILT = ",";
private final static String REDIS_KEY_MAX = "heartbeat.max.id";
private final static String REDIS_RERTY_MAX = "heartbeat.retry.max";
private final static String COURSE_DISTRIBUTED_LOCK = "distributed.lock:course";
@Autowired
private StudyLogClient studyLogClient;
@Autowired
private RecordeClient recordeClient;
@Autowired
private CourseStudyClient courseStudyClient;
@Autowired
private RedisDistributedLock redisDistributedLock;
@Autowired
private RedisCache redisCache;
@Autowired
private CourseClient courseClient;
@Override
public void execute() throws Exception {
String uuid = UUID.randomUUID().toString();
// 四分分钟的锁等待(重试次数乘等待时间 < 过期时间) 加锁不放在try,免得finally块执行解锁
//boolean lockStatus = redisDistributedLock.lock(REDIS_KEY,250000,24,10000);
boolean lockStatus = false;
Date execDate = Calendar.getInstance().getTime();
//XxlJobLogger.log(uuid+"批次,心跳处理开始时间{},分布式锁的状态={}", DateUtil.format(execDate, "yyyy-MM-dd HH:mm:ss.SSS"),lockStatus);
/*if(lockStatus){
XxlJobLogger.log(uuid+"批次,分布式锁的状态={},加锁成功,业务开始",lockStatus);
}else{
XxlJobLogger.log(uuid+"批次,分布式锁的状态={},加锁失败,业务不进行处理",lockStatus);
return;
}*/
try {
Map<String, List<StudyLogVo>> errorData = new HashMap<>();
// 获取最大的id
Object maxIdObj = redisCache.get(REDIS_KEY_MAX);
XxlJobLogger.log(uuid + "批次,redis存在的最大id={}", maxIdObj);
if (Objects.nonNull(maxIdObj) && "off".equalsIgnoreCase(maxIdObj.toString())) {
XxlJobLogger.log(uuid + "批次,心跳服务开关为关闭,停止心跳处理");
return;
}
// 获取最大的id
Object retryMaxRedisObj = redisCache.get(REDIS_RERTY_MAX);
int retryMaxRedis = 1;
if (Objects.nonNull(retryMaxRedisObj)) {
retryMaxRedis = Integer.valueOf(retryMaxRedisObj.toString());
}
XxlJobLogger.log(uuid + "批次,redis重试次数={}", retryMaxRedisObj);
List<StudyLogVo> studyLogs = studyLogClient.list("default");
//XxlJobLogger.log("原始数据={}",JSON.toJSONString(studyLogs));
if (CollectionUtils.isNotEmpty(studyLogs)) {
if (Objects.nonNull(maxIdObj)) {
long maxIdTmp = Long.valueOf(maxIdObj.toString());
// 获取数据库最小的id
//Long minId = studyLogClient.minId();
Long minId = studyLogs.get(0).getId();
XxlJobLogger.log(uuid + "批次,处理的数据库的最小id={},maxIdTmp={}", minId, maxIdTmp);
//if (minId <= maxIdTmp && retryMaxRedis<2) {
if (minId <= maxIdTmp) {
retryMaxRedis = retryMaxRedis + 1;
if (retryMaxRedis >= 35) {
XxlJobLogger.log(uuid + "!!!!!!!!!!!!!!批次已经重试{}次,现在进行重置归零操作##########################", 30);
redisCache.set(REDIS_KEY_MAX, "0");
retryMaxRedis = 1;
}
redisCache.set(REDIS_RERTY_MAX, retryMaxRedis + "");
XxlJobLogger.log(uuid + "批次,新的批处理本次查询的数据库最小id={}小于上次redis存放的最大id={},说明上次处理的还没处理完成,结束本次处理", minId, maxIdTmp);
return;
}
}
HeartBeatVo heartBeatVo = new HeartBeatVo();// todo 心跳参数设置,目前为vo固定,获取根据接口获取
// 查询scorm课程的心跳记录
List<StudyLogVo> scormStudyLogs = studyLogClient.list("scorm");
if (CollectionUtils.isNotEmpty(scormStudyLogs)) {
Map<String, List<StudyLogVo>> scormStudyData = scormStudyLogs.parallelStream()
.collect(Collectors.groupingBy(scormObjKey -> scormObjKey.getAccountId() + "#" + scormObjKey.getCourseId()));
for (Map.Entry<String, List<StudyLogVo>> scormItem : scormStudyData.entrySet()) {
//Optional<StudyLog> validData = scormItem.getValue().parallelStream().filter(scormItemFilter -> scormItemFilter.getChapterId()>0).findAny();
List<StudyLogVo> dataSort = scormItem.getValue().stream().sorted(Comparator.comparing(StudyLogVo::getNginxTime)).collect(Collectors.toList());
int size = dataSort.size();
StudyLogVo studyLogScorm = dataSort.get(size - 1);
studyLogScorm.setTerminalDevice((size * heartBeatVo.getCount()) + "");
studyLogScorm.setCreateTime(dataSort.get(0).getNginxTime());
StudyLog courseStudentLogVo = new StudyLog();
BeanUtils.copyProperties(studyLogScorm,courseStudentLogVo);
courseClient.saveScormHeartBeat(courseStudentLogVo);
/*if(validData.isPresent()){
// 传输数据进行更新
StudyLog studyLogScorm = validData.get();
List<StudyLog> data = scormItem.getValue();
studyLogScorm.setTerminalDevice((scormItem.getValue().size()*heartBeatVo.getCount())+"");
courseClient.saveScormHeartBeat(studyLogScorm);
}*/
}
boolean flag = studyLogClient.deleteBatch(scormStudyLogs.stream().map(obj -> obj.getId()).collect(Collectors.toList()));
XxlJobLogger.log("scrom课程的时长处理完毕");
} else {
XxlJobLogger.log("没有scrom心跳信息");
}
Long maxId = studyLogs.get(studyLogs.size() - 1).getId();
XxlJobLogger.log(uuid + "批次,处理的数据库的最大id={}", maxId);
redisCache.set(REDIS_KEY_MAX, maxId.toString());
redisCache.set(REDIS_RERTY_MAX, "1");
XxlJobLogger.log("查询到studyLog数据{}条", studyLogs.size());
//studyLogs.stream().parallel().collect(Collectors.groupingBy(obj->obj.getSessionId(),HashMap::new,Collectors.toCollection(LinkedList::new)));
Map<String, List<StudyLogVo>> data = studyLogs.stream().parallel().collect(Collectors.groupingBy(
//obj -> obj.getSessionId() + SPILT + obj.getCourseId() + SPILT + obj.getChapterId()));//,Collectors.counting()
obj -> obj.getSessionId()));
XxlJobLogger.log("分组SessionId以后的数据为{}条", data.size());
int excutIndex = 0;
//Set<String> keySets = data.keySet();
List<String> sessionIds = new ArrayList<>();
Set<Long> courseIds = new HashSet<>();
List<Long> chapterIds = new ArrayList<>();
//for (String keyVal : keySets) {
for (Map.Entry<String, List<StudyLogVo>> entryObj : data.entrySet()) {
//String[] valTemp = keyVal.split(SPILT);
sessionIds.add(entryObj.getKey());
List<StudyLogVo> studyLogsIn = entryObj.getValue();
StudyLogVo studyLogIn = studyLogsIn.get(0);
Long courseIdIn = studyLogIn.getCourseId();
Long chapterId = studyLogIn.getChapterId();
if (Objects.isNull(courseIdIn) || courseIdIn.longValue() <= 0) {
StudyLogVo studyLogIn1 = studyLogsIn.stream().filter(obj -> (Objects.nonNull(obj.getChapterId()) && obj.getChapterId().longValue() > 0))
.findFirst().get();
courseIdIn = studyLogIn1.getCourseId();
chapterId = studyLogIn1.getChapterId();
}
courseIds.add(courseIdIn);
chapterIds.add(chapterId);
}
// 查询已经存在的学习记录
// XxlJobLogger.log("sessionIds={}", sessionIds);
List<RecordeEntityVo> recordes = recordeClient.getExistRecorde(sessionIds);
//XxlJobLogger.log("根据sessionIds={},查询结果={}",JSON.toJSONString(sessionIds),JSON.toJSONString(recordes));
Map<String, RecordeEntityVo> recordeMap = null; // 存在的学习记录
boolean existFlag = false;
if (CollectionUtils.isNotEmpty(recordes)) {
existFlag = true;
try {
recordeMap = recordes.stream().collect(Collectors.toMap(key -> key.getSessionId(), val -> val));
} catch (Exception e) {
XxlJobLogger.log("存在重复的学习课程,进行特殊处理,数据={}", JSON.toJSONString(recordes));
recordeMap = new HashMap<>();
Map<String, List<RecordeEntityVo>> recordesMap = recordes.stream().collect(Collectors.groupingBy(key -> key.getSessionId()));
for (Map.Entry<String, List<RecordeEntityVo>> item : recordesMap.entrySet()) {
List<RecordeEntityVo> recordeListT = item.getValue();
int legt = recordeListT.size();
if (legt == 1) {
recordeMap.put(item.getKey(), recordeListT.get(0));
} else {
recordeListT.sort(new Comparator<RecordeEntityVo>() {
@Override
public int compare(RecordeEntityVo o1, RecordeEntityVo o2) {
return o2.getEndTime().compareTo(o1.getEndTime());
}
});
recordeMap.put(item.getKey(), recordeListT.get(legt - 1));
}
}
}
}
//XxlJobLogger.log("根据sessionIds={},查询结果转成map={}",JSON.toJSONString(sessionIds),JSON.toJSONString(recordeMap));
// 查询课程信息
List<CourseEntityVo> courseList = null;
try {
if (recordeMap != null) {
for (RecordeEntityVo recordeItems : recordeMap.values()) {
courseIds.add(recordeItems.getCourseId());
chapterIds.add(recordeItems.getChapterId());
}
}
courseList = recordeClient.selectExistCourse(courseIds);
} catch (Exception e) {
return;
}
Map<Long, CourseEntityVo> courseMap = courseList.stream().collect(Collectors.toMap(key -> key.getId(), val -> val));
// 查询章节信息
List<ChapterEntityVo> chapterList = recordeClient.selectExistChapter(chapterIds);
Map<Long, ChapterEntityVo> chapterMap = chapterList.stream().collect(Collectors.toMap(key -> key.getId(), val -> val));
// 查询素材信息
List<Long> materials = chapterList.stream().map(obj -> obj.getMaterialId()).collect(Collectors.toList());
List<MaterialEntityVo> materialList = recordeClient.selectExistMaterial(materials);
Map<Long, MaterialEntityVo> materialMap = materialList.stream().collect(Collectors.toMap(key -> key.getId(), val -> val));
// 处理心跳数据
List<RecordeEntityVo> newRecorde = new ArrayList<>();
//List<Recorde> existRecorde = new ArrayList<>();
// 更新的学习记录
List<RecordeEntityVo> upRecorde = new ArrayList<>();
//XxlJobLogger.log("对原始数据进行分组以后的数据={}",JSON.toJSONString(data));
for (Map.Entry<String, List<StudyLogVo>> item : data.entrySet()) {
excutIndex++;
XxlJobLogger.log("分组SessionId以后的开始处理{}条数据", excutIndex);
String keyVal = item.getKey();
String[] keyValArrays = keyVal.split(SPILT);
String sessionId = keyValArrays[0];
if (StringUtils.isBlank(sessionId)) {
//XxlJobLogger.log("sessionId为空={}", keyVal);
List<StudyLogVo> studyLogVoList = item.getValue();
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put(UUID.randomUUID().toString() + ": sessionId为空", courseStudyLogVOList);
XxlJobLogger.log("sessionId为空的数据={}", JSON.toJSONString(item));
continue;
}
List<StudyLogVo> valList = item.getValue();
// 每次处理的批次的数据条数
int studyLogSize = valList.size();
// 学习时间次数(对相同时间的同一个sessionId进行合并)
//int groupSize = valList.stream().collect(Collectors.groupingBy(gkey -> gkey.getNginxTime().getTime())).size();
//sortList(valList);
// 常规逆序 numList.stream().sorted();(常规升序)
//numList = numList.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());
// 对象排序 applelist.stream().sorted(Comparator.comparing(Apple::getWight).reversed());(逆序)
//List<Apple> applelist1 = applelist.stream().sorted(Comparator.comparing(Apple::getWight)).collect(Collectors.toList());
/*Collections.sort(valList, (e1, e2) -> {
//return e1.getWight() - e2.getWight();
return e1.getNginxTime().compareTo(e2.getNginxTime());
});*/
valList = valList.stream().sorted(Comparator.comparing(StudyLogVo::getNginxTime)).collect(Collectors.toList());
//XxlJobLogger.log("排序后的数据={}",JSON.toJSONString(valList));
// 合并心跳记录转换成学习记录
RecordeEntityVo recorde = new RecordeEntityVo();
StudyLogVo lastStudy = valList.get(studyLogSize - 1);
// 获取最后的学习时间
//long lastStudyDateVal = lastStudy.getTimestamp();
//Date lastStudyDate = new Date(lastStudyDateVal);
Integer progress = lastStudy.getProgress();
Date lastStudyDate = lastStudy.getNginxTime();
//XxlJobLogger.log("最后学习时间={}",lastStudyDate);
StudyLogVo startStudyLog = valList.get(0);
// 获取开始时间
//long startStudyDateVal = startStudyLog.getTimestamp();
Date firstStartDate = startStudyLog.getNginxTime();
// 获取课程的策略
Long courseIdBycurrStudy = startStudyLog.getCourseId();
CourseEntityVo course = courseMap.get(courseIdBycurrStudy);
if (course == null) {
if (existFlag && recordeMap.containsKey(sessionId)) {
courseIdBycurrStudy = recordeMap.get(sessionId).getCourseId();
course = courseMap.get(courseIdBycurrStudy);
}
if (course == null) {
//XxlJobLogger.log("课程id={}的心跳的课程不存在", startStudyLog.getCourseId());
List<StudyLogVo> studyLogVoList = item.getValue();
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put("课程id=" + startStudyLog.getCourseId() + "的心跳的课程不存在", courseStudyLogVOList);
XxlJobLogger.log("课程id为空的数据={}", JSON.toJSONString(item));
continue;
}
}
int duration = 3;
int overed = 0;
String extedText = "心跳未完成";
Integer overedTmp = lastStudy.getMaterialCompleted();
if (Integer.valueOf(1).equals(overedTmp)) {
extedText = "顺时间心跳完成";
overed = 1;
} else {
Optional<StudyLogVo> dataOpt = valList.stream().parallel().filter(itemStudy -> Integer.valueOf(1).equals(itemStudy.getMaterialCompleted())).findAny();
if (dataOpt.isPresent()) {
//StudyLog headObj = dataOpt.get();
overed = 1;
extedText = "非顺序心跳完成";
}
}
RecordeEntityVo existRecordeItem = null; // 是否是有学习过的记录
if (existFlag && recordeMap.containsKey(sessionId)) {
// 获取存在的学习记录
existRecordeItem = recordeMap.get(sessionId);
duration = existRecordeItem.getDuration();// 获取以前学习时长
// 组装学习记录对象
BeanUtils.copyProperties(existRecordeItem, recorde);
String terminalDevice = existRecordeItem.getTerminalDevice();
if (terminalDevice.indexOf(extedText) < 0) {
terminalDevice += "-->" + startStudyLog.getTerminalDevice() + "[" + DateUtil.toShortSeconds(new Date()) + "]" + recorde.getOvered() + extedText;
recorde.setTerminalDevice(terminalDevice);
} else if (overed == 1) {
terminalDevice += "-->" + "[" + DateUtil.toShortSeconds(new Date()) + "]" + recorde.getOvered() + extedText;
recorde.setTerminalDevice(terminalDevice);
}
String startTerminalType = existRecordeItem.getTerminalType();
if (startTerminalType == null) {
recorde.setTerminalType(startStudyLog.getTerminalType());
}
recorde.setId(existRecordeItem.getId());
} else {
//recorde.setStartTime(new Date(startStudyDateVal - (heartBeatVo.getCount() * 1000)));//开始时间
recorde.setStartTime(firstStartDate);
recorde.setTerminalDevice("心跳新增的问题数据[" + DateUtil.toShortSeconds(new Date()) + recorde.getOvered() + "]" + startStudyLog.getTerminalDevice() + extedText);
String startTerminalType = startStudyLog.getTerminalType();
if (startTerminalType != null) {
recorde.setTerminalType(startTerminalType);
}
recorde.setSessionId(startStudyLog.getSessionId());
Long studyLogChapterId = startStudyLog.getChapterId();
recorde.setCourseId(startStudyLog.getCourseId());
recorde.setChapterId(studyLogChapterId);
if (studyLogChapterId == null || studyLogChapterId.longValue() == 0) {
Optional<StudyLogVo> dataStudyLogChapterIdL = valList.stream().parallel().filter
(studyLogChapterIdL -> (studyLogChapterIdL.getChapterId() != null
&& studyLogChapterIdL.getChapterId().longValue() > 0)).findAny();
if (dataStudyLogChapterIdL.isPresent()) {
StudyLogVo studyLogChapterIdLd = dataStudyLogChapterIdL.get();
recorde.setCourseId(studyLogChapterIdLd.getCourseId());
recorde.setChapterId(studyLogChapterIdLd.getChapterId());
} else {
List<StudyLogVo> studyLogVoList = item.getValue();
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put(sessionId + "章节id为空", courseStudyLogVOList);
XxlJobLogger.log("章节id为空的数据={}", JSON.toJSONString(item));
continue;
}
}
//把学习记录 加上当前课程的企业id 站点id 部门id
String studyAccountId = startStudyLog.getAccountId();
if (StringUtils.isBlank(studyAccountId)) {
List<StudyLogVo> studyLogVoList = item.getValue();
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put(sessionId + "账号id为空", courseStudyLogVOList);
XxlJobLogger.log("账号id为空的数据={}", JSON.toJSONString(item));
continue;
}
recorde.setAccountId(Long.valueOf(studyAccountId));
String studyOrgId = startStudyLog.getOrgId();
if (StringUtils.isBlank(studyOrgId) || studyOrgId.equals("0")) {
recorde.setOrgId(course.getOrgId());
} else {
try {
recorde.setOrgId(Long.valueOf(studyOrgId));
} catch (Exception e) {
recorde.setOrgId(course.getOrgId());
}
}
Long studySiteId = startStudyLog.getSiteId();
if (Objects.isNull(studySiteId) || studySiteId.longValue() == 0) {
recorde.setSiteId(course.getSiteId());
} else {
recorde.setSiteId(studySiteId);
}
String studyCompanId = startStudyLog.getCompanyId();
if (StringUtils.isBlank(studyCompanId) || studyCompanId.equals("0")) {
recorde.setCompanyId(course.getCompanyId());
} else {
try {
recorde.setCompanyId(Long.valueOf(studyCompanId));
} catch (NumberFormatException e) {
recorde.setCompanyId(course.getCompanyId());
}
}
// 判断章节是否已经被用户学习完成过,如果是,该用户后面的学习都为完成
/*int chapterFinished = recordeClient.getCourseChapterFinished(recorde.getAccountId()
, recorde.getChapterId());
if (chapterFinished > 0) {
overed = 1;
}*/
recorde.setValid(1);
recorde.setExitstates(1);
}
recorde.setProgress(progress);
recorde.setOvered(overed); //先设置一次完成状态,如果0,还会第二次赋值
int countDuration = heartBeatVo.getCount() * studyLogSize; // 本次学习时长
//int countDuration = heartBeatVo.getCount() * groupSize;
duration = duration + countDuration;
recorde.setDurationTmp(countDuration);// 设置临时的学习时长,解决培训的时长记录问题
//long diff =recorde.getEndTime().getTime()- recorde.getStartTime().getTime();
recorde.setDuration(duration);// 设置每次学习的学习时长
// 计算章节的学习完成状态
int dbOvered = 0;
if (existRecordeItem != null && existRecordeItem.getOvered().intValue() == 1) {
dbOvered = 1;
}
if (overed == 0 && dbOvered == 0) {
// 获取章节的类型
Long chapterId = startStudyLog.getChapterId();
if (Long.valueOf(0).equals(chapterId)) {
chapterId = item.getValue().parallelStream().map(objVal -> objVal.getChapterId() == null ? 0 : objVal.getChapterId().longValue())
.max((a, b) -> a > b ? 1 : -1).get();
if (existRecordeItem != null &&
(existRecordeItem.getChapterId() == null || existRecordeItem.getChapterId().longValue() == 0)) {
existRecordeItem.setChapterId(chapterId);
}
}
if (chapterId == null || chapterId.intValue() == 0) {
if (existRecordeItem != null) {
chapterId = existRecordeItem.getChapterId();
}
}
ChapterEntityVo chapter = chapterMap.get(chapterId);
if (chapter == null) {
//XxlJobLogger.log("{}该章节已经在课程里已被修改,重新查询", chapterId);
List<StudyLogVo> studyLogVoList = item.getValue();
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put(sessionId + "章节不存在", courseStudyLogVOList);
XxlJobLogger.log("章节不存在的数据={}", JSON.toJSONString(item));
continue;
}
// 素材类型1音频 2 视频 3 文本 4 富媒体 5 Scrom
int materialType = chapter.getMaterialType();
switch (materialType) {
case 1:
int audioComplete = course.getAudioComplete();//音频完成条件 0 播放即完成 1听完整个课程
if (audioComplete == 0) {
overed = 1;
}
break;
case 2:
int videoComplete = course.getVideoComplete();//视频完成条件 0 播放即完成 1听完整个课程
if (videoComplete == 0) {
overed = 1;
}
break;
case 3:
int docComplete = course.getDocComplete();//文档完成条件 0 观看即完成 1停留时长达到学习时长即完成
overed = getOvered(materialMap, countDuration, overed, chapter, docComplete, //duration 修改成 countDuration(当次学习时长)
recorde.getAccountId(), recorde.getSiteId());
break;
case 4:
int richComplete = course.getRichComplete();//多媒体完成条件 0 观看即完成 1停留时长达到学习时长即完成
overed = getOvered(materialMap, countDuration, overed, chapter, richComplete, //duration 修改成 countDuration(当次学习时长)
recorde.getAccountId(), recorde.getSiteId());
break;
default:
}
if (overed == -1) {
List<StudyLogVo> studyLogVoList = item.getValue();
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put(sessionId + "素材不存在", courseStudyLogVOList);
XxlJobLogger.log("素材不存的数据={}", JSON.toJSONString(item));
continue;
}
recorde.setOvered(overed);
}
if (existRecordeItem == null) {
newRecorde.add(recorde);
} else {
recorde.setEndTime(lastStudyDate);
upRecorde.add(recorde);
}
XxlJobLogger.log("分组SessionId以后的完成处理{}条数据", excutIndex);
}
// 同一门课程的章节学习,获取该课程最后一次章节学习记录
List<RecordeEntityVo> allRecordes = new ArrayList<>();
allRecordes.addAll(newRecorde);// 合并学习记录
allRecordes.addAll(upRecorde);
Map<String, RecordeEntityVo> upRecordeMap = new HashMap<>();
if (CollectionUtils.isNotEmpty(upRecorde)) {
upRecordeMap = upRecorde.stream().collect(Collectors.toMap(upKey -> upKey.getSessionId(), upVal -> upVal));
}
// 发放积分和课程完成状态
if (CollectionUtils.isNotEmpty(allRecordes)) {
XxlJobLogger.log("处理课程完成状态的数据{}条", allRecordes.size());
int courseSync = 0;
for (RecordeEntityVo sendPoint : allRecordes) {
courseSync++;
XxlJobLogger.log("处理课程完成状态的数据第{}条", courseSync);
String courseLockKey = COURSE_DISTRIBUTED_LOCK + ":" + sendPoint.getCourseId() + "." + sendPoint.getAccountId();
String sessionId = sendPoint.getSessionId();
int maxRetry = 0;
boolean courseExecLockStatus = false;
do {
courseExecLockStatus = redisDistributedLock.lock(courseLockKey, 5000, 90, 50);
maxRetry++;
} while (!courseExecLockStatus && maxRetry < 3);
try {
try {
//XxlJobLogger.log("发送处理的数据={}",JSON.toJSONString(sendPoint));
courseStudyClient.completionStatus(sendPoint);
} catch (Exception e) {
XxlJobLogger.log("{}发放积分异常{}", JSON.toJSONString(sendPoint), e);
String errKey = sendPoint.getSessionId();
List<StudyLogVo> studyLogVoList = data.get(errKey);
List<StudyLogVo> courseStudyLogVOList = getCourseStudyLogVO(studyLogVoList);
errorData.put(errKey + "发放积分异常", courseStudyLogVOList);
}
if (upRecordeMap.containsKey(sessionId)) {
RecordeEntityVo recordeU = new RecordeEntityVo();
recordeU.setId(sendPoint.getId());
recordeU.setEndTime(sendPoint.getEndTime());
if (Integer.valueOf(1).equals(sendPoint.getOvered())) {
recordeU.setOvered(sendPoint.getOvered());
}
recordeU.setDuration(sendPoint.getDuration());
String deviceInfo = sendPoint.getTerminalDevice();
int lengthStr = deviceInfo.length();
if (lengthStr < 300) {
recordeU.setTerminalDevice(deviceInfo);
} else {
deviceInfo += "->" + sendPoint.getOvered() + "心跳过长" + DateUtil.getSimpleDateFormat("dd日HH:mm:ss").format(new Date());
deviceInfo = deviceInfo.substring(lengthStr - 300);
recordeU.setTerminalDevice(deviceInfo);
}
/*Date lastTime = sendPoint.getEndTime();
Date startTimeTmp = sendPoint.getStartTime();
if(lastTime != null && startTimeTmp != null && lastTime.compareTo(startTimeTmp) < 1){
recordeU.setEndTime(new Date((startTimeTmp.getTime()+
recordeU.getDuration().intValue()*1000)));
}*/
//XxlJobLogger.log("更新数据={}",JSON.toJSONString(recordeU));
recordeClient.update(recordeU);
} else {
recordeClient.save(sendPoint);
}
} finally {
if (courseExecLockStatus) {
redisDistributedLock.releaseLock(courseLockKey);
}
}
}
}
// 对于存在的学历记录,更新数据
/* if (CollectionUtils.isNotEmpty(upRecorde)) {
XxlJobLogger.log("更新学习记录{}条",upRecorde.size());
List<Recorde> customUp = new ArrayList<>();
for(Recorde itemUp:upRecorde){
Recorde recordeU = new Recorde();
recordeU.setId(itemUp.getId());
recordeU.setEndTime(itemUp.getEndTime());
recordeU.setOvered(itemUp.getOvered());
recordeU.setDuration(itemUp.getDuration());
customUp.add(recordeU);
}
boolean retUp = recordeClient.batchUpdate(customUp);
}
// 新的学习记录新增
if (CollectionUtils.isNotEmpty(newRecorde)) {
XxlJobLogger.log("新增学习记录{}条",newRecorde.size());
recordeClient.saveList(newRecorde);
}*/
// 删除本次解析过的学习日志
boolean flag = studyLogClient.deleteBatch(studyLogs.stream().map(obj -> obj.getId()).collect(Collectors.toList()));
XxlJobLogger.log("删除本批处studyLog记录状态={},条数={}", flag, studyLogs.size());
redisCache.set(REDIS_KEY_MAX, "0");
} else {
XxlJobLogger.log("无学习记录");
}
if (errorData.size() > 0) {
try {
XxlJobLogger.log("写入错误心跳日志数据");
//String fileName = "studyHeatError" + (System.currentTimeMillis()) + ".txt";
String fileName = "studyHeatError" + (DateUtil.format(new Date(), "yyyy-MM-dd-HH-mm-ss")) + ".txt";
//Files.write(Paths.get("/app/logs/"+fileName), JSON.toJSONString(errorData).getBytes());
String lineStr = System.getProperty("line.separator");
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, List<StudyLogVo>> errItem : errorData.entrySet()) {
sb.append(JSON.toJSONString(errItem)).append(lineStr);
}
Files.write(Paths.get("/app/logs/" + fileName), sb.toString().getBytes());
} catch (IOException e) {
XxlJobLogger.log("写入错误心跳日志数据异常");
}
}
Date endDate = new Date();
XxlJobLogger.log("心跳处理结束时间=" + DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss.SSS"));
XxlJobLogger.log("心跳总耗时{}毫秒", endDate.getTime() - execDate.getTime());
} finally {
if (lockStatus) {
if (redisDistributedLock.releaseLock(REDIS_KEY)) {
XxlJobLogger.log(uuid + "批次,解锁成功");
} else {
XxlJobLogger.log(uuid + "批次,解锁失败");
}
}
}
XxlJobLogger.log(uuid + "批次,心跳处理结束时间{}", DateUtil.format(Calendar.getInstance().getTime(), "yyyy-MM-dd HH:mm:ss.SSS"));
}
private List<StudyLogVo> getCourseStudyLogVO(List<StudyLogVo> studyLogVoList) {
List<StudyLogVo> list = new ArrayList<>();
if (null != studyLogVoList && studyLogVoList.size() > 0){
studyLogVoList.forEach(studyLogVo -> {
StudyLogVo courseVO = new StudyLogVo();
BeanUtils.copyProperties(studyLogVo,courseVO);
list.add(courseVO);
});
}
return list;
}
private int getOvered(Map<Long, MaterialEntityVo> materialMap, int duration,
int overed, ChapterEntityVo chapter, int complete, long accountId, long siteId) {
if (complete == 0) {
overed = 1;
} else {
// 查询之前学习的时长
//XxlJobLogger.log("查询历史时长参数为accountId={},siteId={},chapterId="+chapter.getId(),accountId,siteId);
Long countDuration = recordeClient.countStudyDuration(accountId, siteId, chapter.getId());
if (Objects.nonNull(countDuration)) {
duration += countDuration.longValue();
}
MaterialEntityVo material = materialMap.get(chapter.getMaterialId());
if (material == null) {
return -1;
}
int timeLong = 0;
timeLong = getTimeLong(material, timeLong);
if (duration >= timeLong) {
overed = 1;
}
}
return overed;
}
private static int getTimeLong(MaterialEntityVo material, int timeLong) {
Integer textSecond = material.getTextSecond();
if (Objects.nonNull(textSecond)) {
timeLong += textSecond.intValue();
}
Integer textMinute = material.getTextMinute();
if (Objects.nonNull(textMinute)) {
timeLong += textMinute.intValue() * 60;
}
Integer textHour = material.getTextHour();
if (Objects.nonNull(textHour)) {
timeLong += textHour.intValue() * 3600;
}
return timeLong;
}
private static void sortList(List<StudyLogVo> valList) {
valList.sort(new Comparator<StudyLogVo>() {
@Override
public int compare(StudyLogVo o1, StudyLogVo o2) {
/*Long timestamp1 = o1.getTimestamp();
Long timestamp2 = o2.getTimestamp();*/
return o1.getNginxTime().compareTo(o2.getNginxTime());
}
});
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.live.application.feign.LiveReplayClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class LiveJob implements BaseJob {
@Autowired
private LiveReplayClient replayClient;
@Override
public void execute() throws Exception {
try{
XxlJobLogger.log("直播初始化回放视频,开始");
replayClient.insertLiveReplay();
XxlJobLogger.log("直播初始化回放视频,完毕");
} catch(Exception e){
XxlJobLogger.log("直播初始化回放视频,异常", e);
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.system.application.system.remote.OrganizationClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
/**
* @ClassName OrganizationJob
* @Description 定时重置部门索引
* @Author shengchenglong
* @DATE 2019-04-10 14:59
* @Version 1.0
*/
@Component
public class OrganizationJob implements BaseJob {
private static final Logger LOGGER = LoggerFactory.getLogger(OrganizationJob.class);
@Autowired
private OrganizationClient organizationClient;
@Override
public void execute() throws Exception {
// 重置所有企业部门索引
XxlJobLogger.log("----------------开始 重建部门索引---------------------");
organizationClient.rebuildOrgIndex(new ArrayList<>());
XxlJobLogger.log("----------------完成 重建部门索引---------------------");
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.core.application.cache.CacheNamespace;
import com.yizhi.system.application.system.remote.OrganizationClient;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Set;
/**
* @ClassName OrganizationRedisJob
* @Description 定时重置部门索引
* @Author shengchenglong
* @DATE 2019-04-10 14:59
* @Version 1.0
*/
@Component
public class OrganizationRedisJob implements BaseJob {
@Autowired
private OrganizationClient organizationClient;
@Autowired
private RedisTemplate redisTemplate;
@Override
public void execute() throws Exception {
XxlJobLogger.log("----------------开始 redis 队列 重建部门索引 ---------------------");
boolean continueFlag = true;
Set set = null;
Long companyId = null;
try {
while (continueFlag) {
set = redisTemplate.opsForZSet().range(CacheNamespace.ORG_INDEX_INIT_KEY, 0, 10);
if (CollectionUtils.isEmpty(set)) {
continueFlag = false;
} else {
for (Object o : set) {
if (o == null) {
continue;
}
companyId = Long.valueOf(o.toString());
XxlJobLogger.log("----------------开始 redis 队列 重建部门索引 companyId: ---------------------" + companyId);
organizationClient.rebuildOrgIndexJob(companyId);
XxlJobLogger.log("----------------完成 redis 队列 重建部门索引 companyId: ---------------------" + companyId);
redisTemplate.opsForZSet().remove(CacheNamespace.ORG_INDEX_INIT_KEY, o);
}
}
}
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log(e);
throw e;
}
XxlJobLogger.log("----------------结束 redis 队列 重建部门索引 ---------------------");
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.point.application.feign.PointManageFeignClients;
import com.yizhi.statistics.application.feign.StatisticsPointClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.List;
@Component
public class PointDetailsJob implements BaseJob {
@Autowired
private PointManageFeignClients pointManageFeignClients;
@Override
public void execute() throws Exception {
}
public void executeParams(String data) {
XxlJobLogger.log("params:{}", data);
if (data == null) {
XxlJobLogger.log("更新所有用户");
pointManageFeignClients.updatePointDetailsChangeAfter(null,null);
return;
}
if (data.contains(",")) {
String[] params = data.split(",");
Long siteId = Long.valueOf(params[0]);
Long accountId = Long.valueOf(params[1]);
XxlJobLogger.log("更新指定站点:{}/指定用户:{}",siteId,accountId);
pointManageFeignClients.updatePointDetailsChangeAfter(siteId,accountId);
return;
}
Long siteId = Long.valueOf(data);
XxlJobLogger.log("更新指定站点:{}",siteId);
pointManageFeignClients.updatePointDetailsChangeAfter(siteId,null);
return;
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.alibaba.fastjson.JSONObject;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.StatisticsAlbumClientV2;
import com.yizhi.statistics.application.feign.StatisticsPointClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
public class PointJob implements BaseJob {
@Autowired
private StatisticsPointClient statisticsPointClient;
@Override
public void execute() throws Exception {
try{
XxlJobLogger.log("积分任务,启动");
statisticsPointClient.syncPointDetails(null,null,null,null);
XxlJobLogger.log("积分任务,完毕");
} catch(Exception e){
XxlJobLogger.log("积分任务,异常", e);
throw e;
}
}
public void executeParams(String data) {
try{
XxlJobLogger.log("积分任务,启动"+data);
if (!StringUtils.isEmpty(data)){
JSONObject jsonObject = JSONObject.parseObject(data);
String startDate = jsonObject.getString("startDate");
String endDate = jsonObject.getString("endDate");
Long siteId = jsonObject.getLong("siteId");
Long companyId = jsonObject.getLong("companyId");
statisticsPointClient.syncPointDetails(startDate,endDate,companyId,siteId);
}
XxlJobLogger.log("积分任务,完毕");
} catch(Exception e){
XxlJobLogger.log("积分任务,异常", e);
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.research.application.feign.ResearchReportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class ResearchIntoTable implements BaseJob {
private final Logger LOG = LoggerFactory.getLogger(ResearchIntoTable.class);
@Autowired
private ResearchReportClient researchReportClient;
@Override
public void execute() throws Exception {
try {
XxlJobLogger.log("调研自动入表任务,启动");
researchReportClient.asynchronousResearch("", "");
XxlJobLogger.log("调研自动入表任务,完毕");
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("调研自动入表任务,异常");
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.message.application.feign.remote.RemoteJobHandleClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @author hutao
* @create
* @since 1.0.0
*/
@Component
public class SendNewMessageJob implements BaseJob {
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private RemoteJobHandleClient remoteJobHandleClient;
@Override
public void execute() throws Exception {
XxlJobLogger.log("定时任务开始执行发送消息,当前时间:" + format.format(new Date()));
remoteJobHandleClient.sendMessageHandle();
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.sign.application.feign.ReportSignClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SignInToTable implements BaseJob {
private final Logger LOG = LoggerFactory.getLogger(SignInToTable.class);
@Autowired
private ReportSignClient reportSignClient;
@Override
public void execute() throws Exception {
try {
XxlJobLogger.log("签到自动入表任务,启动");
reportSignClient.asynchronousSign("", "");
XxlJobLogger.log("签到自动入表任务,完毕");
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("签到自动入表任务,异常");
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.StatisticsCourseClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class StatisticsCourseStudyTimeUpdateJob implements BaseJob {
@Autowired
private StatisticsCourseClient statisticsCourseClient;
public void executeParam(String params) throws Exception {
try {
String[] strings = params.split(",");
XxlJobLogger.log("课程学习时长指定时间段跑批自动入表任务中" + "params: " + params
+ " strings[] " + strings);
XxlJobLogger.log("课程学习时长指定时间段跑批自动入表任务,启动");
// statisticsCourseClient.AsynchronousCourseTraining(strings[0], strings[1]);
statisticsCourseClient.courseOrgStudyDashboard(strings[0],strings[1]);
XxlJobLogger.log("课程学习时长指定时间段跑批自动入表任务,完毕" + "params: " + params
+ " strings[] " + strings);
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("课程学习时长指定时间段跑批自动入表任务,异常");
throw e;
}
}
@Override
public void execute() throws Exception {
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.StatisticsCourseClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName: StatisticsSystemAccountMakeUpJob
* @author: zjl
* @date: 2020/12/24 11:36
*/
@Component
public class StatisticsSystemAccountMakeUpJob implements BaseJob{
@Autowired
private StatisticsCourseClient statisticsCourseClient;
public void executeParam(String param) throws Exception {
try {
XxlJobLogger.log("公司下用户 手动跑批" + "公司id: " + param);
XxlJobLogger.log("公司下用户 手动跑批,启动");
statisticsCourseClient.assignSiteSystemAccount(Long.valueOf(param));
XxlJobLogger.log("公司下用户 手动跑批,完毕" + "公司id: " + param);
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("公司下用户 手动跑批,异常");
throw e;
}
}
@Override
public void execute() throws Exception {
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.StatisticsCourseClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* 处理发版培训项目数据库 tp_plan_activity,tp_student_project_record,tp_student_plan_record 数据丢失,手动跑批
* @ClassName: StatisticsTrainingMakeUpJob
* @author: zjl
* @date: 2020/11/4 15:59
*/
@Component
public class StatisticsTrainingMakeUpJob implements BaseJob {
@Autowired
private StatisticsCourseClient statisticsCourseClient;
public void executeParam(String params) throws Exception {
try {
String[] strings = params.split(",");
XxlJobLogger.log("培训项目指定时间段跑批自动入表任务中" + "params: " + params
+ " strings[] " + strings);
XxlJobLogger.log("培训项目指定时间段跑批自动入表任务,启动");
statisticsCourseClient.AsynchronousCourseTraining(strings[0], strings[1]);
XxlJobLogger.log("培训项目指定时间段跑批自动入表任务,完毕" + "params: " + params
+ " strings[] " + strings);
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("培训项目指定时间段跑批自动入表任务,异常");
throw e;
}
}
@Override
public void execute() throws Exception {
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.training.application.feign.TrainingProjectReportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class StatisticsTrainingProjectJob implements BaseJob {
private final Logger LOG = LoggerFactory.getLogger(StatisticsTrainingProjectJob.class);
@Autowired
private TrainingProjectReportClient trainingProjectReportClient;
@Override
public void execute() throws Exception {
try {
XxlJobLogger.log("项目自动入表任务,启动");
trainingProjectReportClient.AsynchronousCourse(null,null);
XxlJobLogger.log("项目自动入表任务,完毕");
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("项目自动入表任务,异常");
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.statistics.application.feign.StatisticsCourseClient;
import com.yizhi.statistics.application.feign.WorkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class StatisticsWorkJob implements BaseJob {
@Autowired
private WorkClient workClient;
public void executeParam(String params) {
try {
// 3给内存放可参加人数
Integer type = Integer.valueOf(params);
XxlJobLogger.log("StatisticsWorkJob params:" + params + "======type:" +type);
workClient.dealSystemAndRole(type);
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("ERROR StatisticsWorkJob",e);
}
}
@Override
public void execute() throws Exception {
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.suyinbean.application.feign.SydPointAcctranClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class SydPointAcctranJob implements BaseJob {
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Autowired
private SydPointAcctranClient sydPointAcctranClient;
@Override
public void execute() throws Exception {
XxlJobLogger.log("定时任务开始处理江苏银行苏银豆积分,当前时间:" + format.format(new Date()));
try {
sydPointAcctranClient.AsynchronousPointAcctran(null, null, null, null);
} catch (Exception e) {
e.printStackTrace();
XxlJobLogger.log("处理江苏银行苏银豆积分异常!!!", e);
throw e;
}
}
}
package com.yizhi.xxl.job.executor.service.job;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.training.application.feign.TrainingProjectClient;
import com.yizhi.wechat.application.feign.WeiXinClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
*
* @author lilingye
* @create 2018-6-5 13:32:45
* @since 1.0.0
*/
@Component
public class WechatJob implements BaseJob {
@Autowired
WeiXinClient weiXinClient;
@Autowired
TrainingProjectClient trainingProjectClient;
@Override
public void execute() throws Exception {
XxlJobLogger.log("执行微信access_token和jsapiticket的新增/更新");
weiXinClient.saveAccessTokenToRedis();
XxlJobLogger.log("执行培训项目已学习人数的更新");
trainingProjectClient.getJoinNumber();
}
}
package com.yizhi.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.yizhi.xxl.job.executor.service.job.*;
import io.swagger.models.auth.In;
import org.apache.axis.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName BeanJobCenter
* @Description TODO
* @Author shengchenglong
* @DATE 2020/9/1 18:21
* @Version 1.0
*/
@Component
public class BeanJobCenter {
@Autowired
private OrganizationRedisJob organizationRedisJob;
@Autowired
private WechatJob wechatJob;
@Autowired
private HttpSetjob httpSetjob;
@Autowired
private DashboardJob dashboardJob;
@Autowired
private AddMediaTranscodeJob addMediaTranscodeJob;
@Autowired
private OrganizationJob organizationJob;
@Autowired
private CourseStudyHourRankingJob courseStudyHourRankingJob;
@Autowired
private CourseIntoTableJob courseIntoTableJob;
@Autowired
private SydPointAcctranJob sydPointAcctranJob;
@Autowired
private AlbumUnLockJob albumUnLockJob;
@Autowired
private SendNewMessageJob sendNewMessageJob;
@Autowired
private ExamJob examJob;
@Autowired
private ExamInToTableJob examInToTableJob;
@Autowired
private SignInToTable signInToTable;
@Autowired
private StatisticsTrainingMakeUpJob statisticsTrainingMakeUpJob;
@Autowired
private StatisticsCourseStudyTimeUpdateJob statisticsCourseStudyTimeUpdateJob;
@Autowired
private CourseHandlerJob courseHandlerJob;
@Autowired
private StatisticsWorkJob statisticsWorkJob;
@Autowired
private ChatPracticeJob chatPracticeJob;
@Autowired
private LiveJob liveJob;
@Autowired
private StatisticsSystemAccountMakeUpJob statisticsSystemAccountMakeUpJob;
@Autowired
private AlbumJob albumJob;
@Autowired
private PointJob pointJob;
@Autowired
private PointDetailsJob pointDetailsJob;
// 0 0/5 * * * ?
@XxlJob("organizationRedisJob")
public ReturnT<String> organizationRedisJob(String param) {
try {
organizationRedisJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0/30 * * * ?
@XxlJob("wechatJob")
public ReturnT<String> wechatJob(String param) {
try {
wechatJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0/1 * * * ?
@XxlJob("httpSetjob")
public ReturnT<String> httpSetjob(String param) {
try {
httpSetjob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 * * ?
@XxlJob("dashboardJob")
public ReturnT<String> dashboardJob(String param) {
try {
dashboardJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0/1 * * * ?
@XxlJob("addMediaTranscodeJob")
public ReturnT<String> addMediaTranscodeJob(String param) {
try {
addMediaTranscodeJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 1 * * ?
@XxlJob("organizationJob")
public ReturnT<String> organizationJob(String param) {
try {
organizationJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 4 * * ?
@XxlJob("courseStudyHourRankingJob")
public ReturnT<String> courseStudyHourRankingJob(String param) {
try {
courseStudyHourRankingJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ?
@XxlJob("courseIntoTableJob")
public ReturnT<String> courseIntoTableJob(String param) {
try {
courseIntoTableJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ? *
@XxlJob("sydPointAcctranJob")
public ReturnT<String> sydPointAcctranJob(String param) {
try {
sydPointAcctranJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 0/5 * * ? *
@XxlJob("albumUnLockJob")
public ReturnT<String> albumUnLockJob(String param) {
try {
albumUnLockJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0/5 * * * ? *
@XxlJob("sendNewMessageJob")
public ReturnT<String> sendNewMessageJob(String param) {
try {
sendNewMessageJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 */5 * * * ?
@XxlJob("examJob")
public ReturnT<String> examJob(String param) {
try {
examJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ?
@XxlJob("examInToTableJob")
public ReturnT<String> examInToTableJob(String param) {
try {
examInToTableJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ?
@XxlJob("signInToTableJob")
public ReturnT<String> signInToTableJob(String param) {
try {
signInToTable.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ?
@XxlJob("statisticsTrainingMakeUpJob")
public ReturnT<String> statisticsTrainingMakeUpJob(String param) {
try {
statisticsTrainingMakeUpJob.executeParam(param);
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ?
@XxlJob("statisticsCourseStudyTimeUpdateJob")
public ReturnT<String> statisticsCourseStudyTimeUpdateJob(String param) {
try {
statisticsCourseStudyTimeUpdateJob.executeParam(param);
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
@XxlJob("chatPracticeJob")
public ReturnT<String> chatPracticeJob(String param) {
try {
if (StringUtils.isEmpty(param)) {
chatPracticeJob.execute();
}else {
chatPracticeJob.executeParams(param);
}
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
// 0 0 2 1/1 * ?
@XxlJob("StatisticsSystemAccountMakeUpJob")
public ReturnT<String> statisticsSystemAccountMakeUpJob(String param) {
try {
statisticsSystemAccountMakeUpJob.executeParam(param);
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
@XxlJob("liveJob")
public ReturnT<String> liveJob(String param) {
try {
liveJob.execute();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
@XxlJob("albumJob")
public ReturnT<String> albumJob(String param) {
try {
if (org.springframework.util.StringUtils.isEmpty(param)){
albumJob.execute();
}else {
albumJob.executeParams(param);
}
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
@XxlJob("courseHandler")
public ReturnT<String> courseHandler(String param) {
courseHandlerJob.executeParams(param);
return ReturnT.SUCCESS;
}
@XxlJob("statisticsWorkHandler")
public ReturnT<String> statisticsWorkHandler(String param) {
statisticsWorkJob.executeParam(param);
return ReturnT.SUCCESS;
}
@XxlJob("pointJob")
public ReturnT<String> pointJob(String param) {
try {
if (org.springframework.util.StringUtils.isEmpty(param)){
pointJob.execute();
}else {
pointJob.executeParams(param);
}
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<>(ReturnT.FAIL_CODE, e.getMessage());
}
return ReturnT.SUCCESS;
}
@XxlJob("pointDetailsAfter")
public ReturnT<String> pointDetails(String param) {
try {
pointDetailsJob.executeParams(param);
}catch (Exception e) {
XxlJobLogger.log(e);
return ReturnT.FAIL;
}
return ReturnT.SUCCESS;
}
}
package com.yizhi.xxl.job.executor.service.jobhandler;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.util.ShardingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* XxlJob开发示例(Bean模式)
*
* 开发步骤:
* 1、在Spring Bean实例中,开发Job方法,方式格式要求为 "public ReturnT<String> execute(String param)"
* 2、为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
* 3、执行日志:需要通过 "XxlJobLogger.log" 打印执行日志;
*
* @author xuxueli 2019-12-11 21:52:51
*/
//@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public ReturnT<String> demoJobHandler(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public ReturnT<String> shardingJobHandler(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 业务逻辑
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return ReturnT.SUCCESS;
}
/**
* 3、命令行任务
*/
@XxlJob("commandJobHandler")
public ReturnT<String> commandJobHandler(String param) throws Exception {
String command = param;
int exitValue = -1;
BufferedReader bufferedReader = null;
try {
// command process
Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobLogger.log(line);
}
// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobLogger.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
}
}
/**
* 4、跨平台Http任务
* 参数示例:
* "url: http://www.baidu.com\n" +
* "method: get\n" +
* "data: content\n";
*/
@XxlJob("httpJobHandler")
public ReturnT<String> httpJobHandler(String param) throws Exception {
// param parse
if (param==null || param.trim().length()==0) {
XxlJobLogger.log("param["+ param +"] invalid.");
return ReturnT.FAIL;
}
String[] httpParams = param.split("\n");
String url = null;
String method = null;
String data = null;
for (String httpParam: httpParams) {
if (httpParam.startsWith("url:")) {
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
}
if (httpParam.startsWith("method:")) {
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
}
if (httpParam.startsWith("data:")) {
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
}
}
// param valid
if (url==null || url.trim().length()==0) {
XxlJobLogger.log("url["+ url +"] invalid.");
return ReturnT.FAIL;
}
if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
XxlJobLogger.log("method["+ method +"] invalid.");
return ReturnT.FAIL;
}
// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// do connection
connection.connect();
// data
if (data!=null && data.trim().length()>0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
}
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();
XxlJobLogger.log(responseMsg);
return ReturnT.SUCCESS;
} catch (Exception e) {
XxlJobLogger.log(e);
return ReturnT.FAIL;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobLogger.log(e2);
}
}
}
/**
* 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public ReturnT<String> demoJobHandler2(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
return ReturnT.SUCCESS;
}
public void init(){
logger.info("init");
}
public void destroy(){
logger.info("destory");
}
}
spring.application.name=cloud-xxl-job-executor
server.port=80
ACTIVE=${spring.profiles.active}
spring.profiles.active=dev
# nacos
spring.cloud.nacos.config.shared-dataids=common-${spring.profiles.active}.properties
spring.cloud.nacos.config.namespace=${spring.profiles.active}
spring.cloud.nacos.config.prefix=${spring.application.name}
spring.cloud.nacos.config.file-extension=properties
spring.cloud.nacos.config.server-addr=192.168.1.22:3333,192.168.1.22:4444,192.168.1.22:5555
\ No newline at end of file
package com.yizhi.xxl.job.executor;
import com.yizhi.core.application.cache.CacheNamespace;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner;
import redis.clients.jedis.util.SafeEncoder;
import java.util.Set;
/**
* @ClassName Applicationtest
* @Description TODO
* @Author shengchenglong
* @DATE 2020/9/3 19:29
* @Version 1.0
*/
//@SpringBootTest
//@RunWith(SpringRunner.class)
//@Ignore
public class Applicationtest {
// @Autowired
// private RedisTemplate redisTemplate;
@Test
public void test() {
String params ="3";
System.out.println(Integer.valueOf(params));
// int i = 1;
// while (i < 100) {
// boolean f = redisTemplate.opsForZSet().add(CacheNamespace.ORG_INDEX_INIT_KEY, i, System.currentTimeMillis());
// System.out.println(f);
// i++;
// }
//
// Set set;
//
// int j = 1;
// i = 0;
// while (j < 100) {
// set = redisTemplate.opsForZSet().range(CacheNamespace.ORG_INDEX_INIT_KEY, i, i + 1);
// System.out.println(set);
// i++;
// j++;
// }
//
// i = 1;
// while (i < 100) {
// long res = redisTemplate.opsForZSet().remove(CacheNamespace.ORG_INDEX_INIT_KEY, i);
// System.out.println(res);
// i++;
// }
//
//
// set = redisTemplate.getConnectionFactory().getConnection().zRange(SafeEncoder.encode(CacheNamespace.ORG_INDEX_INIT_KEY), 0, 10);
//
// System.out.println(set);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment