diff --git a/src/main/java/com/ctrip/zeus/service/activate/impl/ActivateServiceImpl.java b/src/main/java/com/ctrip/zeus/service/activate/impl/ActivateServiceImpl.java index aad51ba6..074ce5bc 100644 --- a/src/main/java/com/ctrip/zeus/service/activate/impl/ActivateServiceImpl.java +++ b/src/main/java/com/ctrip/zeus/service/activate/impl/ActivateServiceImpl.java @@ -3,7 +3,7 @@ package com.ctrip.zeus.service.activate.impl; import com.ctrip.zeus.dal.core.*; import com.ctrip.zeus.model.entity.Archive; import com.ctrip.zeus.model.entity.Group; -import com.ctrip.zeus.model.entity.GroupSlb; +import com.ctrip.zeus.model.entity.GroupVirtualServer; import com.ctrip.zeus.model.transform.DefaultSaxParser; import com.ctrip.zeus.service.activate.ActivateService; import com.ctrip.zeus.service.model.ArchiveService; @@ -77,11 +77,11 @@ public class ActivateServiceImpl implements ActivateService { confGroupSlbActiveDao.deleteByGroupId(new ConfGroupSlbActiveDo().setGroupId(groupId)); - for (GroupSlb groupSlb:group.getGroupSlbs()) + for (GroupVirtualServer groupSlb:group.getGroupVirtualServers()) { confGroupSlbActiveDao.insert(new ConfGroupSlbActiveDo().setGroupId(groupId) .setPriority(groupSlb.getPriority()) - .setSlbId(groupSlb.getSlbId()).setDataChangeLastTime(new Date()) + .setSlbId(groupSlb.getVirtualServer().getSlbId()).setDataChangeLastTime(new Date()) .setSlbVirtualServerId(groupSlb.getVirtualServer().getId())); } diff --git a/src/main/java/com/ctrip/zeus/service/task/impl/TaskServiceImpl.java b/src/main/java/com/ctrip/zeus/service/task/impl/TaskServiceImpl.java index 6c329b91..491e3799 100644 --- a/src/main/java/com/ctrip/zeus/service/task/impl/TaskServiceImpl.java +++ b/src/main/java/com/ctrip/zeus/service/task/impl/TaskServiceImpl.java @@ -3,6 +3,8 @@ package com.ctrip.zeus.service.task.impl; import com.ctrip.zeus.dal.core.TaskDao; import com.ctrip.zeus.dal.core.TaskDo; import com.ctrip.zeus.dal.core.TaskEntity; +import com.ctrip.zeus.lock.DbLockFactory; +import com.ctrip.zeus.lock.DistLock; import com.ctrip.zeus.service.task.TaskService; import com.ctrip.zeus.support.C; import com.ctrip.zeus.task.entity.OpsTask; @@ -23,15 +25,32 @@ import java.util.List; public class TaskServiceImpl implements TaskService { private static final DynamicIntProperty taskCheckStatusInterval = DynamicPropertyFactory.getInstance().getIntProperty("task.check.status.interval", 200); + private static DynamicIntProperty lockTimeout = DynamicPropertyFactory.getInstance().getIntProperty("lock.timeout", 5000); @Resource private TaskDao taskDao; + @Resource + private DbLockFactory dbLockFactory; @Override public Long add(OpsTask task) throws Exception { TaskDo taskDo = C.toTaskDo(task); taskDo.setStatus("Pending"); - taskDao.insert(taskDo); + String lockName = null; + if ( taskDo.getOpsType().equals("ActivateSlb")){ + lockName = "AddTask_" + taskDo.getOpsType() + taskDo.getSlbId(); + }else if (taskDo.getOpsType().equals("ServerOps")){ + lockName = "AddTask_" + taskDo.getOpsType(); + }else { + lockName = "AddTask_" + taskDo.getOpsType() + taskDo.getGroupId(); + } + DistLock buildLock = dbLockFactory.newLock( lockName ); + try { + buildLock.lock(lockTimeout.get()); + taskDao.insert(taskDo); + }finally { + buildLock.unlock(); + } return taskDo.getId(); } diff --git a/src/main/java/com/ctrip/zeus/service/woker/TaskExecutor.java b/src/main/java/com/ctrip/zeus/service/woker/TaskExecutor.java new file mode 100644 index 00000000..b6b759b6 --- /dev/null +++ b/src/main/java/com/ctrip/zeus/service/woker/TaskExecutor.java @@ -0,0 +1,8 @@ +package com.ctrip.zeus.service.woker; + +/** + * Created by fanqq on 2015/7/29. + */ +public interface TaskExecutor { + void execute(Long slbId); +} diff --git a/src/main/java/com/ctrip/zeus/service/woker/impl/TaskExecutorImpl.java b/src/main/java/com/ctrip/zeus/service/woker/impl/TaskExecutorImpl.java new file mode 100644 index 00000000..2ae3f429 --- /dev/null +++ b/src/main/java/com/ctrip/zeus/service/woker/impl/TaskExecutorImpl.java @@ -0,0 +1,60 @@ +package com.ctrip.zeus.service.woker.impl; + +import com.ctrip.zeus.lock.DbLockFactory; +import com.ctrip.zeus.lock.DistLock; +import com.ctrip.zeus.service.task.TaskService; +import com.ctrip.zeus.service.woker.TaskExecutor; +import com.ctrip.zeus.task.entity.OpsTask; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.List; + +/** + * Created by fanqq on 2015/7/29. + */ +@Component("taskExecutor") +public class TaskExecutorImpl implements TaskExecutor { + + @Resource + private DbLockFactory dbLockFactory; + @Resource + private TaskService taskService; + + private static DynamicIntProperty lockTimeout = DynamicPropertyFactory.getInstance().getIntProperty("lock.timeout", 5000); + Logger logger = LoggerFactory.getLogger(this.getClass()); + + + @Override + public void execute(Long slbId) { + DistLock buildLock = dbLockFactory.newLock( "TaskWorker_" + slbId ); + try { + buildLock.lock(lockTimeout.get()); + executeJob(slbId); + }catch (Exception e){ + logger.warn("TaskWorker get lock failed! TaskWorker: "+slbId); + } finally{ + buildLock.unlock(); + } + } + + private void executeJob(Long slbId){ + //1. get pending tasks , if size == 0 return + List tasks = null; + try { + tasks = taskService.getPendingTasks(slbId); + }catch (Exception e){ + logger.warn("Task Executor get pending tasks failed! ", e); + return; + } + if (tasks.size()==0) return; + + //2. get all tasks datas + + + } +} diff --git a/src/main/java/com/ctrip/zeus/service/woker/impl/TaskWorkerImpl.java b/src/main/java/com/ctrip/zeus/service/woker/impl/TaskWorkerImpl.java index 935e4a9f..003b86aa 100644 --- a/src/main/java/com/ctrip/zeus/service/woker/impl/TaskWorkerImpl.java +++ b/src/main/java/com/ctrip/zeus/service/woker/impl/TaskWorkerImpl.java @@ -2,8 +2,11 @@ package com.ctrip.zeus.service.woker.impl; import com.ctrip.zeus.model.entity.Slb; import com.ctrip.zeus.service.model.SlbRepository; +import com.ctrip.zeus.service.woker.TaskExecutor; import com.ctrip.zeus.service.woker.TaskWorker; import com.ctrip.zeus.util.S; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -16,19 +19,31 @@ public class TaskWorkerImpl implements TaskWorker { private static Long workerSlbId = null; @Resource SlbRepository slbRepository; + @Resource + TaskExecutor taskExecutor; + Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void execute() { try { + //1. init init(); - }catch (Exception e ){ + //2. execute + taskExecutor.execute(workerSlbId); + + }catch (Exception e ){ + logger.error("Task Worker Execute Fail."+e.getMessage(),e); } } private void init()throws Exception{ - + if (workerSlbId != null) return; Slb slb = slbRepository.getBySlbServer(S.getIp()); + if (slb == null){ + logger.error("Can Not Found Slb by Local Ip. TaskExecutor is not working!"); + return; + } workerSlbId = slb.getId(); } }