Documentation
¶
Overview ¶
Package runtime provides the core event processing infrastructure for protoflow.
Architecture Overview ¶
The runtime package implements a message-driven architecture built on top of Watermill. It provides typed handlers for Protocol Buffers and JSON messages, along with a middleware chain for cross-cutting concerns.
Package Structure ¶
The runtime package is organized into the following components:
## Core Service (service.go)
The Service struct is the central orchestrator that wires together:
- Message router (Watermill)
- Publisher and subscriber connections
- Middleware chain
- HTTP servers for metrics and WebUI
- Proto message registry for validation
## Handler Registration (registration*.go)
Handler registration files provide typed wrappers for message handlers:
- registration.go: Raw Watermill handlers and base registration logic
- registration_json.go: Typed JSON message handlers
- registration_proto.go: Typed Protocol Buffer message handlers
## Middleware (middleware.go)
The middleware system provides composable message processing stages:
- CorrelationID: Ensures message traceability
- LogMessages: Debug logging of message payloads
- ProtoValidate: Schema validation for protobuf messages
- Outbox: Transactional outbox pattern support
- Tracer: OpenTelemetry distributed tracing
- Metrics: Prometheus metrics collection
- Retry: Exponential backoff retry logic
- PoisonQueue: Dead letter queue for failed messages
- Recoverer: Panic recovery
## Stats & Monitoring (models.go, resources.go)
Extended metrics collection for handler performance:
- Latency percentiles (p50, p95, p99)
- Throughput tracking
- Error categorization
- Resource usage sampling
- Backlog estimation
## Publishing (publisher.go)
Utilities for emitting proto-based events with proper metadata.
## WebUI (webui.go)
HTTP API for introspecting handler state and statistics.
Sub-packages ¶
- config/: Service configuration with validation
- errors/: Sentinel errors and error types
- handlers/: Message context types and handler building
- ids/: ULID generation for message IDs
- jsoncodec/: JSON marshaling utilities
- logging/: Logger interface and adapters
- metadata/: Message metadata utilities
- transport/: Pub/sub transport implementations (Kafka, RabbitMQ, AWS, NATS, etc.)
Usage Example ¶
cfg := &protoflow.Config{
PubSubSystem: "kafka",
KafkaBrokers: []string{"localhost:9092"},
MetricsEnabled: true,
MetricsPort: 9090,
}
svc := protoflow.NewService(cfg, logger, ctx, protoflow.ServiceDependencies{})
protoflow.RegisterProtoHandler(svc, protoflow.ProtoHandlerRegistration[*pb.OrderCreated]{
Name: "order-processor",
ConsumeQueue: "orders.created",
PublishQueue: "orders.processed",
Handler: processOrder,
})
svc.Start(ctx)
Index ¶
- Constants
- func MustProtoMessage[T proto.Message]() T
- func NewEventID() string
- func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)
- func NewProtoMessage[T proto.Message]() (T, error)
- func PublishProto(ctx context.Context, publisher message.Publisher, topic string, ...) error
- func RegisterCloudEventsHandler(s *Service, reg CloudEventsHandlerRegistration) error
- func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error
- func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error
- func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error
- type BacklogMetrics
- type CloudEventsHandlerRegistration
- type DLQMetrics
- func (m *DLQMetrics) GetSnapshot() DLQMetricsSnapshot
- func (m *DLQMetrics) GetTopicMetrics(topic string) *DLQTopicMetrics
- func (m *DLQMetrics) RecordMessageReplayed(topic string)
- func (m *DLQMetrics) RecordMessageToDLQ(topic, handler string, retryCount int, messageAge time.Duration)
- func (m *DLQMetrics) RecordMessagesPurged(topic string, count int64)
- func (m *DLQMetrics) Register() error
- func (m *DLQMetrics) Reset()
- func (m *DLQMetrics) SetCurrentCount(topic string, count uint64)
- type DLQMetricsSnapshot
- type DLQTopicMetrics
- type DependencyHealth
- type ErrorBreakdown
- type ErrorCategory
- type ErrorClassifier
- type EventContext
- type EventHandler
- type HandlerInfo
- type HandlerStats
- type JobContext
- type JobHooks
- type LatencyMetrics
- type MessageHandlerRegistration
- type MiddlewareBuilder
- type MiddlewareRegistration
- func CorrelationIDMiddleware() MiddlewareRegistration
- func DefaultMiddlewares() []MiddlewareRegistration
- func JobHooksMiddleware(hooks JobHooks) MiddlewareRegistration
- func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration
- func MetricsMiddleware() MiddlewareRegistration
- func OutboxMiddleware() MiddlewareRegistration
- func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration
- func ProtoValidateMiddleware() MiddlewareRegistration
- func RecovererMiddleware() MiddlewareRegistration
- func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration
- func TracerMiddleware() MiddlewareRegistration
- type OutboxStore
- type Producer
- type ProtoValidator
- type PublishOption
- func WithCorrelationID(correlationID string) PublishOption
- func WithDataContentType(contentType string) PublishOption
- func WithDataSchema(schema string) PublishOption
- func WithExtension(key string, value any) PublishOption
- func WithMaxAttempts(max int) PublishOption
- func WithSubject(subject string) PublishOption
- func WithTracing(traceID, parentID string) PublishOption
- type ResourceUsage
- type RetryMiddlewareConfig
- type Service
- func (s *Service) ConsumeEvents(eventType string, handler EventHandler) error
- func (s *Service) GetTransportCapabilities() transportpkg.Capabilities
- func (s *Service) Publish(ctx context.Context, topic string, msgs ...*message.Message) error
- func (s *Service) PublishData(ctx context.Context, eventType, source string, data any, opts ...PublishOption) error
- func (s *Service) PublishDataAfter(ctx context.Context, eventType, source string, data any, delay time.Duration, ...) error
- func (s *Service) PublishEvent(ctx context.Context, evt ce.Event) error
- func (s *Service) PublishEventAfter(ctx context.Context, evt ce.Event, delay time.Duration) error
- func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, ...) error
- func (s *Service) RegisterHTTPHandler(port int, pattern string, handler http.Handler)
- func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error
- func (s *Service) RegisterProtoMessage(msg proto.Message)
- func (s *Service) Start(ctx context.Context) error
- func (s *Service) StartWebUIServer()
- func (s *Service) Stop()
- type ServiceDependencies
- type ThroughputMetrics
- type UnprocessableEventError
Constants ¶
const ( DependencyStatusUnknown = "unknown" DependencyStatusHealthy = "healthy" DependencyStatusDegraded = "degraded" )
Variables ¶
This section is empty.
Functions ¶
func MustProtoMessage ¶
MustProtoMessage instantiates the protobuf message and panics if the type cannot be created.
func NewMessageFromProto ¶
func NewMessageFromProto(event proto.Message, metadata metadatapkg.Metadata) (*message.Message, error)
NewMessageFromProto converts the provided proto message into a Watermill message with the standard metadata required by the event pipeline.
func NewProtoMessage ¶
NewProtoMessage instantiates a zero-value protobuf message for the provided generic type.
func PublishProto ¶
func PublishProto(ctx context.Context, publisher message.Publisher, topic string, event proto.Message, metadata metadatapkg.Metadata) error
PublishProto marshals the proto payload and publishes it to the provided topic.
func RegisterCloudEventsHandler ¶ added in v0.4.1
func RegisterCloudEventsHandler(s *Service, reg CloudEventsHandlerRegistration) error
RegisterCloudEventsHandler registers a CloudEvents handler with full configuration.
func RegisterJSONHandler ¶
func RegisterJSONHandler[T any, O any](svc *Service, cfg handlerpkg.JSONHandlerRegistration[T, O]) error
RegisterJSONHandler converts the typed JSON handler into a Watermill handler and registers it.
func RegisterMessageHandler ¶
func RegisterMessageHandler(svc *Service, cfg MessageHandlerRegistration) error
RegisterMessageHandler attaches the provided handler to the service router.
func RegisterProtoHandler ¶
func RegisterProtoHandler[T proto.Message](svc *Service, cfg handlerpkg.ProtoHandlerRegistration[T]) error
RegisterProtoHandler converts the typed handler into a Watermill handler and registers it on the Service router.
Types ¶
type BacklogMetrics ¶ added in v0.3.4
type CloudEventsHandlerRegistration ¶ added in v0.4.1
type CloudEventsHandlerRegistration struct {
// Name is a unique identifier for this handler.
Name string
// EventType is the CloudEvents type to consume.
EventType string
// Handler processes incoming CloudEvents.
Handler EventHandler
// MaxAttempts overrides the default max retry attempts.
MaxAttempts int
}
CloudEventsHandlerRegistration configures a CloudEvents message handler.
type DLQMetrics ¶ added in v0.4.1
type DLQMetrics struct {
// contains filtered or unexported fields
}
DLQMetrics tracks dead letter queue statistics.
func NewDLQMetrics ¶ added in v0.4.1
func NewDLQMetrics(registerer prometheus.Registerer) *DLQMetrics
NewDLQMetrics creates a new DLQ metrics collector.
func (*DLQMetrics) GetSnapshot ¶ added in v0.4.1
func (m *DLQMetrics) GetSnapshot() DLQMetricsSnapshot
GetSnapshot returns a point-in-time snapshot of all DLQ metrics.
func (*DLQMetrics) GetTopicMetrics ¶ added in v0.4.1
func (m *DLQMetrics) GetTopicMetrics(topic string) *DLQTopicMetrics
GetTopicMetrics returns metrics for a specific topic.
func (*DLQMetrics) RecordMessageReplayed ¶ added in v0.4.1
func (m *DLQMetrics) RecordMessageReplayed(topic string)
RecordMessageReplayed records a message being replayed from the DLQ.
func (*DLQMetrics) RecordMessageToDLQ ¶ added in v0.4.1
func (m *DLQMetrics) RecordMessageToDLQ(topic, handler string, retryCount int, messageAge time.Duration)
RecordMessageToDLQ records a message being added to the DLQ.
func (*DLQMetrics) RecordMessagesPurged ¶ added in v0.4.1
func (m *DLQMetrics) RecordMessagesPurged(topic string, count int64)
RecordMessagesPurged records messages being purged from the DLQ.
func (*DLQMetrics) Register ¶ added in v0.4.1
func (m *DLQMetrics) Register() error
Register registers the Prometheus collectors. Safe to call multiple times.
func (*DLQMetrics) Reset ¶ added in v0.4.1
func (m *DLQMetrics) Reset()
Reset resets all metrics (useful for testing).
func (*DLQMetrics) SetCurrentCount ¶ added in v0.4.1
func (m *DLQMetrics) SetCurrentCount(topic string, count uint64)
SetCurrentCount directly sets the current message count (for sync with external systems).
type DLQMetricsSnapshot ¶ added in v0.4.1
type DLQMetricsSnapshot struct {
TotalMessages uint64 `json:"total_messages"`
TotalReplayed uint64 `json:"total_replayed"`
TotalPurged uint64 `json:"total_purged"`
TopicMetrics map[string]*DLQTopicMetrics `json:"topic_metrics"`
CollectedAt time.Time `json:"collected_at"`
}
DLQMetricsSnapshot provides a point-in-time view of DLQ metrics.
type DLQTopicMetrics ¶ added in v0.4.1
type DLQTopicMetrics struct {
MessagesReceived uint64 `json:"messages_received"`
MessagesCurrent uint64 `json:"messages_current"`
MessagesReplayed uint64 `json:"messages_replayed"`
MessagesPurged uint64 `json:"messages_purged"`
OldestMessageAt time.Time `json:"oldest_message_at,omitempty"`
NewestMessageAt time.Time `json:"newest_message_at,omitempty"`
AvgRetryCount float64 `json:"avg_retry_count"`
LastUpdatedAt time.Time `json:"last_updated_at"`
}
DLQTopicMetrics holds metrics for a specific topic's DLQ.
type DependencyHealth ¶ added in v0.3.4
type ErrorBreakdown ¶ added in v0.3.4
type ErrorBreakdown struct {
Validation uint64 `json:"validation"`
Transport uint64 `json:"transport"`
Downstream uint64 `json:"downstream"`
Other uint64 `json:"other"`
LastError string `json:"last_error,omitempty"`
}
func (*ErrorBreakdown) Record ¶ added in v0.3.4
func (e *ErrorBreakdown) Record(category ErrorCategory, err error)
type ErrorCategory ¶ added in v0.3.4
type ErrorCategory string
const ( ErrorCategoryNone ErrorCategory = "none" ErrorCategoryValidation ErrorCategory = "validation" ErrorCategoryTransport ErrorCategory = "transport" ErrorCategoryDownstream ErrorCategory = "downstream" ErrorCategoryOther ErrorCategory = "other" )
type ErrorClassifier ¶ added in v0.3.4
type ErrorClassifier func(error) ErrorCategory
type EventContext ¶ added in v0.4.1
type EventContext struct {
Event ce.Event
RawData json.RawMessage
Publisher *Service
Logger interface {
Info(msg string, fields map[string]any)
Error(msg string, err error, fields map[string]any)
}
}
EventContext provides helpers for working with CloudEvents in handlers.
func (*EventContext) Publish ¶ added in v0.4.1
Publish publishes a response event, copying tracing context.
func (*EventContext) UnmarshalData ¶ added in v0.4.1
func (ec *EventContext) UnmarshalData(v any) error
UnmarshalData unmarshals the event data into the provided struct.
type EventHandler ¶ added in v0.4.1
EventHandler is the callback signature for CloudEvents handlers. Return nil to acknowledge, or an error to control retry/DLQ behavior:
- nil: acknowledge the message
- ErrRetry: retry with default delay
- ErrRetryAfter(d): retry after specific delay
- ErrDeadLetter: send to dead letter queue
- any other error: treated as ErrRetry
type HandlerInfo ¶ added in v0.3.3
type HandlerInfo struct {
Name string `json:"name"`
ConsumeQueue string `json:"consume_queue"`
PublishQueue string `json:"publish_queue"`
Stats *HandlerStats `json:"stats"`
}
type HandlerStats ¶ added in v0.3.3
type HandlerStats struct {
MessagesProcessed uint64 `json:"messages_processed"`
MessagesFailed uint64 `json:"messages_failed"`
TotalProcessingTime int64 `json:"total_processing_time_ns"`
LastProcessedAt time.Time `json:"last_processed_at"`
Latency LatencyMetrics `json:"latency"`
Throughput ThroughputMetrics `json:"throughput"`
Errors ErrorBreakdown `json:"errors"`
Resource ResourceUsage `json:"resource"`
Backlog BacklogMetrics `json:"backlog"`
Dependencies []DependencyHealth `json:"dependencies"`
// contains filtered or unexported fields
}
func (*HandlerStats) MarshalJSON ¶ added in v0.3.3
func (h *HandlerStats) MarshalJSON() ([]byte, error)
type JobContext ¶ added in v0.4.1
type JobContext struct {
// HandlerName is the name of the handler processing the job.
HandlerName string
// Topic is the topic/queue the message was received from.
Topic string
// MessageUUID is the unique identifier of the message.
MessageUUID string
// Metadata contains the message metadata.
Metadata message.Metadata
// Context is the context associated with the message.
Context context.Context
// StartedAt is when the job started processing.
StartedAt time.Time
// Duration is how long the job took (only set in OnJobDone and OnJobError).
Duration time.Duration
// RetryCount is the number of times this message has been retried.
RetryCount int
}
JobContext provides information about a job execution to hooks.
type JobHooks ¶ added in v0.4.1
type JobHooks struct {
// OnJobStart is called when a handler begins processing a message.
// This is called before the handler function is invoked.
OnJobStart func(ctx JobContext)
// OnJobDone is called when a handler successfully completes processing.
// Duration will be set to how long the handler took.
OnJobDone func(ctx JobContext)
// OnJobError is called when a handler returns an error.
// The error is passed as the second argument.
// Duration will be set to how long the handler took before failing.
OnJobError func(ctx JobContext, err error)
}
JobHooks defines callbacks for job lifecycle events. All hooks are optional - nil hooks are simply not called.
func AlertingHooks ¶ added in v0.4.1
func AlertingHooks(alertFunc func(ctx JobContext, err error)) JobHooks
AlertingHooks returns pre-built hooks that trigger alerts on job errors.
func LoggingHooks ¶ added in v0.4.1
func LoggingHooks(logger interface {
Info(msg string, fields map[string]interface{})
Error(msg string, err error, fields map[string]interface{})
}) JobHooks
LoggingHooks returns pre-built hooks that log job lifecycle events.
func MetricsHooks ¶ added in v0.4.1
MetricsHooks returns pre-built hooks that record job metrics.
type LatencyMetrics ¶ added in v0.3.4
type MessageHandlerRegistration ¶
type MessageHandlerRegistration struct {
Name string
ConsumeQueue string
PublishQueue string
Handler message.HandlerFunc
Subscriber message.Subscriber
Publisher message.Publisher
}
MessageHandlerRegistration wires a raw Watermill handler without typed helpers.
type MiddlewareBuilder ¶
type MiddlewareBuilder func(*Service) (message.HandlerMiddleware, error)
MiddlewareBuilder constructs a handler middleware using the provided service instance.
type MiddlewareRegistration ¶
type MiddlewareRegistration struct {
Name string
Middleware message.HandlerMiddleware
Builder MiddlewareBuilder
}
MiddlewareRegistration captures how a middleware should be registered on a Service router.
func CorrelationIDMiddleware ¶
func CorrelationIDMiddleware() MiddlewareRegistration
CorrelationIDMiddleware ensures each processed message carries a correlation identifier.
func DefaultMiddlewares ¶
func DefaultMiddlewares() []MiddlewareRegistration
DefaultMiddlewares returns the standard middleware chain used by the Service constructor.
func JobHooksMiddleware ¶ added in v0.4.1
func JobHooksMiddleware(hooks JobHooks) MiddlewareRegistration
JobHooksMiddleware creates a middleware that invokes the provided hooks at appropriate points in the message lifecycle.
func LogMessagesMiddleware ¶
func LogMessagesMiddleware(logger loggingpkg.ServiceLogger) MiddlewareRegistration
LogMessagesMiddleware logs the full payload and metadata of handled messages.
func MetricsMiddleware ¶ added in v0.3.2
func MetricsMiddleware() MiddlewareRegistration
MetricsMiddleware adds Prometheus metrics to the handler.
func OutboxMiddleware ¶
func OutboxMiddleware() MiddlewareRegistration
OutboxMiddleware persists outgoing messages when an OutboxStore is configured.
func PoisonQueueMiddleware ¶
func PoisonQueueMiddleware(filter func(error) bool) MiddlewareRegistration
PoisonQueueMiddleware publishes messages that match the supplied filter to the configured poison queue.
func ProtoValidateMiddleware ¶
func ProtoValidateMiddleware() MiddlewareRegistration
ProtoValidateMiddleware unmarshals and validates protobuf payloads when possible.
func RecovererMiddleware ¶
func RecovererMiddleware() MiddlewareRegistration
RecovererMiddleware converts panics into handler errors so they can be retried or sent to the poison queue.
func RetryMiddleware ¶
func RetryMiddleware(cfg RetryMiddlewareConfig) MiddlewareRegistration
RetryMiddleware retries handler execution using the provided configuration (defaults applied to zero values).
func TracerMiddleware ¶
func TracerMiddleware() MiddlewareRegistration
TracerMiddleware wraps handler execution in an OpenTelemetry span.
type OutboxStore ¶
type OutboxStore interface {
StoreOutgoingMessage(ctx context.Context, eventType, uuid, payload string) error
}
OutboxStore persists processed messages so they can be forwarded reliably.
type Producer ¶
type Producer interface {
PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
}
Producer emits proto-based events onto the configured transport.
type ProtoValidator ¶
ProtoValidator validates unmarshalled payloads. Implementations typically forward to protovalidate or a custom struct validator.
type PublishOption ¶ added in v0.4.1
type PublishOption func(*publishOptions)
PublishOption configures event publishing behavior.
func WithCorrelationID ¶ added in v0.4.1
func WithCorrelationID(correlationID string) PublishOption
WithCorrelationID sets the correlation ID for request tracing.
func WithDataContentType ¶ added in v0.4.1
func WithDataContentType(contentType string) PublishOption
WithDataContentType sets the data content type (e.g., "application/json").
func WithDataSchema ¶ added in v0.4.1
func WithDataSchema(schema string) PublishOption
WithDataSchema sets the data schema URI.
func WithExtension ¶ added in v0.4.1
func WithExtension(key string, value any) PublishOption
WithExtension adds a CloudEvents extension attribute.
func WithMaxAttempts ¶ added in v0.4.1
func WithMaxAttempts(max int) PublishOption
WithMaxAttempts sets the maximum retry attempts for the event.
func WithSubject ¶ added in v0.4.1
func WithSubject(subject string) PublishOption
WithSubject sets the CloudEvents subject attribute.
func WithTracing ¶ added in v0.4.1
func WithTracing(traceID, parentID string) PublishOption
WithTracing sets tracing context for the event.
type ResourceUsage ¶ added in v0.3.4
type RetryMiddlewareConfig ¶
type RetryMiddlewareConfig struct {
MaxRetries int
InitialInterval time.Duration
MaxInterval time.Duration
RetryIf func(error) bool
}
RetryMiddlewareConfig customises the retry middleware behaviour.
type Service ¶
type Service struct {
Conf *configpkg.Config
Logger loggingpkg.ServiceLogger
// contains filtered or unexported fields
}
Service wires a Watermill router, publisher, subscriber, and middleware chain.
func NewService ¶
func NewService(conf *configpkg.Config, log loggingpkg.ServiceLogger, ctx context.Context, deps ServiceDependencies) *Service
NewService constructs a Service for the supplied configuration. Register handlers on the returned Service before calling Start. Panics if configuration is invalid or transport cannot be built. Use TryNewService for error-returning variant.
func TryNewService ¶ added in v0.4.1
func TryNewService(conf *configpkg.Config, log loggingpkg.ServiceLogger, ctx context.Context, deps ServiceDependencies) (*Service, error)
TryNewService constructs a Service for the supplied configuration. Returns an error instead of panicking if configuration is invalid or transport cannot be built.
func (*Service) ConsumeEvents ¶ added in v0.4.1
func (s *Service) ConsumeEvents(eventType string, handler EventHandler) error
ConsumeEvents registers a CloudEvents handler for the specified event type. The handler receives CloudEvents and returns errors to control message lifecycle.
func (*Service) GetTransportCapabilities ¶ added in v0.4.1
func (s *Service) GetTransportCapabilities() transportpkg.Capabilities
GetTransportCapabilities returns the capabilities of the configured transport.
func (*Service) Publish ¶ added in v0.4.1
Publish sends a raw Watermill message to the specified topic. Use PublishProto for type-safe proto message publishing.
func (*Service) PublishData ¶ added in v0.4.1
func (s *Service) PublishData(ctx context.Context, eventType, source string, data any, opts ...PublishOption) error
PublishData publishes data as a CloudEvent. This is a convenience method that constructs the event for you.
func (*Service) PublishDataAfter ¶ added in v0.4.1
func (s *Service) PublishDataAfter(ctx context.Context, eventType, source string, data any, delay time.Duration, opts ...PublishOption) error
PublishDataAfter publishes data as a CloudEvent with a delay.
func (*Service) PublishEvent ¶ added in v0.4.1
PublishEvent publishes a CloudEvent to the specified event type topic.
func (*Service) PublishEventAfter ¶ added in v0.4.1
PublishEventAfter publishes a CloudEvent with a delay before processing.
func (*Service) PublishProto ¶
func (s *Service) PublishProto(ctx context.Context, topic string, event proto.Message, metadata metadatapkg.Metadata) error
PublishProto emits the event using the Service publisher so HTTP handlers can create events without touching the internal Watermill APIs directly.
func (*Service) RegisterHTTPHandler ¶ added in v0.3.3
func (*Service) RegisterMiddleware ¶
func (s *Service) RegisterMiddleware(cfg MiddlewareRegistration) error
RegisterMiddleware attaches the supplied middleware to the router.
func (*Service) RegisterProtoMessage ¶
RegisterProtoMessage exposes a proto message type for validation without registering a handler.
func (*Service) Start ¶
Start runs the underlying Watermill router until the provided context is cancelled.
func (*Service) StartWebUIServer ¶ added in v0.3.3
func (s *Service) StartWebUIServer()
type ServiceDependencies ¶
type ServiceDependencies struct {
Outbox OutboxStore
Validator ProtoValidator
Middlewares []MiddlewareRegistration // Appended after the default middleware chain.
DisableDefaultMiddlewares bool // Skips registering the default middleware chain when true.
TransportFactory transportpkg.Factory
ErrorClassifier ErrorClassifier
}
ServiceDependencies holds the optional collaborators that the Service can use. Leave fields nil to skip the related middleware.
type ThroughputMetrics ¶ added in v0.3.4
type UnprocessableEventError ¶
type UnprocessableEventError struct {
// contains filtered or unexported fields
}
UnprocessableEventError wraps payloads that failed validation or unmarshalling.
func (*UnprocessableEventError) Error ¶
func (e *UnprocessableEventError) Error() string
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow.
|
Package cloudevents provides CloudEvents v1.0 compatible event types and utilities for use within protoflow. |
|
Package transport provides transport types and interfaces for the internal runtime.
|
Package transport provides transport types and interfaces for the internal runtime. |