transport

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package transport defines the core interfaces and types for protoflow transports. Each transport implementation (kafka, rabbitmq, aws, etc.) should be in its own sub-package and register itself with the transport registry.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ChannelCapabilities for in-memory Go channel transport.
	ChannelCapabilities = Capabilities{
		Name:              "channel",
		SupportsDelay:     false,
		SupportsNativeDLQ: false,
		SupportsOrdering:  true,
		SupportsTracing:   false,
		SupportsBatching:  false,
		SupportsAck:       true,
		SupportsNack:      true,
		SupportsPriority:  false,
	}

	// KafkaCapabilities for Apache Kafka transport.
	KafkaCapabilities = Capabilities{
		Name:                 "kafka",
		SupportsDelay:        false,
		SupportsNativeDLQ:    false,
		SupportsOrdering:     true,
		SupportsTracing:      true,
		SupportsBatching:     true,
		SupportsAck:          true,
		SupportsNack:         false,
		SupportsPriority:     false,
		SupportsPartitioning: true,
		MaxMessageSize:       1048576,
	}

	// RabbitMQCapabilities for RabbitMQ/AMQP transport.
	RabbitMQCapabilities = Capabilities{
		Name:              "rabbitmq",
		SupportsDelay:     true,
		SupportsNativeDLQ: true,
		SupportsOrdering:  true,
		SupportsTracing:   true,
		SupportsBatching:  false,
		SupportsAck:       true,
		SupportsNack:      true,
		SupportsPriority:  true,
	}

	// NATSCapabilities for NATS Core transport.
	NATSCapabilities = Capabilities{
		Name:              "nats",
		SupportsDelay:     false,
		SupportsNativeDLQ: false,
		SupportsOrdering:  false,
		SupportsTracing:   true,
		SupportsBatching:  false,
		SupportsAck:       false,
		SupportsNack:      false,
		SupportsPriority:  false,
		MaxMessageSize:    1048576,
	}

	// NATSJetStreamCapabilities for NATS JetStream transport.
	NATSJetStreamCapabilities = Capabilities{
		Name:              "nats-jetstream",
		SupportsDelay:     true,
		SupportsNativeDLQ: true,
		SupportsOrdering:  true,
		SupportsTracing:   true,
		SupportsBatching:  true,
		SupportsAck:       true,
		SupportsNack:      true,
		SupportsPriority:  false,
		MaxMessageSize:    1048576,
	}

	// AWSCapabilities for AWS SNS/SQS transport.
	AWSCapabilities = Capabilities{
		Name:              "aws",
		SupportsDelay:     true,
		SupportsNativeDLQ: true,
		SupportsOrdering:  true,
		SupportsTracing:   true,
		SupportsBatching:  true,
		SupportsAck:       true,
		SupportsNack:      true,
		SupportsPriority:  false,
		MaxMessageSize:    262144,
		MaxDelayDuration:  900000,
	}

	// SQLiteCapabilities for SQLite-based transport.
	SQLiteCapabilities = Capabilities{
		Name:              "sqlite",
		SupportsDelay:     true,
		SupportsNativeDLQ: true,
		SupportsOrdering:  true,
		SupportsTracing:   false,
		SupportsBatching:  true,
		SupportsAck:       true,
		SupportsNack:      true,
		SupportsPriority:  false,
	}

	// PostgresCapabilities for PostgreSQL-based transport.
	PostgresCapabilities = Capabilities{
		Name:              "postgres",
		SupportsDelay:     true,
		SupportsNativeDLQ: true,
		SupportsOrdering:  true,
		SupportsTracing:   false,
		SupportsBatching:  true,
		SupportsAck:       true,
		SupportsNack:      true,
		SupportsPriority:  true,
	}

	// HTTPCapabilities for HTTP-based transport.
	HTTPCapabilities = Capabilities{
		Name:              "http",
		SupportsDelay:     false,
		SupportsNativeDLQ: false,
		SupportsOrdering:  false,
		SupportsTracing:   true,
		SupportsBatching:  false,
		SupportsAck:       false,
		SupportsNack:      false,
		SupportsPriority:  false,
	}

	// IOCapabilities for file-based I/O transport.
	IOCapabilities = Capabilities{
		Name:              "io",
		SupportsDelay:     false,
		SupportsNativeDLQ: false,
		SupportsOrdering:  true,
		SupportsTracing:   false,
		SupportsBatching:  false,
		SupportsAck:       false,
		SupportsNack:      false,
		SupportsPriority:  false,
	}
)

