This commit is contained in:
Mengyi Zhou 2016-12-12 15:42:22 +08:00
parent cd7a54896f
commit f4f6b6d5ae
4 changed files with 78 additions and 42 deletions

View file

@ -135,6 +135,19 @@ public class StatusResource {
return responseHandler.handle(list, hh.getMediaType());
}
@GET
@Path("/check/refresh")
@Produces({MediaType.APPLICATION_JSON})
public Response getSlbCheckFailures(@Context HttpServletRequest request, @Context HttpHeaders hh,
@QueryParam("slbId") Long slbId) throws Exception {
if (slbId == null) {
throw new ValidationException("Query param slbId is required.");
}
List<GroupStatus> groupStatuses = groupStatusService.getOfflineGroupsStatusBySlbId(slbId);
slbCheckStatusRollingMachine.refresh(slbId, groupStatuses);
return responseHandler.handle("Successfully refreshed slb check count data.", hh.getMediaType());
}
@GET
@Path("/job/unlock")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})

View file

@ -3,10 +3,7 @@ package com.ctrip.zeus.service.message.queue.consumers;
import com.ctrip.zeus.model.entity.Group;
import com.ctrip.zeus.model.entity.GroupVirtualServer;
import com.ctrip.zeus.model.entity.VirtualServer;
import com.ctrip.zeus.queue.entity.GroupData;
import com.ctrip.zeus.queue.entity.Message;
import com.ctrip.zeus.queue.entity.SlbMessageData;
import com.ctrip.zeus.queue.entity.VsData;
import com.ctrip.zeus.queue.entity.*;
import com.ctrip.zeus.service.build.ConfigHandler;
import com.ctrip.zeus.service.message.queue.AbstractConsumer;
import com.ctrip.zeus.service.model.*;

View file

