jobs

package
v0.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TaskStaleTimeout = 3 * time.Minute
	MaxTaskRetries   = 5
)

Maximum time a task can be "in progress" before being considered stale

Variables

This section is empty.

Functions

func IsRateLimitError

func IsRateLimitError(err error) bool

IsRateLimitError returns true when error indicates an HTTP 429/403/503 blocking response.

Types

type CrawlerInterface

type CrawlerInterface interface {
	WarmURL(ctx context.Context, url string, findLinks bool) (*crawler.CrawlResult, error)
	DiscoverSitemapsAndRobots(ctx context.Context, domain string) (*crawler.SitemapDiscoveryResult, error)
	ParseSitemap(ctx context.Context, sitemapURL string) ([]string, error)
	FilterURLs(urls []string, includePaths, excludePaths []string) []string
	GetUserAgent() string
}

CrawlerInterface defines the methods we need from the crawler

type DbQueueInterface

type DbQueueInterface interface {
	GetNextTask(ctx context.Context, jobID string) (*db.Task, error)
	UpdateTaskStatus(ctx context.Context, task *db.Task) error
	DecrementRunningTasks(ctx context.Context, jobID string) error
	DecrementRunningTasksBy(ctx context.Context, jobID string, count int) error
	Execute(ctx context.Context, fn func(*sql.Tx) error) error
	ExecuteWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
	ExecuteMaintenance(ctx context.Context, fn func(*sql.Tx) error) error
	SetConcurrencyOverride(fn db.ConcurrencyOverrideFunc)
	UpdateDomainTechnologies(ctx context.Context, domainID int, technologies, headers []byte, htmlPath string) error
}

DbQueueInterface defines the database queue operations needed by WorkerPool

type DbQueueProvider

type DbQueueProvider interface {
	Execute(ctx context.Context, fn func(*sql.Tx) error) error
	EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
	CleanupStuckJobs(ctx context.Context) error
}

DbQueueProvider defines the interface for database operations

type DomainLimiter

type DomainLimiter struct {
	// contains filtered or unexported fields
}

DomainLimiter coordinates request pacing across workers for each domain.

func (*DomainLimiter) Acquire

func (dl *DomainLimiter) Acquire(ctx context.Context, req DomainRequest) (*DomainPermit, error)

Acquire waits until the caller is allowed to perform a request against the domain.

func (*DomainLimiter) EstimatedWait

func (dl *DomainLimiter) EstimatedWait(domain string) time.Duration

EstimatedWait returns the estimated time until the domain is available for requests. Returns 0 if the domain is available immediately or unknown.

func (*DomainLimiter) GetEffectiveConcurrency

func (dl *DomainLimiter) GetEffectiveConcurrency(jobID string, domain string) int

GetEffectiveConcurrency returns the current effective concurrency for a job on a domain Returns 0 if no override exists (use configured concurrency)

func (*DomainLimiter) Seed

func (dl *DomainLimiter) Seed(domain string, baseDelaySeconds int, adaptiveDelaySeconds int, floorSeconds int)

Seed initialises limiter state for a domain with persisted values.

func (*DomainLimiter) UpdateRobotsDelay

func (dl *DomainLimiter) UpdateRobotsDelay(domain string, delaySeconds int)

UpdateRobotsDelay allows adjusting the base delay when robots.txt changes.

type DomainLimiterConfig

type DomainLimiterConfig struct {
	BaseDelay             time.Duration
	DelayStep             time.Duration
	SuccessProbeThreshold int
	MaxAdaptiveDelay      time.Duration
	ConcurrencyStep       time.Duration
	PersistInterval       time.Duration
	MaxBlockingRetries    int
	CancelRateLimitJobs   bool
	CancelStreakThreshold int
	CancelDelayThreshold  time.Duration
	RobotsDelayMultiplier float64
}

DomainLimiterConfig controls adaptive throttling behaviour.

type DomainPermit

type DomainPermit struct {
	// contains filtered or unexported fields
}

DomainPermit is returned by Acquire and must be released after the request completes.

func (*DomainPermit) Release

func (p *DomainPermit) Release(success bool, rateLimited bool)

Release notifies the limiter about the outcome of a request.

type DomainRequest

type DomainRequest struct {
	Domain         string
	JobID          string
	RobotsDelay    time.Duration
	JobConcurrency int
}

DomainRequest describes a request against a domain that needs throttling.

type Job

