Documentation
¶
Index ¶
- Variables
- func ExecSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error)
- func GetSubtasksByTaskIDForTest(ctx context.Context, stm *TaskManager, taskID int64) ([]*proto.Subtask, error)
- func GetSubtasksFromHistoryByTaskIDForTest(ctx context.Context, stm *TaskManager, taskID int64) (int, error)
- func GetSubtasksFromHistoryForTest(ctx context.Context, stm *TaskManager) (int, error)
- func GetTasksFromHistoryForTest(ctx context.Context, stm *TaskManager) (int, error)
- func SetTaskManager(is *TaskManager)
- type SessionExecutor
- type TaskManager
- func (*TaskManager) AddGlobalTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, ...) (taskID int64, err error)
- func (stm *TaskManager) AddNewGlobalTask(ctx context.Context, key string, tp proto.TaskType, concurrency int, ...) (taskID int64, err error)
- func (stm *TaskManager) AddNewSubTask(ctx context.Context, globalTaskID int64, step proto.Step, ...) error
- func (stm *TaskManager) CancelGlobalTask(ctx context.Context, taskID int64) error
- func (*TaskManager) CancelGlobalTaskByKeySession(ctx context.Context, se sessionctx.Context, taskKey string) error
- func (stm *TaskManager) CleanUpMeta(ctx context.Context, nodes []string) error
- func (stm *TaskManager) CollectSubTaskError(ctx context.Context, taskID int64) ([]error, error)
- func (stm *TaskManager) DeleteSubtasksByTaskID(ctx context.Context, taskID int64) error
- func (stm *TaskManager) FinishSubtask(ctx context.Context, tidbID string, id int64, meta []byte) error
- func (stm *TaskManager) GCSubtasks(ctx context.Context) error
- func (stm *TaskManager) GetAllNodes(ctx context.Context) ([]string, error)
- func (stm *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, ...) (*proto.Subtask, error)
- func (stm *TaskManager) GetGlobalTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
- func (stm *TaskManager) GetGlobalTaskByKey(ctx context.Context, key string) (task *proto.Task, err error)
- func (stm *TaskManager) GetGlobalTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error)
- func (stm *TaskManager) GetGlobalTasksFromHistoryInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
- func (stm *TaskManager) GetGlobalTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
- func (stm *TaskManager) GetNewGlobalTask(ctx context.Context) (task *proto.Task, err error)
- func (stm *TaskManager) GetNodesByRole(ctx context.Context, role string) (map[string]bool, error)
- func (stm *TaskManager) GetSchedulerIDsByTaskID(ctx context.Context, taskID int64) ([]string, error)
- func (stm *TaskManager) GetSchedulerIDsByTaskIDAndStep(ctx context.Context, taskID int64, step proto.Step) ([]string, error)
- func (stm *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.TaskState]int64, error)
- func (stm *TaskManager) GetSubtaskInStatesCnt(ctx context.Context, taskID int64, states ...interface{}) (int64, error)
- func (stm *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, step proto.Step) (int64, error)
- func (stm *TaskManager) GetSubtasksForImportInto(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)
- func (stm *TaskManager) GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, ...) ([]*proto.Subtask, error)
- func (stm *TaskManager) GetSucceedSubtasksByStep(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)
- func (stm *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64) (task *proto.Task, err error)
- func (stm *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, ...) (bool, error)
- func (stm *TaskManager) IsGlobalTaskCancelling(ctx context.Context, taskID int64) (bool, error)
- func (stm *TaskManager) IsSchedulerCanceled(ctx context.Context, execID string, taskID int64) (bool, error)
- func (stm *TaskManager) PauseSubtasks(ctx context.Context, tidbID string, taskID int64) error
- func (stm *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error)
- func (stm *TaskManager) PrintSubtaskInfo(ctx context.Context, taskID int64)
- func (stm *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error
- func (stm *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, error)
- func (stm *TaskManager) StartManager(ctx context.Context, tidbID string, role string) error
- func (stm *TaskManager) StartSubtask(ctx context.Context, subtaskID int64) error
- func (stm *TaskManager) TransferSubTasks2History(ctx context.Context, taskID int64) error
- func (stm *TaskManager) TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
- func (stm *TaskManager) UpdateErrorToSubtask(ctx context.Context, tidbID string, taskID int64, err error) error
- func (stm *TaskManager) UpdateFailedSchedulerIDs(ctx context.Context, taskID int64, replaceNodes map[string]string) error
- func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(ctx context.Context, gTask *proto.Task, subtasks []*proto.Subtask, ...) (bool, error)
- func (stm *TaskManager) UpdateSubtaskExecID(ctx context.Context, tidbID string, subtaskID int64) error
- func (stm *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int64, rowCount int64) error
- func (stm *TaskManager) UpdateSubtaskStateAndError(ctx context.Context, tidbID string, id int64, state proto.TaskState, ...) error
- func (stm *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) error
- func (stm *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
Constants ¶
This section is empty.
Variables ¶
var ( // TestLastTaskID is used for test to set the last task ID. TestLastTaskID atomic.Int64 )
Functions ¶
func ExecSQL ¶
func ExecSQL(ctx context.Context, se sessionctx.Context, sql string, args ...interface{}) ([]chunk.Row, error)
ExecSQL executes the sql and returns the result. TODO: consider retry.
func GetSubtasksByTaskIDForTest ¶
func GetSubtasksByTaskIDForTest(ctx context.Context, stm *TaskManager, taskID int64) ([]*proto.Subtask, error)
GetSubtasksByTaskIDForTest gets subtasks by taskID for test.
func GetSubtasksFromHistoryByTaskIDForTest ¶
func GetSubtasksFromHistoryByTaskIDForTest(ctx context.Context, stm *TaskManager, taskID int64) (int, error)
GetSubtasksFromHistoryByTaskIDForTest gets subtasks by taskID from history table for test.
func GetSubtasksFromHistoryForTest ¶
func GetSubtasksFromHistoryForTest(ctx context.Context, stm *TaskManager) (int, error)
GetSubtasksFromHistoryForTest gets subtasks from history table for test.
func GetTasksFromHistoryForTest ¶
func GetTasksFromHistoryForTest(ctx context.Context, stm *TaskManager) (int, error)
GetTasksFromHistoryForTest gets tasks from history table for test.
Types ¶
type SessionExecutor ¶
type SessionExecutor interface {
// WithNewSession executes the function with a new session.
WithNewSession(fn func(se sessionctx.Context) error) error
// WithNewTxn executes the fn in a new transaction.
WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
}
SessionExecutor defines the interface for executing SQLs in a session.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager is the manager of global/sub task.
func GetTaskManager ¶
func GetTaskManager() (*TaskManager, error)
GetTaskManager gets the task manager.
func NewTaskManager ¶
func NewTaskManager(sePool sessionPool) *TaskManager
NewTaskManager creates a new task manager.
func (*TaskManager) AddGlobalTaskWithSession ¶
func (*TaskManager) AddGlobalTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error)
AddGlobalTaskWithSession adds a new task to global task table with session.
func (*TaskManager) AddNewGlobalTask ¶
func (stm *TaskManager) AddNewGlobalTask(ctx context.Context, key string, tp proto.TaskType, concurrency int, meta []byte) (taskID int64, err error)
AddNewGlobalTask adds a new task to global task table.
func (*TaskManager) AddNewSubTask ¶
func (stm *TaskManager) AddNewSubTask(ctx context.Context, globalTaskID int64, step proto.Step, designatedTiDBID string, meta []byte, tp proto.TaskType, isRevert bool) error
AddNewSubTask adds a new task to subtask table.
func (*TaskManager) CancelGlobalTask ¶
func (stm *TaskManager) CancelGlobalTask(ctx context.Context, taskID int64) error
CancelGlobalTask cancels global task.
func (*TaskManager) CancelGlobalTaskByKeySession ¶
func (*TaskManager) CancelGlobalTaskByKeySession(ctx context.Context, se sessionctx.Context, taskKey string) error
CancelGlobalTaskByKeySession cancels global task by key using input session.
func (*TaskManager) CleanUpMeta ¶
func (stm *TaskManager) CleanUpMeta(ctx context.Context, nodes []string) error
CleanUpMeta cleanup the outdated row in dist_framework_meta when some tidb down.
func (*TaskManager) CollectSubTaskError ¶
CollectSubTaskError collects the subtask error.
func (*TaskManager) DeleteSubtasksByTaskID ¶
func (stm *TaskManager) DeleteSubtasksByTaskID(ctx context.Context, taskID int64) error
DeleteSubtasksByTaskID deletes the subtask of the given global task ID.
func (*TaskManager) FinishSubtask ¶
func (stm *TaskManager) FinishSubtask(ctx context.Context, tidbID string, id int64, meta []byte) error
FinishSubtask updates the subtask meta and mark state to succeed.
func (*TaskManager) GCSubtasks ¶
func (stm *TaskManager) GCSubtasks(ctx context.Context) error
GCSubtasks deletes the history subtask which is older than the given days.
func (*TaskManager) GetAllNodes ¶
func (stm *TaskManager) GetAllNodes(ctx context.Context) ([]string, error)
GetAllNodes gets nodes in dist_framework_meta.
func (*TaskManager) GetFirstSubtaskInStates ¶
func (stm *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) (*proto.Subtask, error)
GetFirstSubtaskInStates gets the first subtask by given states.
func (*TaskManager) GetGlobalTaskByID ¶
func (stm *TaskManager) GetGlobalTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
GetGlobalTaskByID gets the task by the global task ID.
func (*TaskManager) GetGlobalTaskByKey ¶
func (stm *TaskManager) GetGlobalTaskByKey(ctx context.Context, key string) (task *proto.Task, err error)
GetGlobalTaskByKey gets the task by the task key.
func (*TaskManager) GetGlobalTaskByKeyWithHistory ¶
func (stm *TaskManager) GetGlobalTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error)
GetGlobalTaskByKeyWithHistory gets the task from history table by the task key.
func (*TaskManager) GetGlobalTasksFromHistoryInStates ¶
func (stm *TaskManager) GetGlobalTasksFromHistoryInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetGlobalTasksFromHistoryInStates gets the tasks in history table in the states.
func (*TaskManager) GetGlobalTasksInStates ¶
func (stm *TaskManager) GetGlobalTasksInStates(ctx context.Context, states ...interface{}) (task []*proto.Task, err error)
GetGlobalTasksInStates gets the tasks in the states.
func (*TaskManager) GetNewGlobalTask ¶
GetNewGlobalTask get a new task from global task table, it's used by dispatcher only.
func (*TaskManager) GetNodesByRole ¶
GetNodesByRole gets nodes map from dist_framework_meta by role.
func (*TaskManager) GetSchedulerIDsByTaskID ¶
func (stm *TaskManager) GetSchedulerIDsByTaskID(ctx context.Context, taskID int64) ([]string, error)
GetSchedulerIDsByTaskID gets the scheduler IDs of the given global task ID.
func (*TaskManager) GetSchedulerIDsByTaskIDAndStep ¶
func (stm *TaskManager) GetSchedulerIDsByTaskIDAndStep(ctx context.Context, taskID int64, step proto.Step) ([]string, error)
GetSchedulerIDsByTaskIDAndStep gets the scheduler IDs of the given global task ID and step.
func (*TaskManager) GetSubtaskCntGroupByStates ¶
func (stm *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.TaskState]int64, error)
GetSubtaskCntGroupByStates gets the subtask count by states.
func (*TaskManager) GetSubtaskInStatesCnt ¶
func (stm *TaskManager) GetSubtaskInStatesCnt(ctx context.Context, taskID int64, states ...interface{}) (int64, error)
GetSubtaskInStatesCnt gets the subtask count in the states.
func (*TaskManager) GetSubtaskRowCount ¶
func (stm *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, step proto.Step) (int64, error)
GetSubtaskRowCount gets the subtask row count.
func (*TaskManager) GetSubtasksForImportInto ¶
func (stm *TaskManager) GetSubtasksForImportInto(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)
GetSubtasksForImportInto gets the subtasks for import into(show import jobs).
func (*TaskManager) GetSubtasksInStates ¶
func (stm *TaskManager) GetSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) ([]*proto.Subtask, error)
GetSubtasksInStates gets all subtasks by given states.
func (*TaskManager) GetSucceedSubtasksByStep ¶
func (stm *TaskManager) GetSucceedSubtasksByStep(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)
GetSucceedSubtasksByStep gets the subtask in the success state.
func (*TaskManager) GetTaskByIDWithHistory ¶
func (stm *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64) (task *proto.Task, err error)
GetTaskByIDWithHistory gets the task by the global task ID from both tidb_global_task and tidb_global_task_history.
func (*TaskManager) HasSubtasksInStates ¶
func (stm *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...interface{}) (bool, error)
HasSubtasksInStates checks if there are subtasks in the states.
func (*TaskManager) IsGlobalTaskCancelling ¶
IsGlobalTaskCancelling checks whether the task state is cancelling.
func (*TaskManager) IsSchedulerCanceled ¶
func (stm *TaskManager) IsSchedulerCanceled(ctx context.Context, execID string, taskID int64) (bool, error)
IsSchedulerCanceled checks if subtask 'execID' of task 'taskID' has been canceled somehow.
func (*TaskManager) PauseSubtasks ¶
PauseSubtasks update all running/pending subtasks to pasued state.
func (*TaskManager) PrintSubtaskInfo ¶
func (stm *TaskManager) PrintSubtaskInfo(ctx context.Context, taskID int64)
PrintSubtaskInfo log the subtask info by taskKey. Only used for UT.
func (*TaskManager) ResumeSubtasks ¶
func (stm *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error
ResumeSubtasks update all paused subtasks to pending state.
func (*TaskManager) ResumeTask ¶
ResumeTask resumes the task.
func (*TaskManager) StartManager ¶
StartManager insert the manager information into dist_framework_meta.
func (*TaskManager) StartSubtask ¶
func (stm *TaskManager) StartSubtask(ctx context.Context, subtaskID int64) error
StartSubtask updates the subtask state to running.
func (*TaskManager) TransferSubTasks2History ¶
func (stm *TaskManager) TransferSubTasks2History(ctx context.Context, taskID int64) error
TransferSubTasks2History move all the finished subTask to tidb_background_subtask_history by taskID
func (*TaskManager) TransferTasks2History ¶
TransferTasks2History transfer the selected tasks into tidb_global_task_history table by taskIDs.
func (*TaskManager) UpdateErrorToSubtask ¶
func (stm *TaskManager) UpdateErrorToSubtask(ctx context.Context, tidbID string, taskID int64, err error) error
UpdateErrorToSubtask updates the error to subtask.
func (*TaskManager) UpdateFailedSchedulerIDs ¶
func (stm *TaskManager) UpdateFailedSchedulerIDs(ctx context.Context, taskID int64, replaceNodes map[string]string) error
UpdateFailedSchedulerIDs replace failed scheduler nodes with alive nodes.
func (*TaskManager) UpdateGlobalTaskAndAddSubTasks ¶
func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(ctx context.Context, gTask *proto.Task, subtasks []*proto.Subtask, prevState proto.TaskState) (bool, error)
UpdateGlobalTaskAndAddSubTasks update the global task and add new subtasks
func (*TaskManager) UpdateSubtaskExecID ¶
func (stm *TaskManager) UpdateSubtaskExecID(ctx context.Context, tidbID string, subtaskID int64) error
UpdateSubtaskExecID updates the subtask's exec_id, used for testing now.
func (*TaskManager) UpdateSubtaskRowCount ¶
func (stm *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int64, rowCount int64) error
UpdateSubtaskRowCount updates the subtask row count.
func (*TaskManager) UpdateSubtaskStateAndError ¶
func (stm *TaskManager) UpdateSubtaskStateAndError(ctx context.Context, tidbID string, id int64, state proto.TaskState, subTaskErr error) error
UpdateSubtaskStateAndError updates the subtask state.
func (*TaskManager) WithNewSession ¶
func (stm *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) error
WithNewSession executes the function with a new session.
func (*TaskManager) WithNewTxn ¶
func (stm *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
WithNewTxn executes the fn in a new transaction.