今天在实现 Logging Correlation ID 的功能。理想状态下,我是期望能够在不影响业务逻辑代码的情况下,参照AOP的理念,给Topology的每个processor的增加如下行为:

  • 从header提取CorrelationID
  • CorrelationID 设置到 Mapped Diagnositc Context(其底层用ThreadLocal实现)
  • 在logger的 pattern增加 CorrelationID
  • 清理Mapped Diagnositc Context

这样业务代码不需要做任何改动就可以获得打印correlationID的能力。 然而由于KafakStream的dsl不暴露ConsumerRecord给我们,操作header并不是特别方便。 参见Kafka Record Header的背景以及使用场景。

Zipkin中对KafkaStream的tracing的实现方式与我在前一个项目中做的SafeKafkaStream的实现方式非常类似:通过一个wrapper,实现KafkaStream接口,把各种operation delegate到wrapper例的delegatee并添加额外的行为。

最后采取的思路如下:

  • 使用原始消息的payload中的id作为correlation id,使用一个“知道如何从各种类型的Record中提取correlation id”的CorrelationIDExtractor提取 correlationId
  • 把各个operator的参数, 大多为一个 function,使用 withContext 装饰起来,在装饰后的function里进行 setupcleanup的操作。

这种折衷的方案仍然有如下优点:

  • CorrelationID的获取集中在CorrelationIDExtractor这个一个地方,后续如果KafkaStream有更新对header的支持很容易切换到新的方案。
  • withContext尽量减少了对业务代码的侵入。

using pyinvoke for task automation

The main activities for developers to work in a codebase are the following: make some changes, run tests, package and upload some artifac...… Continue reading