diff --git a/src/main/java/com/ctrip/zeus/nginx/RollingTrafficStatus.java b/src/main/java/com/ctrip/zeus/nginx/RollingTrafficStatus.java index b0bbfc80..3b77a10b 100644 --- a/src/main/java/com/ctrip/zeus/nginx/RollingTrafficStatus.java +++ b/src/main/java/com/ctrip/zeus/nginx/RollingTrafficStatus.java @@ -126,6 +126,8 @@ public class RollingTrafficStatus { if (hostUpstream.length > 1) groupName = hostUpstream[1].replaceFirst("backend_", ""); } + if (groupName.equals("")) + return; Long upRequests = data[ReqStatusOffset.UpstreamReq.ordinal()]; double upResponseTime = (upRequests == null || upRequests == 0) ? 0 : (double) data[ReqStatusOffset.UpstreamRt.ordinal()] / upRequests; Long requests = data[ReqStatusOffset.ReqTotal.ordinal()]; diff --git a/src/main/java/com/ctrip/zeus/nginx/TrafficStatusHelper.java b/src/main/java/com/ctrip/zeus/nginx/TrafficStatusHelper.java new file mode 100644 index 00000000..9dbcc898 --- /dev/null +++ b/src/main/java/com/ctrip/zeus/nginx/TrafficStatusHelper.java @@ -0,0 +1,26 @@ +package com.ctrip.zeus.nginx; + +import com.ctrip.zeus.nginx.entity.ReqStatus; + +/** + * Created by zhoumy on 2015/6/25. + */ +public class TrafficStatusHelper { + public static ReqStatus add(ReqStatus origin, ReqStatus delta, String groupName, String hostname) { + if (origin == null) + return delta.setGroupName(groupName).setHostName(hostname); + return new ReqStatus().setHostName(hostname).setGroupName(groupName) + .setBytesInTotal(origin.getBytesInTotal() + delta.getBytesInTotal()) + .setBytesOutTotal(origin.getBytesOutTotal() + delta.getBytesOutTotal()) + .setSuccessCount(origin.getSuccessCount() + delta.getSuccessCount()) + .setRedirectionCount(origin.getRedirectionCount() + delta.getRedirectionCount()) + .setClientErrCount(origin.getClientErrCount() + delta.getClientErrCount()) + .setServerErrCount(origin.getServerErrCount() + delta.getServerErrCount()) + .setResponseTime(origin.getResponseTime() + delta.getResponseTime()) + .setTotalRequests(origin.getTotalRequests() + delta.getTotalRequests()) + .setUpRequests(origin.getUpRequests() + delta.getUpRequests()) + .setUpResponseTime(origin.getUpResponseTime() + delta.getUpResponseTime()) + .setUpTries(origin.getUpTries() + delta.getUpTries()) + .setTime(origin.getTime()); + } +} diff --git a/src/main/java/com/ctrip/zeus/restful/resource/StatusResource.java b/src/main/java/com/ctrip/zeus/restful/resource/StatusResource.java index 7acd26b1..63f216ac 100644 --- a/src/main/java/com/ctrip/zeus/restful/resource/StatusResource.java +++ b/src/main/java/com/ctrip/zeus/restful/resource/StatusResource.java @@ -1,7 +1,7 @@ package com.ctrip.zeus.restful.resource; import com.ctrip.zeus.auth.Authorize; -import com.ctrip.zeus.exceptions.ValidationException; +import com.ctrip.zeus.exceptions.*; import com.ctrip.zeus.model.entity.*; import com.ctrip.zeus.nginx.entity.ReqStatus; import com.ctrip.zeus.nginx.entity.TrafficStatus; @@ -162,12 +162,18 @@ public class StatusResource { @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public Response getTrafficStatusBySlb(@Context HttpServletRequest request, @Context HttpHeaders hh, @QueryParam("slbId") Long slbId, + @QueryParam("group") Boolean group, + @QueryParam("member") Boolean memeber, @QueryParam("count") int count) throws Exception { if (slbId == null) { throw new ValidationException("Missing parameters."); } + boolean aggregatedByGroup = group == null ? false : group.booleanValue(); + boolean aggregatedByMember = memeber == null ? false : memeber.booleanValue(); + if (group == null && memeber == null) + aggregatedByGroup = aggregatedByMember = true; count = count == 0 ? 1 : count; - List statuses = nginxService.getTrafficStatusBySlb(slbId, count); + List statuses = nginxService.getTrafficStatusBySlb(slbId, count, aggregatedByGroup, aggregatedByMember); TrafficStatusList list = new TrafficStatusList().setTotal(statuses.size()); for (ReqStatus rs : statuses) { list.addReqStatus(rs); @@ -179,14 +185,21 @@ public class StatusResource { @Path("/traffic/group") @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) public Response getTrafficStatusByGroupAndSlb(@Context HttpServletRequest request, @Context HttpHeaders hh, + @QueryParam("groupId") Long groupId, @QueryParam("groupName") String groupName, @QueryParam("slbId") Long slbId, @QueryParam("count") int count) throws Exception { - if (slbId == null || groupName == null) { + if (slbId == null || (groupName == null && groupId == null)) { throw new ValidationException("Missing parameters."); } count = count == 0 ? 1 : count; + if (groupId != null) { + Group g = groupRepository.getById(groupId); + if (g == null) + throw new com.ctrip.zeus.exceptions.NotFoundException("Group cannot be found by Id."); + groupName = g.getName(); + } List statuses = nginxService.getTrafficStatusBySlb(groupName, slbId, count); TrafficStatusList list = new TrafficStatusList().setTotal(statuses.size()); for (ReqStatus rs : statuses) { diff --git a/src/main/java/com/ctrip/zeus/service/nginx/NginxService.java b/src/main/java/com/ctrip/zeus/service/nginx/NginxService.java index 311799db..47bece8b 100644 --- a/src/main/java/com/ctrip/zeus/service/nginx/NginxService.java +++ b/src/main/java/com/ctrip/zeus/service/nginx/NginxService.java @@ -87,7 +87,7 @@ public interface NginxService { * @param slbId the slb name * @return the traffic statuses */ - List getTrafficStatusBySlb(Long slbId, int count) throws Exception; + List getTrafficStatusBySlb(Long slbId, int count, boolean aggregatedByGroup, boolean aggregatedByMember) throws Exception; List getTrafficStatusBySlb(String groupName, Long slbId, int count) throws Exception; diff --git a/src/main/java/com/ctrip/zeus/service/nginx/impl/NginxServiceImpl.java b/src/main/java/com/ctrip/zeus/service/nginx/impl/NginxServiceImpl.java index aaa10bb0..36aa0969 100644 --- a/src/main/java/com/ctrip/zeus/service/nginx/impl/NginxServiceImpl.java +++ b/src/main/java/com/ctrip/zeus/service/nginx/impl/NginxServiceImpl.java @@ -6,6 +6,7 @@ import com.ctrip.zeus.dal.core.NginxServerDo; import com.ctrip.zeus.dal.core.NginxServerEntity; import com.ctrip.zeus.model.entity.*; import com.ctrip.zeus.nginx.NginxOperator; +import com.ctrip.zeus.nginx.TrafficStatusHelper; import com.ctrip.zeus.nginx.entity.NginxResponse; import com.ctrip.zeus.nginx.entity.NginxServerStatus; import com.ctrip.zeus.nginx.entity.ReqStatus; @@ -22,9 +23,10 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; /** * @author:xingchaowang @@ -55,12 +57,11 @@ public class NginxServiceImpl implements NginxService { Long slbId = slb.getId(); int version = nginxConfService.getCurrentVersion(slbId); - NginxServerDo nginxServerDo =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); - if (nginxServerDo!=null&&nginxServerDo.getVersion()>=version) - { + NginxServerDo nginxServerDo = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); + if (nginxServerDo != null && nginxServerDo.getVersion() >= version) { NginxResponse res = new NginxResponse(); res.setServerIp(ip).setSucceed(true).setOutMsg("current version is lower then or equal the version used!current version [" - +version+"],used version ["+nginxServerDo.getVersion()+"]"); + + version + "],used version [" + nginxServerDo.getVersion() + "]"); return res; } @@ -78,23 +79,22 @@ public class NginxServiceImpl implements NginxService { @Override public boolean writeALLToDisk(Long slbId) throws Exception { - return writeALLToDisk(slbId,null); + return writeALLToDisk(slbId, null); } @Override public List writeALLToDiskListResult(Long slbId) throws Exception { List result = new ArrayList<>(); - writeALLToDisk(slbId,result); + writeALLToDisk(slbId, result); return result; } public boolean writeALLToDisk(Long slbId, List responses) throws Exception { List result = null; boolean sucess = true; - if (responses!=null) - { + if (responses != null) { result = responses; - }else { + } else { result = new ArrayList<>(); } @@ -103,7 +103,7 @@ public class NginxServiceImpl implements NginxService { List slbServers = slb.getSlbServers(); for (SlbServer slbServer : slbServers) { - logger.info("[ writeAllToDisk ]: start write to server : " + slbServer.getIp() ); + logger.info("[ writeAllToDisk ]: start write to server : " + slbServer.getIp()); if (ip.equals(slbServer.getIp())) { result.add(writeToDisk()); continue; @@ -112,16 +112,15 @@ public class NginxServiceImpl implements NginxService { NginxResponse response = nginxClient.write(); result.add(response); - logger.info("[ writeAllToDisk ]: write to server finished : " + slbServer.getIp() ); + logger.info("[ writeAllToDisk ]: write to server finished : " + slbServer.getIp()); } - if (result.size()==0){ + if (result.size() == 0) { sucess = false; } - for (NginxResponse res : result) - { - sucess=sucess&&res.getSucceed(); + for (NginxResponse res : result) { + sucess = sucess && res.getSucceed(); } return sucess; @@ -134,32 +133,29 @@ public class NginxServiceImpl implements NginxService { Long slbId = slb.getId(); int version = nginxConfService.getCurrentVersion(slbId); - NginxServerDo nginxServer =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); - if (nginxServer!=null&&nginxServer.getVersion()>=version) - { + NginxServerDo nginxServer = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); + if (nginxServer != null && nginxServer.getVersion() >= version) { NginxResponse res = new NginxResponse(); res.setServerIp(ip).setSucceed(true).setOutMsg("current version is lower then or equal the version used,Don't update!current version [" - +version+"],used version ["+nginxServer.getVersion()+"]"); + + version + "],used version [" + nginxServer.getVersion() + "]"); return res; } NginxOperator nginxOperator = new NginxOperator(slb.getNginxConf(), slb.getNginxBin()); // reload configuration - NginxResponse response = nginxOperator.reloadConf(); + NginxResponse response = nginxOperator.reloadConf(); response.setServerIp(ip); - if (response.getSucceed()) - { + if (response.getSucceed()) { // update the used version in the db - NginxServerDo nginxServerDo =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); + NginxServerDo nginxServerDo = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); nginxServerDao.updateByPK(nginxServerDo.setVersion(version), NginxServerEntity.UPDATESET_FULL); } return response; } - @Override public List loadAll(Long slbId) throws Exception { List result = new ArrayList<>(); @@ -183,24 +179,23 @@ public class NginxServiceImpl implements NginxService { @Override public List writeAllAndLoadAll(Long slbId) throws Exception { List result = new ArrayList<>(); - if(!writeALLToDisk(slbId,result)){ + if (!writeALLToDisk(slbId, result)) { LOGGER.error("Write All To Disk Failed!"); StringBuilder sb = new StringBuilder(128); sb.append("["); - for (NginxResponse res : result) - { - sb.append(String.format(NginxResponse.JSON,res)).append(",\n"); + for (NginxResponse res : result) { + sb.append(String.format(NginxResponse.JSON, res)).append(",\n"); } sb.append("]"); - throw new Exception("Write All To Disk Failed!\nDetail:\n"+sb.toString()); + throw new Exception("Write All To Disk Failed!\nDetail:\n" + sb.toString()); } result = loadAll(slbId); return result; } @Override - public NginxResponse dyopsLocal(String upsName,String upsCommands) throws Exception { - return new NginxOperator().dyupsLocal( upsName, upsCommands); + public NginxResponse dyopsLocal(String upsName, String upsCommands) throws Exception { + return new NginxOperator().dyupsLocal(upsName, upsCommands); } @Override @@ -208,15 +203,14 @@ public class NginxServiceImpl implements NginxService { List result = new ArrayList<>(); Slb slb = slbRepository.getById(slbId); int version = nginxConfService.getCurrentVersion(slbId); - boolean flag=false; + boolean flag = false; String ip = S.getIp(); - NginxServerDo nginxServer =nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); - if (nginxServer!=null&&nginxServer.getVersion()>=version) - { + NginxServerDo nginxServer = nginxServerDao.findByIp(ip, NginxServerEntity.READSET_FULL); + if (nginxServer != null && nginxServer.getVersion() >= version) { NginxResponse res = new NginxResponse(); res.setServerIp(ip).setSucceed(true).setOutMsg("current version is lower then or equal the version used!current version [" - +version+"],used version ["+nginxServer.getVersion()+"]"); + + version + "],used version [" + nginxServer.getVersion() + "]"); List responses = new ArrayList<>(); responses.add(res); return responses; @@ -226,15 +220,15 @@ public class NginxServiceImpl implements NginxService { for (SlbServer slbServer : slbServers) { flag = true; NginxClient nginxClient = NginxClient.getClient("http://" + slbServer.getIp() + ":" + adminServerPort.get()); - for (DyUpstreamOpsData dyup : dyups){ - NginxResponse response = nginxClient.dyups(dyup.getUpstreamName(),dyup.getUpstreamCommands()); + for (DyUpstreamOpsData dyup : dyups) { + NginxResponse response = nginxClient.dyups(dyup.getUpstreamName(), dyup.getUpstreamCommands()); response.setServerIp(slbServer.getIp()); result.add(response); - flag=flag&&response.getSucceed(); + flag = flag && response.getSucceed(); } - if (flag){ + if (flag) { // update the used version in the db - NginxServerDo nginxServerDo =nginxServerDao.findByIp(slbServer.getIp(), NginxServerEntity.READSET_FULL); + NginxServerDo nginxServerDo = nginxServerDao.findByIp(slbServer.getIp(), NginxServerEntity.READSET_FULL); nginxServerDao.updateByPK(nginxServerDo.setVersion(version), NginxServerEntity.UPDATESET_FULL); } } @@ -269,7 +263,15 @@ public class NginxServiceImpl implements NginxService { } @Override - public List getTrafficStatusBySlb(Long slbId, int count) throws Exception { + public List getTrafficStatusBySlb(Long slbId, int count, boolean aggregatedByGroup, boolean aggregatedByMember) throws Exception { + List result = getTrafficStatusBySlb(slbId, count); + if (aggregatedByGroup && aggregatedByMember) { + return result; + } + return aggregateByKey(result, aggregatedByGroup, aggregatedByMember); + } + + private List getTrafficStatusBySlb(Long slbId, int count) throws Exception { Slb slb = slbRepository.getById(slbId); List list = new ArrayList<>(); for (SlbServer slbServer : slb.getSlbServers()) { @@ -283,6 +285,34 @@ public class NginxServiceImpl implements NginxService { return list; } + private List aggregateByKey(List raw, boolean group, boolean member) { + Map result = new ConcurrentHashMap<>(); + for (ReqStatus reqStatus : raw) { + String key = genKey(reqStatus, group, member); + ReqStatus value = result.get(key); + if (group) { + result.put(key, TrafficStatusHelper.add(value, reqStatus, reqStatus.getGroupName(), null)); + continue; + } + if (member) { + result.put(key, TrafficStatusHelper.add(value, reqStatus, null, reqStatus.getHostName())); + continue; + } + result.put(key, TrafficStatusHelper.add(value, reqStatus, null, null)); + } + return new LinkedList<>(result.values()); + } + + private String genKey(ReqStatus reqStatus, boolean group, boolean member) { + final DateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm"); + String time = formatter.format(reqStatus.getTime()); + if (group) + return time + "-" + reqStatus.getGroupName(); + if (member) + return time + "-" + reqStatus.getHostName(); + return time + ""; + } + @Override public List getTrafficStatusBySlb(String groupName, Long slbId, int count) throws Exception { Slb slb = slbRepository.getById(slbId); @@ -340,7 +370,7 @@ public class NginxServiceImpl implements NginxService { private void writeNginxConf(Long slbId, int version, NginxOperator nginxOperator) throws Exception { String nginxConf = nginxConfService.getNginxConf(slbId, version); - if (nginxConf == null || nginxConf.isEmpty()){ + if (nginxConf == null || nginxConf.isEmpty()) { throw new IllegalStateException("the nginx conf must not be empty!"); } nginxOperator.writeNginxConf(nginxConf); @@ -348,7 +378,7 @@ public class NginxServiceImpl implements NginxService { private void writeServerConf(Long slbId, int version, NginxOperator nginxOperator) throws Exception { List nginxConfServerDataList = nginxConfService.getNginxConfServer(slbId, version); - for (NginxConfServerData d : nginxConfServerDataList) { + for (NginxConfServerData d : nginxConfServerDataList) { nginxOperator.writeServerConf(d.getVsId(), d.getContent()); } } @@ -359,7 +389,8 @@ public class NginxServiceImpl implements NginxService { nginxOperator.writeUpstreamsConf(d.getVsId(), d.getContent()); } } - private void cleanConfOnDisk (Long slbId, int version, NginxOperator nginxOperator) throws Exception { + + private void cleanConfOnDisk(Long slbId, int version, NginxOperator nginxOperator) throws Exception { List nginxConfServerDataList = nginxConfService.getNginxConfServer(slbId, version); List vslist = new ArrayList<>(); for (NginxConfServerData d : nginxConfServerDataList) { diff --git a/src/test/java/com/ctrip/zeus/nginx/TrafficStatusTest.java b/src/test/java/com/ctrip/zeus/nginx/TrafficStatusTest.java index e7c26e17..6e3eac7a 100644 --- a/src/test/java/com/ctrip/zeus/nginx/TrafficStatusTest.java +++ b/src/test/java/com/ctrip/zeus/nginx/TrafficStatusTest.java @@ -1,11 +1,11 @@ package com.ctrip.zeus.nginx; -import com.ctrip.zeus.nginx.RollingTrafficStatus; import com.ctrip.zeus.nginx.entity.ReqStatus; import com.ctrip.zeus.nginx.entity.TrafficStatus; import org.junit.Assert; import org.junit.Test; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Random; @@ -133,6 +133,27 @@ public class TrafficStatusTest { assertReqStatusEquals(reqref2, result.get(2).getReqStatuses().get(0)); } + @Test + public void testAggregateByGroup() { + Date now = new Date(); + ReqStatus origin = new ReqStatus().setHostName("123").setGroupName("group") + .setBytesInTotal(8348L).setBytesOutTotal(3738L) + .setTotalRequests(21L).setResponseTime((double)14/21).setSuccessCount(21L) + .setRedirectionCount(1L).setClientErrCount(2L).setServerErrCount(3L).setUpRequests(21L) + .setUpResponseTime((double)14/21).setUpTries(21L).setTime(now); + ReqStatus delta = new ReqStatus().setHostName("456").setGroupName("group") + .setBytesInTotal(2501999L).setBytesOutTotal(3760318L) + .setTotalRequests(13106L).setResponseTime((double) 13104/13106).setSuccessCount(13104L) + .setRedirectionCount(0L).setClientErrCount(2L).setServerErrCount(0L).setUpRequests(0L) + .setUpResponseTime(0.0).setUpTries(0L).setTime(now); + ReqStatus answer = new ReqStatus().setHostName(null).setGroupName("group") + .setBytesInTotal(8348L + 2501999L).setBytesOutTotal(3738L + 3760318L) + .setTotalRequests(21L + 13106L).setResponseTime((double) 14 / 21 + (double) 13104 / 13106).setSuccessCount(21L + 13104L) + .setRedirectionCount(1L).setClientErrCount(4L).setServerErrCount(3L).setUpRequests(21L) + .setUpResponseTime((double) 14 / 21 + 0.0).setUpTries(21L).setTime(now); + assertReqStatusEquals(answer, TrafficStatusHelper.add(origin, delta, "group", null)); + } + private void testGetDelta(int length, int round) { for (int i = 0; i < round; i++) { AtomicLongArray ref = new AtomicLongArray(length);