2026/4/6 9:20:14
网站建设
项目流程
移动网站系统,深圳装修设计生产厂家,四川哪家网站做的最好,北京专业网站设计推荐1. 前言
在分布式系统中#xff0c;RocketMQ 不仅仅是一个消息传输管道#xff0c;它还提供了强大的插件化扩展能力。RocketMQ Hook#xff08;钩子#xff09; 机制类似于 Spring 的 AOP#xff08;面向切面编程#xff09;或 Servlet Filter。它允许开发者在消息发送前…1. 前言在分布式系统中RocketMQ 不仅仅是一个消息传输管道它还提供了强大的插件化扩展能力。RocketMQ Hook钩子机制类似于 Spring 的 AOP面向切面编程或 Servlet Filter。它允许开发者在消息发送前、发送后、消费前、消费后这四个关键时间点插入自定义逻辑。Hook 的典型应用场景包括全链路追踪集成 SkyWalking 或 OpenTelemetry在消息头中注入 TraceID。消息审计与监控统计消息发送耗时、消费成功率。数据隔离与上下文透传在多租户场景下隐式传递租户身份信息。本文将重点介绍 RocketMQ 的SendMessageHook和ConsumeMessageHook接口并结合 Spring Boot演示如何实现 SaaS 场景下的租户上下文透传。2. 核心接口定义2.1 SendMessageHook (发送方钩子)该接口位于生产者端用于拦截消息发送过程。publicinterfaceSendMessageHook{StringhookName();// 发送消息之前执行// 可以在这里修改消息内容、添加 Header 属性、获取 ThreadLocal 上下文voidsendMessageBefore(SendMessageContextcontext);// 发送消息之后执行// 可以在这里统计耗时、记录发送结果voidsendMessageAfter(SendMessageContextcontext);}2.2 ConsumeMessageHook (消费方钩子)该接口位于消费者端用于拦截消息消费过程。publicinterfaceConsumeMessageHook{StringhookName();// 消费消息之前执行// 可以在这里从 Header 提取属性、设置 ThreadLocal 上下文voidconsumeMessageBefore(ConsumeMessageContextcontext);// 消费消息之后执行// 务必在这里清理 ThreadLocal防止线程池污染voidconsumeMessageAfter(ConsumeMessageContextcontext);}3. 实战案例SaaS 多租户上下文透传3.1 业务背景与问题在 SaaS 架构中所有租户共享同一套基础设施应用服务、MQ 集群、数据库。数据隔离通常依赖于代码中的tenant_id。现状HTTP 请求进入 Controller 时拦截器会解析 Token 并将tenant_id存入当前线程的ThreadLocal例如TenantContextHolder。问题当业务逻辑触发 MQ 消息发送时消息会经过网络传输给消费者可能是另一台服务器。网络传输会导致 ThreadLocal 信息丢失。消费者线程在处理消息时无法知道这条消息属于哪个租户从而导致数据库操作缺少租户条件。解决方案利用 RocketMQ Hook在发送前将 ThreadLocal 中的tenant_id注入到消息的 UserPropertyHeader中在消费前读取 Header 并还原到 ThreadLocal。3.2 代码实现(1) 发送方钩子注入租户标识实现SendMessageHook将TenantContextHolder中的 ID 放入消息属性。publicclassTenantRocketMQSendMessageHookimplementsSendMessageHook{publicstaticfinalStringHEADER_TENANT_IDtenant-id;OverridepublicStringhookName(){returngetClass().getSimpleName();}OverridepublicvoidsendMessageBefore(SendMessageContextsendMessageContext){// 1. 获取当前线程的租户上下文LongtenantIdTenantContextHolder.getTenantId();// 2. 如果不存在租户信息则跳过if(tenantIdnull){return;}// 3. 将租户 ID 放入消息的 UserProperty (Header) 中// 这样该信息就会随网络传输到 BrokersendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID,tenantId.toString());}OverridepublicvoidsendMessageAfter(SendMessageContextsendMessageContext){// 发送后无需特殊处理}}(2) 消费方钩子还原租户上下文实现ConsumeMessageHook从消息属性中提取 ID 并恢复到 ThreadLocal。publicclassTenantRocketMQConsumeMessageHookimplementsConsumeMessageHook{publicstaticfinalStringHEADER_TENANT_IDtenant-id;OverridepublicStringhookName(){returngetClass().getSimpleName();}OverridepublicvoidconsumeMessageBefore(ConsumeMessageContextcontext){ListMessageExtmessagescontext.getMsgList();// 校验多租户透传场景下建议设置消费者为单条消费模式// 否则一批消息可能包含不同租户的数据ThreadLocal 无法处理Assert.isTrue(messages.size()1,消息条数({})不正确仅支持单条消费,messages.size());// 1. 从消息 Header 中读取租户 IDStringtenantIdStrmessages.get(0).getUserProperty(HEADER_TENANT_ID);// 2. 如果 Header 中存在租户信息则恢复到当前消费者线程的 ThreadLocal 中if(StrUtil.isNotEmpty(tenantIdStr)){TenantContextHolder.setTenantId(Long.parseLong(tenantIdStr));}}OverridepublicvoidconsumeMessageAfter(ConsumeMessageContextcontext){// 3. 【关键】消费结束后必须清理 ThreadLocal// RocketMQ 消费者使用的是线程池如果不清理该线程复用时会导致租户数据污染TenantContextHolder.clear();}}(3) 自动装配注册 Hook 到 RocketMQ在 Spring Boot 环境中我们需要通过BeanPostProcessor拦截 RocketMQ 的 Bean 初始化过程将上述两个 Hook 注册到底层的DefaultMQProducer和DefaultMQPushConsumer对象中。ComponentpublicclassTenantRocketMQInitializerimplementsBeanPostProcessor{OverridepublicObjectpostProcessAfterInitialization(Objectbean,StringbeanName)throwsBeansException{// 1. 拦截消费者容器注册消费钩子if(beaninstanceofDefaultRocketMQListenerContainer){DefaultRocketMQListenerContainercontainer(DefaultRocketMQListenerContainer)bean;initTenantConsumer(container.getConsumer());}// 2. 拦截生产者模板注册发送钩子elseif(beaninstanceofRocketMQTemplate){RocketMQTemplatetemplate(RocketMQTemplate)bean;initTenantProducer(template.getProducer());}returnbean;}privatevoidinitTenantProducer(DefaultMQProducerproducer){if(producernull||producer.getDefaultMQProducerImpl()null){return;}// 注册发送钩子producer.getDefaultMQProducerImpl().registerSendMessageHook(newTenantRocketMQSendMessageHook());}privatevoidinitTenantConsumer(DefaultMQPushConsumerconsumer){if(consumernull||consumer.getDefaultMQPushConsumerImpl()null){return;}// 注册消费钩子consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(newTenantRocketMQConsumeMessageHook());}}