runtime

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 31 Imported by: 0

Documentation

Overview

Package runtime provides the core event processing infrastructure for protoflow.

Architecture Overview

The runtime package implements a message-driven architecture built on top of Watermill. It provides typed handlers for Protocol Buffers and JSON messages, along with a middleware chain for cross-cutting concerns.

Package Structure

The runtime package is organized into the following components:

## Core Service (service.go)

The Service struct is the central orchestrator that wires together:

  • Message router (Watermill)
  • Publisher and subscriber connections
  • Middleware chain
  • HTTP servers for metrics and WebUI
  • Proto message registry for validation

## Handler Registration (registration*.go)

Handler registration files provide typed wrappers for message handlers:

  • registration.go: Raw Watermill handlers and base registration logic
  • registration_json.go: Typed JSON message handlers
  • registration_proto.go: Typed Protocol Buffer message handlers

## Middleware (middleware.go)

The middleware system provides composable message processing stages:

  • CorrelationID: Ensures message traceability
  • LogMessages: Debug logging of message payloads
  • ProtoValidate: Schema validation for protobuf messages
  • Outbox: Transactional outbox pattern support
  • Tracer: OpenTelemetry distributed tracing
  • Metrics: Prometheus metrics collection
  • Retry: Exponential backoff retry logic
  • PoisonQueue: Dead letter queue for failed messages
  • Recoverer: Panic recovery

## Stats & Monitoring (models.go, resources.go)

Extended metrics collection for handler performance:

  • Latency percentiles (p50, p95, p99)
  • Throughput tracking
  • Error categorization
  • Resource usage sampling
  • Backlog estimation

## Publishing (publisher.go)

Utilities for emitting proto-based events with proper metadata.

## WebUI (webui.go)

HTTP API for introspecting handler state and statistics.

Sub-packages

  • config/: Service configuration with validation
  • errors/: Sentinel errors and error types
  • handlers/: Message context types and handler building
  • ids/: ULID generation for message IDs
  • jsoncodec/: JSON marshaling utilities
  • logging/: Logger interface and adapters
  • metadata/: Message metadata utilities
  • transport/: Pub/sub transport implementations (Kafka, RabbitMQ, AWS, NATS, etc.)

Usage Example

cfg := &protoflow.Config{
	PubSubSystem:   "kafka",
	KafkaBrokers:   []string{"localhost:9092"},
	MetricsEnabled: true,
	MetricsPort:    9090,
}

svc := protoflow.NewService(cfg, logger, ctx, protoflow.ServiceDependencies{})

protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*pb.OrderCreated]{
	Name:         "order-processor",
	ConsumeQueue: "orders.created",
	PublishQueue: "orders.processed",
	Handler:      processOrder,
})

svc.Start(ctx)

Index

Constants

View Source
const (
	DependencyStatusUnknown  = "unknown"
	DependencyStatusHealthy  = "healthy"
	DependencyStatusDegraded = "degraded"
)

Variables

This section is empty.

Functions

func MustProtoMessage

func MustProtoMessage[T proto.Message]() T

MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.

func NewEventID added in v0.4.1

func NewEventID() string

NewEventID generates a new event ID.

func NewMessageFromProto

func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)

NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.

func NewProtoMessage

func NewProtoMessage[T proto.Message]() (T, error)

NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.

func PublishProto

func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata metadatapkg.Metadata) error

PublishProto marshals the proto payload and publishes it to the provided topic.

func RegisterCloudEventsHandler added in v0.4.1

func RegisterCloudEventsHandler(s *Service, reg CloudEventsHandlerRegistration) error

RegisterCloudEventsHandler registers a CloudEvents handler with full configuration.

func RegisterJSONHandler

func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error

RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.

func RegisterMessageHandler

func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error

RegisterMessageHandler attaches the provided handler to the service router.

func RegisterProtoHandler

func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error

RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.

Types

type BacklogMetrics added in v0.3.4

type BacklogMetrics struct {
	InFlight           uint64 `json:"in_flight"`
	MaxInFlight        uint64 `json:"max_in_flight"`
	LastQueueDepth     int64  `json:"last_queue_depth"`
	EstimatedLagMillis int64  `json:"estimated_lag_millis"`
}

type CloudEventsHandlerRegistration added in v0.4.1

type CloudEventsHandlerRegistration struct {
	// Name is a unique identifier for this handler.
	Name string

	// EventType is the CloudEvents type to consume.
	EventType string

	// Handler processes incoming CloudEvents.
	Handler EventHandler

	// MaxAttempts overrides the default max retry attempts.
	MaxAttempts int
}

