简单监控akka在spring boot项目中的actor信箱

发表于2020-04-18,长度5029, 1011个单词, 14分钟读完
Flag Counter

做为项目中的新鲜玩意,akka的性能是上线后特别要关注的地方。没有人对这东西很懂,所以不盯紧一点,随时可能演变成大事故 —— 而且是不知道怎么修复的那种事故。

因为我在项目中使用的actor都是单例,也就是所有的请求都会被actor以单线程处理,所以监控actor的信箱大小是很重要,万一业务量使得单线程actor处理不过来怎么办。

既然使用单例actor,为何不使用单线程的线程池呢?从运行机制上看,actor和线程池差别不大,如果没有特别的理由选谁就看爱好了。

除了信箱的大小,如果能监控actor的数量、占用的内存、占用计算资源等方面也是极好的。我在网上找了一阵,针对akka的监控有几个,主要是

  • Lightbend Telemetry: https://developer.lightbend.com/docs/telemetry/current/home.html
  • Kamon: https://kamon.io/docs/v1/instrumentation/akka/metrics/
  • Github gist: https://gist.github.com/patriknw/5946678

前两个是系统的监控,能力强大,监控方面也多。第一个需要注册,我就放弃了;第二个搞了半天,愣是跑不起来。第三个看起来还行。这里就说一下第三个的用法。

package akka.contrib.mailbox;

import com.typesafe.config.Config;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import scala.Option;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import akka.dispatch.UnboundedMailbox;
import akka.event.Logging;
import akka.event.LoggingAdapter;

/**
 * Logs the mailbox size when exceeding the configured limit. It logs at most
 * once per second when the messages are enqueued or dequeued.
 *
 * Configuration:
 *
 * <pre>
 * akka.actor.default-mailbox {
 *   mailbox-type = akka.contrib.mailbox.LoggingMailboxType
 *   size-limit = 20
 * }
 * </pre>
 */
public class LoggingMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> {
  private final Config config;

  public LoggingMailboxType(ActorSystem.Settings settings, Config config) {
    this.config = config;
  }

  @Override
  public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
    if (owner.isEmpty() || system.isEmpty())
      throw new IllegalArgumentException("no mailbox owner or system given");
    int sizeLimit = config.getInt("size-limit");
    return new LoggingMailbox(owner.get(), system.get(), sizeLimit);
  }

  static class LoggingMailbox implements MessageQueue {

    private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();

    private final int sizeLimit;
    private final LoggingAdapter log;
    private final long interval = 1000000000L; // 1 s, in nanoseconds
    private final String path;
    volatile private long logTime = System.nanoTime();
    private final AtomicInteger queueSize = new AtomicInteger();
    private final AtomicInteger dequeueCount = new AtomicInteger();

    LoggingMailbox(ActorRef owner, ActorSystem system, int sizeLimit) {
      this.path = owner.path().toString();
      this.sizeLimit = sizeLimit;
      this.log = Logging.getLogger(system, LoggingMailbox.class);
    }

    @Override
    public Envelope dequeue() {
      Envelope x = queue.poll();
      if (x != null) {
        int size = queueSize.decrementAndGet();
        dequeueCount.incrementAndGet();
        logSize(size);
      }
      return x;
    }

    @Override
    public void enqueue(ActorRef receiver, Envelope handle) {
      queue.offer(handle);
      int size = queueSize.incrementAndGet();
      logSize(size);
    }

    private void logSize(int size) {
      if (size >= sizeLimit) {
        long now = System.nanoTime();
        if (now - logTime > interval) {
          double msgPerSecond = ((double) dequeueCount.get()) / (((double) (now - logTime)) / 1000000000L);
          logTime = now;
          dequeueCount.set(0);
          log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size,
              String.format("%2.2f", msgPerSecond));
        }
      }
    }

    @Override
    public int numberOfMessages() {
      return queueSize.get();
    }

    @Override
    public boolean hasMessages() {
      return !queue.isEmpty();
    }

    @Override
    public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
      for (Envelope handle : queue) {
        deadLetters.enqueue(owner, handle);
      }
    }
  }
}

使用步骤

1. 复制文件

把整个文件复制到项目的合适包中,假设包名是a.b.c。

2. 创建application.conf

在主模块的src/main/resources下面创建文件application.conf,内容:

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "INFO"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  log-config-on-start = false
  logger-startup-timeout = 20s
  actor {
    default-mailbox {
      mailbox-type = a.b.c.LoggingMailboxType
      size-limit = 10
    }
    debug {
      lifecycle = false
    }
  }
  log-dead-letters = 10
  log-dead-letters-during-shutdown = on
}

这里配置了日志类型、邮箱类型(使用a.b.c.LoggingMailboxType)等。你也可以使用最简单的配置:

akka.actor.default-mailbox {
    mailbox-type = a.b.c.LoggingMailboxType
    size-limit = 10
}

配置中的size-limit会在代码中获取到,当大小超过它的时候会打印日志。

修复父类

如果到上面一步就开动项目,我这边一直报日志系统连接超时。这个错误让我十分纳闷:日志有什么超时的?后来引入了包

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-slf4j_2.12</artifactId>
    <version>2.5.26</version>
</dependency>

启动才得到报错:需要使用UnboundedMessageQueueSemantics的实现类才行。

于是将上面代码中的

  static class LoggingMailbox implements MessageQueue {

改成

  static class LoggingMailbox extends UnboundedMailbox.MessageQueue {

才启动成功。

暂且这样吧!

Written on April 18, 2020
分类: dev, 标签: java akka
如果你喜欢,请赞赏! davelet