fix: 解决远程数据库恢复失败的问题 (#2481)

This commit is contained in:
ssongliu 2023-10-09 17:56:27 +08:00 committed by GitHub
parent f61345c18d
commit 0496b282b1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -109,7 +109,7 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error {
} }
for { for {
line, err := r.ReadString(';') line, err := readLine(r)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
break break
@ -118,29 +118,25 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error {
return err return err
} }
ssql := string(line) ssql, err := trim(line)
ssql, err = trim(ssql)
if err != nil { if err != nil {
global.LOG.Errorf("trim sql failed, err: %v", err) global.LOG.Errorf("trim sql failed, err: %v", err)
return err return err
} }
afterInsertSql := ""
if o.mergeInsert > 1 && strings.HasPrefix(ssql, "INSERT INTO") { if o.mergeInsert > 1 && strings.HasPrefix(ssql, "INSERT INTO") {
var insertSQLs []string var insertSQLs []string
insertSQLs = append(insertSQLs, ssql) insertSQLs = append(insertSQLs, ssql)
for i := 0; i < o.mergeInsert-1; i++ { for i := 0; i < o.mergeInsert-1; i++ {
line, err := r.ReadString(';') line, err := readLine(r)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
break break
} }
global.LOG.Errorf("read merge insert sql failed, err: %v", err)
return err return err
} }
ssql2, err := trim(line)
ssql2 := string(line)
ssql2, err = trim(ssql2)
if err != nil { if err != nil {
global.LOG.Errorf("trim merge insert sql failed, err: %v", err) global.LOG.Errorf("trim merge insert sql failed, err: %v", err)
return err return err
@ -149,7 +145,7 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error {
insertSQLs = append(insertSQLs, ssql2) insertSQLs = append(insertSQLs, ssql2)
continue continue
} }
afterInsertSql = ssql2
break break
} }
ssql, err = mergeInsert(insertSQLs) ssql, err = mergeInsert(insertSQLs)
@ -164,6 +160,13 @@ func Source(dns string, reader io.Reader, opts ...SourceOption) error {
global.LOG.Errorf("exec sql failed, err: %v", err) global.LOG.Errorf("exec sql failed, err: %v", err)
return err return err
} }
if len(afterInsertSql) != 0 {
_, err = dbWrapper.Exec(afterInsertSql)
if err != nil {
global.LOG.Errorf("exec sql failed, err: %v", err)
return err
}
}
} }
_, err = dbWrapper.Exec("COMMIT;") _, err = dbWrapper.Exec("COMMIT;")
@ -226,3 +229,27 @@ func getDBNameFromDNS(dns string) (string, error) {
return "", fmt.Errorf("dns error: %s", dns) return "", fmt.Errorf("dns error: %s", dns)
} }
func readLine(r *bufio.Reader) (string, error) {
lineItem, err := r.ReadString('\n')
if err != nil {
if err == io.EOF {
return lineItem, err
}
global.LOG.Errorf("read merge insert sql failed, err: %v", err)
return "", err
}
if strings.HasSuffix(lineItem, ";\n") {
return lineItem, nil
}
lineAppend, err := readLine(r)
if err != nil {
if err == io.EOF {
return lineItem, err
}
global.LOG.Errorf("read merge insert sql failed, err: %v", err)
return "", err
}
return lineItem + lineAppend, nil
}