traffic status - aggregate by group

This commit is contained in:
Mengyi Zhou 2015-06-25 16:04:17 +08:00
parent 7fba692751
commit 9058efed0e
6 changed files with 146 additions and 53 deletions

View file

@ -126,6 +126,8 @@ public class RollingTrafficStatus {
if (hostUpstream.length > 1)
groupName = hostUpstream[1].replaceFirst("backend_", "");
}
if (groupName.equals(""))
return;
Long upRequests = data[ReqStatusOffset.UpstreamReq.ordinal()];
double upResponseTime = (upRequests == null || upRequests == 0) ? 0 : (double) data[ReqStatusOffset.UpstreamRt.ordinal()] / upRequests;
Long requests = data[ReqStatusOffset.ReqTotal.ordinal()];

View file

@ -0,0 +1,26 @@
package com.ctrip.zeus.nginx;
import com.ctrip.zeus.nginx.entity.ReqStatus;
/**
* Created by zhoumy on 2015/6/25.
*/
public class TrafficStatusHelper {
public static ReqStatus add(ReqStatus origin, ReqStatus delta, String groupName, String hostname) {
if (origin == null)
return delta.setGroupName(groupName).setHostName(hostname);
return new ReqStatus().setHostName(hostname).setGroupName(groupName)
.setBytesInTotal(origin.getBytesInTotal() + delta.getBytesInTotal())
.setBytesOutTotal(origin.getBytesOutTotal() + delta.getBytesOutTotal())
.setSuccessCount(origin.getSuccessCount() + delta.getSuccessCount())
.setRedirectionCount(origin.getRedirectionCount() + delta.getRedirectionCount())
.setClientErrCount(origin.getClientErrCount() + delta.getClientErrCount())
.setServerErrCount(origin.getServerErrCount() + delta.getServerErrCount())
.setResponseTime(origin.getResponseTime() + delta.getResponseTime())
.setTotalRequests(origin.getTotalRequests() + delta.getTotalRequests())
.setUpRequests(origin.getUpRequests() + delta.getUpRequests())
.setUpResponseTime(origin.getUpResponseTime() + delta.getUpResponseTime())
.setUpTries(origin.getUpTries() + delta.getUpTries())
.setTime(origin.getTime());
}
}

View file

