diff --git a/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/AbstractProcessor.java b/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/AbstractProcessor.java new file mode 100644 index 0000000..00a9b80 --- /dev/null +++ b/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/AbstractProcessor.java @@ -0,0 +1,73 @@ +package com.dianping.platform.slb.agent.core.processor; + +import com.dianping.platform.slb.agent.core.transaction.Transaction; +import com.dianping.platform.slb.agent.core.transaction.TransactionManager; +import com.dianping.platform.slb.agent.core.workflow.Context; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * dianping.com @2015 + * slb - soft load balance + *

+ * Created by leon.li(Li Yang) + */ +public abstract class AbstractProcessor implements Processor { + + @Autowired + TransactionManager m_transactionManager; + + protected AtomicReference m_currentTransaction = new AtomicReference(); + + protected AtomicReference m_currentContext = new AtomicReference(); + + protected final Logger m_logger = LoggerFactory.getLogger(getClass()); + + @Override + public boolean isTransactionCurrent(int id) { + return m_currentTransaction.get().getId() == id; + } + + protected void runTransaction(Transaction transaction) { + try { + startTransaction(transaction); + + try { + transaction.setStatus(doTransaction(transaction)); + } catch (Exception ex) { + transaction.setStatus(Transaction.Status.FAILED); + m_logger.error("[processor][exec transaction error]" + transaction.getId(), ex); + } + } catch (Exception ex) { + m_logger.error("[processor][init transaction error]" + transaction.getId(), ex); + transaction.setStatus(Transaction.Status.FAILED); + } finally { + endTransaction(transaction); + } + } + + private void startTransaction(Transaction transaction) throws IOException { + transaction.setStatus(Transaction.Status.PROGRESSING); + m_transactionManager.saveTransaction(transaction); + } + + private void endTransaction(Transaction transaction) { + try { + m_currentTransaction = new AtomicReference(); + m_currentContext = new AtomicReference(); + + m_transactionManager.saveTransaction(transaction); + } catch (Exception ex) { + m_logger.error("[processor][end transaction error]" + transaction.getId(), ex); + } finally { + m_logger.info("[process transaction]" + transaction.getId() + " " + transaction.getStatus()); + } + } + + protected abstract Transaction.Status doTransaction(Transaction transaction) throws Exception; + +} diff --git a/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/NginxConfDeployProcessor.java b/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/NginxConfDeployProcessor.java index 31b33ba..8e90918 100644 --- a/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/NginxConfDeployProcessor.java +++ b/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/NginxConfDeployProcessor.java @@ -1,12 +1,18 @@ package com.dianping.platform.slb.agent.core.processor; import com.dianping.platform.slb.agent.core.transaction.Transaction; -import com.dianping.platform.slb.agent.core.transaction.TransactionManager; +import com.dianping.platform.slb.agent.core.workflow.Context; +import com.dianping.platform.slb.agent.core.workflow.Engine; +import com.dianping.platform.slb.agent.core.workflow.LogFormatter; +import com.dianping.platform.slb.agent.core.workflow.deploy.DeployContext; +import com.dianping.platform.slb.agent.core.workflow.deploy.DeployStep; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; /** * dianping.com @2015 @@ -15,10 +21,13 @@ import java.util.concurrent.LinkedBlockingDeque; * Created by leon.li(Li Yang) */ @Service -public class NginxConfDeployProcessor implements Processor { +public class NginxConfDeployProcessor extends AbstractProcessor { @Autowired - TransactionManager m_transactionManager; + LogFormatter m_logFormatter; + + @Autowired + Engine m_engine; private BlockingQueue m_transactionQueue = new LinkedBlockingDeque(); @@ -28,28 +37,31 @@ public class NginxConfDeployProcessor implements Processor { public SubmitResult submitTransaction(Transaction transaction) { SubmitResult submitResult = new SubmitResult(); - if (m_transactionManager.startTransaction(transaction)) { - if (m_transactionQueue.offer(transaction)) { - submitResult.setResult(true); - startProcessorTransaction(); + try { + if (m_transactionManager.startTransaction(transaction)) { + if (m_transactionQueue.offer(transaction)) { + submitResult.setResult(true); + startProcessorTransaction(); + } else { + submitResult.setResult(false); + submitResult.setMessage("[error]add queue fail"); + } } else { submitResult.setResult(false); - submitResult.setMessage("[error]add queue fail"); + submitResult.setMessage("[error]transaction already exists!"); } - } else { + } catch (IOException e) { submitResult.setResult(false); + submitResult.setMessage(e.getMessage()); } return submitResult; } - @Override - public Transaction getCurrentTranasction() { - return null; - } - @Override public void cancel(int id) { - + if (m_currentTransaction != null && m_currentTransaction.get().getId() == id) { + m_engine.kill(m_currentContext.get()); + } } private void startProcessorTransaction() { @@ -57,17 +69,43 @@ public class NginxConfDeployProcessor implements Processor { new Thread(new InnerTask()).start(); m_processing = true; } + } + @Override + protected Transaction.Status doTransaction(Transaction transaction) throws Exception { + m_currentTransaction.set(transaction); + + Context context = new DeployContext(); + + context.setTask(transaction.getTask()); + context.setOutput(m_transactionManager.getLogOut(transaction)); + m_currentContext.set(context); + + if (m_engine.start(DeployStep.START, context) == 0) { + return Transaction.Status.SUCCESS; + } else { + return Transaction.Status.FAILED; + } } private class InnerTask implements Runnable { - @Override public void run() { + try { + while (true) { + Transaction transaction = m_transactionQueue.poll(30, TimeUnit.SECONDS); + if (transaction == null) { + break; + } + runTransaction(transaction); + } + } catch (InterruptedException e) { + m_logger.error("[processor error][NginxConfDeployProcessor]", e); + } finally { + m_processing = false; + } } } -} - - +} \ No newline at end of file diff --git a/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/Processor.java b/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/Processor.java index 594ba17..03efec4 100644 --- a/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/Processor.java +++ b/slb-agent/src/main/java/com/dianping/platform/slb/agent/core/processor/Processor.java @@ -14,7 +14,7 @@ public interface Processor { SubmitResult submitTransaction(Transaction transaction); - Transaction getCurrentTranasction(); + boolean isTransactionCurrent(int id); void cancel(int id);