From 7aa7c452a5e66e5a825e7a53947ef6a17c7b86ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?vfqq=E6=A8=8A=E7=90=AA=E7=90=A6?= Date: Fri, 11 Nov 2016 14:49:54 +0800 Subject: [PATCH] OperationResource update and fix group healthy consumer --- .../zeus/executor/impl/TaskExecutorImpl.java | 2 +- .../restful/resource/OperationResource.java | 341 +++++------------- .../queue/consumers/GroupHealthyConsumer.java | 20 + .../service/status/GroupStatusService.java | 3 +- .../status/impl/GroupStatusServiceImpl.java | 13 +- 5 files changed, 114 insertions(+), 265 deletions(-) diff --git a/src/main/java/com/ctrip/zeus/executor/impl/TaskExecutorImpl.java b/src/main/java/com/ctrip/zeus/executor/impl/TaskExecutorImpl.java index 6f7d3e0c..9dda10bd 100644 --- a/src/main/java/com/ctrip/zeus/executor/impl/TaskExecutorImpl.java +++ b/src/main/java/com/ctrip/zeus/executor/impl/TaskExecutorImpl.java @@ -703,7 +703,7 @@ public class TaskExecutorImpl implements TaskExecutor { } } - boolean healthyActivateFlag = healthyOpsActivate.get() && configHandler.getEnable("healthy.operation.active", slbId, null, null, false); + boolean healthyActivateFlag = healthyOpsActivate.get(); Set result = new HashSet<>(); for (String key : memberStatus.keySet()) { List status = memberStatus.get(key); diff --git a/src/main/java/com/ctrip/zeus/restful/resource/OperationResource.java b/src/main/java/com/ctrip/zeus/restful/resource/OperationResource.java index 9c40a9c5..248cf92c 100644 --- a/src/main/java/com/ctrip/zeus/restful/resource/OperationResource.java +++ b/src/main/java/com/ctrip/zeus/restful/resource/OperationResource.java @@ -10,6 +10,8 @@ import com.ctrip.zeus.service.message.queue.MessageQueue; import com.ctrip.zeus.service.message.queue.MessageType; import com.ctrip.zeus.service.model.*; import com.ctrip.zeus.service.query.GroupCriteriaQuery; +import com.ctrip.zeus.service.query.SlbCriteriaQuery; +import com.ctrip.zeus.service.query.VirtualServerCriteriaQuery; import com.ctrip.zeus.service.status.GroupStatusService; import com.ctrip.zeus.service.status.StatusOffset; import com.ctrip.zeus.service.status.StatusService; @@ -69,6 +71,10 @@ public class OperationResource { @Resource private PropertyBox propertyBox; @Resource + private VirtualServerCriteriaQuery virtualServerCriteriaQuery; + @Resource + private SlbCriteriaQuery slbCriteriaQuery; + @Resource private MessageQueue messageQueue; @@ -147,15 +153,6 @@ public class OperationResource { Long[] gids = entityFactory.getGroupIdsByGroupServerIp(serverip, SelectionMode.ONLINE_FIRST); - Set groupIdSet = new HashSet<>(); - groupIdSet.addAll(Arrays.asList(gids)); - List statuses = groupStatusService.getOfflineGroupsStatus(groupIdSet); - - for (GroupStatus gs : statuses) { - addHealthyProperty(gs); - } - - List groups = groupRepository.list(gids); if (groups != null) { @@ -163,10 +160,11 @@ public class OperationResource { ss.addGroupName(group.getName()); } } + String slbMessageData = MessageUtil.getMessageData(request, groups.toArray(new Group[]{}), null, null, new String[]{serverip}, true); if (configHandler.getEnable("use.new,message.queue.producer", false)) { - messageQueue.produceMessage(request.getRequestURI(), null, serverip); + messageQueue.produceMessage(request.getRequestURI(), null, slbMessageData); } else { - messageQueue.produceMessage(MessageType.OpsServer, null, serverip); + messageQueue.produceMessage(MessageType.OpsServer, null, slbMessageData); } return responseHandler.handle(ss, hh.getMediaType()); @@ -189,35 +187,9 @@ public class OperationResource { groupId = groupCriteriaQuery.queryByName(groupName); } } + batch = batch == null ? false : batch; - Group gp = groupRepository.getById(groupId); - if (null != batch && batch.equals(true)) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - _ips.add(gs.getIp()); - } - } else if (ips != null) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - if (!_ips.containsAll(ips)) { - IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE); - if (key.length != 0) { - Group online = groupRepository.getByKey(key[0]); - if (online != null && online.getGroupServers() != null) { - for (GroupServer gs : online.getGroupServers()) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - } - } - } - } - return memberOps(request, hh, groupId, _ips, true, TaskOpsType.MEMBER_OPS); + return memberOps(request, hh, groupId, _ips, batch, true, TaskOpsType.MEMBER_OPS); } @GET @@ -238,35 +210,9 @@ public class OperationResource { } } - Group gp = groupRepository.getById(groupId); - if (null != batch && batch.equals(true)) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - _ips.add(gs.getIp()); - } - } else if (ips != null) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - if (!_ips.containsAll(ips)) { - IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE); - if (key.length != 0) { - Group online = groupRepository.getByKey(key[0]); - if (online != null && online.getGroupServers() != null) { - for (GroupServer gs : online.getGroupServers()) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - } - } - } - } + batch = batch == null ? false : batch; - return memberOps(request, hh, groupId, _ips, false, TaskOpsType.MEMBER_OPS); + return memberOps(request, hh, groupId, _ips, batch, false, TaskOpsType.MEMBER_OPS); } @GET @@ -286,34 +232,8 @@ public class OperationResource { groupId = groupCriteriaQuery.queryByName(groupName); } } - Group gp = groupRepository.getById(groupId); - if (null != batch && batch.equals(true)) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - _ips.add(gs.getIp()); - } - } else if (ips != null) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - if (!_ips.containsAll(ips)) { - IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE); - if (key.length != 0) { - Group online = groupRepository.getByKey(key[0]); - if (online != null && online.getGroupServers() != null) { - for (GroupServer gs : online.getGroupServers()) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - } - } - } - } - return memberOps(request, hh, groupId, _ips, true, TaskOpsType.PULL_MEMBER_OPS); + batch = batch == null ? false : batch; + return memberOps(request, hh, groupId, _ips, batch, true, TaskOpsType.PULL_MEMBER_OPS); } @GET @@ -333,34 +253,8 @@ public class OperationResource { groupId = groupCriteriaQuery.queryByName(groupName); } } - Group gp = groupRepository.getById(groupId); - if (null != batch && batch.equals(true)) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - _ips.add(gs.getIp()); - } - } else if (ips != null) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - if (!_ips.containsAll(ips)) { - IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE); - if (key.length != 0) { - Group online = groupRepository.getByKey(key[0]); - if (online != null && online.getGroupServers() != null) { - for (GroupServer gs : online.getGroupServers()) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - } - } - } - } - return memberOps(request, hh, groupId, _ips, false, TaskOpsType.PULL_MEMBER_OPS); + batch = batch == null ? false : batch; + return memberOps(request, hh, groupId, _ips, batch, false, TaskOpsType.PULL_MEMBER_OPS); } @GET @@ -384,38 +278,10 @@ public class OperationResource { if (gp == null) { throw new ValidationException("Group Id or Name not found!"); } - if (null != batch && batch.equals(true)) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - _ips.add(gs.getIp()); - } - } else if (ips != null) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - if (!_ips.containsAll(ips)) { - IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE); - if (key.length != 0) { - Group online = groupRepository.getByKey(key[0]); - if (online != null && online.getGroupServers() != null) { - for (GroupServer gs : online.getGroupServers()) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - } - } - } - } - if (_ips.size() == 0) { - throw new ValidationException("Not found ip in group.GroupId:" + groupId + " ip:" + ips.toString()); - } + batch = batch == null ? false : batch; if (healthyOpsActivate.get()) { - return memberOps(request, hh, groupId, _ips, true, TaskOpsType.HEALTHY_OPS); + return memberOps(request, hh, groupId, _ips, batch, true, TaskOpsType.HEALTHY_OPS); } else { return healthyOps(hh, groupId, _ips, true); } @@ -438,42 +304,9 @@ public class OperationResource { groupId = groupCriteriaQuery.queryByName(groupName); } } - Group gp = groupRepository.getById(groupId); - if (gp == null) { - throw new ValidationException("Group Id or Name not found!"); - } - if (null != batch && batch.equals(true)) { - - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - _ips.add(gs.getIp()); - } - } else if (ips != null) { - List servers = gp.getGroupServers(); - for (GroupServer gs : servers) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - if (!_ips.containsAll(ips)) { - IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE); - if (key.length != 0) { - Group online = groupRepository.getByKey(key[0]); - if (online != null && online.getGroupServers() != null) { - for (GroupServer gs : online.getGroupServers()) { - if (ips.contains(gs.getIp())) { - _ips.add(gs.getIp()); - } - } - } - } - } - } - if (_ips.size() == 0) { - throw new ValidationException("Not found ip in group.GroupId:" + groupId + " ip:" + ips.toString()); - } + batch = batch == null ? false : batch; if (healthyOpsActivate.get()) { - return memberOps(request, hh, groupId, _ips, false, TaskOpsType.HEALTHY_OPS); + return memberOps(request, hh, groupId, _ips, batch, false, TaskOpsType.HEALTHY_OPS); } else { return healthyOps(hh, groupId, _ips, false); } @@ -499,37 +332,78 @@ public class OperationResource { return responseHandler.handle(groupStatusService.getOfflineGroupStatus(groupId), hh.getMediaType()); } - private Response memberOps(HttpServletRequest request, HttpHeaders hh, Long groupId, List ips, boolean up, String type) throws Exception { - Map> status = statusService.fetchGroupServerStatus(new Long[]{groupId}); - boolean skipOps = true; - for (String ip : ips) { - int index = 0; - if (type.equals(TaskOpsType.HEALTHY_OPS)) index = StatusOffset.HEALTHY; - if (type.equals(TaskOpsType.PULL_MEMBER_OPS)) index = StatusOffset.PULL_OPS; - if (type.equals(TaskOpsType.MEMBER_OPS)) index = StatusOffset.MEMBER_OPS; - boolean preStatus = status.get(groupId.toString() + "_" + ip).get(index); - if (preStatus != up) { - skipOps = false; + private Response memberOps(HttpServletRequest request, HttpHeaders hh, Long groupId, List memberIps, boolean batch, boolean up, String type) throws Exception { + + ModelStatusMapping groupMap = entityFactory.getGroupsByIds(new Long[]{groupId}); + if (groupMap.getOfflineMapping() == null || groupMap.getOfflineMapping().size() == 0) { + throw new ValidationException("Not Found Group By Id."); + } + + Set groupMemberIps = new HashSet<>(); + for (GroupServer gs : groupMap.getOfflineMapping().get(groupId).getGroupServers()) { + groupMemberIps.add(gs.getIp()); + } + if (groupMap.getOnlineMapping().get(groupId) != null) { + for (GroupServer gs : groupMap.getOnlineMapping().get(groupId).getGroupServers()) { + groupMemberIps.add(gs.getIp()); } } + + List ips = new ArrayList<>(); + if (batch) { + Group gp = groupMap.getOfflineMapping().get(groupId); + List servers = gp.getGroupServers(); + for (GroupServer gs : servers) { + ips.add(gs.getIp()); + } + } else if (memberIps != null && memberIps.size() > 0) { + for (String ip : memberIps) { + if (groupMemberIps.contains(ip)) { + ips.addAll(memberIps); + } + } + } + if (ips.size() == 0) { + throw new ValidationException("Ip Param Is Null Or Invalidate Ip Param."); + } + + List statuses = groupStatusService.getOfflineGroupsStatus(groupMap); + GroupStatus status = null; + if (statuses.size() > 0) { + status = statuses.get(0); + } + + boolean skipOps = false; + if (status != null) { + skipOps = true; + for (GroupServerStatus gss : status.getGroupServerStatuses()) { + if (ips.contains(gss.getIp())) { + if (type.equals(TaskOpsType.HEALTHY_OPS) && gss.getHealthy() != up) { + skipOps = false; + } else if (type.equals(TaskOpsType.PULL_MEMBER_OPS) && gss.getPull() != up) { + skipOps = false; + } else if (type.equals(TaskOpsType.MEMBER_OPS) && gss.getMember() != up) { + skipOps = false; + } + } + } + } + if (skipOps) { GroupStatus groupStatus = groupStatusService.getOfflineGroupStatus(groupId); logger.info("Group status equals the desired value.Do not need execute task.GroupId:" + groupId + " ips:" + ips.toString() + " up:" + up + " type:" + type); return responseHandler.handle(groupStatus, hh.getMediaType()); } + StringBuilder sb = new StringBuilder(); for (String ip : ips) { sb.append(ip).append(";"); } - ModelStatusMapping mapping = entityFactory.getGroupsByIds(new Long[]{groupId}); - if (mapping.getOfflineMapping() == null || mapping.getOfflineMapping().size() == 0) { - throw new ValidationException("Not Found Group By Id."); - } - Group onlineGroup = mapping.getOnlineMapping().get(groupId); - Group offlineGroup = mapping.getOfflineMapping().get(groupId); + + Group onlineGroup = groupMap.getOnlineMapping().get(groupId); + Group offlineGroup = groupMap.getOfflineMapping().get(groupId); Set vsIds = new HashSet<>(); - Set slbIds = new HashSet<>(); if (onlineGroup != null) { for (GroupVirtualServer gvs : onlineGroup.getGroupVirtualServers()) { vsIds.add(gvs.getVirtualServer().getId()); @@ -539,25 +413,8 @@ public class OperationResource { vsIds.add(gvs.getVirtualServer().getId()); } - ModelStatusMapping vsMaping = entityFactory.getVsesByIds(vsIds.toArray(new Long[]{})); - - VirtualServer tmp; - for (Long vsId : vsIds) { - tmp = vsMaping.getOnlineMapping().get(vsId); - if (tmp == null) { - tmp = vsMaping.getOfflineMapping().get(vsId); - } - slbIds.addAll(tmp.getSlbIds()); - } - //TODO flag for Healthy ops - if (type.equals(TaskOpsType.HEALTHY_OPS)) { - for (Long slbId : slbIds) { - if (!configHandler.getEnable("healthy.operation.active", slbId, null, null, false)) { - logger.info("healthy.operation.active is false. slbId:" + slbId); - return healthyOps(hh, groupId, ips, up); - } - } - } + Set vsIdVersions = virtualServerCriteriaQuery.queryByIdsAndMode(vsIds.toArray(new Long[]{}), SelectionMode.ONLINE_FIRST); + Set slbIds = slbCriteriaQuery.queryByVses(vsIdVersions.toArray(new IdVersion[]{})); List tasks = new ArrayList<>(); for (Long slbId : slbIds) { @@ -577,7 +434,6 @@ public class OperationResource { } } GroupStatus groupStatus = groupStatusService.getOfflineGroupStatus(groupId); - addHealthyProperty(groupStatus); String slbMessageData = MessageUtil.getMessageData(request, new Group[]{offlineGroup}, null, null, ips.toArray(new String[ips.size()]), true); if (configHandler.getEnable("use.new,message.queue.producer", false)) { @@ -591,40 +447,9 @@ public class OperationResource { messageQueue.produceMessage(MessageType.OpsMember, groupId, slbMessageData); } } - - return responseHandler.handle(groupStatus, hh.getMediaType()); } - private void addHealthyProperty(GroupStatus gs) throws Exception { - boolean health = true; - boolean unhealth = true; - for (GroupServerStatus gss : gs.getGroupServerStatuses()) { - if (gss.getServer() && gss.getHealthy() && gss.getPull() && gss.getMember()) { - unhealth = false; - } else { - health = false; - } - } - if (health) { - propertyBox.set("healthy", "health", "group", gs.getGroupId()); - } else if (unhealth) { - propertyBox.set("healthy", "unhealth", "group", gs.getGroupId()); - } else { - propertyBox.set("healthy", "sub-health", "group", gs.getGroupId()); - } - } - - @GET - @Path("/health/fillData") - public Response healthFillData(@Context HttpServletRequest request, - @Context HttpHeaders hh) throws Exception { - List list = groupStatusService.getAllOfflineGroupsStatus(); - for (GroupStatus gs : list) { - addHealthyProperty(gs); - } - return responseHandler.handle("Fill Data Success.", hh.getMediaType()); - } - } + diff --git a/src/main/java/com/ctrip/zeus/service/message/queue/consumers/GroupHealthyConsumer.java b/src/main/java/com/ctrip/zeus/service/message/queue/consumers/GroupHealthyConsumer.java index d2fa6a9d..f4e01fdc 100644 --- a/src/main/java/com/ctrip/zeus/service/message/queue/consumers/GroupHealthyConsumer.java +++ b/src/main/java/com/ctrip/zeus/service/message/queue/consumers/GroupHealthyConsumer.java @@ -39,6 +39,26 @@ public class GroupHealthyConsumer extends AbstractConsumer { addHealthProperty(messages); } + @Override + public void onOpsPull(List messages) { + addHealthProperty(messages); + } + + @Override + public void onOpsMember(List messages) { + addHealthProperty(messages); + } + + @Override + public void onOpsServer(List messages) { + addHealthProperty(messages); + } + + @Override + public void onOpsHealthy(List messages) { + addHealthProperty(messages); + } + protected void addHealthProperty(List messages) { try { Set groupIds = new HashSet<>(); diff --git a/src/main/java/com/ctrip/zeus/service/status/GroupStatusService.java b/src/main/java/com/ctrip/zeus/service/status/GroupStatusService.java index fe5ae289..12b7260e 100644 --- a/src/main/java/com/ctrip/zeus/service/status/GroupStatusService.java +++ b/src/main/java/com/ctrip/zeus/service/status/GroupStatusService.java @@ -2,6 +2,7 @@ package com.ctrip.zeus.service.status; import com.ctrip.zeus.model.entity.Group; import com.ctrip.zeus.service.model.IdVersion; +import com.ctrip.zeus.service.model.ModelStatusMapping; import com.ctrip.zeus.status.entity.GroupServerStatus; import com.ctrip.zeus.status.entity.GroupStatus; import com.ctrip.zeus.status.entity.GroupStatusList; @@ -92,5 +93,5 @@ public interface GroupStatusService { * @return status list * @throws Exception */ - List getOfflineGroupsStatus(Map groups, Map onlineGroups) throws Exception; + List getOfflineGroupsStatus(ModelStatusMapping groupMap) throws Exception; } diff --git a/src/main/java/com/ctrip/zeus/service/status/impl/GroupStatusServiceImpl.java b/src/main/java/com/ctrip/zeus/service/status/impl/GroupStatusServiceImpl.java index 67dcef36..3dacbc7a 100644 --- a/src/main/java/com/ctrip/zeus/service/status/impl/GroupStatusServiceImpl.java +++ b/src/main/java/com/ctrip/zeus/service/status/impl/GroupStatusServiceImpl.java @@ -89,7 +89,7 @@ public class GroupStatusServiceImpl implements GroupStatusService { if (groups.getOfflineMapping() == null || groups.getOfflineMapping().size() == 0) { return result; } - result = getOfflineGroupsStatus(groups.getOfflineMapping(), groups.getOnlineMapping()); + result = getOfflineGroupsStatus(groups); return result; } @@ -115,7 +115,7 @@ public class GroupStatusServiceImpl implements GroupStatusService { return result; } - List list = getOfflineGroupsStatus(groupMap.getOfflineMapping(), groupMap.getOnlineMapping()); + List list = getOfflineGroupsStatus(groupMap); if (list.size() > 0) { result = list.get(0); } @@ -129,7 +129,7 @@ public class GroupStatusServiceImpl implements GroupStatusService { if (map.getOfflineMapping() == null || map.getOfflineMapping().size() == 0) { return result; } - result = getOfflineGroupsStatus(map.getOfflineMapping(), map.getOnlineMapping()); + result = getOfflineGroupsStatus(map); return result; } @@ -160,7 +160,7 @@ public class GroupStatusServiceImpl implements GroupStatusService { boolean serverUp = !allDownServers.contains(gs.getIp()); boolean pullIn = memberStatus.get(key).get(StatusOffset.PULL_OPS); boolean raise = memberStatus.get(key).get(StatusOffset.HEALTHY); - boolean up = memberUp && serverUp && pullIn && raise; + boolean up = memberUp && serverUp && pullIn && raise; groupServerStatus.setServer(serverUp); groupServerStatus.setMember(memberUp); @@ -176,9 +176,12 @@ public class GroupStatusServiceImpl implements GroupStatusService { } @Override - public List getOfflineGroupsStatus(Map groups, Map onlineGroups) throws Exception { + public List getOfflineGroupsStatus(ModelStatusMapping groupMap) throws Exception { List res = new ArrayList<>(); + Map groups = groupMap.getOfflineMapping(); + Map onlineGroups = groupMap.getOnlineMapping(); + Map> memberStatus = statusService.fetchGroupServerStatus(groups.keySet().toArray(new Long[]{})); Set allDownServers = statusService.findAllDownServers();