db

package
v0.27.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// MaxBatchSize is the maximum number of tasks to batch before forcing a flush
	MaxBatchSize = 100
	// MaxBatchInterval is the maximum time to wait before flushing a batch
	MaxBatchInterval = 2 * time.Second
	// BatchChannelSize is the buffer size for the update channel
	BatchChannelSize = 2000
	// MaxConsecutiveFailures before falling back to individual updates
	MaxConsecutiveFailures = 3
	// MaxShutdownRetries for final flush attempts
	MaxShutdownRetries = 5
	// ShutdownRetryDelay between retry attempts
	ShutdownRetryDelay = 500 * time.Millisecond
)
View Source
var ErrConcurrencyBlocked = errors.New("tasks exist but blocked by concurrency limits")

ErrConcurrencyBlocked is returned when pending tasks exist but all are blocked by job concurrency limits. Workers should back off when receiving this error.

View Source
var ErrGoogleAccountNotFound = errors.New("google analytics account not found")

ErrGoogleAccountNotFound is returned when a Google Analytics account is not found

View Source
var ErrGoogleConnectionNotFound = errors.New("google analytics connection not found")

ErrGoogleConnectionNotFound is returned when a Google Analytics connection is not found

View Source
var ErrGoogleTokenNotFound = errors.New("google analytics token not found")

ErrGoogleTokenNotFound is returned when a Google Analytics token is not found in vault

View Source
var ErrPlatformOrgMappingNotFound = errors.New("platform org mapping not found")

ErrPlatformOrgMappingNotFound is returned when a platform mapping does not exist.

View Source
var ErrPoolSaturated = errors.New("database connection pool saturated")

ErrPoolSaturated is returned when the database connection pool cannot provide a connection before the caller's context expires.

View Source
var ErrSchedulerNotFound = errors.New("scheduler not found")

ErrSchedulerNotFound is returned when a scheduler is not found

View Source
var ErrSchedulerStateConflict = errors.New("scheduler state conflict")
View Source
var ErrSlackConnectionNotFound = errors.New("slack connection not found")

ErrSlackConnectionNotFound is returned when a slack connection is not found

View Source
var ErrSlackUserLinkNotFound = errors.New("slack user link not found")

ErrSlackUserLinkNotFound is returned when a slack user link is not found

View Source
var ErrWebflowConnectionNotFound = errors.New("webflow connection not found")

ErrWebflowConnectionNotFound is returned when a webflow connection is not found

View Source
var ErrWebflowSiteSettingNotFound = errors.New("webflow site setting not found")

ErrWebflowSiteSettingNotFound is returned when a site setting is not found

Functions

func AugmentDSNWithTimeout

func AugmentDSNWithTimeout(dsn string, timeoutMs int) string

