wrpc

package module
v0.0.0-...-d8756de Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 16, 2024 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusBreakerClosed int32 = iota + 1
	StatusBreakerOpen
	StatusBreakerHalfOpen
)

Variables

View Source
var DefaultDeadlineDuration = 10 * time.Second

DefaultDeadlineDuration IO超时

View Source
var DefaultDelayOffDuration = 5 * time.Second

DefaultDelayOffDuration 延迟关闭时间

View Source
var DefaultHeartbeatDuration time.Duration = time.Second * 5

DefaultHeartbeatDuration 心跳包

View Source
var DefaultHeartbeatPeriodDuration time.Duration = (DefaultHeartbeatDuration * 9) / 10

DefaultHeartbeatPeriodDuration 心跳包间距

View Source
var SnowFlakeStartupTime int64 = time.Date(2024, time.January, 1, 0, 0, 0, 0, time.UTC).UnixNano()

SnowFlakeStartupTime

View Source
var TCPBufferSize = 4 * 1024

TCPBufferSize 缓存大小

Functions

func AllocRPCResponse

func AllocRPCResponse() *rpcResponse

AllocRPCResponse 从池里取一个

func FrameCtxCancelFunc

func FrameCtxCancelFunc(id int64) []byte

func FrameEncode

func FrameEncode(f Frame, metadata utils.MetaDict[string], payload any, w io.Writer, encoder func(any, io.Writer) error) error

func FreeRPCResponse

func FreeRPCResponse(r *rpcResponse)

FreeRPCResponse 还一个到池里

func GetMeta

func GetMeta(ctx context.Context) utils.MetaDict[string]

GetMeta 取

func GetMetadata

func GetMetadata(data []byte) (utils.MetaDict[string], error)

GetMetadata

func GetPayload

func GetPayload(data []byte, unmarshal func([]byte, any) error) (any, error)

GetPayload

func SetMeta

SetMeta

Types

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker 回路 SRE过载保护算法 降低 K 值会使自适应限流算法更加激进(允许客户端在算法启动时拒绝更多本地请求) 增加 K 值会使自适应限流算法不再那么激进(允许服务端在算法启动时尝试接收更多的请求,与上面相反)

func NewCircuitBreaker

func NewCircuitBreaker(request int64, k float64) *CircuitBreaker

NewCircuitBreakcer 新加回路

func (*CircuitBreaker) AllowRequest

func (c *CircuitBreaker) AllowRequest() error

func (*CircuitBreaker) Breaker

func (c *CircuitBreaker) Breaker(proto func(context.Context, Frame, any) error) func(context.Context, Frame, any) error

func (*CircuitBreaker) MarkFailed

func (c *CircuitBreaker) MarkFailed()

func (*CircuitBreaker) MarkSuccess

func (c *CircuitBreaker) MarkSuccess()

type Client

type Client struct {
	Options

	Conn

	Close func()
	// contains filtered or unexported fields
}

Client 连接

func NewTCPClient

func NewTCPClient(url string, o *Options) (*Client, error)

NewTCPClient 新建

func (*Client) Call

func (c *Client) Call(ctx context.Context, serviceMethod string, args, reply any) error

Call 调用指定的服务,方法,等待调用返回,将结果写入reply,然后返回执行的错误状态 request and response/请求-响应

func (*Client) CloseStream

func (c *Client) CloseStream(s *Stream)

func (*Client) NewStream

func (c *Client) NewStream(ctx context.Context, serviceMethod string) (*Stream, error)

NewStream

func (*Client) Subscribe

func (c *Client) Subscribe(topic string, handler func([]byte) error) error

Subscribe 订阅主题

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic string) error

Unsubscribe 退订主题

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn 连接

func (Conn) Equal

func (c Conn) Equal(obj any) bool

func (Conn) WriteTo

func (c Conn) WriteTo(p net.Buffers)

WriteTo

type Frame

type Frame struct {
	Status uint16
	Seq    int64
	Method uint64
}

Frame 帧

func FrameUnmarshal

func FrameUnmarshal(data []byte, unmarshal func([]byte, any) error) (Frame, utils.MetaDict[string], any, error)

FrameUnmarshal 解码

func GetFrame

func GetFrame(data []byte) (Frame, error)

GetFrame 解码帧,返会帧及错误

func NewFrame

func NewFrame(status uint16, seq int64, method string) (f Frame)

type Limiter

type Limiter struct {
	utils.TokenBucketLimiter
}

func (*Limiter) Ratelimit

func (l *Limiter) Ratelimit(proto func([]byte, io.Writer) error) func([]byte, io.Writer) error

type MethodInfo

type MethodInfo struct {
	// contains filtered or unexported fields
}

MethodInfo 方法

type Option

type Option func(*Options)

Option 选项赋值

func WarpSend

func WarpSend(s func(func(context.Context, Frame, any) error) func(context.Context, Frame, any) error) Option

WarpSend 客服端包装发送

func WithCodec

func WithCodec(m func(any) ([]byte, error), e func(any, io.Writer) error, um func([]byte, any) error) Option

WithCodec 编码

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger 日志

func WithProtocolMagicNumber

func WithProtocolMagicNumber(pm uint32) Option

func WithWarpHandler

func WithWarpHandler(w func(func([]byte, io.Writer) error) func([]byte, io.Writer) error) Option

WithWarpHandler 服务端拦截器

type Options

type Options struct {
	ProtocolMagicNumber uint32

	//编码器
	Marshal func(any) ([]byte, error)
	Encoder func(any, io.Writer) error
	//解码器
	Unmarshal func([]byte, any) error
	//服务端拦截器
	WarpHandler func(func([]byte, io.Writer) error) func([]byte, io.Writer) error
	//客服端包装发送
	WarpSend func(func(context.Context, Frame, any) error) func(context.Context, Frame, any) error
	//日志
	Logger *slog.Logger
	// contains filtered or unexported fields
}

Options 配置

func NewOptions

func NewOptions(opts ...Option) *Options

NewOptions 创建并返回一个配置:接收Option函数类型的不定向参数列表

type RPCContext

type RPCContext struct {
	Metadata utils.MetaDict[string]
}

RPCContext 上下文

type Service

type Service struct {
	Options
	// contains filtered or unexported fields
}

Service 服务端应答

func NewService

func NewService(o *Options) *Service

NewService 新建

func (*Service) RegisterRPC

func (sh *Service) RegisterRPC(target string, rcvr any) error

RegisterRPC 函数必须是导出的,即首字母为大写 函数阻塞会打断网络io

func (*Service) RegisterTopic

func (sh *Service) RegisterTopic(name string) *Topic

RegisterTopic 注册主题

func (*Service) RemoveTopic

func (sh *Service) RemoveTopic(name string)

RemoveTopic 移除主题

func (*Service) Stop

func (sh *Service) Stop()

Stop

func (*Service) TCPServer

func (sh *Service) TCPServer(port string) error

TCPServer tcp服务

type Stream

type Stream struct {
	Conn
	// contains filtered or unexported fields
}

Stream 流

func (*Stream) Recv

func (s *Stream) Recv() (any, error)

Recv 非顺序接受数据

func (*Stream) Send

func (s *Stream) Send(data any) error

type TCPServer

type TCPServer struct {
	ProtocolMagicNumber uint32

	Handler func([]byte, io.Writer) error

	Logger *slog.Logger
	// contains filtered or unexported fields
}

TCPServer TCP服务

func NewTCPServer

func NewTCPServer(snowFlakeID *utils.SnowFlakeID, port string, handler func([]byte, io.Writer) error, logger *slog.Logger) *TCPServer

NewTCPServer 新建

func (*TCPServer) Run

func (s *TCPServer) Run()

Run 运行

func (*TCPServer) Stop

func (s *TCPServer) Stop()

Stop 关闭

type TCPSession

type TCPSession struct {
	Handler func([]byte, io.Writer) error

	Logger *slog.Logger
	// contains filtered or unexported fields
}

TCPSession 会话

func TCPDial

func TCPDial(url string, protocolMagicNumber uint32, handler func([]byte, io.Writer) error, logger *slog.Logger) (*TCPSession, error)

TCPDial 连接

type Topic

type Topic struct {
	*Service
	Name string
	// contains filtered or unexported fields
}

Topic 主题

func (*Topic) Broadcast

func (t *Topic) Broadcast(data []byte) error

Broadcast 广播

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL