refactor processor: add abstract processor

This commit is contained in:
leon.li 2015-12-04 13:43:06 +08:00
parent dc94b3d584
commit ebe91c3ffb
3 changed files with 131 additions and 20 deletions

View file

@ -0,0 +1,73 @@
package com.dianping.platform.slb.agent.core.processor;
import com.dianping.platform.slb.agent.core.transaction.Transaction;
import com.dianping.platform.slb.agent.core.transaction.TransactionManager;
import com.dianping.platform.slb.agent.core.workflow.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
/**
* dianping.com @2015
* slb - soft load balance
* <p/>
* Created by leon.li(Li Yang)
*/
public abstract class AbstractProcessor implements Processor {
@Autowired
TransactionManager m_transactionManager;
protected AtomicReference<Transaction> m_currentTransaction = new AtomicReference<Transaction>();
protected AtomicReference<Context> m_currentContext = new AtomicReference<Context>();
protected final Logger m_logger = LoggerFactory.getLogger(getClass());
@Override
public boolean isTransactionCurrent(int id) {
return m_currentTransaction.get().getId() == id;
}
protected void runTransaction(Transaction transaction) {
try {
startTransaction(transaction);
try {
transaction.setStatus(doTransaction(transaction));
} catch (Exception ex) {
transaction.setStatus(Transaction.Status.FAILED);
m_logger.error("[processor][exec transaction error]" + transaction.getId(), ex);
}
} catch (Exception ex) {
m_logger.error("[processor][init transaction error]" + transaction.getId(), ex);
transaction.setStatus(Transaction.Status.FAILED);
} finally {
endTransaction(transaction);
}
}
private void startTransaction(Transaction transaction) throws IOException {
transaction.setStatus(Transaction.Status.PROGRESSING);
m_transactionManager.saveTransaction(transaction);
}
private void endTransaction(Transaction transaction) {
try {
m_currentTransaction = new AtomicReference<Transaction>();
m_currentContext = new AtomicReference<Context>();
m_transactionManager.saveTransaction(transaction);
} catch (Exception ex) {
m_logger.error("[processor][end transaction error]" + transaction.getId(), ex);
} finally {
m_logger.info("[process transaction]" + transaction.getId() + " " + transaction.getStatus());
}
}
protected abstract Transaction.Status doTransaction(Transaction transaction) throws Exception;
}

View file

@ -1,12 +1,18 @@
package com.dianping.platform.slb.agent.core.processor; package com.dianping.platform.slb.agent.core.processor;
import com.dianping.platform.slb.agent.core.transaction.Transaction; import com.dianping.platform.slb.agent.core.transaction.Transaction;
import com.dianping.platform.slb.agent.core.transaction.TransactionManager; import com.dianping.platform.slb.agent.core.workflow.Context;
import com.dianping.platform.slb.agent.core.workflow.Engine;
import com.dianping.platform.slb.agent.core.workflow.LogFormatter;
import com.dianping.platform.slb.agent.core.workflow.deploy.DeployContext;
import com.dianping.platform.slb.agent.core.workflow.deploy.DeployStep;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/** /**
* dianping.com @2015 * dianping.com @2015
@ -15,10 +21,13 @@ import java.util.concurrent.LinkedBlockingDeque;
* Created by leon.li(Li Yang) * Created by leon.li(Li Yang)
*/ */
@Service @Service
public class NginxConfDeployProcessor implements Processor { public class NginxConfDeployProcessor extends AbstractProcessor {
@Autowired @Autowired
TransactionManager m_transactionManager; LogFormatter m_logFormatter;
@Autowired
Engine m_engine;
private BlockingQueue<Transaction> m_transactionQueue = new LinkedBlockingDeque<Transaction>(); private BlockingQueue<Transaction> m_transactionQueue = new LinkedBlockingDeque<Transaction>();
@ -28,28 +37,31 @@ public class NginxConfDeployProcessor implements Processor {
public SubmitResult submitTransaction(Transaction transaction) { public SubmitResult submitTransaction(Transaction transaction) {
SubmitResult submitResult = new SubmitResult(); SubmitResult submitResult = new SubmitResult();
if (m_transactionManager.startTransaction(transaction)) { try {
if (m_transactionQueue.offer(transaction)) { if (m_transactionManager.startTransaction(transaction)) {
submitResult.setResult(true); if (m_transactionQueue.offer(transaction)) {
startProcessorTransaction(); submitResult.setResult(true);
startProcessorTransaction();
} else {
submitResult.setResult(false);
submitResult.setMessage("[error]add queue fail");
}
} else { } else {
submitResult.setResult(false); submitResult.setResult(false);
submitResult.setMessage("[error]add queue fail"); submitResult.setMessage("[error]transaction already exists!");
} }
} else { } catch (IOException e) {
submitResult.setResult(false); submitResult.setResult(false);
submitResult.setMessage(e.getMessage());
} }
return submitResult; return submitResult;
} }
@Override
public Transaction getCurrentTranasction() {
return null;
}
@Override @Override
public void cancel(int id) { public void cancel(int id) {
if (m_currentTransaction != null && m_currentTransaction.get().getId() == id) {
m_engine.kill(m_currentContext.get());
}
} }
private void startProcessorTransaction() { private void startProcessorTransaction() {
@ -57,17 +69,43 @@ public class NginxConfDeployProcessor implements Processor {
new Thread(new InnerTask()).start(); new Thread(new InnerTask()).start();
m_processing = true; m_processing = true;
} }
}
@Override
protected Transaction.Status doTransaction(Transaction transaction) throws Exception {
m_currentTransaction.set(transaction);
Context context = new DeployContext();
context.setTask(transaction.getTask());
context.setOutput(m_transactionManager.getLogOut(transaction));
m_currentContext.set(context);
if (m_engine.start(DeployStep.START, context) == 0) {
return Transaction.Status.SUCCESS;
} else {
return Transaction.Status.FAILED;
}
} }
private class InnerTask implements Runnable { private class InnerTask implements Runnable {
@Override @Override
public void run() { public void run() {
try {
while (true) {
Transaction transaction = m_transactionQueue.poll(30, TimeUnit.SECONDS);
if (transaction == null) {
break;
}
runTransaction(transaction);
}
} catch (InterruptedException e) {
m_logger.error("[processor error][NginxConfDeployProcessor]", e);
} finally {
m_processing = false;
}
} }
} }
}
}

View file

@ -14,7 +14,7 @@ public interface Processor {
SubmitResult submitTransaction(Transaction transaction); SubmitResult submitTransaction(Transaction transaction);
Transaction getCurrentTranasction(); boolean isTransactionCurrent(int id);
void cancel(int id); void cancel(int id);