Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

Expand All @@ -44,6 +45,7 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/table"
tidbutil "github.com/pingcap/tidb/pkg/util"
Expand Down Expand Up @@ -465,6 +467,143 @@ func (s *jobScheduler) mustReloadSchemas() {
}
}

var tableStruct sync.Map

func compareStruct(old, new *model.TableInfo) bool {
if len(old.Columns) != len(new.Columns) {
return false
}
for i, oldCol := range old.Columns {
newCol := new.Columns[i]
if !oldCol.FieldType.Equal(&newCol.FieldType) {
return false
}
if newCol.ChangingFieldType != nil {
return false
}
if oldCol.Name.L != newCol.Name.L ||
oldCol.GetFlag() != newCol.GetFlag() ||
oldCol.ID != newCol.ID ||
oldCol.Offset != newCol.Offset ||
oldCol.OriginDefaultValue != newCol.OriginDefaultValue {
return false
}
}
if len(old.Indices) != len(new.Indices) {
return false
}
for i, oldIdx := range old.Indices {
newIdx := new.Indices[i]
if len(oldIdx.Columns) != len(newIdx.Columns) {
return false
}
for j, oldIdxCol := range oldIdx.Columns {
newIdxCol := newIdx.Columns[j]
if oldIdxCol.Name.L != newIdxCol.Name.L {
return false
}
if oldIdxCol.Length != newIdxCol.Length {
return false
}
if oldIdxCol.Offset != newIdxCol.Offset {
return false
}
if newIdxCol.UseChangingType {
return false
}
}
}
return true
}

func runAdminCheck(w *worker, ctx context.Context, dbName, tblName string) bool {
var sctx sessionctx.Context
sctx, err := w.sessPool.Get()
if err != nil {
return true
}
defer w.sessPool.Put(sctx)

sql := fmt.Sprintf("ADMIN CHECK TABLE `%s`.`%s`", dbName, tblName)

_, _, err = sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, sql)
return err == nil
}

func getTableInfoForCheck(wk *worker, tableID, schemaID int64) (*model.TableInfo, error) {
err := wk.sess.Begin(wk.workCtx)
if err != nil {
return nil, err
}

txn, err := wk.sess.Txn()
if err != nil {
return nil, err
}
metaMut := meta.NewMutator(txn)
defer func() {
wk.sess.Commit(wk.workCtx)
}()

return getTableInfo(metaMut, tableID, schemaID)
}

func storeTableInfoBeforeExecution(wk *worker, jobW *model.JobW) bool {
store := false
switch jobW.Type {
case model.ActionModifyColumn:
store = true
case model.ActionMultiSchemaChange:
for _, sub := range jobW.MultiSchemaInfo.SubJobs {
if sub.Type == model.ActionModifyColumn {
store = true
}
}
}

if store {
tblInfo, err := getTableInfoForCheck(wk, jobW.TableID, jobW.SchemaID)
if err == nil {
tableStruct.Store(jobW.ID, tblInfo)
return true
}
}

return false
}

func checkTableInfoAfterExecution(wk *worker, jobW *model.JobW) {
v, ok := tableStruct.Load(jobW.ID)
if !ok {
return
}
prevTblInfo := v.(*model.TableInfo)

if jobW.State == model.JobStateDone ||
jobW.State == model.JobStateRollbackDone ||
jobW.State == model.JobStateCancelled {
if !runAdminCheck(wk, context.Background(), jobW.SchemaName, jobW.TableName) {
logutil.DDLLogger().Info("[debug log] admin check failed after ddl",
zap.Int64("job ID", jobW.ID),
zap.String("query", jobW.Query),
)
}
}
if jobW.State == model.JobStateRollbackDone || jobW.State == model.JobStateCancelled {
tblInfo, err := getTableInfoForCheck(wk, jobW.TableID, jobW.SchemaID)
if err == nil {
if !compareStruct(prevTblInfo, tblInfo) {
logutil.DDLLogger().Info("[debug log] the table structure has been changed after modify column",
zap.Int64("job ID", jobW.ID),
zap.String("query", jobW.Query),
)
}

}
tableStruct.Delete(jobW.ID)
}
}

// deliveryJob deliver the job to the worker to run it asynchronously.
// the worker will run the job until it's finished, paused or another owner takes
// over and finished it.
Expand All @@ -476,6 +615,8 @@ func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, jobW *model.Job
metrics.DDLRunningJobCount.WithLabelValues(pool.tp().String()).Inc()
jobCtx := s.getJobRunCtx(jobW.ID, jobW.TraceInfo)

storeTableInfoBeforeExecution(wk, jobW)

s.wg.Run(func() {
start := time.Now()
defer func() {
Expand All @@ -498,6 +639,8 @@ func (s *jobScheduler) deliveryJob(wk *worker, pool *workerPool, jobW *model.Job
}()
for {
err := s.transitOneJobStepAndWaitSync(wk, jobCtx, jobW)
checkTableInfoAfterExecution(wk, jobW)

if err != nil {
logutil.DDLLogger().Info("transit one job step and wait sync failed", zap.Error(err), zap.Stringer("job", jobW))
} else if jobW.InFinalState() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,12 @@ func checkModifyColumnData(
return false, errors.Trace(err)
}
if len(rows) != 0 {
if changingCol.GetType() != mysql.TypeTiny {
logutil.DDLLogger().Info("[debug log] modify column data check failed unexpectedly",
zap.String("from", oldCol.FieldType.String()),
zap.String("to", changingCol.FieldType.String()),
)
}
if rows[0].IsNull(0) {
return true, dbterror.ErrInvalidUseOfNull
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/model"
mmodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -532,6 +533,7 @@ func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum, cols []*tab
if err = handleUpdateError(e.Ctx(), assign.ColName, colInfo, rowIdx, err); err != nil {
return nil, err
}
val = model.MockValue(nil, colInfo, val)
}

val.Copy(&newRowData[assign.Col.Index])
Expand Down
22 changes: 22 additions & 0 deletions pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package model
import (
"bytes"
"fmt"
"math/rand"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -592,6 +593,27 @@ func ColumnNeedRestoredData(idxCol *IndexColumn, colInfos []*ColumnInfo) bool {
return types.NeedRestoredData(colTp)
}

var randStr = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-!@#$^&*()_+=[]{}|<>"

// MockValue mock a value for the column.
func MockValue(_ *TableInfo, col *ColumnInfo, oldValue types.Datum) types.Datum {
if col.ChangingFieldType != nil || col.IsGenerated() {
return oldValue
}
if col.State != StatePublic {
return oldValue
}
// if len(t.meta.Name.L) < 2 || t.meta.Name.L[1] < '0' || t.meta.Name.L[1] > '9' {
// return oldValue
// }
if types.IsTypeInteger(col.GetType()) {
return types.NewDatum(rand.Intn(127))
} else if types.IsString(col.GetType()) {
return types.NewStringDatum(string(randStr[rand.Intn(len(randStr))]))
}
return oldValue
}

// TableNameInfo provides meta data describing a table name info.
type TableNameInfo struct {
ID int64 `json:"id"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,12 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, txn kv.Transaction, r
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

for _, col := range t.Columns {
if col.Offset < len(r) {
r[col.Offset] = model.MockValue(t.meta, col.ColumnInfo, r[col.Offset])
}
}

for _, col := range t.Columns {
if err := checkDataForModifyColumn(r, col); err != nil {
return nil, err
Expand Down