type Job struct {
	ID              string    `json:"id"`
	Domain          string    `json:"domain"`
	UserID          *string   `json:"user_id,omitempty"`
	OrganisationID  *string   `json:"organisation_id,omitempty"`
	Status          JobStatus `json:"status"`
	Progress        float64   `json:"progress"`
	TotalTasks      int       `json:"total_tasks"`
	CompletedTasks  int       `json:"completed_tasks"`
	FailedTasks     int       `json:"failed_tasks"`
	SkippedTasks    int       `json:"skipped_tasks"`
	FoundTasks      int       `json:"found_tasks"`
	SitemapTasks    int       `json:"sitemap_tasks"`
	CreatedAt       time.Time `json:"created_at"`
	StartedAt       time.Time `json:"started_at"`
	CompletedAt     time.Time `json:"completed_at"`
	Concurrency     int       `json:"concurrency"`
	FindLinks       bool      `json:"find_links"`
	MaxPages        int       `json:"max_pages"`
	IncludePaths    []string  `json:"include_paths,omitempty"`
	ExcludePaths    []string  `json:"exclude_paths,omitempty"`
	RequiredWorkers int       `json:"required_workers"`
	SourceType      *string   `json:"source_type,omitempty"`
	SourceDetail    *string   `json:"source_detail,omitempty"`
	SourceInfo      *string   `json:"source_info,omitempty"`
	ErrorMessage    string    `json:"error_message,omitempty"`
	SchedulerID     *string   `json:"scheduler_id,omitempty"`
	// Calculated fields from database
	DurationSeconds       *int     `json:"duration_seconds,omitempty"`
	AvgTimePerTaskSeconds *float64 `json:"avg_time_per_task_seconds,omitempty"`
}

Job represents a crawling job for a domain CHECK: Do all of these currently get utilised somewhere in the app?

type JobInfo

type JobInfo struct {
	DomainID           int
	DomainName         string
	FindLinks          bool
	CrawlDelay         int
	Concurrency        int
	AdaptiveDelay      int
	AdaptiveDelayFloor int
	RobotsRules        *crawler.RobotsRules // Cached robots.txt rules for URL filtering
}

JobInfo caches job-specific data that doesn't change during execution

type JobManager

type JobManager struct {
	// contains filtered or unexported fields
}

JobManager handles job creation and lifecycle management

func NewJobManager

func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface, workerPool *WorkerPool) *JobManager

NewJobManager creates a new job manager

func (*JobManager) CalculateJobProgress

func (jm *JobManager) CalculateJobProgress(job *Job) float64

CalculateJobProgress calculates the progress percentage of a job

func (*JobManager) CancelJob

func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error

CancelJob cancels a running job

func (*JobManager) CreateJob

func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error)

CreateJob creates a new job with the given options

func (*JobManager) EnqueueJobURLs

func (jm *JobManager) EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error

EnqueueJobURLs is a wrapper around dbQueue.EnqueueURLs that adds duplicate detection

func (*JobManager) GetJob

func (jm *JobManager) GetJob(ctx context.Context, jobID string) (*Job, error)

GetJob retrieves a job by ID

func (*JobManager) GetJobStatus

func (jm *JobManager) GetJobStatus(ctx context.Context, jobID string) (*Job, error)

GetJobStatus gets the current status of a job

func (*JobManager) IsJobComplete

func (jm *JobManager) IsJobComplete(job *Job) bool

IsJobComplete checks if all tasks in a job are processed

func (*JobManager) UpdateJobStatus

func (jm *JobManager) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error

UpdateJobStatus updates the status of a job with appropriate timestamps

func (*JobManager) ValidateStatusTransition

func (jm *JobManager) ValidateStatusTransition(from, to JobStatus) error

ValidateStatusTransition checks if a status transition is valid

type JobManagerInterface

type JobManagerInterface interface {
	// Core job operations used by API layer
	CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
	CancelJob(ctx context.Context, jobID string) error
	GetJobStatus(ctx context.Context, jobID string) (*Job, error)

	// Additional job operations
	GetJob(ctx context.Context, jobID string) (*Job, error)
	EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error

	// Job utility methods
	IsJobComplete(job *Job) bool
	CalculateJobProgress(job *Job) float64
	ValidateStatusTransition(from, to JobStatus) error
	UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
}

JobManagerInterface defines the interface for job management operations

type JobOptions

type JobOptions struct {
	Domain          string   `json:"domain"`
	UserID          *string  `json:"user_id,omitempty"`
	OrganisationID  *string  `json:"organisation_id,omitempty"`
	UseSitemap      bool     `json:"use_sitemap"`
	Concurrency     int      `json:"concurrency"`
	FindLinks       bool     `json:"find_links"`
	MaxPages        int      `json:"max_pages"`
	IncludePaths    []string `json:"include_paths,omitempty"`
	ExcludePaths    []string `json:"exclude_paths,omitempty"`
	RequiredWorkers int      `json:"required_workers"`
	SourceType      *string  `json:"source_type,omitempty"`
	SourceDetail    *string  `json:"source_detail,omitempty"`
	SourceInfo      *string  `json:"source_info,omitempty"`
	SchedulerID     *string  `json:"scheduler_id,omitempty"`
}

JobOptions defines configuration options for a crawl job

type JobPerformance

type JobPerformance struct {
	RecentTasks  []int64   // Last 5 task response times for this job
	CurrentBoost int       // Current performance boost workers for this job
	LastCheck    time.Time // When we last evaluated this job
	// LastConcurrencyBlock captures when this job last hit a concurrency cap.
	LastConcurrencyBlock time.Time
}

