mirror of
https://github.com/ctripcorp/zeus.git
synced 2024-09-20 15:46:01 +08:00
hc failure rolling check
This commit is contained in:
parent
232946563a
commit
cd7a54896f
|
@ -4,6 +4,8 @@ import com.ctrip.zeus.auth.Authorize;
|
|||
import com.ctrip.zeus.dal.core.GlobalJobDao;
|
||||
import com.ctrip.zeus.dal.core.GlobalJobDo;
|
||||
import com.ctrip.zeus.exceptions.ValidationException;
|
||||
import com.ctrip.zeus.model.entity.SlbGroupCheckFailureEntity;
|
||||
import com.ctrip.zeus.model.entity.SlbGroupCheckFailureEntityList;
|
||||
import com.ctrip.zeus.restful.message.QueryParamRender;
|
||||
import com.ctrip.zeus.restful.message.ResponseHandler;
|
||||
import com.ctrip.zeus.restful.message.TrimmedQueryParam;
|
||||
|
@ -15,6 +17,8 @@ import com.ctrip.zeus.service.query.QueryEngine;
|
|||
import com.ctrip.zeus.service.status.GroupStatusService;
|
||||
import com.ctrip.zeus.status.entity.GroupStatus;
|
||||
import com.ctrip.zeus.status.entity.GroupStatusList;
|
||||
import com.ctrip.zeus.task.check.SlbCheckStatusRollingMachine;
|
||||
import com.ctrip.zeus.util.CircularArray;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
@ -26,6 +30,7 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.core.*;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
|
@ -46,6 +51,8 @@ public class StatusResource {
|
|||
private GlobalJobDao globalJobDao;
|
||||
@Resource
|
||||
private CriteriaQueryFactory criteriaQueryFactory;
|
||||
@Resource
|
||||
private SlbCheckStatusRollingMachine slbCheckStatusRollingMachine;
|
||||
|
||||
@GET
|
||||
@Path("/groups")
|
||||
|
@ -96,6 +103,38 @@ public class StatusResource {
|
|||
return responseHandler.handle(status, hh.getMediaType());
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/check/slb")
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response getSingleSlbCheckFailures(@Context HttpServletRequest request, @Context HttpHeaders hh,
|
||||
@QueryParam("slbId") Long slbId) throws Exception {
|
||||
CircularArray<Integer> count = slbCheckStatusRollingMachine.getCheckFailureCount(slbId);
|
||||
if (count == null) {
|
||||
throw new ValidationException("Cannot find check result count of slb " + slbId + ".");
|
||||
}
|
||||
SlbGroupCheckFailureEntity entity = new SlbGroupCheckFailureEntity().setSlbId(slbId);
|
||||
for (Integer c : count) {
|
||||
entity.addFailureCount(c);
|
||||
}
|
||||
return responseHandler.handle(entity, hh.getMediaType());
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/check/slbs")
|
||||
@Produces({MediaType.APPLICATION_JSON})
|
||||
public Response getSlbCheckFailures(@Context HttpServletRequest request, @Context HttpHeaders hh) throws Exception {
|
||||
SlbGroupCheckFailureEntityList list = new SlbGroupCheckFailureEntityList();
|
||||
for (Map.Entry<Long, CircularArray<Integer>> e : slbCheckStatusRollingMachine.getCheckFailureCount().entrySet()) {
|
||||
SlbGroupCheckFailureEntity entity = new SlbGroupCheckFailureEntity().setSlbId(e.getKey());
|
||||
for (Integer c : e.getValue()) {
|
||||
entity.addFailureCount(c);
|
||||
}
|
||||
list.addSlbGroupCheckFailureEntity(entity);
|
||||
}
|
||||
list.setTotal(list.getSlbGroupCheckFailureEntities().size());
|
||||
return responseHandler.handle(list, hh.getMediaType());
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/job/unlock")
|
||||
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
|
||||
|
|
|
@ -0,0 +1,194 @@
|
|||
package com.ctrip.zeus.service.message.queue.consumers;
|
||||
|
||||
import com.ctrip.zeus.model.entity.Group;
|
||||
import com.ctrip.zeus.model.entity.GroupVirtualServer;
|
||||
import com.ctrip.zeus.model.entity.VirtualServer;
|
||||
import com.ctrip.zeus.queue.entity.GroupData;
|
||||
import com.ctrip.zeus.queue.entity.Message;
|
||||
import com.ctrip.zeus.queue.entity.SlbMessageData;
|
||||
import com.ctrip.zeus.queue.entity.VsData;
|
||||
import com.ctrip.zeus.service.build.ConfigHandler;
|
||||
import com.ctrip.zeus.service.message.queue.AbstractConsumer;
|
||||
import com.ctrip.zeus.service.model.*;
|
||||
import com.ctrip.zeus.service.query.GroupCriteriaQuery;
|
||||
import com.ctrip.zeus.service.status.GroupStatusService;
|
||||
import com.ctrip.zeus.status.entity.GroupStatus;
|
||||
import com.ctrip.zeus.task.check.SlbCheckStatusRollingMachine;
|
||||
import com.ctrip.zeus.util.MessageUtil;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Created by zhoumy on 2016/12/5.
|
||||
*/
|
||||
@Service("slbCheckStatusConsumer")
|
||||
public class SlbCheckStatusConsumer extends AbstractConsumer {
|
||||
|
||||
@Resource
|
||||
private SlbCheckStatusRollingMachine slbCheckStatusRollingMachine;
|
||||
@Resource
|
||||
private GroupStatusService groupStatusService;
|
||||
@Resource
|
||||
private ConfigHandler configHandler;
|
||||
@Resource
|
||||
private ArchiveRepository archiveRepository;
|
||||
@Resource
|
||||
private GroupCriteriaQuery groupCriteriaQuery;
|
||||
@Resource
|
||||
private VirtualServerRepository virtualServerRepository;
|
||||
@Resource
|
||||
private GroupRepository groupRepository;
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(SlbCheckStatusConsumer.class);
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
try {
|
||||
if (configHandler.getEnable("message.queue", null, null, null, true)) {
|
||||
slbCheckStatusRollingMachine.enable(true);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNewGroup(List<Message> messages) {
|
||||
for (Message m : messages) {
|
||||
if (m.getTargetId() != null && m.getTargetId() > 0L) {
|
||||
Long groupId = m.getTargetId();
|
||||
try {
|
||||
Group g = groupRepository.getById(groupId);
|
||||
Set<Long> slbIds = new HashSet<>();
|
||||
for (GroupVirtualServer gvs : g.getGroupVirtualServers()) {
|
||||
slbIds.addAll(gvs.getVirtualServer().getSlbIds());
|
||||
}
|
||||
GroupStatus groupStatus = groupStatusService.getOfflineGroupStatus(groupId);
|
||||
slbCheckStatusRollingMachine.update(slbIds, groupStatus);
|
||||
} catch (Exception e) {
|
||||
logger.error("Fail to get offline group status of group " + groupId + ".");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdateGroup(List<Message> messages) {
|
||||
for (Message m : messages) {
|
||||
SlbMessageData d = MessageUtil.parserSlbMessageData(m.getTargetData());
|
||||
if (d != null && d.getSuccess()) {
|
||||
GroupData g = d.getGroupDatas().get(0);
|
||||
try {
|
||||
Group curr = archiveRepository.getGroupArchive(g.getId(), g.getVersion());
|
||||
Group prev = archiveRepository.getGroupArchive(g.getId(), g.getVersion() - 1);
|
||||
|
||||
Long[] currVsIds = new Long[curr.getGroupVirtualServers().size()];
|
||||
Long[] prevVsIds = new Long[prev.getGroupVirtualServers().size()];
|
||||
for (int i = 0; i < curr.getGroupVirtualServers().size(); i++) {
|
||||
currVsIds[i] = curr.getGroupVirtualServers().get(i).getVirtualServer().getId();
|
||||
}
|
||||
for (int i = 0; i < prev.getGroupVirtualServers().size(); i++) {
|
||||
prevVsIds[i] = prev.getGroupVirtualServers().get(i).getVirtualServer().getId();
|
||||
}
|
||||
|
||||
Arrays.sort(currVsIds);
|
||||
Arrays.sort(prevVsIds);
|
||||
if (Arrays.equals(currVsIds, prevVsIds)) return;
|
||||
|
||||
Set<Long> currSlbIds = new HashSet<>();
|
||||
Set<Long> prevSlbIds = new HashSet<>();
|
||||
for (VirtualServer vs : virtualServerRepository.listAll(currVsIds)) {
|
||||
currSlbIds.addAll(vs.getSlbIds());
|
||||
}
|
||||
for (VirtualServer vs : virtualServerRepository.listAll(prevVsIds)) {
|
||||
prevSlbIds.addAll(vs.getSlbIds());
|
||||
}
|
||||
if (currSlbIds.equals(prevSlbIds)) return;
|
||||
|
||||
List<GroupStatus> list = new ArrayList<>();
|
||||
list.add(groupStatusService.getOfflineGroupStatus(g.getId()));
|
||||
slbCheckStatusRollingMachine.migrate(prevSlbIds, currSlbIds, Sets.newHashSet(m.getTargetId()), list);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeleteGroup(List<Message> messages) {
|
||||
for (Message m : messages) {
|
||||
SlbMessageData d = MessageUtil.parserSlbMessageData(m.getTargetData());
|
||||
if (d != null && d.getSuccess()) {
|
||||
try {
|
||||
Set<Long> slbIds = new HashSet<>();
|
||||
Group g = archiveRepository.getGroupArchive(m.getTargetId(), 0);
|
||||
for (GroupVirtualServer gvs : g.getGroupVirtualServers()) {
|
||||
slbIds.addAll(gvs.getVirtualServer().getSlbIds());
|
||||
}
|
||||
slbCheckStatusRollingMachine.clear(slbIds, m.getTargetId());
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdateVs(List<Message> messages) {
|
||||
for (Message m : messages) {
|
||||
SlbMessageData d = MessageUtil.parserSlbMessageData(m.getTargetData());
|
||||
if (d != null && d.getSuccess()) {
|
||||
VsData vs = d.getVsDatas().get(0);
|
||||
try {
|
||||
VirtualServer curr = archiveRepository.getVsArchive(vs.getId(), vs.getVersion());
|
||||
VirtualServer prev = archiveRepository.getVsArchive(vs.getId(), vs.getVersion() - 1);
|
||||
|
||||
Set<Long> currSlbIdArray = new HashSet<>(curr.getSlbIds().size());
|
||||
Set<Long> prevSlbIdArray = new HashSet<>(prev.getSlbIds().size());
|
||||
|
||||
if (currSlbIdArray.equals(prevSlbIdArray)) return;
|
||||
|
||||
Set<Long> groupIds = new HashSet<>();
|
||||
for (IdVersion k : groupCriteriaQuery.queryByVsId(vs.getId())) {
|
||||
groupIds.add(k.getId());
|
||||
}
|
||||
List<GroupStatus> groupStatuses = groupStatusService.getOfflineGroupsStatus(groupIds);
|
||||
slbCheckStatusRollingMachine.migrate(prevSlbIdArray, currSlbIdArray, groupIds, groupStatuses);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpsHealthy(List<Message> messages) {
|
||||
Set<Long> groupIds = new HashSet<>();
|
||||
for (Message m : messages) {
|
||||
SlbMessageData d = MessageUtil.parserSlbMessageData(m.getTargetData());
|
||||
if (d != null && d.getSuccess()) {
|
||||
groupIds.add(m.getTargetId());
|
||||
}
|
||||
}
|
||||
Map<Long, Set<Long>> groups = new HashMap<>();
|
||||
try {
|
||||
for (Group g : groupRepository.list(groupIds.toArray(new Long[groupIds.size()]))) {
|
||||
Set<Long> slbIds = new HashSet<>();
|
||||
for (GroupVirtualServer gvs : g.getGroupVirtualServers()) {
|
||||
slbIds.addAll(gvs.getVirtualServer().getSlbIds());
|
||||
}
|
||||
groups.put(g.getId(), slbIds);
|
||||
}
|
||||
for (GroupStatus gs : groupStatusService.getOfflineGroupsStatus(groupIds)) {
|
||||
slbCheckStatusRollingMachine.update(groups.get(gs.getGroupId()), gs);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Fail to get offline groups status of groups " + Joiner.on(",").join(groupIds) + ".");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -305,7 +305,7 @@ public class MessageQueueImpl implements MessageQueue {
|
|||
return;
|
||||
}
|
||||
try {
|
||||
if (configHandler.getEnable("message.queue", null, null, null, false)) {
|
||||
if (configHandler.getEnable("message.queue", null, null, null, true)) {
|
||||
fetchMessage();
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
package com.ctrip.zeus.task.check;
|
||||
|
||||
|
||||
import com.ctrip.zeus.dal.core.StatusCheckCountSlbDao;
|
||||
import com.ctrip.zeus.dal.core.StatusCheckCountSlbDo;
|
||||
import com.ctrip.zeus.dal.core.StatusCheckCountSlbEntity;
|
||||
import com.ctrip.zeus.lock.DbLockFactory;
|
||||
import com.ctrip.zeus.status.entity.GroupServerStatus;
|
||||
import com.ctrip.zeus.status.entity.GroupStatus;
|
||||
import com.ctrip.zeus.task.AbstractTask;
|
||||
import com.ctrip.zeus.util.CircularArray;
|
||||
import com.ctrip.zeus.util.CompressUtils;
|
||||
import com.google.common.base.Joiner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.unidal.dal.jdbc.DalException;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Created by zhoumy on 2016/12/5.
|
||||
*/
|
||||
@Component("slbCheckStatusRollingMachine")
|
||||
public class SlbCheckStatusRollingMachine extends AbstractTask {
|
||||
@Resource
|
||||
private StatusCheckCountSlbDao statusCheckCountSlbDao;
|
||||
@Resource
|
||||
private DbLockFactory dbLockFactory;
|
||||
|
||||
private final static Logger logger = LoggerFactory.getLogger(SlbCheckStatusRollingMachine.class);
|
||||
|
||||
private final Map<Long, CircularArray<Integer>> slbCheckFailureRollingCounter = new HashMap<>();
|
||||
private final Map<Long, TimestampWrapper> groupCacheBySlb = new HashMap<>();
|
||||
|
||||
private final int bitmask;
|
||||
private final int failureId;
|
||||
|
||||
private final static int HC = 1;
|
||||
|
||||
private final AtomicBoolean initFlag = new AtomicBoolean(false);
|
||||
private boolean enabled;
|
||||
|
||||
public SlbCheckStatusRollingMachine() {
|
||||
this(0x000F, HC, false);
|
||||
}
|
||||
|
||||
public SlbCheckStatusRollingMachine(int bitmask, int failureId, boolean enabled) {
|
||||
this.bitmask = bitmask;
|
||||
this.failureId = failureId;
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
private void init() throws Exception {
|
||||
slbCheckFailureRollingCounter.clear();
|
||||
for (StatusCheckCountSlbDo e : statusCheckCountSlbDao.findAll(StatusCheckCountSlbEntity.READSET_DATASET_EXCLUDED)) {
|
||||
CircularArray<Integer> v = new CircularArray<>(10);
|
||||
slbCheckFailureRollingCounter.put(e.getSlbId(), v);
|
||||
v.add(e.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getInterval() {
|
||||
return 10000;
|
||||
}
|
||||
|
||||
public CircularArray<Integer> getCheckFailureCount(Long slbId) {
|
||||
return slbCheckFailureRollingCounter.get(slbId);
|
||||
}
|
||||
|
||||
public Map<Long, CircularArray<Integer>> getCheckFailureCount() {
|
||||
return slbCheckFailureRollingCounter;
|
||||
}
|
||||
|
||||
public void enable(boolean flag) {
|
||||
this.enabled = flag;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() throws Exception {
|
||||
if (!enabled) return;
|
||||
|
||||
if (initFlag.compareAndSet(false, true)) {
|
||||
try {
|
||||
init();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
initFlag.compareAndSet(true, false);
|
||||
logger.error("Fail to initialize group check status by slb.", e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for (StatusCheckCountSlbDo e : statusCheckCountSlbDao.findAll(StatusCheckCountSlbEntity.READSET_DATASET_EXCLUDED)) {
|
||||
CircularArray<Integer> v = slbCheckFailureRollingCounter.get(e.getSlbId());
|
||||
if (v == null) {
|
||||
v = new CircularArray<>(10);
|
||||
slbCheckFailureRollingCounter.put(e.getSlbId(), v);
|
||||
}
|
||||
v.add(e.getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
slbCheckFailureRollingCounter.clear();
|
||||
}
|
||||
|
||||
protected int bitwiseStatus(GroupServerStatus ss) {
|
||||
int val3 = ss.getPull() ? 0 : 1;
|
||||
int val2 = ss.getServer() ? 0 : 1;
|
||||
int val1 = ss.getMember() ? 0 : 1;
|
||||
int val0 = ss.getHealthy() ? 0 : 1;
|
||||
return val0 | (val1 << 1) | (val2 << 2) | (val3 << 3);
|
||||
}
|
||||
|
||||
public void migrate(Set<Long> prevSlbIds, Set<Long> currSlbIds, Set<Long> totalGroupIds, List<GroupStatus> groupStatuses) {
|
||||
Set<Long> failureGroupIds = new HashSet<>();
|
||||
for (GroupStatus groupStatus : groupStatuses) {
|
||||
totalGroupIds.add(groupStatus.getGroupId());
|
||||
for (GroupServerStatus ss : groupStatus.getGroupServerStatuses()) {
|
||||
int val = bitwiseStatus(ss);
|
||||
if ((bitmask & val) == failureId) {
|
||||
failureGroupIds.add(groupStatus.getGroupId());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Set<Long> obj = new HashSet<>();
|
||||
Map<Long, Set<Long>> updatedCache = new HashMap<>();
|
||||
for (Long slbId : prevSlbIds) {
|
||||
updatedCache.put(slbId, obj);
|
||||
}
|
||||
for (Long slbId : currSlbIds) {
|
||||
updatedCache.put(slbId, obj);
|
||||
}
|
||||
try {
|
||||
List<StatusCheckCountSlbDo> toUpdate = new ArrayList<>();
|
||||
updateIfExpired(updatedCache.keySet(), groupCacheBySlb);
|
||||
for (Long slbId : updatedCache.keySet()) {
|
||||
TimestampWrapper v = groupCacheBySlb.get(slbId);
|
||||
Set<Long> dataSet = v.unmodifiableDataSet();
|
||||
if (prevSlbIds.contains(slbId)) {
|
||||
dataSet.removeAll(totalGroupIds);
|
||||
}
|
||||
if (currSlbIds.contains(slbId)) {
|
||||
dataSet.addAll(failureGroupIds);
|
||||
}
|
||||
if (!dataSet.equals(v.groupIds)) {
|
||||
updatedCache.put(slbId, dataSet);
|
||||
toUpdate.add(new StatusCheckCountSlbDo().setSlbId(slbId).setCount(dataSet.size()).setDataChangeLastTime(v.timestamp)
|
||||
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))));
|
||||
}
|
||||
}
|
||||
int[] rc = statusCheckCountSlbDao.update((StatusCheckCountSlbDo[]) toUpdate.toArray(), StatusCheckCountSlbEntity.UPDATESET_FULL);
|
||||
for (int i = 0; i < rc.length; i++) {
|
||||
if (rc[i] == 1) {
|
||||
long slbId = toUpdate.get(i).getSlbId();
|
||||
groupCacheBySlb.get(slbId).update(updatedCache.get(slbId));
|
||||
}
|
||||
}
|
||||
} catch (DalException e) {
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
public void update(Set<Long> slbIds, GroupStatus groupStatus) {
|
||||
Long groupId = groupStatus.getGroupId();
|
||||
boolean failed = false;
|
||||
for (GroupServerStatus ss : groupStatus.getGroupServerStatuses()) {
|
||||
int val = bitwiseStatus(ss);
|
||||
if ((bitmask & val) == failureId) {
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
updateIfExpired(slbIds, groupCacheBySlb);
|
||||
for (Long slbId : slbIds) {
|
||||
int rc = 0;
|
||||
TimestampWrapper v = groupCacheBySlb.get(slbId);
|
||||
Set<Long> dataSet = v.unmodifiableDataSet();
|
||||
if (failed && dataSet.add(groupId)) {
|
||||
rc = statusCheckCountSlbDao.countIncrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1).setDataChangeLastTime(v.timestamp)
|
||||
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))), StatusCheckCountSlbEntity.UPDATESET_FULL);
|
||||
}
|
||||
if (!failed && dataSet.remove(groupId)) {
|
||||
rc = statusCheckCountSlbDao.countDecrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1).setDataChangeLastTime(v.timestamp)
|
||||
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))), StatusCheckCountSlbEntity.UPDATESET_FULL);
|
||||
}
|
||||
if (rc == 1) {
|
||||
groupCacheBySlb.get(slbId).update(dataSet);
|
||||
}
|
||||
}
|
||||
} catch (DalException e) {
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
public void clear(Set<Long> slbIds, Long groupId) {
|
||||
try {
|
||||
updateIfExpired(slbIds, groupCacheBySlb);
|
||||
for (Long slbId : slbIds) {
|
||||
TimestampWrapper v = groupCacheBySlb.get(slbId);
|
||||
Set<Long> dataSet = v.unmodifiableDataSet();
|
||||
if (dataSet.remove(groupId)) {
|
||||
int rc = statusCheckCountSlbDao.countDecrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1).setDataChangeLastTime(v.timestamp)
|
||||
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))), StatusCheckCountSlbEntity.UPDATESET_FULL);
|
||||
if (rc == 1) {
|
||||
v.update(dataSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (DalException e) {
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateIfExpired(Set<Long> slbIds, Map<Long, TimestampWrapper> cache) {
|
||||
List<Long> expired = new ArrayList<>();
|
||||
try {
|
||||
for (StatusCheckCountSlbDo e : statusCheckCountSlbDao.findAllBySlb(slbIds.toArray((Long[]) slbIds.toArray()), StatusCheckCountSlbEntity.READSET_DATASET_EXCLUDED)) {
|
||||
TimestampWrapper v = cache.get(e.getSlbId());
|
||||
if (v == null) expired.add(e.getSlbId());
|
||||
if (e.getDataChangeLastTime().compareTo(v.timestamp) > 0) expired.add(e.getSlbId());
|
||||
}
|
||||
|
||||
for (StatusCheckCountSlbDo e : statusCheckCountSlbDao.findAllBySlb((Long[]) expired.toArray(), StatusCheckCountSlbEntity.READSET_FULL)) {
|
||||
Set<Long> groupSet = new HashSet<>();
|
||||
if (e.getDataSet() != null) {
|
||||
String dataset = new String(CompressUtils.decompress(e.getDataSet().getBytes()));
|
||||
for (String s : dataset.split(",")) {
|
||||
groupSet.add(Long.parseLong(s));
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (DalException e) {
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
|
||||
private class TimestampWrapper {
|
||||
Set<Long> groupIds;
|
||||
Date timestamp;
|
||||
|
||||
public TimestampWrapper(Set<Long> groupIds, Date timestamp) {
|
||||
this.groupIds = groupIds;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
Set<Long> unmodifiableDataSet() {
|
||||
return new HashSet<>(groupIds);
|
||||
}
|
||||
|
||||
void update(Set<Long> groupIds) {
|
||||
this.groupIds = groupIds;
|
||||
timestamp = new Date();
|
||||
}
|
||||
}
|
||||
}
|
44
src/main/java/com/ctrip/zeus/util/CircularArray.java
Normal file
44
src/main/java/com/ctrip/zeus/util/CircularArray.java
Normal file
|
@ -0,0 +1,44 @@
|
|||
package com.ctrip.zeus.util;
|
||||
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Created by zhoumy on 2016/12/5.
|
||||
*/
|
||||
public class CircularArray<T> implements Iterable<T> {
|
||||
private final LinkedList<T> buckets;
|
||||
private final int length;
|
||||
private T lastEntry;
|
||||
private Map<String, Long[]> lastReqStatus;
|
||||
|
||||
public CircularArray(int length) {
|
||||
buckets = new LinkedList<>();
|
||||
this.length = length + 1;
|
||||
}
|
||||
|
||||
public void add(T entry) {
|
||||
buckets.add(entry);
|
||||
if (buckets.size() == length) {
|
||||
buckets.removeFirst();
|
||||
}
|
||||
lastEntry = entry;
|
||||
}
|
||||
|
||||
public T[] getAll() {
|
||||
return (T[]) Array.newInstance(buckets.getClass().getComponentType(), length);
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return buckets.size();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
buckets.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<T> iterator() {
|
||||
return Collections.unmodifiableList(buckets).iterator();
|
||||
}
|
||||
}
|
|
@ -8,6 +8,9 @@ import com.ctrip.zeus.queue.entity.SlbData;
|
|||
import com.ctrip.zeus.queue.entity.SlbMessageData;
|
||||
import com.ctrip.zeus.queue.entity.VsData;
|
||||
import com.ctrip.zeus.queue.transform.DefaultJsonParser;
|
||||
import com.ctrip.zeus.support.ObjectJsonParser;
|
||||
import com.ctrip.zeus.support.ObjectJsonWriter;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -46,14 +49,19 @@ public class MessageUtil {
|
|||
res.addIp(ip);
|
||||
}
|
||||
}
|
||||
return String.format(SlbMessageData.JSON, res);
|
||||
try {
|
||||
return ObjectJsonWriter.write(res);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Write Message Data Fail." + res.toString(), e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public static SlbMessageData parserSlbMessageData(String res) {
|
||||
try {
|
||||
if (res == null) return null;
|
||||
return DefaultJsonParser.parse(SlbMessageData.class, res);
|
||||
} catch (IOException e) {
|
||||
return ObjectJsonParser.parse(res, SlbMessageData.class);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Parser Slb Message Data Failed. Message:" + res, e);
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -498,7 +498,7 @@
|
|||
<member name="version" field="version" value-type="long" length="19" />
|
||||
<member name="slb-id" field="slb_id" value-type="long" length="19" />
|
||||
<member name="vs-ids" field="vs_ids" value-type="String" length="4096" />
|
||||
<member name="group-ids" field="group_ids" value-type="String" length="4096" />
|
||||
<member name="group-ids" field="group_ids" value-type="String" length="10240" />
|
||||
<member name="task-ids" field="task_ids" value-type="String" length="4096" />
|
||||
<member name="cleanvs-ids" field="cleanvs_ids" value-type="String" length="4096" />
|
||||
<member name="type" field="type" value-type="String" length="45" />
|
||||
|
@ -1515,6 +1515,11 @@
|
|||
<index name="DataChange_LastTime" members="DataChange_LastTime ASC" />
|
||||
<index name="idx_datetime" members="datetime ASC" />
|
||||
<index name="type_target_id_operation_user_name_client_ip_datetime_success" members="type ASC, target_id ASC, operation ASC, user_name ASC, client_ip ASC, datetime ASC, success ASC" />
|
||||
<index name="target_id" members="target_id ASC" />
|
||||
<index name="operation" members="operation ASC" />
|
||||
<index name="user_name" members="user_name ASC" />
|
||||
<index name="client_ip" members="client_ip ASC" />
|
||||
<index name="success" members="success ASC" />
|
||||
<readsets>
|
||||
<readset name="FULL" all="true" />
|
||||
</readsets>
|
||||
|
@ -2449,6 +2454,46 @@
|
|||
</query>
|
||||
</query-defs>
|
||||
</entity>
|
||||
<entity name="status-check-count-slb" table="status_check_count_slb" alias="sccs">
|
||||
<member name="id" field="id" value-type="long" length="19" nullable="false" key="true" auto-increment="true" />
|
||||
<member name="slb-id" field="slb_id" value-type="long" length="19" nullable="false" />
|
||||
<member name="count" field="count" value-type="int" length="10" nullable="false" />
|
||||
<member name="data-set" field="data_set" value-type="String" length="255" nullable="false" />
|
||||
<member name="data-change-last-time" field="DataChange_LastTime" value-type="Date" nullable="false" />
|
||||
<var name="key-id" value-type="long" key-member="id" />
|
||||
<primary-key name="PRIMARY" members="id" />
|
||||
<index name="slb_id" unique="true" members="slb_id ASC" />
|
||||
<index name="DataChange_LastTime" members="DataChange_LastTime ASC" />
|
||||
<readsets>
|
||||
<readset name="FULL" all="true" />
|
||||
</readsets>
|
||||
<updatesets>
|
||||
<updateset name="FULL" all="true" />
|
||||
</updatesets>
|
||||
<query-defs>
|
||||
<query name="find-by-PK" type="SELECT">
|
||||
<param name="key-id" />
|
||||
<statement><![CDATA[SELECT <FIELDS/>
|
||||
FROM <TABLE/>
|
||||
WHERE <FIELD name='id'/> = ${key-id}]]></statement>
|
||||
</query>
|
||||
<query name="insert" type="INSERT">
|
||||
<statement><![CDATA[INSERT INTO <TABLE/>(<FIELDS/>)
|
||||
VALUES(<VALUES/>)]]></statement>
|
||||
</query>
|
||||
<query name="update-by-PK" type="UPDATE">
|
||||
<param name="key-id" />
|
||||
<statement><![CDATA[UPDATE <TABLE/>
|
||||
SET <FIELDS/>
|
||||
WHERE <FIELD name='id'/> = ${key-id}]]></statement>
|
||||
</query>
|
||||
<query name="delete-by-PK" type="DELETE">
|
||||
<param name="key-id" />
|
||||
<statement><![CDATA[DELETE FROM <TABLE/>
|
||||
WHERE <FIELD name='id'/> = ${key-id}]]></statement>
|
||||
</query>
|
||||
</query-defs>
|
||||
</entity>
|
||||
<entity name="status-group-server" table="status_group_server" alias="sgs">
|
||||
<member name="id" field="id" value-type="long" length="19" nullable="false" key="true" auto-increment="true" />
|
||||
<member name="slb-id" field="slb_id" value-type="long" length="19" nullable="false" />
|
||||
|
|
|
@ -311,5 +311,78 @@
|
|||
</query>
|
||||
</query-defs>
|
||||
</entity>
|
||||
<entity name="status-check-count-slb" table="status_check_count_slb" alias="sccs" do-class="StatusCheckCountSlbDo">
|
||||
<readsets>
|
||||
<readset name="DATASET_EXCLUDED">
|
||||
<member name="id"/>
|
||||
<member name="slb-id"/>
|
||||
<member name="count"/>
|
||||
<member name="data-change-last-time"/>
|
||||
</readset>
|
||||
</readsets>
|
||||
<var name="num" value-type="Integer"/>
|
||||
<var name="ids" value-type="Long[]"/>
|
||||
<query-defs>
|
||||
<query name="find-all" type="SELECT" multiple="true">
|
||||
<statement>
|
||||
<![CDATA[
|
||||
SELECT <FIELDS/>
|
||||
FROM <TABLE/>
|
||||
]]>
|
||||
</statement>
|
||||
</query>
|
||||
<query name="find-all-by-slb" type="SELECT" multiple="true">
|
||||
<param name="ids"/>
|
||||
<statement>
|
||||
<![CDATA[
|
||||
SELECT <FIELDS/>
|
||||
FROM <TABLE/>
|
||||
WHERE <FIELD name='slb-id'/> IN <IN>${ids}<IN/>
|
||||
]]>
|
||||
</statement>
|
||||
</query>
|
||||
<query name="insert" type="INSERT" batch="true">
|
||||
<statement>
|
||||
<![CDATA[
|
||||
INSERT INTO <TABLE/>(<FIELDS/>)
|
||||
VALUES(<VALUES/>)
|
||||
]]>
|
||||
</statement>
|
||||
</query>
|
||||
<query name="update" type="UPDATE" batch="true">
|
||||
<statement>
|
||||
<![CDATA[
|
||||
UPDATE <TABLE/>
|
||||
SET count = values(count),
|
||||
data_set = ${data-set}
|
||||
WHERE <FIELD name='slb-id'/> = ${slb-id}
|
||||
AND <FIELD name='data-change-last-time'/> <= ${data-change-last-time}
|
||||
]]>
|
||||
</statement>
|
||||
</query>
|
||||
<query name="count-increment" type="UPDATE" batch="true">
|
||||
<statement>
|
||||
<![CDATA[
|
||||
UPDATE <TABLE/>
|
||||
SET count = count + ${num},
|
||||
data_set = ${data-set}
|
||||
WHERE <FIELD name='slb-id'/> = ${slb-id}
|
||||
AND <FIELD name='data-change-last-time'/> <= ${data-change-last-time}
|
||||
]]>
|
||||
</statement>
|
||||
</query>
|
||||
<query name="count-decrement" type="UPDATE" batch="true">
|
||||
<statement>
|
||||
<![CDATA[
|
||||
UPDATE <TABLE/>
|
||||
SET count = count - ${num},
|
||||
data_set = ${data-set}
|
||||
WHERE <FIELD name='slb-id'/> = ${slb-id}
|
||||
AND <FIELD name='data-change-last-time'/> <= ${data-change-last-time}
|
||||
]]>
|
||||
</statement>
|
||||
</query>
|
||||
</query-defs>
|
||||
</entity>
|
||||
</entities>
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
<entity-ref name="group" />
|
||||
<entity-ref name="group-list" />
|
||||
<entity-ref name="archive" />
|
||||
<entity-ref name="slb-group-check-failure-entity-list" />
|
||||
<entity-ref name="slb-group-check-failure-entity" />
|
||||
<entity-ref name="member-action" />
|
||||
<entity-ref name="server-action" />
|
||||
<entity-ref name="conf-req" />
|
||||
|
@ -149,6 +151,14 @@
|
|||
<element name="content" value-type="String" />
|
||||
<element name="version" value-type="int" />
|
||||
</entity>
|
||||
<entity name="slb-group-check-failure-entity-list">
|
||||
<element name="total" value-type="int" />
|
||||
<entity-ref name="slb-group-check-failure-entity" type="list" names="slb-group-check-failure-entities" />
|
||||
</entity>
|
||||
<entity name="slb-group-check-failure-entity">
|
||||
<element name="slb-id" value-type="long" />
|
||||
<element name="failure-count" value-type="int" type="list" names="failure-counts" />
|
||||
</entity>
|
||||
<entity name="member-action">
|
||||
<element name="group-name" value-type="String" />
|
||||
<element name="ip" value-type="String" type="list" names="ips" />
|
||||
|
|
|
@ -539,6 +539,15 @@
|
|||
<data-source-name>zeus</data-source-name>
|
||||
</configuration>
|
||||
</component>
|
||||
<component>
|
||||
<role>org.unidal.dal.jdbc.mapping.TableProvider</role>
|
||||
<role-hint>status-check-count-slb</role-hint>
|
||||
<implementation>org.unidal.dal.jdbc.mapping.SimpleTableProvider</implementation>
|
||||
<configuration>
|
||||
<physical-table-name>status_check_count_slb</physical-table-name>
|
||||
<data-source-name>zeus</data-source-name>
|
||||
</configuration>
|
||||
</component>
|
||||
<component>
|
||||
<role>org.unidal.dal.jdbc.mapping.TableProvider</role>
|
||||
<role-hint>status-group-server</role-hint>
|
||||
|
@ -1124,6 +1133,15 @@
|
|||
</requirement>
|
||||
</requirements>
|
||||
</component>
|
||||
<component>
|
||||
<role>com.ctrip.zeus.dal.core.StatusCheckCountSlbDao</role>
|
||||
<implementation>com.ctrip.zeus.dal.core.StatusCheckCountSlbDao</implementation>
|
||||
<requirements>
|
||||
<requirement>
|
||||
<role>org.unidal.dal.jdbc.QueryEngine</role>
|
||||
</requirement>
|
||||
</requirements>
|
||||
</component>
|
||||
<component>
|
||||
<role>com.ctrip.zeus.dal.core.StatusGroupServerDao</role>
|
||||
<implementation>com.ctrip.zeus.dal.core.StatusGroupServerDao</implementation>
|
||||
|
|
|
@ -74,6 +74,7 @@
|
|||
<table name="default_page_active"/>
|
||||
<table name="default_page_file"/>
|
||||
<table name="default_page_server_active"/>
|
||||
<table name="status_check_count_slb"/>
|
||||
</group>
|
||||
</jdbc>
|
||||
</wizard>
|
||||
|
|
|
@ -244,6 +244,10 @@
|
|||
<constructor-arg type="java.lang.Class" value="com.ctrip.zeus.dal.core.SnapServerGroupDao"/>
|
||||
</bean>
|
||||
|
||||
<bean id="statusCheckCountSlbDao" factory-bean="daoFactory" factory-method="getDao">
|
||||
<constructor-arg type="java.lang.Class" value="com.ctrip.zeus.dal.core.StatusCheckCountSlbDao"/>
|
||||
</bean>
|
||||
|
||||
<bean id="statusGroupServerDao" factory-bean="daoFactory" factory-method="getDao">
|
||||
<constructor-arg type="java.lang.Class" value="com.ctrip.zeus.dal.core.StatusGroupServerDao"/>
|
||||
</bean>
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
package com.ctrip.zeus.task.check;
|
||||
|
||||
import com.ctrip.zeus.status.entity.GroupServerStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Created by zhoumy on 2016/12/6.
|
||||
*/
|
||||
public class SlbCheckStatusRollingMachineTest {
|
||||
@Test
|
||||
public void bitwiseStatus() throws Exception {
|
||||
SlbCheckStatusRollingMachine m = new SlbCheckStatusRollingMachine();
|
||||
Assert.assertEquals(0, m.bitwiseStatus(new GroupServerStatus().setHealthy(true).setMember(true).setPull(true).setServer(true)));
|
||||
Assert.assertEquals(1, m.bitwiseStatus(new GroupServerStatus().setHealthy(false).setMember(true).setPull(true).setServer(true)));
|
||||
Assert.assertEquals(1, m.bitwiseStatus(new GroupServerStatus().setHealthy(false).setMember(false).setPull(true).setServer(true)));
|
||||
}
|
||||
|
||||
}
|
|
@ -123,6 +123,19 @@
|
|||
<version>1</version>
|
||||
</archive>
|
||||
|
||||
<slb-group-check-failure-entity-list>
|
||||
<total>3</total>
|
||||
<slb-group-check-failure-entity/>
|
||||
<slb-group-check-failure-entity/>
|
||||
<slb-group-check-failure-entity/>
|
||||
</slb-group-check-failure-entity-list>
|
||||
<slb-group-check-failure-entity>
|
||||
<slb-id>12345678912</slb-id>
|
||||
<failure-count>1</failure-count>
|
||||
<failure-count>2</failure-count>
|
||||
<failure-count>3</failure-count>
|
||||
</slb-group-check-failure-entity>
|
||||
|
||||
<member-action>
|
||||
<group-name>group002</group-name>
|
||||
<ip>192.68.1</ip>
|
||||
|
|
Loading…
Reference in a new issue