@ -4,7 +4,6 @@ package com.ctrip.zeus.task.check;
import com.ctrip.zeus.dal.core.StatusCheckCountSlbDao;
import com.ctrip.zeus.dal.core.StatusCheckCountSlbDo;
import com.ctrip.zeus.dal.core.StatusCheckCountSlbEntity;
import com.ctrip.zeus.lock.DbLockFactory;
import com.ctrip.zeus.status.entity.GroupServerStatus;
import com.ctrip.zeus.status.entity.GroupStatus;
import com.ctrip.zeus.task.AbstractTask;
@ -28,13 +27,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class SlbCheckStatusRollingMachine extends AbstractTask {
@Resource
private StatusCheckCountSlbDao statusCheckCountSlbDao;
@Resource
private DbLockFactory dbLockFactory;
private final static Logger logger = LoggerFactory.getLogger(SlbCheckStatusRollingMachine.class);
private final Map<Long, CircularArray<Integer>> slbCheckFailureRollingCounter = new HashMap<>();
private final Map<Long, TimestampWrapper> groupCacheBySlb = new HashMap<>();
private final Map<Long, DataSetTimeWrapper> groupCacheBySlb = new HashMap<>();
private final int bitmask;
private final int failureId;
@ -137,18 +134,18 @@ public class SlbCheckStatusRollingMachine extends AbstractTask {
}
Set<Long> obj = new HashSet<>();
Map<Long, Set<Long>> updatedCache = new HashMap<>();
Map<Long, Set<Long>> tmpGroupCache = new HashMap<>();
for (Long slbId : prevSlbIds) {
updatedCache.put(slbId, obj);
tmpGroupCache.put(slbId, obj);
}
for (Long slbId : currSlbIds) {
updatedCache.put(slbId, obj);
tmpGroupCache.put(slbId, obj);
}
try {
List<StatusCheckCountSlbDo> toUpdate = new ArrayList<>();
updateIfExpired(updatedCache.keySet(), groupCacheBySlb);
for (Long slbId : updatedCache.keySet()) {
TimestampWrapper v = groupCacheBySlb.get(slbId);
updateIfExpired(tmpGroupCache.keySet(), groupCacheBySlb);
for (Long slbId : tmpGroupCache.keySet()) {
DataSetTimeWrapper v = groupCacheBySlb.get(slbId);
Set<Long> dataSet = v.unmodifiableDataSet();
if (prevSlbIds.contains(slbId)) {
dataSet.removeAll(totalGroupIds);
@ -157,20 +154,21 @@ public class SlbCheckStatusRollingMachine extends AbstractTask {
dataSet.addAll(failureGroupIds);
}
if (!dataSet.equals(v.groupIds)) {
updatedCache.put(slbId, dataSet);
toUpdate.add(new StatusCheckCountSlbDo().setSlbId(slbId).setCount(dataSet.size()).setDataChangeLastTime(v.timestamp)
tmpGroupCache.put(slbId, dataSet);
toUpdate.add(new StatusCheckCountSlbDo().setSlbId(slbId).setCount(dataSet.size())
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))));
}
}
int[] rc = statusCheckCountSlbDao.update((StatusCheckCountSlbDo[]) toUpdate.toArray(), StatusCheckCountSlbEntity.UPDATESET_FULL);
for (int i = 0; i < rc.length; i++) {
if (rc[i] == 1) {
long slbId = toUpdate.get(i).getSlbId();
groupCacheBySlb.get(slbId).update(updatedCache.get(slbId));
}
statusCheckCountSlbDao.update((StatusCheckCountSlbDo[]) toUpdate.toArray(), StatusCheckCountSlbEntity.UPDATESET_FULL);
for (StatusCheckCountSlbDo e : toUpdate) {
groupCacheBySlb.get(e.getSlbId()).update(tmpGroupCache.get(e.getSlbId()));
}
} catch (DalException e) {
logger.error("An unexpected error occurred when migrating group status count.", e);
} catch (IOException e) {
logger.error("An unexpected error occurred when migrating group status count.", e);
}
}
@ -188,23 +186,24 @@ public class SlbCheckStatusRollingMachine extends AbstractTask {
try {
updateIfExpired(slbIds, groupCacheBySlb);
for (Long slbId : slbIds) {
int rc = 0;
TimestampWrapper v = groupCacheBySlb.get(slbId);
DataSetTimeWrapper v = groupCacheBySlb.get(slbId);
Set<Long> dataSet = v.unmodifiableDataSet();
if (failed && dataSet.add(groupId)) {
rc = statusCheckCountSlbDao.countIncrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1).setDataChangeLastTime(v.timestamp)
statusCheckCountSlbDao.countIncrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1)
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))), StatusCheckCountSlbEntity.UPDATESET_FULL);
}
if (!failed && dataSet.remove(groupId)) {
rc = statusCheckCountSlbDao.countDecrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1).setDataChangeLastTime(v.timestamp)
statusCheckCountSlbDao.countDecrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1)
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))), StatusCheckCountSlbEntity.UPDATESET_FULL);
}
if (rc == 1) {
groupCacheBySlb.get(slbId).update(dataSet);
}
v.update(dataSet);
}
} catch (DalException e) {
logger.error("An unexpected error occurred when updating group status count.", e);
} catch (IOException e) {
logger.error("An unexpected error occurred when updating group status count.", e);
}
}
@ -212,26 +211,28 @@ public class SlbCheckStatusRollingMachine extends AbstractTask {
try {
updateIfExpired(slbIds, groupCacheBySlb);
for (Long slbId : slbIds) {
TimestampWrapper v = groupCacheBySlb.get(slbId);
DataSetTimeWrapper v = groupCacheBySlb.get(slbId);
Set<Long> dataSet = v.unmodifiableDataSet();
if (dataSet.remove(groupId)) {
int rc = statusCheckCountSlbDao.countDecrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1).setDataChangeLastTime(v.timestamp)
statusCheckCountSlbDao.countDecrement(new StatusCheckCountSlbDo().setSlbId(slbId).setNum(1)
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(dataSet)))), StatusCheckCountSlbEntity.UPDATESET_FULL);
if (rc == 1) {
v.update(dataSet);
}
v.update(dataSet);
}
}
} catch (DalException e) {
logger.error("An unexpected error occurred when clearing group from slbs.", e);
} catch (IOException e) {
logger.error("An unexpected error occurred when clearing group from slbs.", e);
}
}
private void updateIfExpired(Set<Long> slbIds, Map<Long, TimestampWrapper> cache) {
private void updateIfExpired(Set<Long> slbIds, Map<Long, DataSetTimeWrapper> cache) {
List<Long> expired = new ArrayList<>();
try {
for (StatusCheckCountSlbDo e : statusCheckCountSlbDao.findAllBySlb(slbIds.toArray((Long[]) slbIds.toArray()), StatusCheckCountSlbEntity.READSET_DATASET_EXCLUDED)) {
TimestampWrapper v = cache.get(e.getSlbId());
DataSetTimeWrapper v = cache.get(e.getSlbId());
if (v == null) expired.add(e.getSlbId());
if (e.getDataChangeLastTime().compareTo(v.timestamp) > 0) expired.add(e.getSlbId());
}
@ -244,17 +245,42 @@ public class SlbCheckStatusRollingMachine extends AbstractTask {
groupSet.add(Long.parseLong(s));
}
}
cache.put(e.getSlbId(), new DataSetTimeWrapper(groupSet, e.getDataChangeLastTime()));
}
if (!cache.keySet().containsAll(slbIds)) {
HashSet<Long> diff = new HashSet<>(slbIds);
diff.removeAll(cache.keySet());
for (Long slbId : diff) {
refresh(slbId, new ArrayList<GroupStatus>());
}
}
} catch (DalException e) {
} catch (IOException e) {
}
}
private class TimestampWrapper {
public void refresh(Long slbId, List<GroupStatus> groupStatuses) throws IOException, DalException {
Set<Long> failureGroupIds = new HashSet<>();
for (GroupStatus groupStatus : groupStatuses) {
for (GroupServerStatus ss : groupStatus.getGroupServerStatuses()) {
int val = bitwiseStatus(ss);
if ((bitmask & val) == failureId) {
failureGroupIds.add(groupStatus.getGroupId());
break;
}
}
}
statusCheckCountSlbDao.insertOrUpdate(new StatusCheckCountSlbDo().setSlbId(slbId).setCount(failureGroupIds.size())
.setDataSet(new String(CompressUtils.compress(Joiner.on(",").join(failureGroupIds)))));
groupCacheBySlb.put(slbId, new DataSetTimeWrapper(failureGroupIds, new Date()));
}
private class DataSetTimeWrapper {
Set<Long> groupIds;
Date timestamp;
public TimestampWrapper(Set<Long> groupIds, Date timestamp) {
public DataSetTimeWrapper(Set<Long> groupIds, Date timestamp) {
this.groupIds = groupIds;
this.timestamp = timestamp;
}

View file

@ -341,11 +341,14 @@
]]>
</statement>
</query>
<query name="insert" type="INSERT" batch="true">
<query name="insert-or-update" type="INSERT" batch="true">
<statement>
<![CDATA[
INSERT INTO <TABLE/>(<FIELDS/>)
VALUES(<VALUES/>)
ON DUPLICATE KEY UPDATE
count = values(count),
data_set = ${data-set}
]]>
</statement>
</query>
@ -356,7 +359,6 @@
SET count = values(count),
data_set = ${data-set}
WHERE <FIELD name='slb-id'/> = ${slb-id}
AND <FIELD name='data-change-last-time'/> <= ${data-change-last-time}
]]>
</statement>
</query>
@ -367,7 +369,6 @@
SET count = count + ${num},
data_set = ${data-set}
WHERE <FIELD name='slb-id'/> = ${slb-id}
AND <FIELD name='data-change-last-time'/> <= ${data-change-last-time}
]]>
</statement>
</query>
@ -378,7 +379,6 @@
SET count = count - ${num},
data_set = ${data-set}
WHERE <FIELD name='slb-id'/> = ${slb-id}
AND <FIELD name='data-change-last-time'/> <= ${data-change-last-time}
]]>
</statement>
</query>