@ -1,7 +1,7 @@
package com.ctrip.zeus.restful.resource;
import com.ctrip.zeus.auth.Authorize;
import com.ctrip.zeus.exceptions.ValidationException;
import com.ctrip.zeus.exceptions.*;
import com.ctrip.zeus.model.entity.*;
import com.ctrip.zeus.nginx.entity.ReqStatus;
import com.ctrip.zeus.nginx.entity.TrafficStatus;
@ -162,12 +162,18 @@ public class StatusResource {
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response getTrafficStatusBySlb(@Context HttpServletRequest request, @Context HttpHeaders hh,
@QueryParam("slbId") Long slbId,
@QueryParam("group") Boolean group,
@QueryParam("member") Boolean memeber,
@QueryParam("count") int count) throws Exception {
if (slbId == null) {
throw new ValidationException("Missing parameters.");
}
boolean aggregatedByGroup = group == null ? false : group.booleanValue();
boolean aggregatedByMember = memeber == null ? false : memeber.booleanValue();
if (group == null && memeber == null)
aggregatedByGroup = aggregatedByMember = true;
count = count == 0 ? 1 : count;
List<ReqStatus> statuses = nginxService.getTrafficStatusBySlb(slbId, count);
List<ReqStatus> statuses = nginxService.getTrafficStatusBySlb(slbId, count, aggregatedByGroup, aggregatedByMember);
TrafficStatusList list = new TrafficStatusList().setTotal(statuses.size());
for (ReqStatus rs : statuses) {
list.addReqStatus(rs);
@ -179,14 +185,21 @@ public class StatusResource {
@Path("/traffic/group")
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response getTrafficStatusByGroupAndSlb(@Context HttpServletRequest request, @Context HttpHeaders hh,
@QueryParam("groupId") Long groupId,
@QueryParam("groupName") String groupName,
@QueryParam("slbId") Long slbId,
@QueryParam("count") int count) throws Exception {
if (slbId == null || groupName == null) {
if (slbId == null || (groupName == null && groupId == null)) {
throw new ValidationException("Missing parameters.");
}
count = count == 0 ? 1 : count;
if (groupId != null) {
Group g = groupRepository.getById(groupId);
if (g == null)
throw new com.ctrip.zeus.exceptions.NotFoundException("Group cannot be found by Id.");
groupName = g.getName();
}
List<ReqStatus> statuses = nginxService.getTrafficStatusBySlb(groupName, slbId, count);
TrafficStatusList list = new TrafficStatusList().setTotal(statuses.size());
for (ReqStatus rs : statuses) {

View file

@ -87,7 +87,7 @@ public interface NginxService {
* @param slbId the slb name
* @return the traffic statuses
*/
List<ReqStatus> getTrafficStatusBySlb(Long slbId, int count) throws Exception;
List<ReqStatus> getTrafficStatusBySlb(Long slbId, int count, boolean aggregatedByGroup, boolean aggregatedByMember) throws Exception;
List<ReqStatus> getTrafficStatusBySlb(String groupName, Long slbId, int count) throws Exception;

View file

@ -6,6 +6,7 @@ import com.ctrip.zeus.dal.core.NginxServerDo;
import com.ctrip.zeus.dal.core.NginxServerEntity;
import com.ctrip.zeus.model.entity.*;
import com.ctrip.zeus.nginx.NginxOperator;
import com.ctrip.zeus.nginx.TrafficStatusHelper;
import com.ctrip.zeus.nginx.entity.NginxResponse;
import com.ctrip.zeus.nginx.entity.NginxServerStatus;
import com.ctrip.zeus.nginx.entity.ReqStatus;
@ -22,9 +23,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author:xingchaowang
@ -55,12 +57,11 @@ public class NginxServiceImpl implements NginxService {
Long slbId = slb.getId();
int version = nginxConfService.getCurrentVersion(slbId);
NginxServerDo nginxServerDo =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
if (nginxServerDo!=null&&nginxServerDo.getVersion()>=version)
{
NginxServerDo nginxServerDo = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
if (nginxServerDo != null && nginxServerDo.getVersion() >= version) {
NginxResponse res = new NginxResponse();
res.setServerIp(ip).setSucceed(true).setOutMsg("current version is lower then or equal the version used!current version ["
+version+"],used version ["+nginxServerDo.getVersion()+"]");
+ version + "],used version [" + nginxServerDo.getVersion() + "]");
return res;
}
@ -78,23 +79,22 @@ public class NginxServiceImpl implements NginxService {
@Override
public boolean writeALLToDisk(Long slbId) throws Exception {
return writeALLToDisk(slbId,null);
return writeALLToDisk(slbId, null);
}
@Override
public List<NginxResponse> writeALLToDiskListResult(Long slbId) throws Exception {
List<NginxResponse> result = new ArrayList<>();
writeALLToDisk(slbId,result);
writeALLToDisk(slbId, result);
return result;
}
public boolean writeALLToDisk(Long slbId, List<NginxResponse> responses) throws Exception {
List<NginxResponse> result = null;
boolean sucess = true;
if (responses!=null)
{
if (responses != null) {
result = responses;
}else {
} else {
result = new ArrayList<>();
}
@ -103,7 +103,7 @@ public class NginxServiceImpl implements NginxService {
List<SlbServer> slbServers = slb.getSlbServers();
for (SlbServer slbServer : slbServers) {
logger.info("[ writeAllToDisk ]: start write to server : " + slbServer.getIp() );
logger.info("[ writeAllToDisk ]: start write to server : " + slbServer.getIp());
if (ip.equals(slbServer.getIp())) {
result.add(writeToDisk());
continue;
@ -112,16 +112,15 @@ public class NginxServiceImpl implements NginxService {
NginxResponse response = nginxClient.write();
result.add(response);
logger.info("[ writeAllToDisk ]: write to server finished : " + slbServer.getIp() );
logger.info("[ writeAllToDisk ]: write to server finished : " + slbServer.getIp());
}
if (result.size()==0){
if (result.size() == 0) {
sucess = false;
}
for (NginxResponse res : result)
{
sucess=sucess&&res.getSucceed();
for (NginxResponse res : result) {
sucess = sucess && res.getSucceed();
}
return sucess;
@ -134,32 +133,29 @@ public class NginxServiceImpl implements NginxService {
Long slbId = slb.getId();
int version = nginxConfService.getCurrentVersion(slbId);
NginxServerDo nginxServer =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
if (nginxServer!=null&&nginxServer.getVersion()>=version)
{
NginxServerDo nginxServer = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
if (nginxServer != null && nginxServer.getVersion() >= version) {
NginxResponse res = new NginxResponse();
res.setServerIp(ip).setSucceed(true).setOutMsg("current version is lower then or equal the version used,Don't update!current version ["
+version+"],used version ["+nginxServer.getVersion()+"]");
+ version + "],used version [" + nginxServer.getVersion() + "]");
return res;
}
NginxOperator nginxOperator = new NginxOperator(slb.getNginxConf(), slb.getNginxBin());
// reload configuration
NginxResponse response = nginxOperator.reloadConf();
NginxResponse response = nginxOperator.reloadConf();
response.setServerIp(ip);
if (response.getSucceed())
{
if (response.getSucceed()) {
// update the used version in the db
NginxServerDo nginxServerDo =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
NginxServerDo nginxServerDo = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
nginxServerDao.updateByPK(nginxServerDo.setVersion(version), NginxServerEntity.UPDATESET_FULL);
}
return response;
}
@Override
public List<NginxResponse> loadAll(Long slbId) throws Exception {
List<NginxResponse> result = new ArrayList<>();
@ -183,24 +179,23 @@ public class NginxServiceImpl implements NginxService {
@Override
public List<NginxResponse> writeAllAndLoadAll(Long slbId) throws Exception {
List<NginxResponse> result = new ArrayList<>();
if(!writeALLToDisk(slbId,result)){
if (!writeALLToDisk(slbId, result)) {
LOGGER.error("Write All To Disk Failed!");
StringBuilder sb = new StringBuilder(128);
sb.append("[");
for (NginxResponse res : result)
{
sb.append(String.format(NginxResponse.JSON,res)).append(",\n");
for (NginxResponse res : result) {
sb.append(String.format(NginxResponse.JSON, res)).append(",\n");
}
sb.append("]");
throw new Exception("Write All To Disk Failed!\nDetail:\n"+sb.toString());
throw new Exception("Write All To Disk Failed!\nDetail:\n" + sb.toString());
}
result = loadAll(slbId);
return result;
}
@Override
public NginxResponse dyopsLocal(String upsName,String upsCommands) throws Exception {
return new NginxOperator().dyupsLocal( upsName, upsCommands);
public NginxResponse dyopsLocal(String upsName, String upsCommands) throws Exception {
return new NginxOperator().dyupsLocal(upsName, upsCommands);
}
@Override
@ -208,15 +203,14 @@ public class NginxServiceImpl implements NginxService {
List<NginxResponse> result = new ArrayList<>();
Slb slb = slbRepository.getById(slbId);
int version = nginxConfService.getCurrentVersion(slbId);
boolean flag=false;
boolean flag = false;
String ip = S.getIp();
NginxServerDo nginxServer =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
if (nginxServer!=null&&nginxServer.getVersion()>=version)
{
NginxServerDo nginxServer = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL);
if (nginxServer != null && nginxServer.getVersion() >= version) {
NginxResponse res = new NginxResponse();
res.setServerIp(ip).setSucceed(true).setOutMsg("current version is lower then or equal the version used!current version ["
+version+"],used version ["+nginxServer.getVersion()+"]");
+ version + "],used version [" + nginxServer.getVersion() + "]");
List<NginxResponse> responses = new ArrayList<>();
responses.add(res);
return responses;
@ -226,15 +220,15 @@ public class NginxServiceImpl implements NginxService {
for (SlbServer slbServer : slbServers) {
flag = true;
NginxClient nginxClient = NginxClient.getClient("http://" + slbServer.getIp() + ":" + adminServerPort.get());
for (DyUpstreamOpsData dyup : dyups){
NginxResponse response = nginxClient.dyups(dyup.getUpstreamName(),dyup.getUpstreamCommands());
for (DyUpstreamOpsData dyup : dyups) {
NginxResponse response = nginxClient.dyups(dyup.getUpstreamName(), dyup.getUpstreamCommands());
response.setServerIp(slbServer.getIp());
result.add(response);
flag=flag&&response.getSucceed();
flag = flag && response.getSucceed();
}
if (flag){
if (flag) {
// update the used version in the db
NginxServerDo nginxServerDo =nginxServerDao.findByIp(slbServer.getIp(), NginxServerEntity.READSET_FULL);
NginxServerDo nginxServerDo = nginxServerDao.findByIp(slbServer.getIp(), NginxServerEntity.READSET_FULL);
nginxServerDao.updateByPK(nginxServerDo.setVersion(version), NginxServerEntity.UPDATESET_FULL);
}
}
@ -269,7 +263,15 @@ public class NginxServiceImpl implements NginxService {
}
@Override
public List<ReqStatus> getTrafficStatusBySlb(Long slbId, int count) throws Exception {
public List<ReqStatus> getTrafficStatusBySlb(Long slbId, int count, boolean aggregatedByGroup, boolean aggregatedByMember) throws Exception {
List<ReqStatus> result = getTrafficStatusBySlb(slbId, count);
if (aggregatedByGroup && aggregatedByMember) {
return result;
}
return aggregateByKey(result, aggregatedByGroup, aggregatedByMember);
}
private List<ReqStatus> getTrafficStatusBySlb(Long slbId, int count) throws Exception {
Slb slb = slbRepository.getById(slbId);
List<ReqStatus> list = new ArrayList<>();
for (SlbServer slbServer : slb.getSlbServers()) {
@ -283,6 +285,34 @@ public class NginxServiceImpl implements NginxService {
return list;
}
private List<ReqStatus> aggregateByKey(List<ReqStatus> raw, boolean group, boolean member) {
Map<String, ReqStatus> result = new ConcurrentHashMap<>();
for (ReqStatus reqStatus : raw) {
String key = genKey(reqStatus, group, member);
ReqStatus value = result.get(key);
if (group) {
result.put(key, TrafficStatusHelper.add(value, reqStatus, reqStatus.getGroupName(), null));
continue;
}
if (member) {
result.put(key, TrafficStatusHelper.add(value, reqStatus, null, reqStatus.getHostName()));
continue;
}
result.put(key, TrafficStatusHelper.add(value, reqStatus, null, null));
}
return new LinkedList<>(result.values());
}
private String genKey(ReqStatus reqStatus, boolean group, boolean member) {
final DateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
String time = formatter.format(reqStatus.getTime());
if (group)
return time + "-" + reqStatus.getGroupName();
if (member)
return time + "-" + reqStatus.getHostName();
return time + "";
}
@Override
public List<ReqStatus> getTrafficStatusBySlb(String groupName, Long slbId, int count) throws Exception {
Slb slb = slbRepository.getById(slbId);
@ -340,7 +370,7 @@ public class NginxServiceImpl implements NginxService {
private void writeNginxConf(Long slbId, int version, NginxOperator nginxOperator) throws Exception {
String nginxConf = nginxConfService.getNginxConf(slbId, version);
if (nginxConf == null || nginxConf.isEmpty()){
if (nginxConf == null || nginxConf.isEmpty()) {
throw new IllegalStateException("the nginx conf must not be empty!");
}
nginxOperator.writeNginxConf(nginxConf);
@ -348,7 +378,7 @@ public class NginxServiceImpl implements NginxService {
private void writeServerConf(Long slbId, int version, NginxOperator nginxOperator) throws Exception {
List<NginxConfServerData> nginxConfServerDataList = nginxConfService.getNginxConfServer(slbId, version);
for (NginxConfServerData d : nginxConfServerDataList) {
for (NginxConfServerData d : nginxConfServerDataList) {
nginxOperator.writeServerConf(d.getVsId(), d.getContent());
}
}
@ -359,7 +389,8 @@ public class NginxServiceImpl implements NginxService {
nginxOperator.writeUpstreamsConf(d.getVsId(), d.getContent());
}
}
private void cleanConfOnDisk (Long slbId, int version, NginxOperator nginxOperator) throws Exception {
private void cleanConfOnDisk(Long slbId, int version, NginxOperator nginxOperator) throws Exception {
List<NginxConfServerData> nginxConfServerDataList = nginxConfService.getNginxConfServer(slbId, version);
List<Long> vslist = new ArrayList<>();
for (NginxConfServerData d : nginxConfServerDataList) {

View file

@ -1,11 +1,11 @@
package com.ctrip.zeus.nginx;
import com.ctrip.zeus.nginx.RollingTrafficStatus;
import com.ctrip.zeus.nginx.entity.ReqStatus;
import com.ctrip.zeus.nginx.entity.TrafficStatus;
import org.junit.Assert;
import org.junit.Test;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -133,6 +133,27 @@ public class TrafficStatusTest {
assertReqStatusEquals(reqref2, result.get(2).getReqStatuses().get(0));
}
@Test
public void testAggregateByGroup() {
Date now = new Date();
ReqStatus origin = new ReqStatus().setHostName("123").setGroupName("group")
.setBytesInTotal(8348L).setBytesOutTotal(3738L)
.setTotalRequests(21L).setResponseTime((double)14/21).setSuccessCount(21L)
.setRedirectionCount(1L).setClientErrCount(2L).setServerErrCount(3L).setUpRequests(21L)
.setUpResponseTime((double)14/21).setUpTries(21L).setTime(now);
ReqStatus delta = new ReqStatus().setHostName("456").setGroupName("group")
.setBytesInTotal(2501999L).setBytesOutTotal(3760318L)
.setTotalRequests(13106L).setResponseTime((double) 13104/13106).setSuccessCount(13104L)
.setRedirectionCount(0L).setClientErrCount(2L).setServerErrCount(0L).setUpRequests(0L)
.setUpResponseTime(0.0).setUpTries(0L).setTime(now);
ReqStatus answer = new ReqStatus().setHostName(null).setGroupName("group")
.setBytesInTotal(8348L + 2501999L).setBytesOutTotal(3738L + 3760318L)
.setTotalRequests(21L + 13106L).setResponseTime((double) 14 / 21 + (double) 13104 / 13106).setSuccessCount(21L + 13104L)
.setRedirectionCount(1L).setClientErrCount(4L).setServerErrCount(3L).setUpRequests(21L)
.setUpResponseTime((double) 14 / 21 + 0.0).setUpTries(21L).setTime(now);
assertReqStatusEquals(answer, TrafficStatusHelper.add(origin, delta, "group", null));
}
private void testGetDelta(int length, int round) {
for (int i = 0; i < round; i++) {
AtomicLongArray ref = new AtomicLongArray(length);