Easy-DotNET Easy-DotNET
🏠首页
  • 知识地图
  • 源码脑图

    • 总览
    • Program
    • WebApplication
    • Host主机
    • WebHost主机
    • 依赖注入
    • Autofac
    • Middleware中间件
    • RateLimiter限制速率
    • 响应缓存、请求解压缩
  • 设计初衷
  • 克隆
  • 类型转换
  • 日期时间
  • IO流
  • 工具类
  • 语言特性
  • 集合类
  • Codec编码
  • 文本操作
  • 数学
  • 图片
  • 网络
  • Emoji表情
  • C# 12
  • C# 11
  • C# 10
  • C# 9.0
  • C# 8.0
  • C# 早期版本
  • C# 教程
  • ORM
  • 定时任务
  • 日志
  • 认证与授权
  • Swagger
  • 对象映射
  • 熔断重试限流
  • 缓存
  • 注册发现
  • 网关
  • GraphQL
  • 更多
  • Docker

    • 简介
    • Adminer
    • Apisix
    • Apollo
    • Cassandra
    • Cerebro
    • ClickHouse
    • Consul
    • EasyMock
    • Elasticsearch
    • Emqx
    • FastDFS
    • Flink
    • Gitlab
    • Jenkins
    • Jrebel
    • MariaDB
    • MySQL
    • Percona
    • Phpmyadmin
    • PostgreSQL
    • Redis
  • Linux

    • 查看Linux系统信息
    • CentOS7调整磁盘分区
    • IO压测
    • 图形化监控工具Cockpit
  • 总览
  • 列表

    • 算法数据结构
    • API
    • 应用框架
    • 应用模板
    • 操作系统
    • 工作流
    • 入门套件
    • 示例
    • 人工智能
    • 程序集
    • Assets
    • 认证授权
    • Blazor
    • 区块链
    • 书籍
    • 自动构建
    • 报表
    • 缓存
    • 日历
    • 聊天
    • CLI
    • CLR
    • CMS
    • 代码分析和指标
    • 代码片段
    • 压缩
    • 持续集成
    • 密码学
    • 数据库
    • 数据库驱动
    • 日期时间
    • 反编译
    • 部署
    • DirectX
    • 分布式计算
    • DLR
    • 文档
    • 电商支付
    • 模拟器
    • 环境管理
    • ETL
    • 事件消息
    • Exception
    • 扩展
    • 函数式编程
    • 游戏引擎
    • GIS
    • Git工具
    • 绘图
    • GraphQL
    • GUI
    • HTML-CSS
    • HTTP
    • IDE
    • 图片处理
    • 安装工具
    • 交互式编程
    • 国际化
    • 互操作性
    • IoC
    • JS引擎
    • 日志
    • 机器学习和数据科学
    • Markdown
    • 邮件
    • 数学
    • 媒体
    • 指标
    • 微型框架
    • 最小化器
    • MVVM
    • 网络
    • 对象映射
    • Office
    • OpenAI
    • ORM
    • 包管理器
    • PDF
    • 性能分析工具
    • 协议
    • 推送通知
    • SQL构建器
    • 消息队列
    • RPC
    • 响应式编程
    • 实时通信
    • 正则表达式
    • 任务调度
    • SDK和API
    • 搜索引擎
    • 序列化
    • SMS
    • 状态机
    • 静态站点生成
    • 强命名
    • 风格指南
    • 模板引擎
    • 测试
    • 工具
    • 交易
    • UI自动测试
    • Visual Studio 插件
    • Web浏览器
    • Web框架
    • WebServers
    • WebSocket
    • Windows服务
    • WPF
    • 解析器
    • 源码生成
    • 其他
    • 资源
  • AspNetCore面试题
  • Elasticsearch面试题
  • MongoDB面试题
  • MySql面试题
  • Nginx面试题
  • RabbitMQ面试题
  • Redis面试题
  • 设计模式
  • 微服务
🧑‍💻.NET Blog
GitHub (opens new window)
🏠首页
  • 知识地图
  • 源码脑图

    • 总览
    • Program
    • WebApplication
    • Host主机
    • WebHost主机
    • 依赖注入
    • Autofac
    • Middleware中间件
    • RateLimiter限制速率
    • 响应缓存、请求解压缩
  • 设计初衷
  • 克隆
  • 类型转换
  • 日期时间
  • IO流
  • 工具类
  • 语言特性
  • 集合类
  • Codec编码
  • 文本操作
  • 数学
  • 图片
  • 网络
  • Emoji表情
  • C# 12
  • C# 11
  • C# 10
  • C# 9.0
  • C# 8.0
  • C# 早期版本
  • C# 教程
  • ORM
  • 定时任务
  • 日志
  • 认证与授权
  • Swagger
  • 对象映射
  • 熔断重试限流
  • 缓存
  • 注册发现
  • 网关
  • GraphQL
  • 更多
  • Docker

    • 简介
    • Adminer
    • Apisix
    • Apollo
    • Cassandra
    • Cerebro
    • ClickHouse
    • Consul
    • EasyMock
    • Elasticsearch
    • Emqx
    • FastDFS
    • Flink
    • Gitlab
    • Jenkins
    • Jrebel
    • MariaDB
    • MySQL
    • Percona
    • Phpmyadmin
    • PostgreSQL
    • Redis
  • Linux

    • 查看Linux系统信息
    • CentOS7调整磁盘分区
    • IO压测
    • 图形化监控工具Cockpit
  • 总览
  • 列表

    • 算法数据结构
    • API
    • 应用框架
    • 应用模板
    • 操作系统
    • 工作流
    • 入门套件
    • 示例
    • 人工智能
    • 程序集
    • Assets
    • 认证授权
    • Blazor
    • 区块链
    • 书籍
    • 自动构建
    • 报表
    • 缓存
    • 日历
    • 聊天
    • CLI
    • CLR
    • CMS
    • 代码分析和指标
    • 代码片段
    • 压缩
    • 持续集成
    • 密码学
    • 数据库
    • 数据库驱动
    • 日期时间
    • 反编译
    • 部署
    • DirectX
    • 分布式计算
    • DLR
    • 文档
    • 电商支付
    • 模拟器
    • 环境管理
    • ETL
    • 事件消息
    • Exception
    • 扩展
    • 函数式编程
    • 游戏引擎
    • GIS
    • Git工具
    • 绘图
    • GraphQL
    • GUI
    • HTML-CSS
    • HTTP
    • IDE
    • 图片处理
    • 安装工具
    • 交互式编程
    • 国际化
    • 互操作性
    • IoC
    • JS引擎
    • 日志
    • 机器学习和数据科学
    • Markdown
    • 邮件
    • 数学
    • 媒体
    • 指标
    • 微型框架
    • 最小化器
    • MVVM
    • 网络
    • 对象映射
    • Office
    • OpenAI
    • ORM
    • 包管理器
    • PDF
    • 性能分析工具
    • 协议
    • 推送通知
    • SQL构建器
    • 消息队列
    • RPC
    • 响应式编程
    • 实时通信
    • 正则表达式
    • 任务调度
    • SDK和API
    • 搜索引擎
    • 序列化
    • SMS
    • 状态机
    • 静态站点生成
    • 强命名
    • 风格指南
    • 模板引擎
    • 测试
    • 工具
    • 交易
    • UI自动测试
    • Visual Studio 插件
    • Web浏览器
    • Web框架
    • WebServers
    • WebSocket
    • Windows服务
    • WPF
    • 解析器
    • 源码生成
    • 其他
    • 资源
  • AspNetCore面试题
  • Elasticsearch面试题
  • MongoDB面试题
  • MySql面试题
  • Nginx面试题
  • RabbitMQ面试题
  • Redis面试题
  • 设计模式
  • 微服务
🧑‍💻.NET Blog
GitHub (opens new window)
npm
  • 简介
  • ORM

    • EFCore
    • Dapper
    • FreeSql
    • SqlSugar
  • 任务调度

    • Hangfire
    • Quartz
    • FluentScheduler
    • Coravel
    • Quartzmin
  • 日志

    • Serilog
    • NLog
    • Log4Net
    • Stackdriver
    • ExceptionLess
  • 身份认证与授权

    • IdentityServer4
    • Identity
    • JWTBearer
    • Auth0
    • OpenIddict
  • Swagger文档

    • Swagger
    • Swashbuckle
    • NSwag
    • ReDoc
  • 对象映射

    • AutoMapper
    • EmitMapper
    • AgileMapper
    • Mapster
  • 消息传递

    • MediatR
    • MassTransit
    • Rebus
      • 安装
      • 发送和接收消息
        • 发送消息
        • 接收消息
      • 消息的持久化和重试
        • 消息的持久化
        • 消息的重试
      • 消息的处理
      • 消息的路由
      • 使用总线
      • 并发控制
      • 总结
    • NServiceBus
  • 熔断重试限流

    • Polly
    • Resilience4j
    • AkkaNET
  • 缓存

    • CsRedis
    • FreeRedis
    • EasyCaching
    • StackExchangeRedis
    • CacheCow
    • NCache
    • Memory
  • 注册发现

    • Consul
    • Nacos
    • Apollo
  • 网关

    • Ocelot
    • Kong
    • Traefik
    • Zuul
  • GraphQL

    • GraphQLPlatform
    • GraphQLdotnet
  • 更多

    • NodaTime
    • FluentAssertions
    • Humanizer
    • 爬虫-AngleSharp
    • 邮件-MailKit
  • NET微服务
  • 消息传递
一个大西瓜
2023-04-24
目录

Rebus

开源地址

Github:https://github.com/rebus-org/Rebus (opens new window)

Rebus是一个轻量级的.NET消息总线,用于在分布式系统中发送消息。Rebus支持多种传输方式,包括本地传输、MSMQ、RabbitMQ、Azure Service Bus和Amazon SQS等。它还提供了许多高级功能,如重试机制、事务支持、并发控制和批处理等。

Rebus的核心思想是使用简单的消息模型来解耦应用程序之间的通信,从而使应用程序更加可维护和可扩展。它通过将消息作为对象传递来实现这一点,而不是通过低级别的通信协议,例如TCP或HTTP。这使得开发人员可以更容易地构建高度可靠的系统,而无需了解复杂的通信协议和网络拓扑。

# 安装

在使用Rebus之前,我们需要将其添加到我们的.NET项目中。Rebus提供了一组NuGet软件包,可以通过NuGet包管理器进行安装。以下是安装Rebus的步骤:

  1. 打开Visual Studio,创建一个.NET Core或.NET Framework项目。
  2. 在解决方案资源管理器中,右键单击项目名称,然后选择“管理NuGet程序包”菜单项。
  3. 在NuGet包管理器中,搜索“Rebus”并安装以下软件包:
    • Rebus
    • Rebus.RabbitMQ
    • Rebus.Microsoft.Extensions.DependencyInjection
  4. 在应用程序的启动代码中配置Rebus。例如,如果我们使用的是ASP.NET Core,我们可以在Startup.cs文件中添加以下代码:

services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue")));

# 发送和接收消息

一旦我们安装了Rebus并将其配置到我们的应用程序中,我们就可以开始发送和接收消息了。下面是一个简单的示例,演示如何使用Rebus发送和接收消息。

# 发送消息


using (var activator = new BuiltinHandlerActivator())
{
    Configure.With(activator)
        .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
        .Start();

    var message = new MyMessage { Text = "Hello, world!" };
    activator.Bus.Send(message);
}

在这个例子中,我们使用Rebus的BuiltinHandlerActivator创建一个处理程序激活器,并使用RabbitMQ传输配置Rebus。然后,我们创建一个MyMessage对象,将其发送到消息总线。

# 接收消息


using (var activator = new BuiltinHandlerActivator())
{
    Configure.With(activator)
        .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
        .Routing(r => r.Typeased().MapAssemblyOf<MyMessage>("my-queue"))
        .Start();

    activator.Handle<MyMessage>(async message =>
    {
        Console.WriteLine($"Received message: {message.Text}");
    });

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey();
}

在这个例子中,我们同样使用BuiltinHandlerActivator创建一个处理程序激活器,并配置RabbitMQ传输。然后,我们注册一个处理程序,用于处理MyMessage类型的消息。当消息总线收到MyMessage消息时,处理程序将打印一条消息到控制台。

# 消息的持久化和重试

Rebus提供了一些高级功能,以确保消息能够被可靠地传递并正确地处理。其中包括消息的持久化和重试机制。

# 消息的持久化

Rebus提供了一种简单的方式来持久化消息,以便在系统崩溃或其他故障发生时可以恢复它们。要启用消息的持久化,我们可以使用InMemorySagaRepository或SqlServerSagaRepository等提供程序之一。

下面是一个例子,演示如何使用SqlServerSagaRepository持久化消息:

services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue"))
    .Sagas(s => s.StoreInSqlServer("Data Source=(local);Initial Catalog=RebusSample;Integrated Security=True;", "Sagas"))
    .Options(o => o.EnableMessageTracing()))
    .AddSqlServer();

在这个例子中,我们使用了SqlServerSagaRepository来持久化消息。我们还启用了消息跟踪功能,以便在需要时可以更轻松地调试和诊断消息。

# 消息的重试

在分布式系统中,消息的传递可能会因为各种原因而失败,例如网络问题、服务崩溃等。为了确保消息能够被正确地处理,Rebus提供了一种重试机制。默认情况下,Rebus将在5秒钟后重试失败的消息,并将重试次数限制为10次。

我们可以通过以下代码配置重试机制:


services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue"))
    .Options(o => o.SimpleRetryStrategy(errorQueueAddress: "error"))
    .Options(o => o.EnableMessageTracing()))
    .AddSqlServer();

在这个例子中,我们使用了SimpleRetryStrategy来配置Rebus的重试机制,并将错误消息路由到一个名为“error”的队列中。

# 消息的处理

在Rebus中,消息的处理是通过处理程序来完成的。处理程序是一个实现IHandleMessages<TMessage>接口的类,其中TMessage是我们要处理的消息类型。

下面是一个例子,演示如何创建一个处理程序来处理MyMessage消息:


public class MyMessageHandler : IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message)
    {
        Console.WriteLine($"Received message: {message.Text}");
    }
}

在这个例子中,我们创建了一个名为MyMessageHandler的类,它实现了IHandleMessages<MyMessage>接口。当消息总线收到MyMessage消息时,处理程序将打印一条消息到控制台。

我们可以使用activator.Handle方法将处理程序注册到处理程序激活器中:


activator.Handle<MyMessage>(new MyMessageHandler());

在这个例子中,我们将MyMessageHandler注册到处理程序激活器中,以便在消息总线收到MyMessage消息时调用它。

# 消息的路由

在Rebus中,消息的路由是通过一个路由表来完成的。路由表将消息类型映射到队列名称,以便消息可以正确地路由到接收者。

我们可以使用以下代码配置路由表:


services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue")))
    .AddSqlServer();

在这个例子中,我们使用TypeBased方法将消息类型映射到队列名称。MapAssemblyOf<MyMessage>将会扫描包含MyMessage类型的程序集,并将它们映射到my-queue队列。

我们还可以使用Endpoint方法将队列名称映射到端点名称。例如:


services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue"))
    .Options(o => o.Endpoint("my-endpoint")))
    .AddSqlServer();

在这个例子中,我们使用Endpoint方法将队列名称my-queue映射到端点名称my-endpoint。这使得我们可以使用my-endpoint来发送消息,而不是使用my-queue。

# 使用总线

Rebus提供了一个IBus接口,它允许我们在应用程序中发送和接收消息。我们可以在处理程序中使用IBus接口来发送和接收消息。

下面是一个例子,演示如何使用IBus接口发送和接收消息:


public class MyMessageHandler : IHandleMessages<MyMessage>
{
    private readonly IBus _bus;

    public MyMessageHandler(IBus bus)
    {
        _bus = bus;
    }

    public async Task Handle(MyMessage message)
    {
        Console.WriteLine($"Received message: {message.Text}");

        var replyMessage = new MyReplyMessage { Text = "Hello, back!" };
        await _bus.Reply(replyMessage);
    }
}

在这个例子中,我们注入了一个IBus接口,以便在处理程序中使用它来发送和接收消息。当处理程序收到MyMessage消息时,它将打印一条消息到控制台,并发送一个MyReplyMessage回复消息。

我们可以使用activator.Bus.Send方法来发送消息:


var message = new MyMessage { Text = "Hello, world!" };
activator.Bus.Send(message);

在这个例子中,我们使用处理程序激活器的Bus属性来发送MyMessage消息。

我们还可以使用activator.Bus.Publish方法来发布消息:


var message = new MyMessage { Text = "Hello, world!" };
activator.Bus.Publish(message);

在这个例子中,我们使用处理程序激活器的Bus属性来发布MyMessage消息。发布消息与发送消息不同,因为它可以传递给多个接收者。

# 并发控制

在分布式系统中,消息的处理可能会发生在多个线程中。为了确保系统的正确性和可靠性,我们需要进行并发控制。

Rebus提供了一些选项,以帮助我们控制消息的并发处理。例如,我们可以使用LimitConsumers选项限制同时处理消息的处理程序数量:


services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue"))
    .Options(o => o.LimitConsumers(5)))
    .AddSqlServer();

在这个例子中,我们使用LimitConsumers选项将消息处理程序的数量限制为5个。这意味着,在任何给定的时间,只有5个处理程序可以同时处理消息。

我们还可以使用MaxDegreeOfParallelism选项限制消息处理程序可以使用的最大线程数:


services.AddRebus(configure => configure
    .Transport(t => t.UseRabbitMq("amqp://guest:guest@localhost", "my-queue"))
    .Routing(r => r.TypeBased().MapAssemblyOf<MyMessage>("my-queue"))
    .Options(o => o.MaxDegreeOfParallelism(10)))
    .AddSqlServer();

在这个例子中,我们使用MaxDegreeOfParallelism选项将消息处理程序可以使用的最大线程数限制为10个。

# 总结

Rebus是一个轻量级的.NET消息总线,用于在分布式系统中发送消息。它提供了许多高级功能,如重试机制、事务支持、并发控制和批处理等。

在本文中,我们介绍了如何在.NET中使用Rebus。我们学习了如何安装Rebus、发送和接收消息、处理消息、路由消息以及进行并发控制。我们还讨论了消息的持久化和重试机制,以确保消息能够被可靠地传递并正确地处理。

Rebus是一个非常强大和灵活的消息总线,可以帮助我们构建可靠、可扩展的分布式系统。如果您正在开发.NET应用程序,并且需要在应用程序之间传递消息,那么Rebus是一个值得考虑的选择。

如果您想要深入了解Rebus的更多功能和用法,可以查阅Rebus的官方文档,或参考其GitHub存储库中的示例代码和文档。

上次更新: 2023/04/26, 22:10:06
MassTransit
NServiceBus

← MassTransit NServiceBus→

Theme by Vdoing | Copyright © 2019-2024 一个大西瓜 | MIT License | 苏ICP备2023013501号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式