Implementing CorrelationID In Kafka Stream
今天在实现 Logging Correlation ID 的功能。理想状态下,我是期望能够在不影响业务逻辑代码的情况下,参照AOP的理念,给Topology的每个processor的增加如下行为:
- 从header提取
CorrelationID - 把
CorrelationID设置到 Mapped Diagnositc Context(其底层用ThreadLocal实现) - 在logger的 pattern增加
CorrelationID - 清理Mapped Diagnositc Context
这样业务代码不需要做任何改动就可以获得打印correlationID的能力。 然而由于KafakStream的dsl不暴露ConsumerRecord给我们,操作header并不是特别方便。 参见Kafka Record Header的背景以及使用场景。
Zipkin中对KafkaStream的tracing的实现方式与我在前一个项目中做的SafeKafkaStream的实现方式非常类似:通过一个wrapper,实现KafkaStream接口,把各种operation delegate到wrapper例的delegatee并添加额外的行为。
最后采取的思路如下:
- 使用原始消息的payload中的id作为correlation id,使用一个“知道如何从各种类型的Record中提取correlation id”的
CorrelationIDExtractor提取 correlationId - 把各个operator的参数, 大多为一个 function,使用
withContext装饰起来,在装饰后的function里进行setup和cleanup的操作。
这种折衷的方案仍然有如下优点:
- CorrelationID的获取集中在CorrelationIDExtractor这个一个地方,后续如果KafkaStream有更新对header的支持很容易切换到新的方案。
withContext尽量减少了对业务代码的侵入。