CloudEventsHandlerRegistration configures a CloudEvents message handler.

type DLQMetrics added in v0.4.1

type DLQMetrics struct {
	// contains filtered or unexported fields
}

DLQMetrics tracks dead letter queue statistics.

func NewDLQMetrics added in v0.4.1

func NewDLQMetrics(registerer prometheus.Registerer) *DLQMetrics

NewDLQMetrics creates a new DLQ metrics collector.

func (*DLQMetrics) GetSnapshot added in v0.4.1

func (m *DLQMetrics) GetSnapshot() DLQMetricsSnapshot

GetSnapshot returns a point-in-time snapshot of all DLQ metrics.

func (*DLQMetrics) GetTopicMetrics added in v0.4.1

func (m *DLQMetrics) GetTopicMetrics(topic string) *DLQTopicMetrics

GetTopicMetrics returns metrics for a specific topic.

func (*DLQMetrics) RecordMessageReplayed added in v0.4.1

func (m *DLQMetrics) RecordMessageReplayed(topic string)

RecordMessageReplayed records a message being replayed from the DLQ.

func (*DLQMetrics) RecordMessageToDLQ added in v0.4.1

func (m *DLQMetrics) RecordMessageToDLQ(topic, handler string, retryCount int, messageAge time.Duration)

RecordMessageToDLQ records a message being added to the DLQ.

func (*DLQMetrics) RecordMessagesPurged added in v0.4.1

func (m *DLQMetrics) RecordMessagesPurged(topic string, count int64)

RecordMessagesPurged records messages being purged from the DLQ.

func (*DLQMetrics) Register added in v0.4.1

func (m *DLQMetrics) Register() error

Register registers the Prometheus collectors. Safe to call multiple times.

func (*DLQMetrics) Reset added in v0.4.1

func (m *DLQMetrics) Reset()

Reset resets all metrics (useful for testing).

func (*DLQMetrics) SetCurrentCount added in v0.4.1

func (m *DLQMetrics) SetCurrentCount(topic string, count uint64)

SetCurrentCount directly sets the current message count (for sync with external systems).

type DLQMetricsSnapshot added in v0.4.1

type DLQMetricsSnapshot struct {
	TotalMessages uint64                      `json:"total_messages"`
	TotalReplayed uint64                      `json:"total_replayed"`
	TotalPurged   uint64                      `json:"total_purged"`
	TopicMetrics  map[string]*DLQTopicMetrics `json:"topic_metrics"`
	CollectedAt   time.Time                   `json:"collected_at"`
}

DLQMetricsSnapshot provides a point-in-time view of DLQ metrics.

type DLQTopicMetrics added in v0.4.1

type DLQTopicMetrics struct {
	MessagesReceived uint64    `json:"messages_received"`
	MessagesCurrent  uint64    `json:"messages_current"`
	MessagesReplayed uint64    `json:"messages_replayed"`
	MessagesPurged   uint64    `json:"messages_purged"`
	OldestMessageAt  time.Time `json:"oldest_message_at,omitempty"`
	NewestMessageAt  time.Time `json:"newest_message_at,omitempty"`
	AvgRetryCount    float64   `json:"avg_retry_count"`
	LastUpdatedAt    time.Time `json:"last_updated_at"`
}

DLQTopicMetrics holds metrics for a specific topic's DLQ.

type DependencyHealth added in v0.3.4

type DependencyHealth struct {
	Name        string    `json:"name"`
	Status      string    `json:"status"`
	LastChecked time.Time `json:"last_checked"`
	Details     string    `json:"details,omitempty"`
}

type ErrorBreakdown added in v0.3.4

type ErrorBreakdown struct {
	Validation uint64 `json:"validation"`
	Transport  uint64 `json:"transport"`
	Downstream uint64 `json:"downstream"`
	Other      uint64 `json:"other"`
	LastError  string `json:"last_error,omitempty"`
}

func (*ErrorBreakdown) Record added in v0.3.4

func (e *ErrorBreakdown) Record(category ErrorCategory, err error)

type ErrorCategory added in v0.3.4

type ErrorCategory string
const (
	ErrorCategoryNone       ErrorCategory = "none"
	ErrorCategoryValidation ErrorCategory = "validation"
	ErrorCategoryTransport  ErrorCategory = "transport"
	ErrorCategoryDownstream ErrorCategory = "downstream"
	ErrorCategoryOther      ErrorCategory = "other"
)

type ErrorClassifier added in v0.3.4

type ErrorClassifier func(error) ErrorCategory

type EventContext added in v0.4.1

type EventContext struct {
	Event     ce.Event
	RawData   json.RawMessage
	Publisher *Service
	Logger    interface {
		Info(msg string, fields map[string]any)
		Error(msg string, err error, fields map[string]any)
	}
}

EventContext provides helpers for working with CloudEvents in handlers.

func (*EventContext) Publish added in v0.4.1

func (ec *EventContext) Publish(ctx context.Context, eventType, source string, data any) error

Publish publishes a response event, copying tracing context.

func (*EventContext) UnmarshalData added in v0.4.1

func (ec *EventContext) UnmarshalData(v any) error

UnmarshalData unmarshals the event data into the provided struct.

type EventHandler added in v0.4.1

type EventHandler func(ctx context.Context, evt ce.Event) error

EventHandler is the callback signature for CloudEvents handlers. Return nil to acknowledge, or an error to control retry/DLQ behavior:

  • nil: acknowledge the message
  • ErrRetry: retry with default delay
  • ErrRetryAfter(d): retry after specific delay
  • ErrDeadLetter: send to dead letter queue
  • any other error: treated as ErrRetry

type HandlerInfo added in v0.3.3

type HandlerInfo struct {
	Name         string        `json:"name"`
	ConsumeQueue string        `json:"consume_queue"`
	PublishQueue string        `json:"publish_queue"`
	Stats        *HandlerStats `json:"stats"`
}

type HandlerStats added in v0.3.3

type HandlerStats struct {
	MessagesProcessed   uint64    `json:"messages_processed"`
	MessagesFailed      uint64    `json:"messages_failed"`
	TotalProcessingTime int64     `json:"total_processing_time_ns"`
	LastProcessedAt     time.Time `json:"last_processed_at"`

	Latency      LatencyMetrics     `json:"latency"`
	Throughput   ThroughputMetrics  `json:"throughput"`
	Errors       ErrorBreakdown     `json:"errors"`
	Resource     ResourceUsage      `json:"resource"`
	Backlog      BacklogMetrics     `json:"backlog"`
	Dependencies []DependencyHealth `json:"dependencies"`
	// contains filtered or unexported fields
}

func (*HandlerStats) MarshalJSON added in v0.3.3

func (h *HandlerStats) MarshalJSON() ([]byte, error)

type JobContext added in v0.4.1

type JobContext struct {
	// HandlerName is the name of the handler processing the job.
	HandlerName string
	// Topic is the topic/queue the message was received from.
	Topic string
	// MessageUUID is the unique identifier of the message.
	MessageUUID string
	// Metadata contains the message metadata.
	Metadata message.Metadata
	// Context is the context associated with the message.
	Context context.Context
	// StartedAt is when the job started processing.
	StartedAt time.Time
	// Duration is how long the job took (only set in OnJobDone and OnJobError).
	Duration time.Duration
	// RetryCount is the number of times this message has been retried.
	RetryCount int
}

JobContext provides information about a job execution to hooks.

type JobHooks added in v0.4.1

type JobHooks struct {
	// OnJobStart is called when a handler begins processing a message.
	// This is called before the handler function is invoked.
	OnJobStart func(ctx JobContext)

	// OnJobDone is called when a handler successfully completes processing.
	// Duration will be set to how long the handler took.
	OnJobDone func(ctx JobContext)

	// OnJobError is called when a handler returns an error.
	// The error is passed as the second argument.
	// Duration will be set to how long the handler took before failing.
	OnJobError func(ctx JobContext, err error)
}

JobHooks defines callbacks for job lifecycle events. All hooks are optional - nil hooks are simply not called.

func AlertingHooks added in v0.4.1

func AlertingHooks(alertFunc func(ctx JobContext, err error)) JobHooks

AlertingHooks returns pre-built hooks that trigger alerts on job errors.

func LoggingHooks added in v0.4.1

func LoggingHooks(logger interface {
	Info(msg string, fields map[string]interface{})
	Error(msg string, err error, fields map[string]interface{})
}) JobHooks

LoggingHooks returns pre-built hooks that log job lifecycle events.

func MetricsHooks added in v0.4.1

func MetricsHooks(onStart, onDone, onError func(handlerName, topic string)) JobHooks

MetricsHooks returns pre-built hooks that record job metrics.

func (JobHooks) Merge added in v0.4.1

func (h JobHooks) Merge(other JobHooks) JobHooks

Merge combines two JobHooks, creating a new JobHooks that calls both. The hooks from 'other' are called after the hooks from 'h'.

type LatencyMetrics added in v0.3.4

type LatencyMetrics struct {
	AverageNs  int64 `json:"average_ns"`
	P50Ns      int64 `json:"p50_ns"`
	P95Ns      int64 `json:"p95_ns"`
	P99Ns      int64 `json:"p99_ns"`
	LastNs     int64 `json:"last_ns"`
	SampleSize int   `json:"sample_size"`
}

type MessageHandlerRegistration

type MessageHandlerRegistration struct {
	Name         string
	ConsumeQueue string
	PublishQueue string
	Handler      message.HandlerFunc
	Subscriber   message.Subscriber
	Publisher    message.Publisher
}

MessageHandlerRegistration wires a raw Watermill handler without typed helpers.

type MiddlewareBuilder

type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)

MiddlewareBuilder constructs a handler middleware using the provided service instance.

type MiddlewareRegistration

type MiddlewareRegistration struct {
	Name       string
	Middleware message.HandlerMiddleware
	Builder    MiddlewareBuilder
}

MiddlewareRegistration captures how a middleware should be registered on a Service router.

func CorrelationIDMiddleware

func CorrelationIDMiddleware() MiddlewareRegistration

CorrelationIDMiddleware ensures each processed message carries a correlation identifier.

func DefaultMiddlewares

func DefaultMiddlewares() []MiddlewareRegistration

DefaultMiddlewares returns the standard middleware chain used by the Service constructor.

func JobHooksMiddleware added in v0.4.1

func JobHooksMiddleware(hooks JobHooks) MiddlewareRegistration

JobHooksMiddleware creates a middleware that invokes the provided hooks at appropriate points in the message lifecycle.

func LogMessagesMiddleware

func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration

LogMessagesMiddleware logs the full payload and metadata of handled messages.

func MetricsMiddleware added in v0.3.2

func MetricsMiddleware() MiddlewareRegistration

MetricsMiddleware adds Prometheus metrics to the handler.

func OutboxMiddleware

func OutboxMiddleware() MiddlewareRegistration

OutboxMiddleware persists outgoing messages when an OutboxStore is configured.

func PoisonQueueMiddleware

func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration

PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.

func ProtoValidateMiddleware

func ProtoValidateMiddleware() MiddlewareRegistration

ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.

func RecovererMiddleware

func RecovererMiddleware() MiddlewareRegistration

RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.

func RetryMiddleware

func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration

RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).

func TracerMiddleware

func TracerMiddleware() MiddlewareRegistration

TracerMiddleware wraps handler execution in an OpenTelemetry span.

type OutboxStore

type OutboxStore interface {
	StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}

OutboxStore persists processed messages so they can be forwarded reliably.

type Producer

type Producer interface {
	PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
}

Producer emits proto-based events onto the configured transport.

type ProtoValidator

type ProtoValidator interface {
	Validate(value any) error
}

ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.

type PublishOption added in v0.4.1

type PublishOption func(*publishOptions)

PublishOption configures event publishing behavior.

func WithCorrelationID added in v0.4.1

func WithCorrelationID(correlationID string) PublishOption

WithCorrelationID sets the correlation ID for request tracing.

func WithDataContentType added in v0.4.1

func WithDataContentType(contentType string) PublishOption

WithDataContentType sets the data content type (e.g., "application/json").

func WithDataSchema added in v0.4.1

func WithDataSchema(schema string) PublishOption

WithDataSchema sets the data schema URI.

func WithExtension added in v0.4.1

func WithExtension(key string, value any) PublishOption

WithExtension adds a CloudEvents extension attribute.

func WithMaxAttempts added in v0.4.1

func WithMaxAttempts(max int) PublishOption

WithMaxAttempts sets the maximum retry attempts for the event.

func WithSubject added in v0.4.1

func WithSubject(subject string) PublishOption

WithSubject sets the CloudEvents subject attribute.

func WithTracing added in v0.4.1

func WithTracing(traceID, parentID string) PublishOption

WithTracing sets tracing context for the event.

type ResourceUsage added in v0.3.4

type ResourceUsage struct {
	CPUPercent  float64 `json:"cpu_percent"`
	MemoryBytes uint64  `json:"memory_bytes"`
	Goroutines  int     `json:"goroutines"`
}

type RetryMiddlewareConfig

type RetryMiddlewareConfig struct {
	MaxRetries      int
	InitialInterval time.Duration
	MaxInterval     time.Duration
	RetryIf         func(error) bool
}

RetryMiddlewareConfig customises the retry middleware behaviour.

type Service

type Service struct {
	Conf   *configpkg.Config
	Logger loggingpkg.ServiceLogger
	// contains filtered or unexported fields
}

Service wires a Watermill router, publisher, subscriber, and middleware chain.

func NewService

NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start. Panics if configuration is invalid or transport cannot be built. Use TryNewService for error-returning variant.

func TryNewService added in v0.4.1

func TryNewService(conf *configpkg.Config, log loggingpkg.ServiceLogger, ctx context.Context, deps ServiceDependencies) (*Service, error)

TryNewService constructs a Service for the supplied configuration. Returns an error instead of panicking if configuration is invalid or transport cannot be built.

func (*Service) ConsumeEvents added in v0.4.1

func (s *Service) ConsumeEvents(eventType string, handler EventHandler) error

ConsumeEvents registers a CloudEvents handler for the specified event type. The handler receives CloudEvents and returns errors to control message lifecycle.

func (*Service) GetTransportCapabilities added in v0.4.1

func (s *Service) GetTransportCapabilities() transportpkg.Capabilities

GetTransportCapabilities returns the capabilities of the configured transport.

func (*Service) Publish added in v0.4.1

func (s *Service) Publish(ctx context.Context, topic string, msgs ...*message.Message) error

Publish sends a raw Watermill message to the specified topic. Use PublishProto for type-safe proto message publishing.

func (*Service) PublishData added in v0.4.1

func (s *Service) PublishData(ctx context.Context, eventType, source string, data any, opts ...PublishOption) error

PublishData publishes data as a CloudEvent. This is a convenience method that constructs the event for you.

func (*Service) PublishDataAfter added in v0.4.1

func (s *Service) PublishDataAfter(ctx context.Context, eventType, source string, data any, delay time.Duration, opts ...PublishOption) error

PublishDataAfter publishes data as a CloudEvent with a delay.

func (*Service) PublishEvent added in v0.4.1

func (s *Service) PublishEvent(ctx context.Context, evt ce.Event) error

PublishEvent publishes a CloudEvent to the specified event type topic.

func (*Service) PublishEventAfter added in v0.4.1

func (s *Service) PublishEventAfter(ctx context.Context, evt ce.Event, delay time.Duration) error

PublishEventAfter publishes a CloudEvent with a delay before processing.

func (*Service) PublishProto

func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error

PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.

func (*Service) RegisterHTTPHandler added in v0.3.3

func (s *Service) RegisterHTTPHandler(port int, pattern string, handler http.Handler)

func (*Service) RegisterMiddleware

func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error

RegisterMiddleware attaches the supplied middleware to the router.

func (*Service) RegisterProtoMessage

func (s *Service) RegisterProtoMessage(msg proto.Message)

RegisterProtoMessage exposes a proto message type for validation without registering a handler.

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start runs the underlying Watermill router until the provided context is cancelled.

func (*Service) StartWebUIServer added in v0.3.3

func (s *Service) StartWebUIServer()

func (*Service) Stop added in v0.4.1

func (s *Service) Stop()

Stop gracefully shuts down HTTP servers. Called automatically when the context passed to Start is cancelled.

type ServiceDependencies

type ServiceDependencies struct {
	Outbox                    OutboxStore
	Validator                 ProtoValidator
	Middlewares               []MiddlewareRegistration // Appended after the default middleware chain.
	DisableDefaultMiddlewares bool                     // Skips registering the default middleware chain when true.
	TransportFactory          transportpkg.Factory
	ErrorClassifier           ErrorClassifier
}

ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.

type ThroughputMetrics added in v0.3.4

type ThroughputMetrics struct {
	CurrentRPS       float64 `json:"current_rps"`
	WindowSeconds    float64 `json:"window_seconds"`
	MessagesInWindow uint64  `json:"messages_in_window"`
	TotalMessages    uint64  `json:"total_messages"`
}

type UnprocessableEventError

type UnprocessableEventError struct {
	// contains filtered or unexported fields
}

UnprocessableEventError wraps payloads that failed validation or unmarshalling.

func (*UnprocessableEventError) Error

func (e *UnprocessableEventError) Error() string

Directories

Path Synopsis
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow.
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow.
Package transport provides transport types and interfaces for the internal runtime.
Package transport provides transport types and interfaces for the internal runtime.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL