OperationResource update and fix group healthy consumer

This commit is contained in:
vfqq樊琪琦 2016-11-11 14:49:54 +08:00
parent a9f95980e0
commit 7aa7c452a5
5 changed files with 114 additions and 265 deletions

View file

@ -703,7 +703,7 @@ public class TaskExecutorImpl implements TaskExecutor {
}
}
boolean healthyActivateFlag = healthyOpsActivate.get() && configHandler.getEnable("healthy.operation.active", slbId, null, null, false);
boolean healthyActivateFlag = healthyOpsActivate.get();
Set<String> result = new HashSet<>();
for (String key : memberStatus.keySet()) {
List<Boolean> status = memberStatus.get(key);

View file

@ -10,6 +10,8 @@ import com.ctrip.zeus.service.message.queue.MessageQueue;
import com.ctrip.zeus.service.message.queue.MessageType;
import com.ctrip.zeus.service.model.*;
import com.ctrip.zeus.service.query.GroupCriteriaQuery;
import com.ctrip.zeus.service.query.SlbCriteriaQuery;
import com.ctrip.zeus.service.query.VirtualServerCriteriaQuery;
import com.ctrip.zeus.service.status.GroupStatusService;
import com.ctrip.zeus.service.status.StatusOffset;
import com.ctrip.zeus.service.status.StatusService;
@ -69,6 +71,10 @@ public class OperationResource {
@Resource
private PropertyBox propertyBox;
@Resource
private VirtualServerCriteriaQuery virtualServerCriteriaQuery;
@Resource
private SlbCriteriaQuery slbCriteriaQuery;
@Resource
private MessageQueue messageQueue;
@ -147,15 +153,6 @@ public class OperationResource {
Long[] gids = entityFactory.getGroupIdsByGroupServerIp(serverip, SelectionMode.ONLINE_FIRST);
Set<Long> groupIdSet = new HashSet<>();
groupIdSet.addAll(Arrays.asList(gids));
List<GroupStatus> statuses = groupStatusService.getOfflineGroupsStatus(groupIdSet);
for (GroupStatus gs : statuses) {
addHealthyProperty(gs);
}
List<Group> groups = groupRepository.list(gids);
if (groups != null) {
@ -163,10 +160,11 @@ public class OperationResource {
ss.addGroupName(group.getName());
}
}
String slbMessageData = MessageUtil.getMessageData(request, groups.toArray(new Group[]{}), null, null, new String[]{serverip}, true);
if (configHandler.getEnable("use.new,message.queue.producer", false)) {
messageQueue.produceMessage(request.getRequestURI(), null, serverip);
messageQueue.produceMessage(request.getRequestURI(), null, slbMessageData);
} else {
messageQueue.produceMessage(MessageType.OpsServer, null, serverip);
messageQueue.produceMessage(MessageType.OpsServer, null, slbMessageData);
}
return responseHandler.handle(ss, hh.getMediaType());
@ -189,35 +187,9 @@ public class OperationResource {
groupId = groupCriteriaQuery.queryByName(groupName);
}
}
batch = batch == null ? false : batch;
Group gp = groupRepository.getById(groupId);
if (null != batch && batch.equals(true)) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
_ips.add(gs.getIp());
}
} else if (ips != null) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
if (!_ips.containsAll(ips)) {
IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE);
if (key.length != 0) {
Group online = groupRepository.getByKey(key[0]);
if (online != null && online.getGroupServers() != null) {
for (GroupServer gs : online.getGroupServers()) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
}
}
}
}
return memberOps(request, hh, groupId, _ips, true, TaskOpsType.MEMBER_OPS);
return memberOps(request, hh, groupId, _ips, batch, true, TaskOpsType.MEMBER_OPS);
}
@GET
@ -238,35 +210,9 @@ public class OperationResource {
}
}
Group gp = groupRepository.getById(groupId);
if (null != batch && batch.equals(true)) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
_ips.add(gs.getIp());
}
} else if (ips != null) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
if (!_ips.containsAll(ips)) {
IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE);
if (key.length != 0) {
Group online = groupRepository.getByKey(key[0]);
if (online != null && online.getGroupServers() != null) {
for (GroupServer gs : online.getGroupServers()) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
}
}
}
}
batch = batch == null ? false : batch;
return memberOps(request, hh, groupId, _ips, false, TaskOpsType.MEMBER_OPS);
return memberOps(request, hh, groupId, _ips, batch, false, TaskOpsType.MEMBER_OPS);
}
@GET
@ -286,34 +232,8 @@ public class OperationResource {
groupId = groupCriteriaQuery.queryByName(groupName);
}
}
Group gp = groupRepository.getById(groupId);
if (null != batch && batch.equals(true)) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
_ips.add(gs.getIp());
}
} else if (ips != null) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
if (!_ips.containsAll(ips)) {
IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE);
if (key.length != 0) {
Group online = groupRepository.getByKey(key[0]);
if (online != null && online.getGroupServers() != null) {
for (GroupServer gs : online.getGroupServers()) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
}
}
}
}
return memberOps(request, hh, groupId, _ips, true, TaskOpsType.PULL_MEMBER_OPS);
batch = batch == null ? false : batch;
return memberOps(request, hh, groupId, _ips, batch, true, TaskOpsType.PULL_MEMBER_OPS);
}
@GET
@ -333,34 +253,8 @@ public class OperationResource {
groupId = groupCriteriaQuery.queryByName(groupName);
}
}
Group gp = groupRepository.getById(groupId);
if (null != batch && batch.equals(true)) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
_ips.add(gs.getIp());
}
} else if (ips != null) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
if (!_ips.containsAll(ips)) {
IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE);
if (key.length != 0) {
Group online = groupRepository.getByKey(key[0]);
if (online != null && online.getGroupServers() != null) {
for (GroupServer gs : online.getGroupServers()) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
}
}
}
}
return memberOps(request, hh, groupId, _ips, false, TaskOpsType.PULL_MEMBER_OPS);
batch = batch == null ? false : batch;
return memberOps(request, hh, groupId, _ips, batch, false, TaskOpsType.PULL_MEMBER_OPS);
}
@GET
@ -384,38 +278,10 @@ public class OperationResource {
if (gp == null) {
throw new ValidationException("Group Id or Name not found!");
}
if (null != batch && batch.equals(true)) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
_ips.add(gs.getIp());
}
} else if (ips != null) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
if (!_ips.containsAll(ips)) {
IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE);
if (key.length != 0) {
Group online = groupRepository.getByKey(key[0]);
if (online != null && online.getGroupServers() != null) {
for (GroupServer gs : online.getGroupServers()) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
}
}
}
}
if (_ips.size() == 0) {
throw new ValidationException("Not found ip in group.GroupId:" + groupId + " ip:" + ips.toString());
}
batch = batch == null ? false : batch;
if (healthyOpsActivate.get()) {
return memberOps(request, hh, groupId, _ips, true, TaskOpsType.HEALTHY_OPS);
return memberOps(request, hh, groupId, _ips, batch, true, TaskOpsType.HEALTHY_OPS);
} else {
return healthyOps(hh, groupId, _ips, true);
}
@ -438,42 +304,9 @@ public class OperationResource {
groupId = groupCriteriaQuery.queryByName(groupName);
}
}
Group gp = groupRepository.getById(groupId);
if (gp == null) {
throw new ValidationException("Group Id or Name not found!");
}
if (null != batch && batch.equals(true)) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
_ips.add(gs.getIp());
}
} else if (ips != null) {
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
if (!_ips.containsAll(ips)) {
IdVersion[] key = groupCriteriaQuery.queryByIdAndMode(groupId, SelectionMode.ONLINE_EXCLUSIVE);
if (key.length != 0) {
Group online = groupRepository.getByKey(key[0]);
if (online != null && online.getGroupServers() != null) {
for (GroupServer gs : online.getGroupServers()) {
if (ips.contains(gs.getIp())) {
_ips.add(gs.getIp());
}
}
}
}
}
}
if (_ips.size() == 0) {
throw new ValidationException("Not found ip in group.GroupId:" + groupId + " ip:" + ips.toString());
}
batch = batch == null ? false : batch;
if (healthyOpsActivate.get()) {
return memberOps(request, hh, groupId, _ips, false, TaskOpsType.HEALTHY_OPS);
return memberOps(request, hh, groupId, _ips, batch, false, TaskOpsType.HEALTHY_OPS);
} else {
return healthyOps(hh, groupId, _ips, false);
}
@ -499,37 +332,78 @@ public class OperationResource {
return responseHandler.handle(groupStatusService.getOfflineGroupStatus(groupId), hh.getMediaType());
}
private Response memberOps(HttpServletRequest request, HttpHeaders hh, Long groupId, List<String> ips, boolean up, String type) throws Exception {
Map<String, List<Boolean>> status = statusService.fetchGroupServerStatus(new Long[]{groupId});
boolean skipOps = true;
for (String ip : ips) {
int index = 0;
if (type.equals(TaskOpsType.HEALTHY_OPS)) index = StatusOffset.HEALTHY;
if (type.equals(TaskOpsType.PULL_MEMBER_OPS)) index = StatusOffset.PULL_OPS;
if (type.equals(TaskOpsType.MEMBER_OPS)) index = StatusOffset.MEMBER_OPS;
boolean preStatus = status.get(groupId.toString() + "_" + ip).get(index);
if (preStatus != up) {
skipOps = false;
private Response memberOps(HttpServletRequest request, HttpHeaders hh, Long groupId, List<String> memberIps, boolean batch, boolean up, String type) throws Exception {
ModelStatusMapping<Group> groupMap = entityFactory.getGroupsByIds(new Long[]{groupId});
if (groupMap.getOfflineMapping() == null || groupMap.getOfflineMapping().size() == 0) {
throw new ValidationException("Not Found Group By Id.");
}
Set<String> groupMemberIps = new HashSet<>();
for (GroupServer gs : groupMap.getOfflineMapping().get(groupId).getGroupServers()) {
groupMemberIps.add(gs.getIp());
}
if (groupMap.getOnlineMapping().get(groupId) != null) {
for (GroupServer gs : groupMap.getOnlineMapping().get(groupId).getGroupServers()) {
groupMemberIps.add(gs.getIp());
}
}
List<String> ips = new ArrayList<>();
if (batch) {
Group gp = groupMap.getOfflineMapping().get(groupId);
List<GroupServer> servers = gp.getGroupServers();
for (GroupServer gs : servers) {
ips.add(gs.getIp());
}
} else if (memberIps != null && memberIps.size() > 0) {
for (String ip : memberIps) {
if (groupMemberIps.contains(ip)) {
ips.addAll(memberIps);
}
}
}
if (ips.size() == 0) {
throw new ValidationException("Ip Param Is Null Or Invalidate Ip Param.");
}
List<GroupStatus> statuses = groupStatusService.getOfflineGroupsStatus(groupMap);
GroupStatus status = null;
if (statuses.size() > 0) {
status = statuses.get(0);
}
boolean skipOps = false;
if (status != null) {
skipOps = true;
for (GroupServerStatus gss : status.getGroupServerStatuses()) {
if (ips.contains(gss.getIp())) {
if (type.equals(TaskOpsType.HEALTHY_OPS) && gss.getHealthy() != up) {
skipOps = false;
} else if (type.equals(TaskOpsType.PULL_MEMBER_OPS) && gss.getPull() != up) {
skipOps = false;
} else if (type.equals(TaskOpsType.MEMBER_OPS) && gss.getMember() != up) {
skipOps = false;
}
}
}
}
if (skipOps) {
GroupStatus groupStatus = groupStatusService.getOfflineGroupStatus(groupId);
logger.info("Group status equals the desired value.Do not need execute task.GroupId:" + groupId + " ips:"
+ ips.toString() + " up:" + up + " type:" + type);
return responseHandler.handle(groupStatus, hh.getMediaType());
}
StringBuilder sb = new StringBuilder();
for (String ip : ips) {
sb.append(ip).append(";");
}
ModelStatusMapping<Group> mapping = entityFactory.getGroupsByIds(new Long[]{groupId});
if (mapping.getOfflineMapping() == null || mapping.getOfflineMapping().size() == 0) {
throw new ValidationException("Not Found Group By Id.");
}
Group onlineGroup = mapping.getOnlineMapping().get(groupId);
Group offlineGroup = mapping.getOfflineMapping().get(groupId);
Group onlineGroup = groupMap.getOnlineMapping().get(groupId);
Group offlineGroup = groupMap.getOfflineMapping().get(groupId);
Set<Long> vsIds = new HashSet<>();
Set<Long> slbIds = new HashSet<>();
if (onlineGroup != null) {
for (GroupVirtualServer gvs : onlineGroup.getGroupVirtualServers()) {
vsIds.add(gvs.getVirtualServer().getId());
@ -539,25 +413,8 @@ public class OperationResource {
vsIds.add(gvs.getVirtualServer().getId());
}
ModelStatusMapping<VirtualServer> vsMaping = entityFactory.getVsesByIds(vsIds.toArray(new Long[]{}));
VirtualServer tmp;
for (Long vsId : vsIds) {
tmp = vsMaping.getOnlineMapping().get(vsId);
if (tmp == null) {
tmp = vsMaping.getOfflineMapping().get(vsId);
}
slbIds.addAll(tmp.getSlbIds());
}
//TODO flag for Healthy ops
if (type.equals(TaskOpsType.HEALTHY_OPS)) {
for (Long slbId : slbIds) {
if (!configHandler.getEnable("healthy.operation.active", slbId, null, null, false)) {
logger.info("healthy.operation.active is false. slbId:" + slbId);
return healthyOps(hh, groupId, ips, up);
}
}
}
Set<IdVersion> vsIdVersions = virtualServerCriteriaQuery.queryByIdsAndMode(vsIds.toArray(new Long[]{}), SelectionMode.ONLINE_FIRST);
Set<Long> slbIds = slbCriteriaQuery.queryByVses(vsIdVersions.toArray(new IdVersion[]{}));
List<OpsTask> tasks = new ArrayList<>();
for (Long slbId : slbIds) {
@ -577,7 +434,6 @@ public class OperationResource {
}
}
GroupStatus groupStatus = groupStatusService.getOfflineGroupStatus(groupId);
addHealthyProperty(groupStatus);
String slbMessageData = MessageUtil.getMessageData(request, new Group[]{offlineGroup}, null, null, ips.toArray(new String[ips.size()]), true);
if (configHandler.getEnable("use.new,message.queue.producer", false)) {
@ -591,40 +447,9 @@ public class OperationResource {
messageQueue.produceMessage(MessageType.OpsMember, groupId, slbMessageData);
}
}
return responseHandler.handle(groupStatus, hh.getMediaType());
}
private void addHealthyProperty(GroupStatus gs) throws Exception {
boolean health = true;
boolean unhealth = true;
for (GroupServerStatus gss : gs.getGroupServerStatuses()) {
if (gss.getServer() && gss.getHealthy() && gss.getPull() && gss.getMember()) {
unhealth = false;
} else {
health = false;
}
}
if (health) {
propertyBox.set("healthy", "health", "group", gs.getGroupId());
} else if (unhealth) {
propertyBox.set("healthy", "unhealth", "group", gs.getGroupId());
} else {
propertyBox.set("healthy", "sub-health", "group", gs.getGroupId());
}
}
@GET
@Path("/health/fillData")
public Response healthFillData(@Context HttpServletRequest request,
@Context HttpHeaders hh) throws Exception {
List<GroupStatus> list = groupStatusService.getAllOfflineGroupsStatus();
for (GroupStatus gs : list) {
addHealthyProperty(gs);
}
return responseHandler.handle("Fill Data Success.", hh.getMediaType());
}
}

View file

@ -39,6 +39,26 @@ public class GroupHealthyConsumer extends AbstractConsumer {
addHealthProperty(messages);
}
@Override
public void onOpsPull(List<Message> messages) {
addHealthProperty(messages);
}
@Override
public void onOpsMember(List<Message> messages) {
addHealthProperty(messages);
}
@Override
public void onOpsServer(List<Message> messages) {
addHealthProperty(messages);
}
@Override
public void onOpsHealthy(List<Message> messages) {
addHealthProperty(messages);
}
protected void addHealthProperty(List<Message> messages) {
try {
Set<Long> groupIds = new HashSet<>();

View file

@ -2,6 +2,7 @@ package com.ctrip.zeus.service.status;
import com.ctrip.zeus.model.entity.Group;
import com.ctrip.zeus.service.model.IdVersion;
import com.ctrip.zeus.service.model.ModelStatusMapping;
import com.ctrip.zeus.status.entity.GroupServerStatus;
import com.ctrip.zeus.status.entity.GroupStatus;
import com.ctrip.zeus.status.entity.GroupStatusList;
@ -92,5 +93,5 @@ public interface GroupStatusService {
* @return status list
* @throws Exception
*/
List<GroupStatus> getOfflineGroupsStatus(Map<Long, Group> groups, Map<Long, Group> onlineGroups) throws Exception;
List<GroupStatus> getOfflineGroupsStatus(ModelStatusMapping<Group> groupMap) throws Exception;
}

View file

@ -89,7 +89,7 @@ public class GroupStatusServiceImpl implements GroupStatusService {
if (groups.getOfflineMapping() == null || groups.getOfflineMapping().size() == 0) {
return result;
}
result = getOfflineGroupsStatus(groups.getOfflineMapping(), groups.getOnlineMapping());
result = getOfflineGroupsStatus(groups);
return result;
}
@ -115,7 +115,7 @@ public class GroupStatusServiceImpl implements GroupStatusService {
return result;
}
List<GroupStatus> list = getOfflineGroupsStatus(groupMap.getOfflineMapping(), groupMap.getOnlineMapping());
List<GroupStatus> list = getOfflineGroupsStatus(groupMap);
if (list.size() > 0) {
result = list.get(0);
}
@ -129,7 +129,7 @@ public class GroupStatusServiceImpl implements GroupStatusService {
if (map.getOfflineMapping() == null || map.getOfflineMapping().size() == 0) {
return result;
}
result = getOfflineGroupsStatus(map.getOfflineMapping(), map.getOnlineMapping());
result = getOfflineGroupsStatus(map);
return result;
}
@ -160,7 +160,7 @@ public class GroupStatusServiceImpl implements GroupStatusService {
boolean serverUp = !allDownServers.contains(gs.getIp());
boolean pullIn = memberStatus.get(key).get(StatusOffset.PULL_OPS);
boolean raise = memberStatus.get(key).get(StatusOffset.HEALTHY);
boolean up = memberUp && serverUp && pullIn && raise;
boolean up = memberUp && serverUp && pullIn && raise;
groupServerStatus.setServer(serverUp);
groupServerStatus.setMember(memberUp);
@ -176,9 +176,12 @@ public class GroupStatusServiceImpl implements GroupStatusService {
}
@Override
public List<GroupStatus> getOfflineGroupsStatus(Map<Long, Group> groups, Map<Long, Group> onlineGroups) throws Exception {
public List<GroupStatus> getOfflineGroupsStatus(ModelStatusMapping<Group> groupMap) throws Exception {
List<GroupStatus> res = new ArrayList<>();
Map<Long, Group> groups = groupMap.getOfflineMapping();
Map<Long, Group> onlineGroups = groupMap.getOnlineMapping();
Map<String, List<Boolean>> memberStatus = statusService.fetchGroupServerStatus(groups.keySet().toArray(new Long[]{}));
Set<String> allDownServers = statusService.findAllDownServers();