JobPerformance tracks performance metrics for a specific job

type JobStatus

type JobStatus string

JobStatus represents the current status of a job

const (
	JobStatusPending      JobStatus = "pending"
	JobStatusInitialising JobStatus = "initializing"
	JobStatusRunning      JobStatus = "running"
	JobStatusPaused       JobStatus = "paused"
	JobStatusCompleted    JobStatus = "completed"
	JobStatusFailed       JobStatus = "failed"
	JobStatusCancelled    JobStatus = "cancelled"
)

type QuotaExceededError

type QuotaExceededError struct {
	Used     int       `json:"used"`
	Limit    int       `json:"limit"`
	ResetsAt time.Time `json:"resets_at"`
	PlanName string    `json:"plan_name"`
}

QuotaExceededError represents when an org has exceeded their daily quota

func (*QuotaExceededError) Error

func (e *QuotaExceededError) Error() string

type Task

type Task struct {
	ID          string     `json:"id"`
	JobID       string     `json:"job_id"`
	PageID      int        `json:"page_id"`
	Path        string     `json:"path"`
	DomainID    int        `json:"domain_id"`
	DomainName  string     `json:"domain_name"`
	Status      TaskStatus `json:"status"`
	CreatedAt   time.Time  `json:"created_at"`
	StartedAt   time.Time  `json:"started_at"`
	CompletedAt time.Time  `json:"completed_at"`
	RetryCount  int        `json:"retry_count"`
	Error       string     `json:"error,omitempty"`

	// Source information
	SourceType string `json:"source_type"`          // "sitemap", "link", "manual"
	SourceURL  string `json:"source_url,omitempty"` // URL where this was discovered (for find_links)

	// Result data
	StatusCode         int    `json:"status_code,omitempty"`
	ResponseTime       int64  `json:"response_time,omitempty"`
	CacheStatus        string `json:"cache_status,omitempty"`
	ContentType        string `json:"content_type,omitempty"`
	SecondResponseTime int64  `json:"second_response_time,omitempty"`
	SecondCacheStatus  string `json:"second_cache_status,omitempty"`

	// Priority
	PriorityScore float64 `json:"priority_score"`

	// Job configuration that affects processing
	FindLinks          bool `json:"-"`
	CrawlDelay         int  `json:"-"` // Crawl delay in seconds from robots.txt
	JobConcurrency     int  `json:"-"`
	AdaptiveDelay      int  `json:"-"`
	AdaptiveDelayFloor int  `json:"-"`
}

Task represents a single URL to be crawled within a job

type TaskStatus

type TaskStatus string

TaskStatus represents the current status of a task

const (
	TaskStatusWaiting   TaskStatus = "waiting"
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusSkipped   TaskStatus = "skipped"
)

type WaitingReason

type WaitingReason string

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

func NewWorkerPool

func NewWorkerPool(sqlDB *sql.DB, dbQueue DbQueueInterface, crawler CrawlerInterface, numWorkers int, workerConcurrency int, dbConfig *db.Config) *WorkerPool

func (*WorkerPool) AddJob

func (wp *WorkerPool) AddJob(jobID string, options *JobOptions)

func (*WorkerPool) CleanupStuckJobs

func (wp *WorkerPool) CleanupStuckJobs(ctx context.Context) error

CleanupStuckJobs finds and fixes jobs that are stuck in pending/running state despite having all their tasks completed

func (*WorkerPool) EnqueueURLs

func (wp *WorkerPool) EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error

EnqueueURLs is a wrapper that ensures all task enqueuing goes through the JobManager. This allows for centralised logic, such as duplicate checking, to be applied.

func (*WorkerPool) NotifyNewTasks

func (wp *WorkerPool) NotifyNewTasks()

NotifyNewTasks wakes workers to check for new tasks immediately instead of waiting for the next task monitor tick (30 seconds).

func (*WorkerPool) RemoveJob

func (wp *WorkerPool) RemoveJob(jobID string)

func (*WorkerPool) SetJobManager

func (wp *WorkerPool) SetJobManager(jm *JobManager)

SetJobManager sets the JobManager reference for duplicate task checking

func (*WorkerPool) Start

func (wp *WorkerPool) Start(ctx context.Context)

func (*WorkerPool) StartCleanupMonitor

func (wp *WorkerPool) StartCleanupMonitor(ctx context.Context)

StartCleanupMonitor starts the cleanup monitor goroutine

func (*WorkerPool) StartQuotaPromotionMonitor

func (wp *WorkerPool) StartQuotaPromotionMonitor(ctx context.Context)

StartQuotaPromotionMonitor checks for waiting tasks that can be promoted when quota becomes available

func (*WorkerPool) StartTaskMonitor

func (wp *WorkerPool) StartTaskMonitor(ctx context.Context)

StartTaskMonitor starts a background process that monitors for pending tasks

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

func (*WorkerPool) WaitForJobs

func (wp *WorkerPool) WaitForJobs()

WaitForJobs waits for all active jobs to complete

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL