diff --git a/backend/utils/mysql/helper/source.go b/backend/utils/mysql/helper/source.go index a5890aafb..91fa75e97 100644 --- a/backend/utils/mysql/helper/source.go +++ b/backend/utils/mysql/helper/source.go @@ -109,7 +109,7 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error { } for { - line, err := r.ReadString(';') + line, err := readLine(r) if err != nil { if err == io.EOF { break @@ -118,29 +118,25 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error { return err } - ssql := string(line) - - ssql, err = trim(ssql) + ssql, err := trim(line) if err != nil { global.LOG.Errorf("trim sql failed, err: %v", err) return err } + afterInsertSql := "" if o.mergeInsert > 1 && strings.HasPrefix(ssql, "INSERT INTO") { var insertSQLs []string insertSQLs = append(insertSQLs, ssql) for i := 0; i < o.mergeInsert-1; i++ { - line, err := r.ReadString(';') + line, err := readLine(r) if err != nil { if err == io.EOF { break } - global.LOG.Errorf("read merge insert sql failed, err: %v", err) return err } - - ssql2 := string(line) - ssql2, err = trim(ssql2) + ssql2, err := trim(line) if err != nil { global.LOG.Errorf("trim merge insert sql failed, err: %v", err) return err @@ -149,7 +145,7 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error { insertSQLs = append(insertSQLs, ssql2) continue } - + afterInsertSql = ssql2 break } ssql, err = mergeInsert(insertSQLs) @@ -164,6 +160,13 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error { global.LOG.Errorf("exec sql failed, err: %v", err) return err } + if len(afterInsertSql) != 0 { + _, err = dbWrapper.Exec(afterInsertSql) + if err != nil { + global.LOG.Errorf("exec sql failed, err: %v", err) + return err + } + } } _, err = dbWrapper.Exec("COMMIT;") @@ -226,3 +229,27 @@ func getDBNameFromDNS(dns string) (string, error) { return "", fmt.Errorf("dns error: %s", dns) } + +func readLine(r *bufio.Reader) (string, error) { + lineItem, err := r.ReadString('\n') + if err != nil { + if err == io.EOF { + return lineItem, err + } + global.LOG.Errorf("read merge insert sql failed, err: %v", err) + return "", err + } + if strings.HasSuffix(lineItem, ";\n") { + return lineItem, nil + } + lineAppend, err := readLine(r) + if err != nil { + if err == io.EOF { + return lineItem, err + } + global.LOG.Errorf("read merge insert sql failed, err: %v", err) + return "", err + } + + return lineItem + lineAppend, nil +}