Merge branch 'github_dev' of https://github.com/ctripcorp/zeus into github_dev

This commit is contained in:
fanqq 2015-12-03 13:32:16 +08:00
commit bcd2671267
18 changed files with 629 additions and 150 deletions

View file

@ -17,7 +17,7 @@
<dependency>
<groupId>com.dianping.cat</groupId>
<artifactId>cat-client</artifactId>
<version>1.1.5</version>
<version>1.3.6</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
@ -189,11 +189,6 @@
<artifactId>logback-access</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.clogging</groupId>
<artifactId>clogging-agent</artifactId>
<version>3.3.2</version>
</dependency>
<!-- Log End -->
<!-- Jetty -->

View file

@ -1,37 +0,0 @@
package com.ctrip.zeus.logstats;
import com.ctrip.zeus.logstats.analyzer.AccessLogStatsAnalyzer;
import com.ctrip.zeus.logstats.analyzer.LogStatsAnalyzer;
import java.io.IOException;
/**
* Created by zhoumy on 2015/11/17.
*/
public class LogStatsReportWorker implements LogStatsWorker {
private static final int SwitchInterval = 60 * 1000;
private final LogStatsAnalyzer analyzer;
private final StatsDelegate reporter;
private int nextStartTime;
public LogStatsReportWorker() throws IOException {
analyzer = new AccessLogStatsAnalyzer();
reporter = new StatsDelegate<String>() {
@Override
public void delegate(String input) {
}
};
}
@Override
public void doJob() {
if (nextStartTime + SwitchInterval < System.currentTimeMillis()) {
try {
analyzer.analyze(reporter);
} catch (IOException e) {
e.printStackTrace();
}
}
nextStartTime += SwitchInterval;
}
}

View file

@ -1,11 +0,0 @@
package com.ctrip.zeus.logstats;
import java.io.IOException;
/**
* Created by zhoumy on 2015/11/17.
*/
public interface LogStatsWorker {
void doJob() throws IOException;
}

View file