AugmentDSNWithTimeout adds statement_timeout to a DSN if not already present Supports both URL format (postgresql://...) and key=value format

func CreatePageRecords

func CreatePageRecords(ctx context.Context, q TransactionExecutor, domainID int, domain string, urls []string) ([]int, []string, error)

CreatePageRecords finds existing pages or creates new ones for the given URLs. It returns the page IDs and their corresponding paths.

func Serialise

func Serialise(v any) string

Serialise converts data to JSON string representation. It is named with British English spelling for consistency.

Types

type ActivityPoint

type ActivityPoint struct {
	Timestamp  string `json:"timestamp"`
	JobsCount  int    `json:"jobs_count"`
	TasksCount int    `json:"tasks_count"`
}

ActivityPoint represents a data point for activity charts

type BatchManager

type BatchManager struct {
	// contains filtered or unexported fields
}

BatchManager coordinates batching of database operations

func NewBatchManager

func NewBatchManager(queue QueueExecutor) *BatchManager

NewBatchManager creates a new batch manager

func (*BatchManager) QueueTaskUpdate

func (bm *BatchManager) QueueTaskUpdate(task *Task)

QueueTaskUpdate adds a task update to the batch queue

func (*BatchManager) Stop

func (bm *BatchManager) Stop()

Stop gracefully shuts down the batch manager, flushing remaining updates

type ConcurrencyOverrideFunc

type ConcurrencyOverrideFunc func(jobID string, domain string) int

ConcurrencyOverrideFunc is a callback to get effective concurrency for a job Returns the effective concurrency limit, or 0 if no override exists

type Config

type Config struct {
	Host            string        // Database host
	Port            string        // Database port
	User            string        // Database user
	Password        string        // Database password
	Database        string        // Database name
	SSLMode         string        // SSL mode (disable, require, verify-ca, verify-full)
	MaxIdleConns    int           // Maximum number of idle connections
	MaxOpenConns    int           // Maximum number of open connections
	MaxLifetime     time.Duration // Maximum lifetime of a connection
	DatabaseURL     string        // Original DATABASE_URL if used
	ApplicationName string        // Identifier for this application instance
}

Config holds PostgreSQL connection configuration

func (*Config) ConnectionString

func (c *Config) ConnectionString() string

ConnectionString returns the PostgreSQL connection string

func (*Config) Validate

func (c *Config) Validate() error

Validate checks if the configuration is valid

type DB

type DB struct {
	Cache *cache.InMemoryCache
	// contains filtered or unexported fields
}

DB represents a PostgreSQL database connection

func InitFromEnv

func InitFromEnv() (*DB, error)

InitFromEnv creates a PostgreSQL connection using environment variables

func InitFromEnvWithRetry

func InitFromEnvWithRetry(ctx context.Context) (*DB, error)

InitFromEnvWithRetry creates a PostgreSQL connection using environment variables with automatic retry on connection failures

func InitFromEnvWithRetryConfig

func InitFromEnvWithRetryConfig(ctx context.Context, retryConfig RetryConfig) (*DB, error)

InitFromEnvWithRetryConfig creates a PostgreSQL connection with custom retry configuration

func InitFromURLWithSuffix

func InitFromURLWithSuffix(databaseURL string, appEnv string, appNameSuffix string) (*DB, error)

InitFromURLWithSuffix creates a PostgreSQL connection using the provided URL and optional application name suffix. It applies the same environment-based pooling limits as InitFromEnv.

func InitFromURLWithSuffixRetry

func InitFromURLWithSuffixRetry(ctx context.Context, databaseURL string, appEnv string, appNameSuffix string) (*DB, error)

InitFromURLWithSuffixRetry creates a PostgreSQL connection using the provided URL with automatic retry on connection failures

func New

func New(config *Config) (*DB, error)

New creates a new PostgreSQL database connection

func WaitForDatabase

func WaitForDatabase(ctx context.Context, maxWait time.Duration) (*DB, error)

WaitForDatabase blocks until the database connection is established or context is cancelled This is useful during application startup to gracefully wait for database availability

func (*DB) AcceptOrganisationInvite

func (db *DB) AcceptOrganisationInvite(ctx context.Context, token, userID string) (*OrganisationInvite, error)

AcceptOrganisationInvite marks an invite as accepted and adds the user to the organisation.

func (*DB) AddOrganisationMember

func (db *DB) AddOrganisationMember(userID, organisationID, role string) error

AddOrganisationMember adds a user as a member of an organisation

func (*DB) ApplyTrafficScoresToTasks

func (db *DB) ApplyTrafficScoresToTasks(ctx context.Context, organisationID string, domainID int) error

ApplyTrafficScoresToTasks updates pending tasks using traffic scores for a domain. This ensures existing tasks are reprioritised once analytics are available.

func (*DB) CalculateTrafficScores

func (db *DB) CalculateTrafficScores(ctx context.Context, organisationID string, domainID int) error

CalculateTrafficScores calculates and stores traffic scores for all pages in a domain using a log-scaled view curve (28-day). This emphasises high-traffic pages while keeping the long tail near the floor. Score range: 0 (0-1 views) then 0.10 (floor) to 0.99 (ceiling).

func (*DB) CheckHealth

func (db *DB) CheckHealth(ctx context.Context) HealthCheck

CheckHealth tests the database connection and returns health information

func (*DB) Close

func (db *DB) Close() error

Close closes the database connection

func (*DB) CountOrganisationAdmins

func (db *DB) CountOrganisationAdmins(ctx context.Context, organisationID string) (int, error)

CountOrganisationAdmins returns the number of admins in an organisation.

func (*DB) CreateGoogleConnection

func (db *DB) CreateGoogleConnection(ctx context.Context, conn *GoogleAnalyticsConnection) error

CreateGoogleConnection creates a new Google Analytics connection for an organisation Note: Use StoreGoogleToken after creating the connection to store the refresh token in Vault

func (*DB) CreateOrUpdateSiteSetting

func (db *DB) CreateOrUpdateSiteSetting(ctx context.Context, setting *WebflowSiteSetting) error

CreateOrUpdateSiteSetting creates or updates a Webflow site setting

func (*DB) CreateOrganisation

func (db *DB) CreateOrganisation(name string) (*Organisation, error)

CreateOrganisation creates a new organisation

func (*DB) CreateOrganisationInvite

func (db *DB) CreateOrganisationInvite(ctx context.Context, invite *OrganisationInvite) (*OrganisationInvite, error)

CreateOrganisationInvite inserts a new invite record.

func (*DB) CreateScheduler

func (db *DB) CreateScheduler(ctx context.Context, scheduler *Scheduler) error

CreateScheduler creates a new scheduler

func (*DB) CreateSlackConnection

func (db *DB) CreateSlackConnection(ctx context.Context, conn *SlackConnection) error

CreateSlackConnection creates a new Slack connection for an organisation Note: Use StoreSlackToken after creating the connection to store the access token in Vault

func (db *DB) CreateSlackUserLink(ctx context.Context, link *SlackUserLink) error

CreateSlackUserLink creates a link between a BBB user and their Slack identity

func (*DB) CreateUser

func (db *DB) CreateUser(userID, email string, firstName, lastName, fullName *string, orgName string) (*User, *Organisation, error)

If user already exists, returns the existing user and their organisation

func (*DB) CreateWebflowConnection

func (db *DB) CreateWebflowConnection(ctx context.Context, conn *WebflowConnection) error

CreateWebflowConnection creates a new Webflow connection for an organisation Note: Use StoreWebflowToken after creating the connection to store the access token in Vault

func (*DB) DeleteGoogleConnection

func (db *DB) DeleteGoogleConnection(ctx context.Context, connectionID, organisationID string) error

DeleteGoogleConnection deletes a Google Analytics connection

func (*DB) DeleteScheduler

func (db *DB) DeleteScheduler(ctx context.Context, schedulerID string) error

DeleteScheduler deletes a scheduler

func (*DB) DeleteSiteSetting

func (db *DB) DeleteSiteSetting(ctx context.Context, organisationID, webflowSiteID string) error

DeleteSiteSetting deletes a site setting

func (*DB) DeleteSiteSettingsByConnection

func (db *DB) DeleteSiteSettingsByConnection(ctx context.Context, connectionID string) error

DeleteSiteSettingsByConnection deletes all site settings for a connection This is called when a Webflow connection is disconnected

func (*DB) DeleteSlackConnection

func (db *DB) DeleteSlackConnection(ctx context.Context, connectionID, organisationID string) error

DeleteSlackConnection deletes a Slack connection

func (db *DB) DeleteSlackUserLink(ctx context.Context, userID, connectionID string) error

DeleteSlackUserLink deletes a user's Slack link

func (*DB) DeleteWebflowConnection

func (db *DB) DeleteWebflowConnection(ctx context.Context, connectionID, organisationID string) error

DeleteWebflowConnection deletes a Webflow connection

func (*DB) GetActiveGAConnectionForDomain

func (db *DB) GetActiveGAConnectionForDomain(ctx context.Context, organisationID string, domainID int) (*GoogleAnalyticsConnection, error)

GetActiveGAConnectionForDomain retrieves the active GA4 connection for a specific domain Returns nil if no active connection for this domain (not an error)

func (*DB) GetActiveGAConnectionForOrganisation

func (db *DB) GetActiveGAConnectionForOrganisation(ctx context.Context, orgID string) (*GoogleAnalyticsConnection, error)

GetActiveGAConnectionForOrganisation retrieves the active GA4 connection for an organisation Returns nil if no active connection (not an error) Deprecated: Use GetActiveGAConnectionForDomain for domain-specific lookups

func (*DB) GetActivePlans

func (db *DB) GetActivePlans(ctx context.Context) ([]Plan, error)

GetActivePlans returns all active subscription plans

func (*DB) GetConfig

func (d *DB) GetConfig() *Config

GetConfig returns the original DB connection settings

func (*DB) GetDB

func (db *DB) GetDB() *sql.DB

GetDB returns the underlying database connection

func (*DB) GetDomainNameByID

func (db *DB) GetDomainNameByID(ctx context.Context, domainID int) (string, error)

GetDomainNameByID retrieves a single domain name by ID

func (*DB) GetDomainNames

func (db *DB) GetDomainNames(ctx context.Context, domainIDs []int) (map[int]string, error)

GetDomainNames retrieves domain names for multiple domain IDs in a single query Returns a map of domainID -> domainName

func (*DB) GetDomainsForOrganisation

func (db *DB) GetDomainsForOrganisation(ctx context.Context, organisationID string) ([]OrganisationDomain, error)

GetDomainsForOrganisation returns all domains that belong to the organisation This includes domains from jobs created by any user in the organisation

func (*DB) GetEffectiveOrganisationID

func (db *DB) GetEffectiveOrganisationID(user *User) string

GetEffectiveOrganisationID returns the user's effective organisation ID (active_organisation_id if set, otherwise organisation_id for backward compatibility)

func (*DB) GetEnabledUserLinksForConnection

func (db *DB) GetEnabledUserLinksForConnection(ctx context.Context, connectionID string) ([]*SlackUserLink, error)

GetEnabledUserLinksForConnection returns user links with DM notifications enabled

func (*DB) GetExternalRedirects

func (db *DB) GetExternalRedirects(organisationID string, startDate, endDate *time.Time) ([]ExternalRedirect, error)

GetExternalRedirects retrieves pages that redirect to external domains

func (*DB) GetGA4Account

func (db *DB) GetGA4Account(ctx context.Context, accountID string) (*GoogleAnalyticsAccount, error)

GetGA4Account retrieves a Google Analytics account by ID

func (*DB) GetGA4AccountByGoogleID

func (db *DB) GetGA4AccountByGoogleID(ctx context.Context, organisationID, googleAccountID string) (*GoogleAnalyticsAccount, error)

GetGA4AccountByGoogleID retrieves a Google Analytics account by its Google account ID

func (*DB) GetGA4AccountToken

func (db *DB) GetGA4AccountToken(ctx context.Context, accountID string) (string, error)

GetGA4AccountToken retrieves a Google Analytics refresh token for an account from Supabase Vault

func (*DB) GetGA4AccountWithToken

func (db *DB) GetGA4AccountWithToken(ctx context.Context, organisationID string) (*GoogleAnalyticsAccount, error)

GetGA4AccountWithToken retrieves a GA4 account that has a valid token stored Returns the first account found with a token for the organisation

func (*DB) GetGAConnectionWithToken

func (db *DB) GetGAConnectionWithToken(ctx context.Context, organisationID string) (*GoogleAnalyticsConnection, error)

GetGAConnectionWithToken retrieves a GA4 connection that has a valid token stored Returns the most recently updated connection with a token for the organisation

func (*DB) GetGoogleConnection

func (db *DB) GetGoogleConnection(ctx context.Context, connectionID string) (*GoogleAnalyticsConnection, error)

GetGoogleConnection retrieves a Google Analytics connection by ID

func (*DB) GetGoogleToken

func (db *DB) GetGoogleToken(ctx context.Context, connectionID string) (string, error)

GetGoogleToken retrieves a Google Analytics refresh token from Supabase Vault

func (*DB) GetJobActivity

func (db *DB) GetJobActivity(organisationID string, startDate, endDate *time.Time) ([]ActivityPoint, error)

GetJobActivity retrieves job activity data for charts

func (*DB) GetJobStats

func (db *DB) GetJobStats(organisationID string, startDate, endDate *time.Time) (*JobStats, error)

GetJobStats retrieves job statistics for the dashboard

func (*DB) GetLastJobStartTimeForScheduler

func (db *DB) GetLastJobStartTimeForScheduler(ctx context.Context, schedulerID string) (*time.Time, error)

GetLastJobStartTimeForScheduler retrieves the most recent started_at time for jobs created by a scheduler

func (*DB) GetNotification

func (db *DB) GetNotification(ctx context.Context, notificationID string) (*Notification, error)

GetNotification retrieves a notification by ID

func (*DB) GetOrCreateDomainID

func (db *DB) GetOrCreateDomainID(ctx context.Context, domain string) (int, error)

GetOrCreateDomainID retrieves or creates a domain ID for a given domain name Uses INSERT ... ON CONFLICT to handle concurrent creation atomically

func (*DB) GetOrCreateUser

func (db *DB) GetOrCreateUser(userID, email string, fullName *string) (*User, error)

GetOrCreateUser retrieves a user by ID, creating them if they don't exist This is used for auto-creating users from valid JWT tokens

func (*DB) GetOrganisation

func (db *DB) GetOrganisation(organisationID string) (*Organisation, error)

GetOrganisation retrieves an organisation by ID

func (*DB) GetOrganisationByName

func (db *DB) GetOrganisationByName(name string) (*Organisation, error)

GetOrganisationByName retrieves an organisation by name (case-insensitive)

func (*DB) GetOrganisationInviteByToken

func (db *DB) GetOrganisationInviteByToken(ctx context.Context, token string) (*OrganisationInvite, error)

GetOrganisationInviteByToken returns an invite by token.

func (*DB) GetOrganisationMemberRole

func (db *DB) GetOrganisationMemberRole(ctx context.Context, userID, organisationID string) (string, error)

GetOrganisationMemberRole returns the role for a user in an organisation.

func (*DB) GetOrganisationMembers

func (db *DB) GetOrganisationMembers(organisationID string) ([]*User, error)

func (*DB) GetOrganisationPlanID

func (db *DB) GetOrganisationPlanID(ctx context.Context, organisationID string) (string, error)

GetOrganisationPlanID returns the current plan ID for an organisation.

func (*DB) GetOrganisationUsageStats

func (db *DB) GetOrganisationUsageStats(ctx context.Context, orgID string) (*UsageStats, error)

GetOrganisationUsageStats returns current usage statistics for an organisation

func (*DB) GetPendingSlackNotifications

func (db *DB) GetPendingSlackNotifications(ctx context.Context, limit int) ([]*Notification, error)

GetPendingSlackNotifications retrieves notifications not yet delivered to Slack

func (*DB) GetPlatformOrgMapping

func (db *DB) GetPlatformOrgMapping(ctx context.Context, platform, platformID string) (*PlatformOrgMapping, error)

GetPlatformOrgMapping returns the mapping for a platform identity.

func (*DB) GetScheduler

func (db *DB) GetScheduler(ctx context.Context, schedulerID string) (*Scheduler, error)

GetScheduler retrieves a scheduler by ID

func (*DB) GetSchedulersReadyToRun

func (db *DB) GetSchedulersReadyToRun(ctx context.Context, limit int) ([]*Scheduler, error)

GetSchedulersReadyToRun retrieves schedulers that are ready to run

func (*DB) GetSiteSetting

func (db *DB) GetSiteSetting(ctx context.Context, organisationID, webflowSiteID string) (*WebflowSiteSetting, error)

GetSiteSetting retrieves a site setting by organisation and Webflow site ID

func (*DB) GetSiteSettingByID

func (db *DB) GetSiteSettingByID(ctx context.Context, id string) (*WebflowSiteSetting, error)

GetSiteSettingByID retrieves a site setting by its ID

func (*DB) GetSlackConnection

func (db *DB) GetSlackConnection(ctx context.Context, connectionID string) (*SlackConnection, error)

GetSlackConnection retrieves a Slack connection by ID

func (*DB) GetSlackConnectionsForOrg

func (db *DB) GetSlackConnectionsForOrg(ctx context.Context, organisationID string) ([]*SlackConnection, error)

GetSlackConnectionsForOrg returns all Slack connections for an organisation

func (*DB) GetSlackToken

func (db *DB) GetSlackToken(ctx context.Context, connectionID string) (string, error)

GetSlackToken retrieves a Slack access token from Supabase Vault

func (db *DB) GetSlackUserLink(ctx context.Context, userID, connectionID string) (*SlackUserLink, error)

GetSlackUserLink retrieves a user's Slack link for a specific connection

func (*DB) GetSlowPages

func (db *DB) GetSlowPages(organisationID string, startDate, endDate *time.Time) ([]SlowPage, error)

GetSlowPages retrieves the slowest pages after cache retry attempts Returns top 10 absolute slowest and 10% slowest from user's organisation

func (*DB) GetUnreadNotificationCount

func (db *DB) GetUnreadNotificationCount(ctx context.Context, organisationID string) (int, error)

GetUnreadCount returns the count of unread notifications for an organisation

func (*DB) GetUser

func (db *DB) GetUser(userID string) (*User, error)

GetUser retrieves a user by ID

func (*DB) GetUserByWebhookToken

func (db *DB) GetUserByWebhookToken(webhookToken string) (*User, error)

GetUserByWebhookToken retrieves a user by their webhook token

func (*DB) GetWebflowConnection

func (db *DB) GetWebflowConnection(ctx context.Context, connectionID string) (*WebflowConnection, error)

GetWebflowConnection retrieves a Webflow connection by ID

func (*DB) GetWebflowToken

func (db *DB) GetWebflowToken(ctx context.Context, connectionID string) (string, error)

GetWebflowToken retrieves a Webflow access token from Supabase Vault

func (*DB) IsOrganisationMemberEmail

func (db *DB) IsOrganisationMemberEmail(ctx context.Context, organisationID, email string) (bool, error)

IsOrganisationMemberEmail checks whether an email belongs to a member of the organisation.

func (*DB) ListAllSiteSettings

func (db *DB) ListAllSiteSettings(ctx context.Context, organisationID string) ([]*WebflowSiteSetting, error)

ListAllSiteSettings lists all site settings for an organisation (including unconfigured)

func (*DB) ListConfiguredSiteSettings

func (db *DB) ListConfiguredSiteSettings(ctx context.Context, organisationID string) ([]*WebflowSiteSetting, error)

ListConfiguredSiteSettings lists all site settings that have configuration (schedule or auto-publish) for an organisation

func (*DB) ListDailyUsage

func (db *DB) ListDailyUsage(ctx context.Context, organisationID string, startDate, endDate time.Time) ([]DailyUsageEntry, error)

ListDailyUsage returns daily usage rows for an organisation within a date range.

func (*DB) ListGA4Accounts

func (db *DB) ListGA4Accounts(ctx context.Context, organisationID string) ([]*GoogleAnalyticsAccount, error)

ListGA4Accounts lists all Google Analytics accounts for an organisation

func (*DB) ListGoogleConnections

func (db *DB) ListGoogleConnections(ctx context.Context, organisationID string) ([]*GoogleAnalyticsConnection, error)

ListGoogleConnections lists all Google Analytics connections for an organisation

func (*DB) ListJobs

func (db *DB) ListJobs(organisationID string, limit, offset int, status, dateRange, timezone string) ([]JobWithDomain, int, error)

ListJobs retrieves a paginated list of jobs for an organisation

func (*DB) ListJobsWithOffset

func (db *DB) ListJobsWithOffset(organisationID string, limit, offset int, status, dateRange string, tzOffsetMinutes int, includeStats bool) ([]JobWithDomain, int, error)

ListJobsWithOffset lists jobs for an organisation with timezone offset-based date filtering

func (*DB) ListNotifications

func (db *DB) ListNotifications(ctx context.Context, organisationID string, limit, offset int, unreadOnly bool) ([]*Notification, int, error)

ListNotifications retrieves notifications for an organisation

func (*DB) ListOrganisationInvites

func (db *DB) ListOrganisationInvites(ctx context.Context, organisationID string) ([]OrganisationInvite, error)

ListOrganisationInvites returns pending invites for an organisation.

func (*DB) ListOrganisationMembers

func (db *DB) ListOrganisationMembers(ctx context.Context, organisationID string) ([]OrganisationMember, error)

ListOrganisationMembers returns all members for an organisation.

func (*DB) ListSchedulers

func (db *DB) ListSchedulers(ctx context.Context, organisationID string) ([]*Scheduler, error)

ListSchedulers retrieves all schedulers for an organisation

func (*DB) ListSiteSettingsByConnection

func (db *DB) ListSiteSettingsByConnection(ctx context.Context, connectionID string) ([]*WebflowSiteSetting, error)

ListSiteSettingsByConnection lists all site settings for a specific connection

func (*DB) ListSlackConnections

func (db *DB) ListSlackConnections(ctx context.Context, organisationID string) ([]*SlackConnection, error)

ListSlackConnections lists all Slack connections for an organisation

func (*DB) ListSlackUserLinksForConnection

func (db *DB) ListSlackUserLinksForConnection(ctx context.Context, connectionID string) ([]*SlackUserLink, error)

ListSlackUserLinksForConnection lists all user links for a Slack connection

func (*DB) ListUserOrganisations

func (db *DB) ListUserOrganisations(userID string) ([]UserOrganisation, error)

ListUserOrganisations returns all organisations a user is a member of

func (*DB) ListWebflowConnections

func (db *DB) ListWebflowConnections(ctx context.Context, organisationID string) ([]*WebflowConnection, error)

ListWebflowConnections lists all Webflow connections for an organisation

func (*DB) MarkAllNotificationsRead

func (db *DB) MarkAllNotificationsRead(ctx context.Context, organisationID string) error

MarkAllNotificationsRead marks all notifications as read for an organisation

func (*DB) MarkConnectionInactive

func (db *DB) MarkConnectionInactive(ctx context.Context, connectionID, reason string) error

MarkConnectionInactive sets a connection status to inactive with a reason logged

func (*DB) MarkNotificationDelivered

func (db *DB) MarkNotificationDelivered(ctx context.Context, notificationID, channel string) error

MarkNotificationDelivered marks a notification as delivered to a channel

func (*DB) MarkNotificationRead

func (db *DB) MarkNotificationRead(ctx context.Context, notificationID, organisationID string) error

MarkNotificationRead marks a notification as read

func (*DB) RecalculateJobStats

func (db *DB) RecalculateJobStats(ctx context.Context, jobID string) error

RecalculateJobStats recalculates all statistics for a job based on actual task records

func (*DB) RemoveOrganisationMember

func (db *DB) RemoveOrganisationMember(ctx context.Context, userID, organisationID string) error

RemoveOrganisationMember deletes a membership from an organisation.

func (*DB) ResetDataOnly

func (db *DB) ResetDataOnly() error

ResetDataOnly clears all data from tables but preserves the schema. This is the safe option for clearing test data without triggering schema changes.

func (*DB) ResetSchema

func (db *DB) ResetSchema() error

ResetSchema performs a database reset by dropping job-related tables, clearing migration history, and re-running all migrations. Users and organisations are preserved.

func (*DB) RevokeOrganisationInvite

func (db *DB) RevokeOrganisationInvite(ctx context.Context, inviteID, organisationID string) error

RevokeOrganisationInvite marks an invite as revoked.

func (*DB) SetActiveOrganisation

func (db *DB) SetActiveOrganisation(userID, organisationID string) error

SetActiveOrganisation sets the user's active organisation

func (*DB) SetOrganisationPlan

func (db *DB) SetOrganisationPlan(ctx context.Context, organisationID, planID string) error

SetOrganisationPlan updates the organisation's plan.

func (*DB) StoreGA4AccountToken

func (db *DB) StoreGA4AccountToken(ctx context.Context, accountID, refreshToken string) error

StoreGA4AccountToken stores a Google Analytics refresh token for an account in Supabase Vault

func (*DB) StoreGoogleToken

func (db *DB) StoreGoogleToken(ctx context.Context, connectionID, refreshToken string) error

StoreGoogleToken stores a Google Analytics refresh token in Supabase Vault

func (*DB) StoreSlackToken

func (db *DB) StoreSlackToken(ctx context.Context, connectionID, token string) error

StoreSlackToken stores a Slack access token in Supabase Vault

func (*DB) StoreWebflowToken

func (db *DB) StoreWebflowToken(ctx context.Context, connectionID, token string) error

StoreWebflowToken stores a Webflow access token in Supabase Vault

func (*DB) UpdateConnectionDomains

func (db *DB) UpdateConnectionDomains(ctx context.Context, connectionID string, domainIDs []int) error

UpdateConnectionDomains updates the domain_ids for an existing connection

func (*DB) UpdateConnectionLastSync

func (db *DB) UpdateConnectionLastSync(ctx context.Context, connectionID string) error

UpdateConnectionLastSync updates the last_synced_at timestamp for a connection

func (*DB) UpdateDomainTechnologies

func (db *DB) UpdateDomainTechnologies(ctx context.Context, domainID int, technologies, headers []byte, htmlPath string) error

UpdateDomainTechnologies updates the detected technologies for a domain. Called after first successful task crawl in a job to store tech detection results.

func (*DB) UpdateGoogleConnectionStatus

func (db *DB) UpdateGoogleConnectionStatus(ctx context.Context, connectionID, organisationID, status string) error

UpdateGoogleConnectionStatus updates the status of a Google Analytics connection

func (*DB) UpdateOrganisationMemberRole

func (db *DB) UpdateOrganisationMemberRole(ctx context.Context, userID, organisationID, role string) error

UpdateOrganisationMemberRole updates a member's role in an organisation.

func (*DB) UpdateScheduler

func (db *DB) UpdateScheduler(ctx context.Context, schedulerID string, updates *Scheduler, expectedIsEnabled *bool) error

UpdateScheduler updates a scheduler's configuration. If expectedIsEnabled is non-nil, the update is conditional on current is_enabled matching the expected value (optimistic concurrency).

func (*DB) UpdateSchedulerNextRun

func (db *DB) UpdateSchedulerNextRun(ctx context.Context, schedulerID string, nextRun time.Time) error

UpdateSchedulerNextRun updates only the next_run_at timestamp

func (*DB) UpdateSiteAutoPublish

func (db *DB) UpdateSiteAutoPublish(ctx context.Context, organisationID, webflowSiteID string, enabled bool, webhookID string) error

UpdateSiteAutoPublish updates only the auto-publish-related fields for a site setting

func (*DB) UpdateSiteSchedule

func (db *DB) UpdateSiteSchedule(ctx context.Context, organisationID, webflowSiteID string, scheduleIntervalHours *int, schedulerID string) error

UpdateSiteSchedule updates only the schedule-related fields for a site setting

func (*DB) UpdateSlackUserLinkNotifications

func (db *DB) UpdateSlackUserLinkNotifications(ctx context.Context, userID, connectionID string, dmNotifications bool) error

UpdateSlackUserLinkNotifications updates the DM notification preference for a user link

func (*DB) UpdateUserNames

func (db *DB) UpdateUserNames(userID string, firstName, lastName, fullName *string) error

UpdateUserNames updates the user's name fields.

func (*DB) UpsertGA4Account

func (db *DB) UpsertGA4Account(ctx context.Context, account *GoogleAnalyticsAccount) error

UpsertGA4Account creates or updates a Google Analytics account record

func (*DB) UpsertOrganisationDomain

func (db *DB) UpsertOrganisationDomain(ctx context.Context, organisationID string, domainID int) error

UpsertOrganisationDomain ensures a domain is associated with an organisation.

func (*DB) UpsertPageWithAnalytics

func (db *DB) UpsertPageWithAnalytics(
	ctx context.Context,
	organisationID string,
	domainID int,
	path string,
	pageViews map[string]int64,
	connectionID string,
) (int, error)

UpsertPageWithAnalytics creates or updates a page and its org-scoped analytics data Stores GA4 page view data in the page_analytics table tied to the organisation

func (*DB) UpsertPlatformOrgMapping

func (db *DB) UpsertPlatformOrgMapping(ctx context.Context, mapping *PlatformOrgMapping) error

UpsertPlatformOrgMapping creates or updates a platform mapping.

func (*DB) ValidateOrganisationMembership

func (db *DB) ValidateOrganisationMembership(userID, organisationID string) (bool, error)

ValidateOrganisationMembership checks if a user is a member of an organisation

type DailyUsageEntry

type DailyUsageEntry struct {
	UsageDate      time.Time
	PagesProcessed int
	JobsCreated    int
}

DailyUsageEntry represents historical daily usage.

type DbQueue

type DbQueue struct {
	// contains filtered or unexported fields
}

DbQueue is a PostgreSQL implementation of a job queue

func NewDbQueue

func NewDbQueue(db *DB) *DbQueue

NewDbQueue creates a PostgreSQL job queue

func (*DbQueue) CleanupStuckJobs

func (q *DbQueue) CleanupStuckJobs(ctx context.Context) error

CleanupStuckJobs finds and fixes jobs that are stuck in pending/running state despite having all their tasks completed

func (*DbQueue) DecrementRunningTasks

func (q *DbQueue) DecrementRunningTasks(ctx context.Context, jobID string) error

DecrementRunningTasks immediately decrements the running_tasks counter for a job. This is called when a task completes to free up concurrency slots without waiting for batch flush. Also promotes one waiting task to pending if job still has capacity. The actual task field updates are still handled by the batch manager for efficiency.

func (*DbQueue) DecrementRunningTasksBy

func (q *DbQueue) DecrementRunningTasksBy(ctx context.Context, jobID string, count int) error

DecrementRunningTasksBy releases multiple running task slots for a job in one trip.

func (*DbQueue) EnqueueURLs

func (q *DbQueue) EnqueueURLs(ctx context.Context, jobID string, pages []Page, sourceType string, sourceURL string) error

EnqueueURLs adds multiple URLs as tasks for a job

func (*DbQueue) Execute

func (q *DbQueue) Execute(ctx context.Context, fn func(*sql.Tx) error) error

Execute runs a database operation in a transaction

func (*DbQueue) ExecuteMaintenance

func (q *DbQueue) ExecuteMaintenance(ctx context.Context, fn func(*sql.Tx) error) error

ExecuteMaintenance runs a low-impact transaction that bypasses pool saturation guards. This is intended for housekeeping tasks that must always run, even when the pool is busy.

func (*DbQueue) ExecuteWithContext

func (q *DbQueue) ExecuteWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error

ExecuteWithContext runs a transactional operation with full context propagation. The callback receives both the context and transaction, ensuring SQL statements can respect timeouts. This should be preferred over Execute for all new code.

func (*DbQueue) GetNextTask

func (q *DbQueue) GetNextTask(ctx context.Context, jobID string) (*Task, error)

GetNextTask gets a pending task using row-level locking Uses FOR UPDATE SKIP LOCKED to prevent lock contention between workers Combines SELECT and UPDATE in a CTE for atomic claiming

func (*DbQueue) SetConcurrencyOverride

func (q *DbQueue) SetConcurrencyOverride(fn ConcurrencyOverrideFunc)

SetConcurrencyOverride sets a callback to retrieve effective concurrency from the domain limiter

func (*DbQueue) UpdateDomainTechnologies

func (q *DbQueue) UpdateDomainTechnologies(ctx context.Context, domainID int, technologies, headers []byte, htmlPath string) error

UpdateDomainTechnologies updates the detected technologies for a domain. Delegates to the underlying DB implementation.

func (*DbQueue) UpdateTaskStatus

func (q *DbQueue) UpdateTaskStatus(ctx context.Context, task *Task) error

UpdateTaskStatus updates a task's status and associated metadata in a single function This provides a unified way to handle various task state transitions

type Domain

type Domain struct {
	Name string `json:"name"`
}

Domain represents the domain information for jobs

type ExternalRedirect

type ExternalRedirect struct {
	URL         string `json:"url"`
	Domain      string `json:"domain"`
	Path        string `json:"path"`
	RedirectURL string `json:"redirect_url"`
	JobID       string `json:"job_id"`
	CompletedAt string `json:"completed_at"`
}

ExternalRedirect represents a page that redirects to an external domain

type GoogleAnalyticsAccount

type GoogleAnalyticsAccount struct {
	ID                string // UUID
	OrganisationID    string // Organisation this account belongs to
	GoogleAccountID   string // GA account ID (e.g., "accounts/123456")
	GoogleAccountName string // Display name of the account
	GoogleUserID      string // Google user ID who authorised
	GoogleEmail       string // Google email for display
	VaultSecretName   string // Token stored in Vault
	InstallingUserID  string // Our user who installed
	CreatedAt         time.Time
	UpdatedAt         time.Time
}

GoogleAnalyticsAccount represents an organisation's linked Google Analytics account Accounts are synced from Google API and stored in DB for persistent display.

type GoogleAnalyticsConnection

type GoogleAnalyticsConnection struct {
	ID                string
	OrganisationID    string
	GA4PropertyID     string        // GA4 property ID (e.g., "123456789")
	GA4PropertyName   string        // Display name of the property
	GoogleAccountID   string        // GA account ID (e.g., "accounts/123456")
	GoogleAccountName string        // Display name of the account
	GoogleUserID      string        // Google user ID who authorised
	GoogleEmail       string        // Google email for display
	VaultSecretName   string        // Name of the secret in Supabase Vault
	InstallingUserID  string        // Our user who installed
	Status            string        // "active" or "inactive"
	DomainIDs         pq.Int64Array // Array of domain IDs associated with this property
	LastSyncedAt      time.Time     // When analytics data was last synced
	CreatedAt         time.Time
	UpdatedAt         time.Time
}

GoogleAnalyticsConnection represents an organisation's connection to a GA4 property

type HealthCheck

type HealthCheck struct {
	Connected   bool          `json:"connected"`
	Latency     time.Duration `json:"latency_ms"`
	Tables      []string      `json:"tables,omitempty"`
	TablesCount int           `json:"tables_count"`
	Error       string        `json:"error,omitempty"`
}

HealthCheck contains database health information

type JobListItem

type JobListItem struct {
	ID                    string         `json:"id"`
	Status                string         `json:"status"`
	Progress              float64        `json:"progress"`
	TotalTasks            int            `json:"total_tasks"`
	CompletedTasks        int            `json:"completed_tasks"`
	FailedTasks           int            `json:"failed_tasks"`
	SkippedTasks          int            `json:"skipped_tasks"`
	SitemapTasks          int            `json:"sitemap_tasks"`
	FoundTasks            int            `json:"found_tasks"`
	CreatedAt             string         `json:"created_at"`
	StartedAt             *string        `json:"started_at,omitempty"`
	CompletedAt           *string        `json:"completed_at,omitempty"`
	Domain                *string        `json:"domains,omitempty"` // For compatibility with frontend
	DurationSeconds       *int           `json:"duration_seconds,omitempty"`
	AvgTimePerTaskSeconds *float64       `json:"avg_time_per_task_seconds,omitempty"`
	Stats                 map[string]any `json:"stats,omitempty"`
}

JobListItem represents a job in the list view

type JobStats

type JobStats struct {
	TotalJobs         int     `json:"total_jobs"`
	RunningJobs       int     `json:"running_jobs"`
	CompletedJobs     int     `json:"completed_jobs"`
	FailedJobs        int     `json:"failed_jobs"`
	TotalTasks        int     `json:"total_tasks"`
	AvgCompletionTime float64 `json:"avg_completion_time"`
}

JobStats represents job statistics for the dashboard

type JobWithDomain

type JobWithDomain struct {
	JobListItem
	Domains *Domain `json:"domains"`
}

JobWithDomain represents a job with domain information

type Notification

type Notification struct {
	ID               string
	OrganisationID   string
	UserID           *string // nil for org-wide notifications
	Type             NotificationType
	Subject          string         // Main heading (e.g., "✅ Job completed: example.com")
	Preview          string         // Short summary for previews/toasts
	Message          string         // Full details (optional)
	Link             string         // URL path to view details (e.g., "/jobs/abc-123")
	Data             map[string]any // Additional structured data
	ReadAt           *time.Time
	SlackDeliveredAt *time.Time
	EmailDeliveredAt *time.Time
	CreatedAt        time.Time
}

Notification represents a notification record

type NotificationData

type NotificationData struct {
	JobID          string `json:"job_id,omitempty"`
	Domain         string `json:"domain,omitempty"`
	CompletedTasks int    `json:"completed_tasks,omitempty"`
	FailedTasks    int    `json:"failed_tasks,omitempty"`
	Duration       string `json:"duration,omitempty"`
	ErrorMessage   string `json:"error_message,omitempty"`
	SchedulerID    string `json:"scheduler_id,omitempty"`
}

NotificationData is the structured payload for different notification types

type NotificationType

type NotificationType string

NotificationType defines the types of notifications

const (
	NotificationJobComplete    NotificationType = "job_complete"
	NotificationJobFailed      NotificationType = "job_failed"
	NotificationSchedulerRun   NotificationType = "scheduler_run"
	NotificationSchedulerError NotificationType = "scheduler_error"
)

type Organisation

type Organisation struct {
	ID        string    `json:"id"`
	Name      string    `json:"name"`
	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

Organisation represents an organisation in the system

type OrganisationDomain

type OrganisationDomain struct {
	ID   int    `json:"id"`
	Name string `json:"name"`
}

OrganisationDomain represents a domain belonging to an organisation

type OrganisationInvite

type OrganisationInvite struct {
	ID             string
	OrganisationID string
	Email          string
	Role           string
	Token          string
	CreatedBy      string
	CreatedAt      time.Time
	ExpiresAt      time.Time
	AcceptedAt     *time.Time
	RevokedAt      *time.Time
}

OrganisationInvite represents a pending invite for an organisation.

type OrganisationMember

type OrganisationMember struct {
	UserID    string
	Email     string
	FirstName *string
	LastName  *string
	FullName  *string
	Role      string
	CreatedAt time.Time
}

OrganisationMember represents a user membership within an organisation.

type Page

type Page struct {
	ID       int
	Path     string
	Priority float64
}

Page represents a page to be enqueued with its priority

type Plan

type Plan struct {
	ID                string    `json:"id"`
	Name              string    `json:"name"`
	DisplayName       string    `json:"display_name"`
	DailyPageLimit    int       `json:"daily_page_limit"`
	MonthlyPriceCents int       `json:"monthly_price_cents"`
	IsActive          bool      `json:"is_active"`
	SortOrder         int       `json:"sort_order"`
	CreatedAt         time.Time `json:"created_at"`
}

Plan represents a subscription tier

type PlatformOrgMapping

type PlatformOrgMapping struct {
	ID             string
	Platform       string
	PlatformID     string
	PlatformName   *string
	OrganisationID string
	CreatedAt      time.Time
	UpdatedAt      time.Time
	CreatedBy      *string
}

PlatformOrgMapping links an external platform identity to an organisation.

type QueueExecutor

type QueueExecutor interface {
	Execute(ctx context.Context, fn func(*sql.Tx) error) error
	ExecuteWithContext(ctx context.Context, fn func(context.Context, *sql.Tx) error) error
}

QueueExecutor defines the minimal interface needed for batch operations

type RetryConfig

type RetryConfig struct {
	MaxAttempts     int           // Maximum number of connection attempts
	InitialInterval time.Duration // Initial retry interval
	MaxInterval     time.Duration // Maximum retry interval (cap for exponential backoff)
	Multiplier      float64       // Backoff multiplier (typically 2.0)
	Jitter          bool          // Add randomness to prevent thundering herd
}

RetryConfig holds configuration for connection retry behaviour

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns sensible defaults for database connection retries

type Scheduler

type Scheduler struct {
	ID                    string
	DomainID              int
	OrganisationID        string
	ScheduleIntervalHours int
	NextRunAt             time.Time
	IsEnabled             bool
	Concurrency           int
	FindLinks             bool
	MaxPages              int
	IncludePaths          []string
	ExcludePaths          []string
	RequiredWorkers       int
	CreatedAt             time.Time
	UpdatedAt             time.Time
}

Scheduler represents a recurring job schedule

type SlackConnection

type SlackConnection struct {
	ID               string
	OrganisationID   string
	WorkspaceID      string
	WorkspaceName    string
	VaultSecretName  string // Name of the secret in Supabase Vault
	BotUserID        string
	InstallingUserID *string
	CreatedAt        time.Time
	UpdatedAt        time.Time
}

SlackConnection represents an organisation's connection to a Slack workspace

type SlackUserLink struct {
	ID                string
	UserID            string
	SlackConnectionID string
	SlackUserID       string
	DMNotifications   bool
	CreatedAt         time.Time
}

SlackUserLink represents a BBB user linked to their Slack identity

type SlowPage

type SlowPage struct {
	URL                string `json:"url"`
	Domain             string `json:"domain"`
	Path               string `json:"path"`
	SecondResponseTime int64  `json:"second_response_time"` // milliseconds after cache retry
	JobID              string `json:"job_id"`
	CompletedAt        string `json:"completed_at"`
}

SlowPage represents a slow-loading page for dashboard analysis

type Task

type Task struct {
	ID          string
	JobID       string
	PageID      int
	Path        string
	Status      string
	CreatedAt   time.Time
	StartedAt   time.Time
	CompletedAt time.Time
	RetryCount  int
	Error       string
	SourceType  string
	SourceURL   string

	// Result data
	StatusCode          int
	ResponseTime        int64
	CacheStatus         string
	ContentType         string
	ContentLength       int64
	Headers             []byte // Stored as JSONB
	RedirectURL         string
	DNSLookupTime       int64
	TCPConnectionTime   int64
	TLSHandshakeTime    int64
	TTFB                int64
	ContentTransferTime int64

	// Second request data
	SecondResponseTime        int64
	SecondCacheStatus         string
	SecondContentLength       int64
	SecondHeaders             []byte // Stored as JSONB
	SecondDNSLookupTime       int64
	SecondTCPConnectionTime   int64
	SecondTLSHandshakeTime    int64
	SecondTTFB                int64
	SecondContentTransferTime int64
	CacheCheckAttempts        []byte // Stored as JSONB

	// Priority
	PriorityScore float64
}

Task represents a task in the queue

type TaskUpdate

type TaskUpdate struct {
	Task      *Task
	UpdatedAt time.Time
}

TaskUpdate represents a pending task status update

type TransactionExecutor

type TransactionExecutor interface {
	Execute(ctx context.Context, fn func(*sql.Tx) error) error
}

TransactionExecutor interface for types that can execute transactions

type UsageStats

type UsageStats struct {
	DailyLimit      int       `json:"daily_limit"`
	DailyUsed       int       `json:"daily_used"`
	DailyRemaining  int       `json:"daily_remaining"`
	UsagePercentage float64   `json:"usage_percentage"`
	PlanID          string    `json:"plan_id"`
	PlanName        string    `json:"plan_name"`
	PlanDisplayName string    `json:"plan_display_name"`
	ResetsAt        time.Time `json:"resets_at"`
}

UsageStats represents current usage statistics for an organisation

type User

type User struct {
	ID                   string    `json:"id"`
	Email                string    `json:"email"`
	FirstName            *string   `json:"first_name,omitempty"`
	LastName             *string   `json:"last_name,omitempty"`
	FullName             *string   `json:"full_name,omitempty"`
	OrganisationID       *string   `json:"organisation_id,omitempty"`
	ActiveOrganisationID *string   `json:"active_organisation_id,omitempty"`
	SlackUserID          *string   `json:"slack_user_id,omitempty"`
	WebhookToken         *string   `json:"-"` // Excluded from JSON - sensitive credential
	CreatedAt            time.Time `json:"created_at"`
	UpdatedAt            time.Time `json:"updated_at"`
}

User represents a user in the system

type UserOrganisation

type UserOrganisation struct {
	ID        string    `json:"id"`
	Name      string    `json:"name"`
	CreatedAt time.Time `json:"created_at"`
}

UserOrganisation represents an organisation a user belongs to

type WebflowConnection

type WebflowConnection struct {
	ID                 string
	OrganisationID     string
	WebflowWorkspaceID string
	WorkspaceName      string // Display name of the Webflow workspace
	AuthedUserID       string
	VaultSecretName    string // Name of the secret in Supabase Vault
	InstallingUserID   string
	CreatedAt          time.Time
	UpdatedAt          time.Time
}

WebflowConnection represents an organisation's connection to a Webflow Workspace/User Note: While OAuth is user-workspace scoped, we map it to an Organisation in our system.

type WebflowSiteSetting

type WebflowSiteSetting struct {
	ID                    string
	ConnectionID          string
	OrganisationID        string
	WebflowSiteID         string
	SiteName              string
	PrimaryDomain         string
	ScheduleIntervalHours *int
	AutoPublishEnabled    bool
	WebhookID             string
	WebhookRegisteredAt   *time.Time
	SchedulerID           string
	CreatedAt             time.Time
	UpdatedAt             time.Time
}

WebflowSiteSetting represents per-site configuration for a Webflow site

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL