Documentation
¶
Overview ¶
Package transport defines the core interfaces and types for protoflow transports. Each transport implementation (kafka, rabbitmq, aws, etc.) should be in its own sub-package and register itself with the transport registry.
Index ¶
- Variables
- func Register(name string, builder Builder)
- func RegisterWithCapabilities(name string, builder Builder, caps Capabilities)
- type Builder
- type Capabilities
- type CapabilitiesProvider
- type Config
- type DLQLister
- type DLQManager
- type DLQMessage
- type DelayedPublisher
- type QueueIntrospector
- type Registry
- func (r *Registry) Build(ctx context.Context, cfg Config, logger watermill.LoggerAdapter) (Transport, error)
- func (r *Registry) GetCapabilities(name string) Capabilities
- func (r *Registry) Has(name string) bool
- func (r *Registry) Names() []string
- func (r *Registry) Register(name string, builder Builder)
- func (r *Registry) RegisterWithCapabilities(name string, builder Builder, caps Capabilities)
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ( // ChannelCapabilities for in-memory Go channel transport. ChannelCapabilities = Capabilities{ Name: "channel", SupportsDelay: false, SupportsNativeDLQ: false, SupportsOrdering: true, SupportsTracing: false, SupportsBatching: false, SupportsAck: true, SupportsNack: true, SupportsPriority: false, } // KafkaCapabilities for Apache Kafka transport. KafkaCapabilities = Capabilities{ Name: "kafka", SupportsDelay: false, SupportsNativeDLQ: false, SupportsOrdering: true, SupportsTracing: true, SupportsBatching: true, SupportsAck: true, SupportsNack: false, SupportsPriority: false, SupportsPartitioning: true, MaxMessageSize: 1048576, } // RabbitMQCapabilities for RabbitMQ/AMQP transport. RabbitMQCapabilities = Capabilities{ Name: "rabbitmq", SupportsDelay: true, SupportsNativeDLQ: true, SupportsOrdering: true, SupportsTracing: true, SupportsBatching: false, SupportsAck: true, SupportsNack: true, SupportsPriority: true, } // NATSCapabilities for NATS Core transport. NATSCapabilities = Capabilities{ Name: "nats", SupportsDelay: false, SupportsNativeDLQ: false, SupportsOrdering: false, SupportsTracing: true, SupportsBatching: false, SupportsAck: false, SupportsNack: false, SupportsPriority: false, MaxMessageSize: 1048576, } // NATSJetStreamCapabilities for NATS JetStream transport. NATSJetStreamCapabilities = Capabilities{ Name: "nats-jetstream", SupportsDelay: true, SupportsNativeDLQ: true, SupportsOrdering: true, SupportsTracing: true, SupportsBatching: true, SupportsAck: true, SupportsNack: true, SupportsPriority: false, MaxMessageSize: 1048576, } // AWSCapabilities for AWS SNS/SQS transport. AWSCapabilities = Capabilities{ Name: "aws", SupportsDelay: true, SupportsNativeDLQ: true, SupportsOrdering: true, SupportsTracing: true, SupportsBatching: true, SupportsAck: true, SupportsNack: true, SupportsPriority: false, MaxMessageSize: 262144, MaxDelayDuration: 900000, } // SQLiteCapabilities for SQLite-based transport. SQLiteCapabilities = Capabilities{ Name: "sqlite", SupportsDelay: true, SupportsNativeDLQ: true, SupportsOrdering: true, SupportsTracing: false, SupportsBatching: true, SupportsAck: true, SupportsNack: true, SupportsPriority: false, } // PostgresCapabilities for PostgreSQL-based transport. PostgresCapabilities = Capabilities{ Name: "postgres", SupportsDelay: true, SupportsNativeDLQ: true, SupportsOrdering: true, SupportsTracing: false, SupportsBatching: true, SupportsAck: true, SupportsNack: true, SupportsPriority: true, } // HTTPCapabilities for HTTP-based transport. HTTPCapabilities = Capabilities{ Name: "http", SupportsDelay: false, SupportsNativeDLQ: false, SupportsOrdering: false, SupportsTracing: true, SupportsBatching: false, SupportsAck: false, SupportsNack: false, SupportsPriority: false, } // IOCapabilities for file-based I/O transport. IOCapabilities = Capabilities{ Name: "io", SupportsDelay: false, SupportsNativeDLQ: false, SupportsOrdering: true, SupportsTracing: false, SupportsBatching: false, SupportsAck: false, SupportsNack: false, SupportsPriority: false, } )
Predefined capability sets for common transports.
var DefaultRegistry = NewRegistry()
DefaultRegistry is the global transport registry.
Functions ¶
func RegisterWithCapabilities ¶
func RegisterWithCapabilities(name string, builder Builder, caps Capabilities)
RegisterWithCapabilities adds a transport builder and its capabilities to the default registry.
Types ¶
type Builder ¶
type Builder func(ctx context.Context, cfg Config, logger watermill.LoggerAdapter) (Transport, error)
Builder is the function signature for creating a transport from config. Each transport package should provide a Builder function that can be registered.
type Capabilities ¶
type Capabilities struct {
// SupportsDelay indicates the transport can natively delay message delivery.
// When false, delayed delivery must be emulated by the application.
SupportsDelay bool
// SupportsNativeDLQ indicates the transport has built-in dead letter queue support.
// When false, protoflow will handle DLQ routing at the application level.
SupportsNativeDLQ bool
// SupportsOrdering indicates the transport guarantees message ordering.
// When true, messages within a partition/stream are delivered in order.
SupportsOrdering bool
// SupportsTracing indicates the transport propagates tracing headers natively.
SupportsTracing bool
// SupportsBatching indicates the transport can batch multiple messages.
SupportsBatching bool
// SupportsAck indicates the transport supports explicit message acknowledgment.
SupportsAck bool
// SupportsNack indicates the transport supports negative acknowledgment (redelivery).
SupportsNack bool
// SupportsPriority indicates the transport supports message priority queues.
SupportsPriority bool
// SupportsPartitioning indicates the transport supports message partitioning.
SupportsPartitioning bool
// MaxMessageSize is the maximum message size in bytes (0 = unlimited/unknown).
MaxMessageSize int64
// MaxDelayDuration is the maximum delay duration supported (0 = unlimited/unknown).
MaxDelayDuration int64
// Name is the human-readable name of the transport.
Name string
// Version is the transport/driver version.
Version string
}
Capabilities describes the features supported by a transport backend. Use this to introspect what operations are available at runtime.
func GetCapabilities ¶
func GetCapabilities(transportName string) Capabilities
GetCapabilities returns the capabilities for a transport by name. Uses the registry to look up capabilities registered by each transport package. Returns a zero Capabilities struct if the transport is unknown.
func (Capabilities) RequiresDLQEmulation ¶
func (c Capabilities) RequiresDLQEmulation() bool
RequiresDLQEmulation returns true if the transport needs application-level DLQ routing because it doesn't support native dead letter queues.
func (Capabilities) RequiresDelayEmulation ¶
func (c Capabilities) RequiresDelayEmulation() bool
RequiresDelayEmulation returns true if the transport needs application-level delay handling because it doesn't support native delayed delivery.
func (Capabilities) SupportsReliableDelivery ¶
func (c Capabilities) SupportsReliableDelivery() bool
SupportsReliableDelivery returns true if the transport supports at-least-once delivery semantics (ack + nack).
type CapabilitiesProvider ¶
type CapabilitiesProvider interface {
Capabilities() Capabilities
}
CapabilitiesProvider is implemented by transports that can report their capabilities.
type Config ¶
type Config interface {
// GetPubSubSystem returns the transport type name.
GetPubSubSystem() string
// Kafka
GetKafkaBrokers() []string
GetKafkaConsumerGroup() string
// RabbitMQ
GetRabbitMQURL() string
// NATS
GetNATSURL() string
// HTTP
GetHTTPServerAddress() string
GetHTTPPublisherURL() string
// IO
GetIOFile() string
// SQLite
GetSQLiteFile() string
// PostgreSQL
GetPostgresURL() string
// AWS
GetAWSRegion() string
GetAWSAccountID() string
GetAWSAccessKeyID() string
GetAWSSecretAccessKey() string
GetAWSEndpoint() string
}
Config provides the configuration values needed by transports. This interface allows transports to access only the config they need without depending on the full config package.
type DLQLister ¶
type DLQLister interface {
ListDLQMessages(topic string, limit, offset int) ([]DLQMessage, error)
}
DLQLister is implemented by transports that can list DLQ messages.
type DLQManager ¶
type DLQManager interface {
GetDLQCount(topic string) (int64, error)
ReplayDLQMessage(dlqID int64) error
ReplayAllDLQ(topic string) (int64, error)
PurgeDLQ(topic string) (int64, error)
}
DLQManager is implemented by transports that support DLQ management.
type DLQMessage ¶
type DLQMessage struct {
ID int64 `json:"id"`
UUID string `json:"uuid"`
OriginalTopic string `json:"original_topic"`
Payload []byte `json:"payload"`
Metadata map[string]string `json:"metadata"`
ErrorMessage string `json:"error_message"`
FailedAt time.Time `json:"failed_at"`
RetryCount int `json:"retry_count"`
}
DLQMessage represents a message in the dead letter queue. This is used by transports that implement DLQLister.
type DelayedPublisher ¶
type DelayedPublisher interface {
PublishWithDelay(topic string, delay int64, messages ...*message.Message) error
}
DelayedPublisher is implemented by transports that support delayed message delivery.
type QueueIntrospector ¶
QueueIntrospector is implemented by transports that can report queue statistics.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry maintains a mapping of transport names to their builders and capabilities. Transport packages should register themselves using Register.
func (*Registry) Build ¶
func (r *Registry) Build(ctx context.Context, cfg Config, logger watermill.LoggerAdapter) (Transport, error)
Build creates a transport using the registered builder for the config's PubSubSystem.
func (*Registry) GetCapabilities ¶
func (r *Registry) GetCapabilities(name string) Capabilities
GetCapabilities returns the capabilities for a registered transport. Returns a zero Capabilities struct if the transport is unknown.
func (*Registry) Register ¶
Register adds a transport builder to the registry. The name should match the PubSubSystem config value (e.g., "kafka", "rabbitmq").
func (*Registry) RegisterWithCapabilities ¶
func (r *Registry) RegisterWithCapabilities(name string, builder Builder, caps Capabilities)
RegisterWithCapabilities adds a transport builder and its capabilities to the registry.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package aws provides an AWS SNS/SQS transport for protoflow.
|
Package aws provides an AWS SNS/SQS transport for protoflow. |
|
Package channel provides an in-memory Go channel transport for protoflow.
|
Package channel provides an in-memory Go channel transport for protoflow. |
|
Package http provides an HTTP transport for protoflow.
|
Package http provides an HTTP transport for protoflow. |
|
Package io provides a file-based I/O transport for protoflow.
|
Package io provides a file-based I/O transport for protoflow. |
|
Package jetstream provides a NATS JetStream transport for protoflow.
|
Package jetstream provides a NATS JetStream transport for protoflow. |
|
Package kafka provides a Kafka transport for protoflow.
|
Package kafka provides a Kafka transport for protoflow. |
|
Package nats provides a NATS Core transport for protoflow.
|
Package nats provides a NATS Core transport for protoflow. |
|
Package postgres provides a PostgreSQL-based transport for protoflow.
|
Package postgres provides a PostgreSQL-based transport for protoflow. |
|
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow.
|
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow. |
|
Package sqlite provides a SQLite-based transport for protoflow.
|
Package sqlite provides a SQLite-based transport for protoflow. |
|
Package transports imports all built-in transports for auto-registration.
|
Package transports imports all built-in transports for auto-registration. |