This commit is contained in:
fanqq 2015-07-30 10:24:49 +08:00
parent 84974edb87
commit e4e0c779c3
5 changed files with 108 additions and 6 deletions

View file

@ -3,7 +3,7 @@ package com.ctrip.zeus.service.activate.impl;
import com.ctrip.zeus.dal.core.*;
import com.ctrip.zeus.model.entity.Archive;
import com.ctrip.zeus.model.entity.Group;
import com.ctrip.zeus.model.entity.GroupSlb;
import com.ctrip.zeus.model.entity.GroupVirtualServer;
import com.ctrip.zeus.model.transform.DefaultSaxParser;
import com.ctrip.zeus.service.activate.ActivateService;
import com.ctrip.zeus.service.model.ArchiveService;
@ -77,11 +77,11 @@ public class ActivateServiceImpl implements ActivateService {
confGroupSlbActiveDao.deleteByGroupId(new ConfGroupSlbActiveDo().setGroupId(groupId));
for (GroupSlb groupSlb:group.getGroupSlbs())
for (GroupVirtualServer groupSlb:group.getGroupVirtualServers())
{
confGroupSlbActiveDao.insert(new ConfGroupSlbActiveDo().setGroupId(groupId)
.setPriority(groupSlb.getPriority())
.setSlbId(groupSlb.getSlbId()).setDataChangeLastTime(new Date())
.setSlbId(groupSlb.getVirtualServer().getSlbId()).setDataChangeLastTime(new Date())
.setSlbVirtualServerId(groupSlb.getVirtualServer().getId()));
}

View file

@ -3,6 +3,8 @@ package com.ctrip.zeus.service.task.impl;
import com.ctrip.zeus.dal.core.TaskDao;
import com.ctrip.zeus.dal.core.TaskDo;
import com.ctrip.zeus.dal.core.TaskEntity;
import com.ctrip.zeus.lock.DbLockFactory;
import com.ctrip.zeus.lock.DistLock;
import com.ctrip.zeus.service.task.TaskService;
import com.ctrip.zeus.support.C;
import com.ctrip.zeus.task.entity.OpsTask;
@ -23,15 +25,32 @@ import java.util.List;
public class TaskServiceImpl implements TaskService {
private static final DynamicIntProperty taskCheckStatusInterval = DynamicPropertyFactory.getInstance().getIntProperty("task.check.status.interval", 200);
private static DynamicIntProperty lockTimeout = DynamicPropertyFactory.getInstance().getIntProperty("lock.timeout", 5000);
@Resource
private TaskDao taskDao;
@Resource
private DbLockFactory dbLockFactory;
@Override
public Long add(OpsTask task) throws Exception {
TaskDo taskDo = C.toTaskDo(task);
taskDo.setStatus("Pending");
taskDao.insert(taskDo);
String lockName = null;
if ( taskDo.getOpsType().equals("ActivateSlb")){
lockName = "AddTask_" + taskDo.getOpsType() + taskDo.getSlbId();
}else if (taskDo.getOpsType().equals("ServerOps")){
lockName = "AddTask_" + taskDo.getOpsType();
}else {
lockName = "AddTask_" + taskDo.getOpsType() + taskDo.getGroupId();
}
DistLock buildLock = dbLockFactory.newLock( lockName );
try {
buildLock.lock(lockTimeout.get());
taskDao.insert(taskDo);
}finally {
buildLock.unlock();
}
return taskDo.getId();
}

View file

@ -0,0 +1,8 @@
package com.ctrip.zeus.service.woker;
/**
* Created by fanqq on 2015/7/29.
*/
public interface TaskExecutor {
void execute(Long slbId);
}

View file

@ -0,0 +1,60 @@
package com.ctrip.zeus.service.woker.impl;
import com.ctrip.zeus.lock.DbLockFactory;
import com.ctrip.zeus.lock.DistLock;
import com.ctrip.zeus.service.task.TaskService;
import com.ctrip.zeus.service.woker.TaskExecutor;
import com.ctrip.zeus.task.entity.OpsTask;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* Created by fanqq on 2015/7/29.
*/
@Component("taskExecutor")
public class TaskExecutorImpl implements TaskExecutor {
@Resource
private DbLockFactory dbLockFactory;
@Resource
private TaskService taskService;
private static DynamicIntProperty lockTimeout = DynamicPropertyFactory.getInstance().getIntProperty("lock.timeout", 5000);
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void execute(Long slbId) {
DistLock buildLock = dbLockFactory.newLock( "TaskWorker_" + slbId );
try {
buildLock.lock(lockTimeout.get());
executeJob(slbId);
}catch (Exception e){
logger.warn("TaskWorker get lock failed! TaskWorker: "+slbId);
} finally{
buildLock.unlock();
}
}
private void executeJob(Long slbId){
//1. get pending tasks , if size == 0 return
List<OpsTask> tasks = null;
try {
tasks = taskService.getPendingTasks(slbId);
}catch (Exception e){
logger.warn("Task Executor get pending tasks failed! ", e);
return;
}
if (tasks.size()==0) return;
//2. get all tasks datas
}
}

View file

@ -2,8 +2,11 @@ package com.ctrip.zeus.service.woker.impl;
import com.ctrip.zeus.model.entity.Slb;
import com.ctrip.zeus.service.model.SlbRepository;
import com.ctrip.zeus.service.woker.TaskExecutor;
import com.ctrip.zeus.service.woker.TaskWorker;
import com.ctrip.zeus.util.S;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@ -16,19 +19,31 @@ public class TaskWorkerImpl implements TaskWorker {
private static Long workerSlbId = null;
@Resource
SlbRepository slbRepository;
@Resource
TaskExecutor taskExecutor;
Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void execute() {
try {
//1. init
init();
}catch (Exception e ){
//2. execute
taskExecutor.execute(workerSlbId);
}catch (Exception e ){
logger.error("Task Worker Execute Fail."+e.getMessage(),e);
}
}
private void init()throws Exception{
if (workerSlbId != null) return;
Slb slb = slbRepository.getBySlbServer(S.getIp());
if (slb == null){
logger.error("Can Not Found Slb by Local Ip. TaskExecutor is not working!");
return;
}
workerSlbId = slb.getId();
}
}