@ -9,30 +9,30 @@ import com.ctrip.zeus.logstats.parser.LogParser;
import com.ctrip.zeus.logstats.tracker.AccessLogTracker;
import com.ctrip.zeus.logstats.tracker.LogTracker;
import com.ctrip.zeus.logstats.tracker.LogTrackerStrategy;
import com.ctrip.zeus.service.build.conf.LogFormat;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
/**
* Created by zhoumy on 2015/11/16.
*/
public class AccessLogStatsAnalyzer implements LogStatsAnalyzer {
private static final String AccessLogFormat =
"[$time_local] $host $hostname $server_addr $request_method $uri " +
"\"$query_string\" $server_port $remote_user $remote_addr $http_x_forwarded_for " +
"$server_protocol \"$http_user_agent\" \"$cookie_COOKIE\" \"$http_referer\" " +
"$host $status $body_bytes_sent $request_time $upstream_response_time ";
private static final int TrackerReadSize = 2048;
private static final String AccessLogFormat = LogFormat.getMainCompactString();
private static final DynamicIntProperty TrackerReadSize = DynamicPropertyFactory.getInstance().getIntProperty("accesslog.tracker.readsize", 1024 * 5);
private final LogStatsAnalyzerConfig config;
private final LogTracker logTracker;
private final LogParser logParser;
public AccessLogStatsAnalyzer() throws IOException {
this(new LogStatsAnalyzerConfigBuilder()
.setLogFormat(AccessLogFormat)
.setLogFilename("/opt/app/nginx/access.log")
.setTrackerReadSize(TrackerReadSize)
.allowTracking("access-foot-print.log")
.setLogFormat(new AccessLogLineFormat(AccessLogFormat).generate())
.setLogFilename("/opt/logs/nginx/access.log")
.setTrackerReadSize(TrackerReadSize.get())
.allowTracking("access-track.log")
.build());
}
@ -57,6 +57,11 @@ public class AccessLogStatsAnalyzer implements LogStatsAnalyzer {
logTracker.stop();
}
@Override
public boolean reachFileEnd() throws IOException {
return logTracker.reachFileEnd();
}
@Override
public String analyze() throws IOException {
String raw = logTracker.move();
@ -74,7 +79,7 @@ public class AccessLogStatsAnalyzer implements LogStatsAnalyzer {
}
public static class LogStatsAnalyzerConfigBuilder {
private String logFormat;
private LineFormat logFormat;
private String logFilename;
private String trackingFilename;
private boolean allowTracking;
@ -85,7 +90,7 @@ public class AccessLogStatsAnalyzer implements LogStatsAnalyzer {
return this;
}
public LogStatsAnalyzerConfigBuilder setLogFormat(String logFormat) {
public LogStatsAnalyzerConfigBuilder setLogFormat(LineFormat logFormat) {
this.logFormat = logFormat;
return this;
}
@ -102,11 +107,9 @@ public class AccessLogStatsAnalyzer implements LogStatsAnalyzer {
}
public LogStatsAnalyzerConfig build() throws IOException {
LineFormat format = new AccessLogLineFormat();
format.setFormat(logFormat);
File f = new File(logFilename);
if (f.exists() && f.isFile()) {
String rootDir = f.getParentFile().getAbsolutePath();
String rootDir = f.getAbsoluteFile().getParentFile().getAbsolutePath();
LogTrackerStrategy strategy = new LogTrackerStrategy()
.setAllowLogRotate(true)
.setAllowTrackerMemo(allowTracking)
@ -115,7 +118,7 @@ public class AccessLogStatsAnalyzer implements LogStatsAnalyzer {
.setLogFilename(logFilename)
.setReadSize(trackerReadSize);
return new LogStatsAnalyzerConfig()
.addFormat(format)
.addFormat(logFormat)
.setLogTracker(new AccessLogTracker(strategy));
} else {
throw new IOException(logFilename + " is not a file or does not exist.");

View file

@ -15,6 +15,15 @@ public interface LogStatsAnalyzer {
void stop() throws IOException;
/**
* This method check if the reading cursor has reached the end of the file at the time
* when it is called. It might return a wrong value if file keeps growing.
* This method is recommended to use iff reading a read-only file.
* @return true if file cursor has reached the end of the file.
* @throws IOException
*/
boolean reachFileEnd() throws IOException;
String analyze() throws IOException;
void analyze(StatsDelegate<String> delegate) throws IOException;

View file

@ -23,7 +23,8 @@ public class JsonStringWriter {
.append(",");
}
}
sb.deleteCharAt(sb.length() - 1);
if (keyValues.size() > 0)
sb.deleteCharAt(sb.length() - 1);
sb.append("}");
return sb.toString();
}

View file

@ -1,7 +1,9 @@
package com.ctrip.zeus.logstats.common;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
@ -12,6 +14,15 @@ public class AccessLogLineFormat implements LineFormat {
private String patternString;
private Pattern pattern;
private String[] keys;
private final Map<String, String> patternRegistry = new HashMap<>();
public AccessLogLineFormat() {
}
public AccessLogLineFormat(String format) {
setFormat(format);
registerPatternForKey("http_x_forwarded_for", "(-|(?:[0-9.]+(?:, [0-9.]+)*))");
}
@Override
public String getFormat() {
@ -34,12 +45,24 @@ public class AccessLogLineFormat implements LineFormat {
}
@Override
public void setFormat(String format) {
public LineFormat setFormat(String format) {
this.format = format;
parsePattern(format);
return this;
}
protected void parsePattern(String format) {
@Override
public LineFormat registerPatternForKey(String key, String pattern) {
patternRegistry.put(key, pattern);
return this;
}
@Override
public LineFormat generate() {
parsePattern();
return this;
}
protected void parsePattern() {
List<String> keyList = new ArrayList<>();
StringBuilder sb = new StringBuilder();
char[] formatChars = format.toCharArray();
@ -48,10 +71,14 @@ public class AccessLogLineFormat implements LineFormat {
if (formatChars[i] == '$') {
int start = i;
// word characters
sb.append(depth % 2 == 0 ? "(\\S+)" : "(.+)");
while (++i < formatChars.length && (Character.isLetterOrDigit(formatChars[i]) || formatChars[i] == '_')) {
}
keyList.add(i - start == 0 ? "key" + i : new String(formatChars, start + 1, i - start - 1));
String k = i - start == 0 ? "key" + i : new String(formatChars, start + 1, i - start - 1);
keyList.add(k);
if (patternRegistry.containsKey(k))
sb.append(patternRegistry.get(k));
else
sb.append(depth % 2 == 0 ? "(\\S+)" : "(.*)");
}
if (i >= formatChars.length)
break;

View file

@ -20,5 +20,9 @@ public interface LineFormat {
String[] getKeys();
void setFormat(String format);
LineFormat setFormat(String format);
LineFormat registerPatternForKey(String key, String pattern);
LineFormat generate();
}

View file

@ -13,13 +13,16 @@ import java.util.Scanner;
* Created by zhoumy on 2015/11/13.
*/
public class AccessLogTracker implements LogTracker {
private final int TrackLatch = 10;
private final LogTrackerStrategy strategy;
private final String logFilename;
private final int size;
private final ByteBuffer buffer;
private RandomAccessFile raf;
private FileChannel fileChannel;
private int offset;
private long offset;
private String offsetValue = "";
private int rollingLogCounter;
private File trackingFile;
private boolean allowTracking;
@ -58,6 +61,11 @@ public class AccessLogTracker implements LogTracker {
return strategy;
}
@Override
public boolean reachFileEnd() throws IOException {
return offset == fileChannel.size();
}
@Override
public void start() throws IOException {
raf = new RandomAccessFile(getLogFilename(), "r");
@ -74,7 +82,7 @@ public class AccessLogTracker implements LogTracker {
offset = 0;
else {
// check if log rotate has been done
fileChannel.position(curr.rOffset - curr.rValue.getBytes().length - 1);
fileChannel.position(curr.rOffset - curr.rValue.getBytes().length - 2);
String rafline = raf.readLine();
if (rafline.equals(curr.rValue))
offset = curr.rOffset;
@ -85,6 +93,8 @@ public class AccessLogTracker implements LogTracker {
@Override
public void stop() throws IOException {
rollingLogCounter = TrackLatch;
tryLog();
if (fileChannel != null)
fileChannel.close();
if (raf != null)
@ -100,62 +110,79 @@ public class AccessLogTracker implements LogTracker {
@Override
public void fastMove(final StatsDelegate<String> delegator) throws IOException {
if (offset > fileChannel.position()) {
offset = 0;
}
buffer.clear();
try {
if (fileChannel.read(buffer) == -1)
return;
} catch (IOException ex) {
stop();
}
buffer.flip();
boolean eol = false;
int colOffset = 0;
byte[] line = new byte[size];
String fresh = "";
if (offset > fileChannel.size()) {
offset = 0;
fileChannel.position(offset);
}
buffer.clear();
try {
if (fileChannel.read(buffer) == -1)
return;
} catch (IOException ex) {
stop();
}
buffer.flip();
boolean eol = false;
int colOffset = 0;
byte[] line = new byte[size];
while (buffer.hasRemaining()) {
while (!eol && buffer.hasRemaining()) {
byte b;
switch (b = buffer.get()) {
case -1:
case '\n':
eol = true;
fresh = new String(line, 0, colOffset);
delegator.delegate(fresh);
offset += ++colOffset;
break;
case '\r':
eol = true;
if ((buffer.get()) != '\n')
buffer.position(colOffset);
else
colOffset++;
fresh = new String(line, 0, colOffset);
delegator.delegate(fresh);
offset += ++colOffset;
break;
default:
line[colOffset] = b;
++colOffset;
break;
} // end of switch
}// end of while !eol
colOffset = 0;
eol = false;
while (buffer.hasRemaining()) {
while (!eol && buffer.hasRemaining()) {
byte b;
switch (b = buffer.get()) {
case -1:
case '\n':
eol = true;
offsetValue = new String(line, 0, colOffset);
delegator.delegate(offsetValue);
offset += ++colOffset;
break;
case '\r':
eol = true;
offsetValue = new String(line, 0, colOffset);
if ((buffer.get()) != '\n')
buffer.position(colOffset);
else
colOffset++;
delegator.delegate(offsetValue);
offset += ++colOffset;
break;
default:
line[colOffset] = b;
++colOffset;
break;
} // end of switch
}// end of while !eol
colOffset = 0;
eol = false;
}
fileChannel.position(offset);
tryLog();
} catch (IOException ex) {
logger.error("Some error occurred when tracking access.log.", ex);
hotfix();
}
fileChannel.position(offset);
tryLog(offset, fresh);
}
private void tryLog(Integer offset, String value) {
if (allowTracking) {
private void hotfix() throws IOException {
if (fileChannel != null)
fileChannel.close();
if (raf != null)
raf.close();
fileChannel = null;
raf = null;
start();
fileChannel.position(offset);
}
private void tryLog() {
rollingLogCounter++;
if (allowTracking && (TrackLatch <= rollingLogCounter)) {
OutputStream os = null;
try {
os = new FileOutputStream(trackingFile);
String output = offset.toString() + "\n" + value;
String output = Long.valueOf(offset).toString() + "\n" + offsetValue;
os.write(output.getBytes());
} catch (FileNotFoundException e) {
e.printStackTrace();
@ -169,6 +196,7 @@ public class AccessLogTracker implements LogTracker {
e.printStackTrace();
}
}
rollingLogCounter = 0;
}
}
@ -195,7 +223,7 @@ public class AccessLogTracker implements LogTracker {
}
private class RecordOffset {
int rOffset;
long rOffset;
String rValue;
}
}

View file

@ -15,6 +15,15 @@ public interface LogTracker {
LogTrackerStrategy getStrategy();
/**
* This method check if the reading cursor has reached the end of the file at the time
* when it is called. It might return a wrong value if file keeps growing.
* This method is recommended to use iff reading a read-only file.
* @return true if file cursor has reached the end of the file.
* @throws IOException
*/
boolean reachFileEnd() throws IOException;
void start() throws IOException;
void stop() throws IOException;

View file

@ -0,0 +1,25 @@
package com.ctrip.zeus.service.build.conf;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
/**
* Created by zhoumy on 2015/11/20.
*/
public class LogFormat {
private static DynamicStringProperty logFormat = DynamicPropertyFactory.getInstance().getStringProperty("slb.nginx.log-format",
"'[$time_local] $host $hostname $server_addr $request_method $uri '\n" +
"'\"$query_string\" $server_port $remote_user $remote_addr $http_x_forwarded_for '\n" +
"'$server_protocol \"$http_user_agent\" \"$cookie_COOKIE\" \"$http_referer\" '\n" +
"'$host $status $body_bytes_sent $request_time $upstream_response_time '\n" +
"'$upstream_addr $upstream_status $proxy_host'"
);
public static String getMain() {
return logFormat.get();
}
public static String getMainCompactString() {
return logFormat.get().replaceAll("'", "").replaceAll("\\n", "");
}
}

View file

@ -19,15 +19,6 @@ public class NginxConf {
private static DynamicIntProperty checkShmSize = DynamicPropertyFactory.getInstance().getIntProperty("slb.nginx.checkShmSize", 32);
private static DynamicStringProperty logLevel = DynamicPropertyFactory.getInstance().getStringProperty("slb.nginx.logLevel", "");
private static DynamicStringProperty logFormat = DynamicPropertyFactory.getInstance().getStringProperty("slb.nginx.log-format",
"log_format main '[$time_local] $host $hostname $server_addr $request_method $uri '\n" +
"'\"$query_string\" $server_port $remote_user $remote_addr $http_x_forwarded_for '\n" +
"'$server_protocol \"$http_user_agent\" \"$cookie_COOKIE\" \"$http_referer\" '\n" +
"'$host $status $body_bytes_sent $request_time $upstream_response_time '\n" +
"'$upstream_addr $upstream_status $proxy_host';\n"
);
private static DynamicIntProperty dyupsPort = DynamicPropertyFactory.getInstance().getIntProperty("dyups.port", 8081);
public static String generate(Slb slb) {
@ -54,7 +45,7 @@ public class NginxConf {
b.append("default_type application/octet-stream;\n");
b.append("keepalive_timeout 65;\n");
b.append(logFormat.get());
b.append("log_format main " + LogFormat.getMain() + ";\n");
b.append("access_log /opt/logs/nginx/access.log main;\n");
b.append("server_names_hash_max_size ").append(serverNamesHashMaxSize.get()).append(";\n");
b.append("server_names_hash_bucket_size ").append(serverNamesHashBucketSize.get()).append(";\n");

View file

@ -0,0 +1,102 @@
package com.ctrip.zeus.service.report.stats;
import com.ctriposs.tools.reqmetrics.StatsKey;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import java.util.Stack;
/**
* Created by zhoumy on 2015/11/20.
*/
public class AccessLogRecord {
private final String costKey = "request_time";
private final String statusKey = "status";
private final String upCostKey = "upstream_response_time";
private final Long slbId;
private final Set<String> keys = ImmutableSet.of("host", "uri", "server_port", statusKey, costKey,
upCostKey, "upstream_addr", "upstream_status", "proxy_host");
private final StatsKey statsKey;
public AccessLogRecord(Long slbId, String value) {
this.slbId = slbId;
this.statsKey = parse(value);
}
public StatsKey getStatsKey() {
return statsKey;
}
public long getResponseSize() {
return 0L;
}
public long getCost() {
return Double.valueOf(statsKey.getTags().get(costKey)).longValue();
}
public String getStatus() {
return statsKey.getTags().get(statusKey);
}
private StatsKey parse(String value) {
StatsKey result = new StatsKey("slb.request")
.addTag("slbId", slbId.toString())
.reportCount(false)
.reportStatus(false)
.reportRequestSize(false)
.reportResponseSize(false);
Stack<String> grammarChecker = new Stack<>();
StringBuilder sb = new StringBuilder();
for (char c : value.toCharArray()) {
switch (c) {
case '{':
if (grammarChecker.empty())
grammarChecker.push("{");
else
sb.append(c);
break;
case '"':
if (grammarChecker.peek().equals("\"")) {
grammarChecker.pop();
String tmp = sb.toString();
grammarChecker.push(tmp);
sb.setLength(0);
} else {
grammarChecker.push("\"");
}
break;
case ',':
case '}':
if (grammarChecker.peek().equals("\"")) {
sb.append(c);
} else {
String v = grammarChecker.pop();
String k = grammarChecker.pop();
if (keys.contains(k)) {
if (k.equals(costKey) || k.equals(upCostKey)) {
if (v.equals("-"))
result.addTag(k, "0");
else
result.addTag(k, Double.valueOf(Double.parseDouble(v) * 1000.0).toString());
} else {
result.addTag(k, v);
}
}
}
break;
case ':':
if (grammarChecker.peek().equals("\"")) {
sb.append(c);
}
break;
default:
sb.append(c);
}
}
grammarChecker.pop();
if (!grammarChecker.isEmpty())
return null;
return result;
}
}

View file

@ -41,7 +41,6 @@
<property name="startDelay" value="1000"/>
<property name="repeatInterval" value="3600000"/>
</bean>
<bean id="taskCleanDetail" class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<property name="jobClass" value="com.ctrip.zeus.service.task.TaskCleanJob"/>
<property name="jobDataMap">

View file

@ -3,6 +3,9 @@ package com.ctrip.zeus.logstats;
import com.ctrip.zeus.logstats.analyzer.AccessLogStatsAnalyzer;
import com.ctrip.zeus.logstats.analyzer.LogStatsAnalyzer;
import com.ctrip.zeus.logstats.analyzer.LogStatsAnalyzerConfig;
import com.ctrip.zeus.logstats.common.AccessLogLineFormat;
import com.ctrip.zeus.logstats.common.LineFormat;
import com.ctrip.zeus.logstats.tracker.LogTracker;
import org.junit.Assert;
import org.junit.Test;
@ -10,6 +13,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -17,12 +21,13 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class AnalyzerTest {
private static final String AccessLogFormat =
private static final String AccessLogFormatString =
"[$time_local] $host $hostname $server_addr $request_method $uri " +
"\"$query_string\" $server_port $remote_user $remote_addr $http_x_forwarded_for " +
"$server_protocol \"$http_user_agent\" \"$cookie_COOKIE\" \"$http_referer\" " +
"$host $status $body_bytes_sent $request_time $upstream_response_time " +
"$upstream_addr $upstream_status";
private static final LineFormat AccessLogFormat = new AccessLogLineFormat(AccessLogFormatString).generate();
private static final int TrackerReadSize = 2048;
private final URL accessLogUrl = this.getClass().getClassLoader().getResource("com.ctrip.zeus.service/access.log");
@ -97,5 +102,161 @@ public class AnalyzerTest {
s.close();
}
Assert.assertEquals(14, count.get());
if (f.exists())
f.delete();
}
@Test
public void testTrackerWhenLogRotating() throws Exception {
final String logRotateFilename = "log-rotate-access.log";
final String logRotateTrackingFilename = "log-rotate-tracker.log";
File lf = new File(logRotateFilename);
if (lf.exists())
lf.delete();
File trf = new File(logRotateTrackingFilename);
if (trf.exists())
trf.delete();
final long endTime = System.currentTimeMillis() + 60 * 1000L;
final AtomicInteger writerCount = new AtomicInteger();
final AtomicInteger trackerCount = new AtomicInteger();
final CountDownLatch writerLatch = new CountDownLatch(1);
final CountDownLatch trackerLatch = new CountDownLatch(1);
Thread writer = new Thread() {
@Override
public void run() {
TestLogWriter writer = new TestLogWriter(logRotateFilename, 10 * 1000L);
try {
writer.run(endTime);
writer.stop();
} catch (Exception e) {
e.printStackTrace();
}
writerCount.set(writer.getCount());
writerLatch.countDown();
}
};
Thread reader = new Thread() {
@Override
public void run() {
final AccessLogStatsAnalyzer.LogStatsAnalyzerConfigBuilder builder;
try {
Thread.sleep(30L);
builder = new AccessLogStatsAnalyzer.LogStatsAnalyzerConfigBuilder()
.setLogFormat(AccessLogFormat)
.setLogFilename(logRotateFilename)
.setTrackerReadSize(TrackerReadSize)
.allowTracking(logRotateTrackingFilename);
File f = new File(logRotateTrackingFilename);
if (f.exists())
f.delete();
StatsDelegate reporter = new StatsDelegate<String>() {
@Override
public void delegate(String input) {
trackerCount.incrementAndGet();
}
};
LogTracker tracker = builder.build().getLogTracker();
tracker.start();
while (System.currentTimeMillis() < endTime + 30L) {
tracker.fastMove(reporter);
}
trackerLatch.countDown();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
writer.start();
reader.start();
writerLatch.await();
trackerLatch.await();
Assert.assertEquals(writerCount.get(), trackerCount.get());
if (lf.exists())
lf.delete();
if (trf.exists())
trf.delete();
}
@Test
public void testAnalyzerPerformanceWhenLogRotating() throws Exception {
final String logRotateFilename = "log-rotate-perf-access.log";
final String logRotateTrackingFilename = "log-rotate-perf-tracker.log";
File lf = new File(logRotateFilename);
if (lf.exists())
lf.delete();
File trf = new File(logRotateTrackingFilename);
if (trf.exists())
trf.delete();
final long endTime = System.currentTimeMillis() + 60 * 1000L;
final AtomicInteger writerCount = new AtomicInteger();
final AtomicInteger readerCount = new AtomicInteger();
final CountDownLatch writerLatch = new CountDownLatch(1);
final CountDownLatch readerLatch = new CountDownLatch(1);
Thread writer = new Thread() {
@Override
public void run() {
TestLogWriter writer = new TestLogWriter(logRotateFilename, 10 * 1000L);
try {
writer.run(endTime);
writer.stop();
} catch (Exception e) {
e.printStackTrace();
}
writerCount.set(writer.getCount());
writerLatch.countDown();
}
};
Thread reader = new Thread() {
@Override
public void run() {
final AccessLogStatsAnalyzer.LogStatsAnalyzerConfigBuilder builder;
try {
builder = new AccessLogStatsAnalyzer.LogStatsAnalyzerConfigBuilder()
.setLogFormat(AccessLogFormat)
.setLogFilename(logRotateFilename)
.setTrackerReadSize(TrackerReadSize)
.allowTracking(logRotateTrackingFilename);
File f = new File(logRotateTrackingFilename);
if (f.exists())
f.delete();
StatsDelegate reporter = new StatsDelegate<String>() {
@Override
public void delegate(String input) {
readerCount.incrementAndGet();
}
};
LogStatsAnalyzer analyzer = new AccessLogStatsAnalyzer(builder.build());
analyzer.start();
while (System.currentTimeMillis() < endTime + 100L) {
analyzer.analyze(reporter);
}
readerLatch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
};
writer.start();
reader.start();
writerLatch.await();
readerLatch.await();
System.out.println("writer count: " + writerCount.get());
System.out.println("reader count: " + readerCount.get());
Assert.assertTrue((readerCount.get() / 60) > 20000);
if (lf.exists())
lf.delete();
if (trf.exists())
trf.delete();
}
}

View file

@ -23,7 +23,7 @@ public class LogParsingTest {
"$server_protocol \"$http_user_agent\" \"$cookie_COOKIE\" \"$http_referer\" " +
"$host $status $body_bytes_sent $request_time $upstream_response_time " +
"$upstream_addr $upstream_status";
private final String log = "[17/Nov/2015:15:10:44 +0800] ws.you.ctripcorp.com vms09191 10.8.95.27 POST /gsapi/api/xml/GetRecmdProduct \"-\" 80 - 10.8.106.66 - HTTP/1.1 \"-\" \"-\" \"-\" ws.you.ctripcorp.com 200 521 0.042 0.039 10.8.168.228:80 200";
@Test
public void testFormatParsing() {
String[] expectedKeys = {"time_local", "host", "hostname", "server_addr", "request_method", "uri",
@ -31,21 +31,29 @@ public class LogParsingTest {
"server_protocol", "http_user_agent", "cookie_COOKIE", "http_referer",
"host", "status", "body_bytes_sent", "request_time", "upstream_response_time",
"upstream_addr", "upstream_status"};
String expectedPatternString = "\\[(.+)\\]\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\\"(.+)\\\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\\"(.+)\\\"\\s+\\\"(.+)\\\"\\s+\\\"(.+)\\\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)";
String expectedPatternString = "\\[(.*)\\]\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\\"(.*)\\\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\\"(.*)\\\"\\s+\\\"(.*)\\\"\\s+\\\"(.*)\\\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)";
LineFormat lineFormat = new AccessLogLineFormat();
lineFormat.setFormat(AccessLogFormat);
LineFormat lineFormat = new AccessLogLineFormat().setFormat(AccessLogFormat).generate();
String[] actualKeys = lineFormat.getKeys();
Assert.assertArrayEquals(expectedKeys, actualKeys);
Assert.assertEquals(expectedPatternString, lineFormat.getPatternString());
}
@Test
public void testParser() {
String[] expectedValues = {"17/Nov/2015:15:10:44 +0800", "ws.you.ctripcorp.com", "vms09191", "10.8.95.27", "POST", "/gsapi/api/xml/GetRecmdProduct", "-", "80", "-", "10.8.106.66", "-", "HTTP/1.1", "-", "-", "-", "ws.you.ctripcorp.com", "200", "521", "0.042", "0.039", "10.8.168.228:80", "200"};
public void testFormatRegistry() {
String expectedPatternString = "\\[(.*)\\]\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+\\\"(.*)\\\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(-|(?:[0-9.]+(?:, [0-9.]+)*))\\s+(\\S+)\\s+\\\"(.*)\\\"\\s+\\\"(.*)\\\"\\s+\\\"(.*)\\\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)";
LineFormat lineFormat = new AccessLogLineFormat()
.setFormat(AccessLogFormat)
.registerPatternForKey("http_x_forwarded_for", "(-|(?:[0-9.]+(?:, [0-9.]+)*))")
.generate();
Assert.assertEquals(expectedPatternString, lineFormat.getPatternString());
}
LineFormat lineFormat = new AccessLogLineFormat();
lineFormat.setFormat(AccessLogFormat);
@Test
public void testParser() {
final String log = "[17/Nov/2015:15:10:44 +0800] ws.you.ctripcorp.com vms09191 10.8.95.27 POST /gsapi/api/xml/GetRecmdProduct \"-\" 80 - 10.8.106.66 - HTTP/1.1 \"-\" \"-\" \"-\" ws.you.ctripcorp.com 200 521 0.042 0.039 10.8.168.228:80 200";
String[] expectedValues = {"17/Nov/2015:15:10:44 +0800", "ws.you.ctripcorp.com", "vms09191", "10.8.95.27", "POST", "/gsapi/api/xml/GetRecmdProduct", "-", "80", "-", "10.8.106.66", "-", "HTTP/1.1", "-", "-", "-", "ws.you.ctripcorp.com", "200", "521", "0.042", "0.039", "10.8.168.228:80", "200"};
LineFormat lineFormat = new AccessLogLineFormat().setFormat(AccessLogFormat).generate();
List<LineFormat> formats = new ArrayList<>();
formats.add(lineFormat);
final LogParser parser = new AccessLogParser(formats);
@ -56,11 +64,42 @@ public class LogParsingTest {
Assert.assertArrayEquals(expectedValues, actualValues.toArray(new String[actualValues.size()]));
}
@Test
public void testParser2() {
String log1 = "[17/Nov/2015:15:10:44 +0800] ws.you.ctripcorp.com vms09191 10.8.95.27 POST /gsapi/api/xml/GetRecmdProduct \"-\" 80 - 10.8.106.66 - HTTP/1.1 \"-\" \"-\" \"-\" ws.you.ctripcorp.com 200 521 0.042 0.039 10.8.168.228:80 200";
String log2 = "[02/Dec/2015:13:02:19 +0800] ws.util.you.ctripcorp.com vms09191 10.8.95.27 POST /bgmgmt/api/json/ExecUpdateContentProcess \"-\" 80 - 10.15.114.31 10.32.65.134, 10.15.202.207 HTTP/1.1 \"python-requests/2.2.0 CPython/2.7.6 Windows/7\" \"-\" \"-\" ws.util.you.ctripcorp.com 200 143 0.005 0.005 10.8.24.101:80 200";
String log3 = "[02/Dec/2015:13:43:03 +0800] ws.mobile.qiche.ctripcorp.com vms09191 10.8.95.27 POST /app/index.php \"param=/api/home&method=config.getAppConfig&_fxpcqlniredt=09031130410105805720\" 80 - 10.15.138.65 117.136.75.139 HTTP/1.1 \"\" \"-\" \"http://m.ctrip.com/webapp/train/?allianceid=106334&sid=728666&ouid=4&sourceid=2377\" ws.mobile.qiche.ctripcorp.com 200 99 0.017 0.017 10.8.119.73:80 200";
String log4 = "[02/Dec/2015:13:00:10 +0800] ws.schedule.ctripcorp.com vms09191 10.8.95.27 POST /UbtPushApi/UserActionReceiveHandler.ashx \"-\" 80 - 10.8.91.104 - HTTP/1.1 \"Java/THttpClient/HC\" \"-\" \"-\" ws.schedule.ctripcorp.com 200 24 0.007 0.007 10.8.168.238:80 200";
LineFormat lineFormat = new AccessLogLineFormat()
.setFormat(AccessLogFormat)
.registerPatternForKey("http_x_forwarded_for", "(-|(?:[0-9.]+(?:, [0-9.]+)*))")
.generate();
List<LineFormat> formats = new ArrayList<>();
formats.add(lineFormat);
final LogParser parser = new AccessLogParser(formats);
Assert.assertTrue(parser.parse(log1).size() > 0);
Assert.assertTrue(parser.parse(log2).size() > 0);
Assert.assertTrue(parser.parse(log3).size() > 0);
Assert.assertTrue(parser.parse(log4).size() > 0);
for (KeyValue keyValue : parser.parse(log1)) {
Assert.assertNotNull(keyValue.getValue());
}
for (KeyValue keyValue : parser.parse(log2)) {
Assert.assertNotNull(keyValue.getValue());
}
for (KeyValue keyValue : parser.parse(log3)) {
Assert.assertNotNull(keyValue.getValue());
}
for (KeyValue keyValue : parser.parse(log4)) {
Assert.assertNotNull(keyValue.getValue());
}
}
@Test
public void testJsonSerializer() {
String log = "[17/Nov/2015:15:10:44 +0800] ws.you.ctripcorp.com vms09191 10.8.95.27 POST /gsapi/api/xml/GetRecmdProduct \"-\" 80 - 10.8.106.66 - HTTP/1.1 \"-\" \"-\" \"-\" ws.you.ctripcorp.com 200 521 0.042 0.039 10.8.168.228:80 200";
String expectedJsonValue = "{\"time_local\":\"17/Nov/2015:15:10:44 +0800\",\"host\":\"ws.you.ctripcorp.com\",\"hostname\":\"vms09191\",\"server_addr\":\"10.8.95.27\",\"request_method\":\"POST\",\"uri\":\"/gsapi/api/xml/GetRecmdProduct\",\"query_string\":\"-\",\"server_port\":\"80\",\"remote_user\":\"-\",\"remote_addr\":\"10.8.106.66\",\"http_x_forwarded_for\":\"-\",\"server_protocol\":\"HTTP/1.1\",\"http_user_agent\":\"-\",\"cookie_COOKIE\":\"-\",\"http_referer\":\"-\",\"status\":\"200\",\"body_bytes_sent\":\"521\",\"request_time\":\"0.042\",\"upstream_response_time\":\"0.039\",\"upstream_addr\":\"10.8.168.228:80\",\"upstream_status\":\"200\"}";
LineFormat lineFormat = new AccessLogLineFormat();
lineFormat.setFormat(AccessLogFormat);
LineFormat lineFormat = new AccessLogLineFormat().setFormat(AccessLogFormat).generate();
List<LineFormat> formats = new ArrayList<>();
formats.add(lineFormat);
final LogParser parser = new AccessLogParser(formats);

View file

@ -0,0 +1,23 @@
package com.ctrip.zeus.logstats;
import com.ctrip.zeus.service.report.stats.AccessLogRecord;
import org.junit.Assert;
import org.junit.Test;
/**
* Created by zhoumy on 2015/11/24.
*/
public class LogStatsReportTest {
@Test
public void testGenerateAccessLogRecordGen() {
final String value = "{\"time_local\":\"18/Nov/2015:17:10:41 +0800\",\"host\":\"contents.ctrip.com\",\"hostname\":\"vms09922\",\"server_addr\":\"10.15.150.37\",\"request_method\":\"GET\",\"uri\":\"/market-channel-apppromotion/data.aspx\",\"query_string\":\"source=gdt&appid=379395415&app_type=ios&click_id=ovaeyvq4aaaayjvps43q&click_time=1447837841&muid=c37d9a4c30cb02b9dd491f23bc8b8d3c&advertiser_id=956419\",\"server_port\":\"80\",\"remote_user\":\"-\",\"remote_addr\":\"14.17.33.36\",\"http_x_forwarded_for\":\"-\",\"server_protocol\":\"HTTP/1.1\",\"http_user_agent\":\"-\",\"cookie_COOKIE\":\"-\",\"http_referer\":\"-\",\"status\":\"200\",\"body_bytes_sent\":\"36\",\"request_time\":\"0.016\",\"upstream_response_time\":\"0.016\",\"upstream_addr\":\"10.15.133.34:80\",\"upstream_status\":\"200\"}";
final String value2 = "{\"time_local\":\"02/Dec/2015:13:00:12 +0800\",\"host\":\"ws.connect.qiche.ctripcorp.com\",\"hostname\":\"vms09191\",\"server_addr\":\"10.8.95.27\",\"request_method\":\"GET\",\"uri\":\"/index.php\",\"query_string\":\"param=/data/ticketStatus&date=2015-12-04&strJson=[{\\x22fromCity\\x22:\\x22\\x5Cu6c88\\x5Cu9633\\x22,\\x22toCity\\x22:\\x22\\x5Cu519c\\x5Cu5b89\\x22,\\x22website\\x22:\\x22www.84100.com\\x22,\\x22date\\x22:\\x222015-12-04\\x22}]\",\"server_port\":\"80\",\"remote_user\":\"-\",\"remote_addr\":\"10.8.112.232\",\"http_x_forwarded_for\":\"-\",\"server_protocol\":\"HTTP/1.1\",\"http_user_agent\":\"PHP/5.3.17\",\"cookie_COOKIE\":\"-\",\"http_referer\":\"-\",\"status\":\"200\",\"body_bytes_sent\":\"123\",\"request_time\":\"0.015\",\"upstream_response_time\":\"0.015\",\"upstream_addr\":\"10.8.91.167:80\",\"upstream_status\":\"200\"}";
AccessLogRecord r = new AccessLogRecord(1L, value);
Assert.assertNotNull(r);
Assert.assertEquals(16, r.getCost());
r = new AccessLogRecord(1L, value2);
Assert.assertNotNull(r);
}
}

View file

@ -0,0 +1,111 @@
package com.ctrip.zeus.logstats;
import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;
import java.util.UUID;
/**
* Created by zhoumy on 2015/11/30.
*/
public class TestLogWriter {
private final String logFilename;
private final long logRotateInterval;
private RandomAccessFile raf;
private FileChannel fileChannel;
private int count;
public TestLogWriter(String logFilename, long logRotateInterval) {
this.logFilename = logFilename;
this.logRotateInterval = logRotateInterval;
File f = new File(logFilename);
try {
if (!f.exists())
f.createNewFile();
raf = new RandomAccessFile(logFilename, "rw");
fileChannel = raf.getChannel();
} catch (IOException e) {
e.printStackTrace();
}
}
private static String randomGenRec() {
Random rand = new Random();
StringBuilder sb = new StringBuilder();
Integer[] ipComponents1 = {rand.nextInt(255), rand.nextInt(255), rand.nextInt(255), rand.nextInt(255)};
Integer[] ipComponents2 = {rand.nextInt(255), rand.nextInt(255), rand.nextInt(255), rand.nextInt(255)};
Integer[] ipComponents3 = {rand.nextInt(255), rand.nextInt(255), rand.nextInt(255), rand.nextInt(255)};
Integer[] ipComponents4 = {rand.nextInt(255), rand.nextInt(255), rand.nextInt(255), rand.nextInt(255)};
sb.append("[").append(new Date().toString()).append("]")
.append(" ").append("zeus.ctrip.com." + rand.nextInt(10))
.append(" ").append("vms" + rand.nextLong())
.append(" ").append(Joiner.on(".").join(ipComponents1))
.append(" ").append("GET")
.append(" ").append(randomGenUri(rand.nextInt(10)))
.append(" \"-\" 80 -")
.append(" ").append(Joiner.on(".").join(ipComponents2))
.append(" ").append(Joiner.on(".").join(ipComponents3))
.append(" HTTP/1.1 \"-\" \"-\" \"-\"")
.append(" ").append("zeus.ctrip.com." + rand.nextInt(10))
.append(" 200").append(" " + rand.nextLong())
.append(" " + rand.nextDouble()).append(" " + rand.nextDouble())
.append(" ").append(Joiner.on(".").join(ipComponents4) + ":80")
.append(" 200\n");
return sb.toString();
}
public void run(long endTime) throws Exception {
if (raf == null || fileChannel == null)
return;
while (System.currentTimeMillis() <= endTime) {
long nextRotateTime = System.currentTimeMillis() + logRotateInterval;
long now;
while ((now = System.currentTimeMillis()) <= endTime && now <= nextRotateTime) {
fileChannel.write(ByteBuffer.wrap(randomGenRec().getBytes(Charset.forName("UTF-8"))));
count++;
}
if (now > nextRotateTime) {
Thread.sleep(500L);
try {
fileChannel.truncate(0);
fileChannel.close();
raf.close();
fileChannel = null;
raf = null;
} finally {
raf = new RandomAccessFile(logFilename, "rw");
fileChannel = raf.getChannel();
System.out.println("Log rotate has been done.");
}
}
}
}
public void stop() throws IOException {
fileChannel.close();
raf.close();
fileChannel = null;
raf = null;
}
public int getCount() {
return count;
}
private static String randomGenUri(int loop) {
loop = loop >= 1 ? loop : 1;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < loop; i++) {
sb.append(UUID.randomUUID().toString().replaceAll("-", "/"));
}
return "/" + sb.toString();
}
}