Predefined capability sets for common transports.

View Source
var DefaultRegistry = NewRegistry()

DefaultRegistry is the global transport registry.

Functions

func Register

func Register(name string, builder Builder)

Register adds a transport builder to the default registry.

func RegisterWithCapabilities

func RegisterWithCapabilities(name string, builder Builder, caps Capabilities)

RegisterWithCapabilities adds a transport builder and its capabilities to the default registry.

Types

type Builder

type Builder func(ctx context.Context, cfg Config, logger watermill.LoggerAdapter) (Transport, error)

Builder is the function signature for creating a transport from config. Each transport package should provide a Builder function that can be registered.

type Capabilities

type Capabilities struct {
	// SupportsDelay indicates the transport can natively delay message delivery.
	// When false, delayed delivery must be emulated by the application.
	SupportsDelay bool

	// SupportsNativeDLQ indicates the transport has built-in dead letter queue support.
	// When false, protoflow will handle DLQ routing at the application level.
	SupportsNativeDLQ bool

	// SupportsOrdering indicates the transport guarantees message ordering.
	// When true, messages within a partition/stream are delivered in order.
	SupportsOrdering bool

	// SupportsTracing indicates the transport propagates tracing headers natively.
	SupportsTracing bool

	// SupportsBatching indicates the transport can batch multiple messages.
	SupportsBatching bool

	// SupportsAck indicates the transport supports explicit message acknowledgment.
	SupportsAck bool

	// SupportsNack indicates the transport supports negative acknowledgment (redelivery).
	SupportsNack bool

	// SupportsPriority indicates the transport supports message priority queues.
	SupportsPriority bool

	// SupportsPartitioning indicates the transport supports message partitioning.
	SupportsPartitioning bool

	// MaxMessageSize is the maximum message size in bytes (0 = unlimited/unknown).
	MaxMessageSize int64

	// MaxDelayDuration is the maximum delay duration supported (0 = unlimited/unknown).
	MaxDelayDuration int64

	// Name is the human-readable name of the transport.
	Name string

	// Version is the transport/driver version.
	Version string
}

Capabilities describes the features supported by a transport backend. Use this to introspect what operations are available at runtime.

func GetCapabilities

func GetCapabilities(transportName string) Capabilities

GetCapabilities returns the capabilities for a transport by name. Uses the registry to look up capabilities registered by each transport package. Returns a zero Capabilities struct if the transport is unknown.

func (Capabilities) RequiresDLQEmulation

func (c Capabilities) RequiresDLQEmulation() bool

RequiresDLQEmulation returns true if the transport needs application-level DLQ routing because it doesn't support native dead letter queues.

func (Capabilities) RequiresDelayEmulation

func (c Capabilities) RequiresDelayEmulation() bool

RequiresDelayEmulation returns true if the transport needs application-level delay handling because it doesn't support native delayed delivery.

func (Capabilities) SupportsReliableDelivery

func (c Capabilities) SupportsReliableDelivery() bool

SupportsReliableDelivery returns true if the transport supports at-least-once delivery semantics (ack + nack).

type CapabilitiesProvider

type CapabilitiesProvider interface {
	Capabilities() Capabilities
}

CapabilitiesProvider is implemented by transports that can report their capabilities.

type Config

type Config interface {
	// GetPubSubSystem returns the transport type name.
	GetPubSubSystem() string

	// Kafka
	GetKafkaBrokers() []string
	GetKafkaConsumerGroup() string

	// RabbitMQ
	GetRabbitMQURL() string

	// NATS
	GetNATSURL() string

	// HTTP
	GetHTTPServerAddress() string
	GetHTTPPublisherURL() string

	// IO
	GetIOFile() string

	// SQLite
	GetSQLiteFile() string

	// PostgreSQL
	GetPostgresURL() string

	// AWS
	GetAWSRegion() string
	GetAWSAccountID() string
	GetAWSAccessKeyID() string
	GetAWSSecretAccessKey() string
	GetAWSEndpoint() string
}

