Documentation
¶
Index ¶
- Constants
- func IsRateLimitError(err error) bool
- type CrawlerInterface
- type DbQueueInterface
- type DbQueueProvider
- type DomainLimiter
- func (dl *DomainLimiter) Acquire(ctx context.Context, req DomainRequest) (*DomainPermit, error)
- func (dl *DomainLimiter) EstimatedWait(domain string) time.Duration
- func (dl *DomainLimiter) GetEffectiveConcurrency(jobID string, domain string) int
- func (dl *DomainLimiter) Seed(domain string, baseDelaySeconds int, adaptiveDelaySeconds int, ...)
- func (dl *DomainLimiter) UpdateRobotsDelay(domain string, delaySeconds int)
- type DomainLimiterConfig
- type DomainPermit
- type DomainRequest
- type Job
- type JobInfo
- type JobManager
- func (jm *JobManager) CalculateJobProgress(job *Job) float64
- func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error
- func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
- func (jm *JobManager) EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, ...) error
- func (jm *JobManager) GetJob(ctx context.Context, jobID string) (*Job, error)
- func (jm *JobManager) GetJobStatus(ctx context.Context, jobID string) (*Job, error)
- func (jm *JobManager) IsJobComplete(job *Job) bool
- func (jm *JobManager) UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
- func (jm *JobManager) ValidateStatusTransition(from, to JobStatus) error
- type JobManagerInterface
- type JobOptions
- type JobPerformance
- type JobStatus
- type QuotaExceededError
- type Task
- type TaskStatus
- type WaitingReason
- type WorkerPool
- func (wp *WorkerPool) AddJob(jobID string, options *JobOptions)
- func (wp *WorkerPool) CleanupStuckJobs(ctx context.Context) error
- func (wp *WorkerPool) EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, ...) error
- func (wp *WorkerPool) NotifyNewTasks()
- func (wp *WorkerPool) RemoveJob(jobID string)
- func (wp *WorkerPool) SetJobManager(jm *JobManager)
- func (wp *WorkerPool) Start(ctx context.Context)
- func (wp *WorkerPool) StartCleanupMonitor(ctx context.Context)
- func (wp *WorkerPool) StartQuotaPromotionMonitor(ctx context.Context)
- func (wp *WorkerPool) StartTaskMonitor(ctx context.Context)
- func (wp *WorkerPool) Stop()
- func (wp *WorkerPool) WaitForJobs()
Constants ¶
const ( TaskStaleTimeout = 3 * time.Minute MaxTaskRetries = 5 )
Maximum time a task can be "in progress" before being considered stale
Variables ¶
This section is empty.
Functions ¶
func IsRateLimitError ¶
IsRateLimitError returns true when error indicates an HTTP 429/403/503 blocking response.
Types ¶
type CrawlerInterface ¶
type CrawlerInterface interface {
WarmURL(ctx context.Context, url string, findLinks bool) (*crawler.CrawlResult, error)
DiscoverSitemapsAndRobots(ctx context.Context, domain string) (*crawler.SitemapDiscoveryResult, error)
ParseSitemap(ctx context.Context, sitemapURL string) ([]string, error)
FilterURLs(urls []string, includePaths, excludePaths []string) []string
GetUserAgent() string
}
CrawlerInterface defines the methods we need from the crawler
type DbQueueInterface ¶
type DbQueueInterface interface {
GetNextTask(ctx context.Context, jobID string) (*db.Task, error)
UpdateTaskStatus(ctx context.Context, task *db.Task) error
DecrementRunningTasks(ctx context.Context, jobID string) error
DecrementRunningTasksBy(ctx context.Context, jobID string, count int) error
Execute(ctx context.Context, fn func(*sql.Tx) error) error
ExecuteWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
ExecuteMaintenance(ctx context.Context, fn func(*sql.Tx) error) error
SetConcurrencyOverride(fn db.ConcurrencyOverrideFunc)
UpdateDomainTechnologies(ctx context.Context, domainID int, technologies, headers []byte, htmlPath string) error
}
DbQueueInterface defines the database queue operations needed by WorkerPool
type DbQueueProvider ¶
type DbQueueProvider interface {
Execute(ctx context.Context, fn func(*sql.Tx) error) error
EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
CleanupStuckJobs(ctx context.Context) error
}
DbQueueProvider defines the interface for database operations
type DomainLimiter ¶
type DomainLimiter struct {
// contains filtered or unexported fields
}
DomainLimiter coordinates request pacing across workers for each domain.
func (*DomainLimiter) Acquire ¶
func (dl *DomainLimiter) Acquire(ctx context.Context, req DomainRequest) (*DomainPermit, error)
Acquire waits until the caller is allowed to perform a request against the domain.
func (*DomainLimiter) EstimatedWait ¶
func (dl *DomainLimiter) EstimatedWait(domain string) time.Duration
EstimatedWait returns the estimated time until the domain is available for requests. Returns 0 if the domain is available immediately or unknown.
func (*DomainLimiter) GetEffectiveConcurrency ¶
func (dl *DomainLimiter) GetEffectiveConcurrency(jobID string, domain string) int
GetEffectiveConcurrency returns the current effective concurrency for a job on a domain Returns 0 if no override exists (use configured concurrency)
func (*DomainLimiter) Seed ¶
func (dl *DomainLimiter) Seed(domain string, baseDelaySeconds int, adaptiveDelaySeconds int, floorSeconds int)
Seed initialises limiter state for a domain with persisted values.
func (*DomainLimiter) UpdateRobotsDelay ¶
func (dl *DomainLimiter) UpdateRobotsDelay(domain string, delaySeconds int)
UpdateRobotsDelay allows adjusting the base delay when robots.txt changes.
type DomainLimiterConfig ¶
type DomainLimiterConfig struct {
BaseDelay time.Duration
DelayStep time.Duration
SuccessProbeThreshold int
MaxAdaptiveDelay time.Duration
ConcurrencyStep time.Duration
PersistInterval time.Duration
MaxBlockingRetries int
CancelRateLimitJobs bool
CancelStreakThreshold int
CancelDelayThreshold time.Duration
RobotsDelayMultiplier float64
}
DomainLimiterConfig controls adaptive throttling behaviour.
type DomainPermit ¶
type DomainPermit struct {
// contains filtered or unexported fields
}
DomainPermit is returned by Acquire and must be released after the request completes.
func (*DomainPermit) Release ¶
func (p *DomainPermit) Release(success bool, rateLimited bool)
Release notifies the limiter about the outcome of a request.
type DomainRequest ¶
type DomainRequest struct {
Domain string
JobID string
RobotsDelay time.Duration
JobConcurrency int
}
DomainRequest describes a request against a domain that needs throttling.
type Job ¶
type Job struct {
ID string `json:"id"`
Domain string `json:"domain"`
UserID *string `json:"user_id,omitempty"`
OrganisationID *string `json:"organisation_id,omitempty"`
Status JobStatus `json:"status"`
Progress float64 `json:"progress"`
TotalTasks int `json:"total_tasks"`
CompletedTasks int `json:"completed_tasks"`
FailedTasks int `json:"failed_tasks"`
SkippedTasks int `json:"skipped_tasks"`
FoundTasks int `json:"found_tasks"`
SitemapTasks int `json:"sitemap_tasks"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
Concurrency int `json:"concurrency"`
FindLinks bool `json:"find_links"`
MaxPages int `json:"max_pages"`
IncludePaths []string `json:"include_paths,omitempty"`
ExcludePaths []string `json:"exclude_paths,omitempty"`
RequiredWorkers int `json:"required_workers"`
SourceType *string `json:"source_type,omitempty"`
SourceDetail *string `json:"source_detail,omitempty"`
SourceInfo *string `json:"source_info,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
SchedulerID *string `json:"scheduler_id,omitempty"`
// Calculated fields from database
DurationSeconds *int `json:"duration_seconds,omitempty"`
AvgTimePerTaskSeconds *float64 `json:"avg_time_per_task_seconds,omitempty"`
}
Job represents a crawling job for a domain CHECK: Do all of these currently get utilised somewhere in the app?
type JobInfo ¶
type JobInfo struct {
DomainID int
DomainName string
FindLinks bool
CrawlDelay int
Concurrency int
AdaptiveDelay int
AdaptiveDelayFloor int
RobotsRules *crawler.RobotsRules // Cached robots.txt rules for URL filtering
}
JobInfo caches job-specific data that doesn't change during execution
type JobManager ¶
type JobManager struct {
// contains filtered or unexported fields
}
JobManager handles job creation and lifecycle management
func NewJobManager ¶
func NewJobManager(db *sql.DB, dbQueue DbQueueProvider, crawler CrawlerInterface, workerPool *WorkerPool) *JobManager
NewJobManager creates a new job manager
func (*JobManager) CalculateJobProgress ¶
func (jm *JobManager) CalculateJobProgress(job *Job) float64
CalculateJobProgress calculates the progress percentage of a job
func (*JobManager) CancelJob ¶
func (jm *JobManager) CancelJob(ctx context.Context, jobID string) error
CancelJob cancels a running job
func (*JobManager) CreateJob ¶
func (jm *JobManager) CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
CreateJob creates a new job with the given options
func (*JobManager) EnqueueJobURLs ¶
func (jm *JobManager) EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
EnqueueJobURLs is a wrapper around dbQueue.EnqueueURLs that adds duplicate detection
func (*JobManager) GetJobStatus ¶
GetJobStatus gets the current status of a job
func (*JobManager) IsJobComplete ¶
func (jm *JobManager) IsJobComplete(job *Job) bool
IsJobComplete checks if all tasks in a job are processed
func (*JobManager) UpdateJobStatus ¶
UpdateJobStatus updates the status of a job with appropriate timestamps
func (*JobManager) ValidateStatusTransition ¶
func (jm *JobManager) ValidateStatusTransition(from, to JobStatus) error
ValidateStatusTransition checks if a status transition is valid
type JobManagerInterface ¶
type JobManagerInterface interface {
// Core job operations used by API layer
CreateJob(ctx context.Context, options *JobOptions) (*Job, error)
CancelJob(ctx context.Context, jobID string) error
GetJobStatus(ctx context.Context, jobID string) (*Job, error)
// Additional job operations
GetJob(ctx context.Context, jobID string) (*Job, error)
EnqueueJobURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
// Job utility methods
IsJobComplete(job *Job) bool
CalculateJobProgress(job *Job) float64
ValidateStatusTransition(from, to JobStatus) error
UpdateJobStatus(ctx context.Context, jobID string, status JobStatus) error
}
JobManagerInterface defines the interface for job management operations
type JobOptions ¶
type JobOptions struct {
Domain string `json:"domain"`
UserID *string `json:"user_id,omitempty"`
OrganisationID *string `json:"organisation_id,omitempty"`
UseSitemap bool `json:"use_sitemap"`
Concurrency int `json:"concurrency"`
FindLinks bool `json:"find_links"`
MaxPages int `json:"max_pages"`
IncludePaths []string `json:"include_paths,omitempty"`
ExcludePaths []string `json:"exclude_paths,omitempty"`
RequiredWorkers int `json:"required_workers"`
SourceType *string `json:"source_type,omitempty"`
SourceDetail *string `json:"source_detail,omitempty"`
SourceInfo *string `json:"source_info,omitempty"`
SchedulerID *string `json:"scheduler_id,omitempty"`
}
JobOptions defines configuration options for a crawl job
type JobPerformance ¶
type JobPerformance struct {
RecentTasks []int64 // Last 5 task response times for this job
CurrentBoost int // Current performance boost workers for this job
LastCheck time.Time // When we last evaluated this job
// LastConcurrencyBlock captures when this job last hit a concurrency cap.
LastConcurrencyBlock time.Time
}
JobPerformance tracks performance metrics for a specific job
type QuotaExceededError ¶
type QuotaExceededError struct {
Used int `json:"used"`
Limit int `json:"limit"`
ResetsAt time.Time `json:"resets_at"`
PlanName string `json:"plan_name"`
}
QuotaExceededError represents when an org has exceeded their daily quota
func (*QuotaExceededError) Error ¶
func (e *QuotaExceededError) Error() string
type Task ¶
type Task struct {
ID string `json:"id"`
JobID string `json:"job_id"`
PageID int `json:"page_id"`
Path string `json:"path"`
DomainID int `json:"domain_id"`
DomainName string `json:"domain_name"`
Status TaskStatus `json:"status"`
CreatedAt time.Time `json:"created_at"`
StartedAt time.Time `json:"started_at"`
CompletedAt time.Time `json:"completed_at"`
RetryCount int `json:"retry_count"`
Error string `json:"error,omitempty"`
// Source information
SourceType string `json:"source_type"` // "sitemap", "link", "manual"
SourceURL string `json:"source_url,omitempty"` // URL where this was discovered (for find_links)
// Result data
StatusCode int `json:"status_code,omitempty"`
ResponseTime int64 `json:"response_time,omitempty"`
CacheStatus string `json:"cache_status,omitempty"`
ContentType string `json:"content_type,omitempty"`
SecondResponseTime int64 `json:"second_response_time,omitempty"`
SecondCacheStatus string `json:"second_cache_status,omitempty"`
// Priority
PriorityScore float64 `json:"priority_score"`
// Job configuration that affects processing
FindLinks bool `json:"-"`
CrawlDelay int `json:"-"` // Crawl delay in seconds from robots.txt
JobConcurrency int `json:"-"`
AdaptiveDelay int `json:"-"`
AdaptiveDelayFloor int `json:"-"`
}
Task represents a single URL to be crawled within a job
type TaskStatus ¶
type TaskStatus string
TaskStatus represents the current status of a task
const ( TaskStatusWaiting TaskStatus = "waiting" TaskStatusPending TaskStatus = "pending" TaskStatusRunning TaskStatus = "running" TaskStatusCompleted TaskStatus = "completed" TaskStatusFailed TaskStatus = "failed" TaskStatusSkipped TaskStatus = "skipped" )
type WaitingReason ¶
type WaitingReason string
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
func NewWorkerPool ¶
func NewWorkerPool(sqlDB *sql.DB, dbQueue DbQueueInterface, crawler CrawlerInterface, numWorkers int, workerConcurrency int, dbConfig *db.Config) *WorkerPool
func (*WorkerPool) AddJob ¶
func (wp *WorkerPool) AddJob(jobID string, options *JobOptions)
func (*WorkerPool) CleanupStuckJobs ¶
func (wp *WorkerPool) CleanupStuckJobs(ctx context.Context) error
CleanupStuckJobs finds and fixes jobs that are stuck in pending/running state despite having all their tasks completed
func (*WorkerPool) EnqueueURLs ¶
func (wp *WorkerPool) EnqueueURLs(ctx context.Context, jobID string, pages []db.Page, sourceType string, sourceURL string) error
EnqueueURLs is a wrapper that ensures all task enqueuing goes through the JobManager. This allows for centralised logic, such as duplicate checking, to be applied.
func (*WorkerPool) NotifyNewTasks ¶
func (wp *WorkerPool) NotifyNewTasks()
NotifyNewTasks wakes workers to check for new tasks immediately instead of waiting for the next task monitor tick (30 seconds).
func (*WorkerPool) RemoveJob ¶
func (wp *WorkerPool) RemoveJob(jobID string)
func (*WorkerPool) SetJobManager ¶
func (wp *WorkerPool) SetJobManager(jm *JobManager)
SetJobManager sets the JobManager reference for duplicate task checking
func (*WorkerPool) Start ¶
func (wp *WorkerPool) Start(ctx context.Context)
func (*WorkerPool) StartCleanupMonitor ¶
func (wp *WorkerPool) StartCleanupMonitor(ctx context.Context)
StartCleanupMonitor starts the cleanup monitor goroutine
func (*WorkerPool) StartQuotaPromotionMonitor ¶
func (wp *WorkerPool) StartQuotaPromotionMonitor(ctx context.Context)
StartQuotaPromotionMonitor checks for waiting tasks that can be promoted when quota becomes available
func (*WorkerPool) StartTaskMonitor ¶
func (wp *WorkerPool) StartTaskMonitor(ctx context.Context)
StartTaskMonitor starts a background process that monitors for pending tasks
func (*WorkerPool) Stop ¶
func (wp *WorkerPool) Stop()
func (*WorkerPool) WaitForJobs ¶
func (wp *WorkerPool) WaitForJobs()
WaitForJobs waits for all active jobs to complete