Config provides the configuration values needed by transports. This interface allows transports to access only the config they need without depending on the full config package.

type DLQLister

type DLQLister interface {
	ListDLQMessages(topic string, limit, offset int) ([]DLQMessage, error)
}

DLQLister is implemented by transports that can list DLQ messages.

type DLQManager

type DLQManager interface {
	GetDLQCount(topic string) (int64, error)
	ReplayDLQMessage(dlqID int64) error
	ReplayAllDLQ(topic string) (int64, error)
	PurgeDLQ(topic string) (int64, error)
}

DLQManager is implemented by transports that support DLQ management.

type DLQMessage

type DLQMessage struct {
	ID            int64             `json:"id"`
	UUID          string            `json:"uuid"`
	OriginalTopic string            `json:"original_topic"`
	Payload       []byte            `json:"payload"`
	Metadata      map[string]string `json:"metadata"`
	ErrorMessage  string            `json:"error_message"`
	FailedAt      time.Time         `json:"failed_at"`
	RetryCount    int               `json:"retry_count"`
}

DLQMessage represents a message in the dead letter queue. This is used by transports that implement DLQLister.

type DelayedPublisher

type DelayedPublisher interface {
	PublishWithDelay(topic string, delay int64, messages ...*message.Message) error
}

DelayedPublisher is implemented by transports that support delayed message delivery.

type QueueIntrospector

type QueueIntrospector interface {
	GetPendingCount(topic string) (int64, error)
}

QueueIntrospector is implemented by transports that can report queue statistics.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maintains a mapping of transport names to their builders and capabilities. Transport packages should register themselves using Register.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new transport registry.

func (*Registry) Build

func (r *Registry) Build(ctx context.Context, cfg Config, logger watermill.LoggerAdapter) (Transport, error)

Build creates a transport using the registered builder for the config's PubSubSystem.

func (*Registry) GetCapabilities

func (r *Registry) GetCapabilities(name string) Capabilities

GetCapabilities returns the capabilities for a registered transport. Returns a zero Capabilities struct if the transport is unknown.

func (*Registry) Has

func (r *Registry) Has(name string) bool

Has returns true if a transport is registered with the given name.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns the list of registered transport names.

func (*Registry) Register

func (r *Registry) Register(name string, builder Builder)

Register adds a transport builder to the registry. The name should match the PubSubSystem config value (e.g., "kafka", "rabbitmq").

func (*Registry) RegisterWithCapabilities

func (r *Registry) RegisterWithCapabilities(name string, builder Builder, caps Capabilities)

RegisterWithCapabilities adds a transport builder and its capabilities to the registry.

type Transport

type Transport struct {
	Publisher  message.Publisher
	Subscriber message.Subscriber
}

Transport combines a publisher and subscriber pair produced by a factory.

func Build

func Build(ctx context.Context, cfg Config, logger watermill.LoggerAdapter) (Transport, error)

Build creates a transport using the default registry.

Directories

Path Synopsis
Package aws provides an AWS SNS/SQS transport for protoflow.
Package aws provides an AWS SNS/SQS transport for protoflow.
Package channel provides an in-memory Go channel transport for protoflow.
Package channel provides an in-memory Go channel transport for protoflow.
Package http provides an HTTP transport for protoflow.
Package http provides an HTTP transport for protoflow.
Package io provides a file-based I/O transport for protoflow.
Package io provides a file-based I/O transport for protoflow.
Package jetstream provides a NATS JetStream transport for protoflow.
Package jetstream provides a NATS JetStream transport for protoflow.
Package kafka provides a Kafka transport for protoflow.
Package kafka provides a Kafka transport for protoflow.
Package nats provides a NATS Core transport for protoflow.
Package nats provides a NATS Core transport for protoflow.
Package postgres provides a PostgreSQL-based transport for protoflow.
Package postgres provides a PostgreSQL-based transport for protoflow.
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow.
Package rabbitmq provides a RabbitMQ/AMQP transport for protoflow.
Package sqlite provides a SQLite-based transport for protoflow.
Package sqlite provides a SQLite-based transport for protoflow.
Package transports imports all built-in transports for auto-registration.
Package transports imports all built-in transports for auto-registration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL