Netty源码(七)内存泄露检测

前言

使用池化资源时,如果不归还资源,会导致资源泄露,如数据库连接。对于Netty内存池,如果使用内存块后不归还则会导致内存泄露。Netty为了方便用户排查内存泄露问题,提供了泄露检测服务。本章从几个角度学习一下:

  • 如何使用Netty泄露检测
  • Netty泄露检测等级如何配置更合理
  • Netty泄露检测如何实现

一、泄露检测使用

public class MyLeakTest {
    public static final int _1MB = 1024 * 1024;
    public static final int _17MB = 17 * 1024 * 1024;
    @Before
    public void init() {
        ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            long memory = PlatformDependent.usedDirectMemory()/ _1MB;
            System.out.println("使用直接内存:" + memory + "MB");
        }, 0, 1, TimeUnit.SECONDS);
    }

    @Test
    public void testLeak01() throws InterruptedException {
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
        for (int i = 0; i < 3; i++) {
            allocate17();
            System.gc();
            Thread.sleep(1000);
        }
    }

    private void allocate17() {
        System.out.println("开始分配");
        PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
        allocator.newDirectBuffer(_17MB, Integer.MAX_VALUE)/*.release()*/;
        System.out.println("成功分配");
    }
}
复制代码

init方法启动一个线程负责打印当前使用的直接内存。testLeak01方法模拟内存泄露。

testLeak01方法首先通过ResourceLeakDetector.setLevel方法设置Netty泄露检测的级别为PARANOID,这一步也可以通过**-Dio.netty.leakDetection.level**=paranoid设置。然后循环分配17MB内存,不调用ByteBuf的release方法,调用System.gc通知进行垃圾回收。

控制台输出:

开始分配
成功分配
使用直接内存:17MB
开始分配
14:12:51.719 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:372)
	io.netty.buffer.MyLeakTest.allocate17(MyLeakTest.java:65)
	io.netty.buffer.MyLeakTest.testLeak01(MyLeakTest.java:40)
	sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
成功分配
使用直接内存:34MB
开始分配
成功分配
使用直接内存:51MB
复制代码

如果我们使用SIMPLE级别或ADVANCED级别,则需要经过多次内存分配后,才会打印出这个异常日志。

@Test
public void testLeak03() throws InterruptedException {
    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
    for (int i = 0; i < 300; i++) {
        allocate1KB(); // 分配1KB
        System.gc();
        Thread.sleep(100);
    }
}
private void allocate1KB(int count) {
    System.out.println("开始分配,第" + count + "次");
    PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
    allocator.newDirectBuffer(1024, Integer.MAX_VALUE)/*.release()*/;
    System.out.println("成功分配,第" + count + "次");
}
复制代码

控制台输出:

成功分配,第243次
使用直接内存:16MB
开始分配,第244次
成功分配,第244次
// ...
开始分配,第251次
成功分配,第251次
开始分配,第252次
14:21:33.063 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:372)
	io.netty.buffer.MyLeakTest.allocate1KB(MyLeakTest.java:75)
	io.netty.buffer.MyLeakTest.testLeak03(MyLeakTest.java:66)
	sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	
成功分配,第252次
使用直接内存:16MB
开始分配,第253次
复制代码

为什么这里直接内存一直是16MB?可以回忆一下Netty内存分配的过程。Netty一次性向系统申请的内存大小就是16MB,后续分配都是走Chunk分配或Subpage分配,不会再向系统申请。

二、泄露检测等级

泄露检测等级对应枚举类ResourceLeakDetector.Level。

public enum Level {
    /**
    * Disables resource leak detection.
    */
    DISABLED,
    /**
    * Enables simplistic sampling resource leak detection which reports there is a leak or not,
    * at the cost of small overhead (default).
    */
    SIMPLE,
    /**
    * Enables advanced sampling resource leak detection which reports where the leaked object was accessed
    * recently at the cost of high overhead.
    */
    ADVANCED,
    /**
    * Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
    * at the cost of the highest possible overhead (for testing purposes only).
    */
    PARANOID;
}
复制代码

不同泄露检测等级,对应的泄露检测行为不同。

等级 开启检测几率 对应ByteBuf实现类
DISABLED 0%
SIMPLE 默认1/128 SimpleLeakAwareByteBuf
ADVANCED 默认1/128 AdvancedLeakAwareByteBuf
PARANOID 100% AdvancedLeakAwareByteBuf

默认情况下,泄露检测等级是SIMPLE,SIMPLE和ADVANCED的采样率是1/128,对于单个泄露地方的Record记录最大值是4。

private static final Level DEFAULT_LEVEL = Level.SIMPLE;
private static final int DEFAULT_TARGET_RECORDS = 4;
private static final int DEFAULT_SAMPLING_INTERVAL = 128;
private static Level level;
static {
    final boolean disabled;
    if (SystemPropertyUtil.get("io.netty.noResourceLeakDetection") != null) {
        disabled = SystemPropertyUtil.getBoolean("io.netty.noResourceLeakDetection", false);
    } else {
        disabled = false;
    }
    // io.netty.noResourceLeakDetection默认为false,默认等级是SIMPLE
    Level defaultLevel = disabled? Level.DISABLED : DEFAULT_LEVEL;

    // First read old property name
    String levelStr = SystemPropertyUtil.get(PROP_LEVEL_OLD, defaultLevel.name());

    // If new property name is present, use it
    levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr);
    Level level = Level.parseLevel(levelStr);
    // 对于一个地方的泄露,最多记录几个RECORD,默认 io.netty.leakDetection.targetRecords = 4
    TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
    // 泄露检测采样率 默认 io.netty.leakDetection.samplingInterval = 128
    SAMPLING_INTERVAL = SystemPropertyUtil.getInt(PROP_SAMPLING_INTERVAL, DEFAULT_SAMPLING_INTERVAL);
    ResourceLeakDetector.level = level;
}
复制代码

泄露检测实现类都是WrappedByteBuf的子类,WrappedByteBuf包装了一个ByteBuf,所有ByteBuf的接口都委托里面这个ByteBuf实现。对于SIMPLE等级,选择SimpleLeakAwareByteBuf包装泄露检测,对于SIMPLE以上等级,选择AdvancedLeakAwareByteBuf包装泄露检测。

具体逻辑见PooledByteBufAllocator#newDirectBuffer、PooledByteBufAllocator#newHeapBuffer、UnpooledByteBufAllocator#newDirectBuffer等入口。这里以PooledByteBufAllocator#newDirectBuffer为例。

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    // 1. 获得当前线程缓存 和 线程缓存对应的PoolArena
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;
    final ByteBuf buf;
    if (directArena != null) {
        // 2. 选择池化
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        // 选择非池化
        buf = PlatformDependent.hasUnsafe() ?
            UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    // 3. 如果配置了内存泄露检测,将ByteBuf包装一下
    return toLeakAwareBuffer(buf);
}
复制代码

newDirectBuffer在最后一步,将PooledByteBuf包装为LeakAware的ByteBuf。根据ResourceLeakDetector类的level字段,确定实例化哪个LeakAwareByteBuf,同时构造时传入了一个ResourceLeakTracker实例。

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
    ResourceLeakTracker<ByteBuf> leak;
    switch (ResourceLeakDetector.getLevel()) {
        case SIMPLE:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new SimpleLeakAwareByteBuf(buf, leak);
            }
            break;
        case ADVANCED:
        case PARANOID:
            leak = AbstractByteBuf.leakDetector.track(buf);
            if (leak != null) {
                buf = new AdvancedLeakAwareByteBuf(buf, leak);
            }
            break;
        default:
            break;
    }
    return buf;
}
复制代码

注意ResourceLeakTracker实例是由AbstractByteBuf类变量leakDetector的track方法创建的。

public abstract class AbstractByteBuf extends ByteBuf {
    static final ResourceLeakDetector<ByteBuf> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
}
复制代码

SimpleLeakAwareByteBuf简单泄露检测,不记录内存泄露的额外信息。特点是只有release方法调用时,才会使用ResourceLeakTracker的close方法关闭泄露检测

class SimpleLeakAwareByteBuf extends WrappedByteBuf {
	// 参与内存泄露检测的ByteBuf
    private final ByteBuf trackedByteBuf;
    final ResourceLeakTracker<ByteBuf> leak;

    SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped);
        this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
        this.leak = ObjectUtil.checkNotNull(leak, "leak");
    }
    @Override
    public boolean release() {
        if (super.release()) {
            closeLeak();
            return true;
        }
        return false;
    }

    @Override
    public boolean release(int decrement) {
        if (super.release(decrement)) {
            closeLeak();
            return true;
        }
        return false;
    }
    // 当正确调用release方法后,关闭泄露检测
    private void closeLeak() {
        boolean closed = leak.close(trackedByteBuf);
        assert closed;
    }
}
复制代码

AdvancedLeakAwareByteBuf继承SimpleLeakAwareByteBuf,在执行ByteBuf的API时会记录调用记录Record信息,方便后续排查。ResourceLeakTracker的record方法之后再说。

final class AdvancedLeakAwareByteBuf extends SimpleLeakAwareByteBuf {

    private static final String PROP_ACQUIRE_AND_RELEASE_ONLY = "io.netty.leakDetection.acquireAndReleaseOnly";
    // 是否只有获取和释放的时候记录record,默认false
    private static final boolean ACQUIRE_AND_RELEASE_ONLY;

    static {
        ACQUIRE_AND_RELEASE_ONLY = SystemPropertyUtil.getBoolean(PROP_ACQUIRE_AND_RELEASE_ONLY, false);
    }

    AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker<ByteBuf> leak) {
        super(buf, leak);
    }

    AdvancedLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped, trackedByteBuf, leak);
    }
	// ACQUIRE_AND_RELEASE_ONLY 为 false 才记录泄露Record
    static void recordLeakNonRefCountingOperation(ResourceLeakTracker<ByteBuf> leak) {
        if (!ACQUIRE_AND_RELEASE_ONLY) {
            leak.record();
        }
    }
    // read & write 都要判断ACQUIRE_AND_RELEASE_ONLY为false 才执行leak.record
    @Override
    public byte getByte(int index) {
        recordLeakNonRefCountingOperation(leak);
        return super.getByte(index);
    }
    
    @Override
    public ByteBuf writeByte(int value) {
        recordLeakNonRefCountingOperation(leak);
        return super.writeByte(value);
    }

    // retain(增加引用计数)/release(减少引用计数)/touch(专门用于记录一次record)
    // 直接执行leak.record
    @Override
    public ByteBuf retain() {
        leak.record();
        return super.retain();
    }

    @Override
    public boolean release() {
        leak.record();
        return super.release();
    }

    @Override
    public ByteBuf touch() {
        leak.record();
        return this;
    }
}
复制代码

三、Record

Record代表一个记录,链表数据结构,是ResourceLeakDetector的内部类。

private static final class Record extends Throwable {
    // Record链表的尾节点
    private static final Record BOTTOM = new Record();
    // 外部传入的hint.toString,为了记录内存泄露的额外信息
    private final String hintString;
    // 指向下一个Record
    private final Record next;
    // 当前Record所处链表位置0-n,只有尾节点BOTTOM是-1
    private final int pos;

    Record(Record next, Object hint) {
        hintString = hint.toString();
        this.next = next;
        this.pos = next.pos + 1;
    }

    Record(Record next) {
        hintString = null;
        this.next = next;
        this.pos = next.pos + 1;
    }

    private Record() {
        hintString = null;
        next = null;
        pos = -1;
    }
}
复制代码

Record继承Throwable,省略部分代码,可以看到这里继承Throwable就是为了获取调用栈(getStackTrace)。

@Override
public String toString() {
    StringBuilder buf = new StringBuilder(2048);
    if (hintString != null) {
        buf.append("\tHint: ").append(hintString).append(NEWLINE);
    }
    // 继承Throwable,可以打印StackTrace
    StackTraceElement[] array = getStackTrace();
    // 跳过前3个栈
    for (int i = 3; i < array.length; i++) {
        StackTraceElement element = array[i];
        buf.append('\t');
        buf.append(element.toString());
        buf.append(NEWLINE);
    }
    return buf.toString();
}
复制代码

通过一个单元测试,看看Record的作用是什么。

@Test
public void testRecord() throws InterruptedException {
    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
    PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
    // Created at
    ByteBuf byteBuf = allocator.newDirectBuffer(1024, Integer.MAX_VALUE); 
    // #2
    byteBuf.touch();
    // #1
    ByteBuf slice = byteBuf.slice();
    // 释放强引用,通知gc
    slice = null;
    byteBuf = null;
    System.gc();
    Thread.sleep(1000);
    // 为了触发泄露检测日志打印
    byteBuf = allocator.newDirectBuffer(1024, Integer.MAX_VALUE);
}
复制代码

控制台输出:

使用直接内存:0MB
使用直接内存:16MB
14:18:34.888 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AdvancedLeakAwareByteBuf.slice(AdvancedLeakAwareByteBuf.java:76)
	io.netty.buffer.MyLeakTest.testRecord(MyLeakTest.java:80)
	sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	
#2:
	io.netty.buffer.MyLeakTest.testRecord(MyLeakTest.java:79)
	sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:372)
	io.netty.buffer.MyLeakTest.testRecord(MyLeakTest.java:78)
	sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	
复制代码

可以看到输出的records包含3段

  • #1:slice方法调用的堆栈信息
  • #2:touch方法调用的堆栈信息
  • Created at:Bytebuf刚创建的堆栈信息

四、DefaultResourceLeak

ResourceLeakTracker接口暴露给客户端使用,一方面提供记录Record,另一方面支持关闭针对某个对象的泄露检测。

public interface ResourceLeakTracker<T>  {
    // 记录一个Record
    void record();
    // 记录一个Record,附带一些hint信息帮助排查
    void record(Object hint);
    // 关闭trackedObject的内存泄露检测,往往是个ByteBuf实例
    boolean close(T trackedObject);
}
复制代码

ResourceLeakTracker默认实现是DefaultResourceLeak

private static final class DefaultResourceLeak<T>
            extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
		// 针对head指针的原子更新器
        private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater =
                (AtomicReferenceFieldUpdater)
                        AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head");
        // 针对droppedRecords丢弃Record数量的原子更新器
        private static final AtomicIntegerFieldUpdater<DefaultResourceLeak<?>> droppedRecordsUpdater =
                (AtomicIntegerFieldUpdater)
                        AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class, "droppedRecords");

		// 针对T泄露检测的Record链表
        private volatile Record head;
        // 丢弃Record数量
        private volatile int droppedRecords;
		// 包含所有DefaultResourceLeak实例,由外部传入
        private final Set<DefaultResourceLeak<?>> allLeaks;
        // T实例的哈希值
        private final int trackedHash;

        DefaultResourceLeak(Object referent,
                ReferenceQueue<Object> refQueue,
                Set<DefaultResourceLeak<?>> allLeaks) {
            // 调用WeakReference的构造方法,传入引用对象和引用队列
            super(referent, refQueue);
            // 计算引用对象的哈希值
            trackedHash = System.identityHashCode(referent);
            // 把当前实例加入allLeaks
            allLeaks.add(this);
            // Record链表的头节点设置为Record.BOTTOM
            headUpdater.set(this, new Record(Record.BOTTOM));
            this.allLeaks = allLeaks;
        }
}
复制代码

ResourceLeak接口已经废弃了,可以忽略。关键点是DefaultResourceLeak继承了WeakReference,将外部传入的引用对象和引用队列通过WeakReference构造传入,这个引用对象对于Netty内存泄露检测来说就是监测的ByteBuf实例。

1、record0

record0方法负责记录一个Record。实现ResourceLeakTracker接口的record()和record(T)抽象方法。

private void record0(Object hint) {
    if (TARGET_RECORDS > 0) {
        Record oldHead;
        Record prevHead;
        Record newHead;
        boolean dropped;
        do {
            // 如果头节点为空,表示close方法已经被调用,资源已经释放,不需要继续
            if ((prevHead = oldHead = headUpdater.get(this)) == null) {
                return;
            }
            // 当前记录数
            final int numElements = oldHead.pos + 1;
            // 如果当前记录数大于io.netty.leakDetection.targetRecords(默认4),可能丢弃链表头部的一个Record
            if (numElements >= TARGET_RECORDS) {
                // 丢弃几率 = 1 / 2 ^ (numElements - targetRecords)
                final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30);
                if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << backOffFactor) != 0) {
                    prevHead = oldHead.next;
                }
            } else {
                dropped = false;
            }
            // cas设置链表头节点为新创建的Record
            newHead = hint != null ? new Record(prevHead, hint) : new Record(prevHead);
        } while (!headUpdater.compareAndSet(this, oldHead, newHead));
        if (dropped) {
            droppedRecordsUpdater.incrementAndGet(this);
        }
    }
}
复制代码

record0方法主要就是构造一个新的Record记录,用CAS头插进Record链表。值得注意的是,当超出4个Record之后,每次新插入Record都可能导致链表头节点被丢弃。

2、close

close方法负责关闭对于当前引用对象的泄露检测工作。实现ResourceLeakTracker接口的close(T)抽象方法。

 private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater =
                (AtomicReferenceFieldUpdater)
                        AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head");
private final Set<DefaultResourceLeak<?>> allLeaks;
private final int trackedHash;

@Override
public boolean close() {
    // 从Set中移除当前DefaultResourceLeak实例
    if (allLeaks.remove(this)) {
        // 调用WeakReference.clear方法移除referent
        clear();
        // 设置Record链表的头节点为空
        headUpdater.set(this, null);
        return true;
    }
    return false;
}

@Override
public boolean close(T trackedObject) {
    // 确保传入实例和监控实例是一个实例
    assert trackedHash == System.identityHashCode(trackedObject);
    try {
        return close();
    } finally {
        reachabilityFence0(trackedObject);
    }
}
复制代码

close方法做了三件事情:

  • 从allLeaks中移除当前实例
  • 调用父类WeakReference的clear方法,设置引用对象为空
  • 设置Record的头节点为空

看一下reachabilityFence0这个方法,这个方法是为了防止当前DefaultResourceLeak实例被提前回收。因为JDK9之前可能会导致提前判定对象不可达,导致提前回收,这里通过finally+synchronized保证对象不会被提前回收,JDK9提供了Reference.reachabilityFence方法解决这个问题。

/**
* Recent versions of the JDK have a nasty habit of prematurely deciding objects are unreachable.
* see: https://stackoverflow.com/questions/26642153/finalize-called-on-strongly-reachable-object-in-java-8
* The Java 9 method Reference.reachabilityFence offers a solution to this problem.
*/
private static void reachabilityFence0(Object ref) {
    if (ref != null) {
        synchronized (ref) {
            // Empty synchronized is ok: https://stackoverflow.com/a/31933260/1151521
        }
    }
}
复制代码

3、dispose

dipose方法是非public方法,给Netty自己用的,目的就是判断DefaultResourceLeak的close方法有没有被客户端调用过,如果调用过代表监控的资源被正常回收(ByteBuf的release方法被正常调用),allLeaks集合中应该不存在当前DefaultResourceLeak实例。

 /**
 * @return true 没有执行过close,即存在内存泄露
 *         false 执行过close,即对象被正常关闭,不存在内存泄露
 */
 boolean dispose() {
 	clear();
	return allLeaks.remove(this);
 }
复制代码

至于为什么这里用一个Set集合来判断是否DefaultResourceLeak实例被正常调用close,而不是通过一个布尔值来判断,是因为要保证DefaultResourceLeak被强引用,不会被GC,这个后面再看。

五、ResourceLeakDetector

ResourceLeakDetector负责管理针对某类实例(泛型T)的泄露检测。

public class ResourceLeakDetector<T> {
    // 所有由当前Detector管理的泄露检测任务的集合
    private final Set<DefaultResourceLeak<?>> allLeaks =
            Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());
    // 引用队列,当DefaultResourceLeak包裹的检测对象被回收时,这里可以收到DefaultResourceLeak实例
    private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
    // 已经做过内存泄露报告输出的报告String集合
    private final Set<String> reportedLeaks =
            Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());

    // io.netty.leakDetection.samplingInterval 默认 128
    private final int samplingInterval;
}
复制代码

ResourceLeakDetector主要要做下面三件事情:

  • 建立检测对象与ResourceLeakTracker的关系,开启泄露检测
  • 检测泄露
  • 泄露报告

这三件事情都在track方法中执行了,对于Netty内存泄露检测而言,就是创建ByteBuf的时候把这三件事情全做了

这也是为什么第三节关于Record的单元测试最后一行要分配一次内存,因为只有分配ByteBuf时,会调用toLeakAwareBuffer方法,toLeakAwareBuffer方法会调用track方法,这才能做一次泄露检测,打印泄露报告

private DefaultResourceLeak track0(T obj) {
    Level level = ResourceLeakDetector.level;
    if (level == Level.DISABLED) {
        return null;
    }
    // 如果level小于PARANOID,不一定会返回DefaultResourceLeak
    // 取决于随机PlatformDependent.threadLocalRandom().nextInt(samplingInterval)
    if (level.ordinal() < Level.PARANOID.ordinal()) {
        // 1/128的几率 做一次泄露检测 并 返回DefaultResourceLeak
        if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
            reportLeak();
            return new DefaultResourceLeak(obj, refQueue, allLeaks);
        }
        return null;
    }
    // level 等于 PARANOID 立即做一次泄露检测 并返回DefaultResourceLeak
    reportLeak();
    return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
复制代码

track方法根据检测等级的不同,行为稍有不同。区别是小于PARANOID级别的情况下,不一定会同步做一次泄露检测+泄露报告+开启obj的泄露检测

track方法里,reportLeak方法 = 泄露检测+泄露报告;new DefaultResourceLeak = 开启针对obj的泄露检测。DefaultResourceLeak的构造方法在第四节看过了,这里重点看reportLeak方法。

private void reportLeak() {
    // 是否开启泄露检测(目前是通过日志级别是否允许error控制的)
    if (!needReport()) {
        // 清空refQueue
        clearRefQueue();
        return;
    }

    for (;;) {
        // 从ReferenceQueue获取DefaultResourceLeak
        // DefaultResourceLeak作为虚引用包裹的obj(ByteBuf实例)此时已经被回收
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
        if (ref == null) {
            break;
        }
        // 判断是否发生泄露
        if (!ref.dispose()) {
            continue;
        }
        // 打印泄露日志
        String records = ref.toString();
        // 如果records已经输出过了,不会再次输出
        if (reportedLeaks.add(records)) {
            if (records.isEmpty()) {
                reportUntracedLeak(resourceType);
            } else {
                reportTracedLeak(resourceType, records);
            }
        }
    }
}
复制代码

首先reportLeak方法由日志级别控制,如果日志级别小于Error是不会做泄露检测的。

protected boolean needReport() {
    return logger.isErrorEnabled();
}
复制代码

接下来循环拉取引用队列中的DefaultResourceLeak虚引用实例。

如果获取到DefaultResourceLeak,代表DefaultResourceLeak实例监控的引用实例已经被垃圾回收,通过dispose方法判断监控引用实例是否正常调用过close关闭方法

如果发生泄露,判断DefaultResourceLeak的toString方法输出是否在reportedLeaks集合内,如果存在表示已经输出过本次泄露了,不会再次输出泄露报告,否则输出泄露报告。

六、疑问

为什么要用一个allLeaks的Set通过remove方法来判断正常释放资源,而不用一个boolean判断?

private final Set<DefaultResourceLeak<?>> allLeaks;
private final int trackedHash;

DefaultResourceLeak(
    Object referent,
    ReferenceQueue<Object> refQueue,
    Set<DefaultResourceLeak<?>> allLeaks) {
    super(referent, refQueue);
    trackedHash = System.identityHashCode(referent);
    // 注意,这个allLeaks是外部传入的
    // 就是ResourceLeakDetector的allLeaks集合
    allLeaks.add(this);
    this.allLeaks = allLeaks;
}
/**
* Netty调用这个方法用于判断检测对象实例是否正常调用过close方法。
* @return true 没有执行过close,即存在内存泄露
*         false 执行过close,即对象被正常关闭,不存在内存泄露
*/
boolean dispose() {
    clear();
    return allLeaks.remove(this);
}

/**
* 暴露给客户端调用,当监测对象正常释放时被调用
*/
@Override
public boolean close() {
    // 从Set中移除当前DefaultResourceLeak实例
    if (allLeaks.remove(this)) {
        // Call clear so the reference is not even enqueued.
        // 调用WeakReference.clear方法移除referent
        clear();
        // 设置Record链表的头节点为空
        headUpdater.set(this, null);
        return true;
    }
    return false;
}
复制代码

如果这里不通过Set强引用DefaultResourceLeak实例,当ByteBuf不存在强引用时,DefaultResourceLeak会被直接GC

如SimpleLeakAwareByteBuf持有DefaultResourceLeak实例,当SimpleLeakAwareByteBuf不被强引用,这里DefaultResourceLeak实例也会被回收,就无法检测内存泄露了。

class SimpleLeakAwareByteBuf extends WrappedByteBuf {
	// 被检测的ByteBuf
    private final ByteBuf trackedByteBuf;
    // DefaultResourceLeak实例
    final ResourceLeakTracker<ByteBuf> leak;

    SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
        super(wrapped);
        this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
        this.leak = ObjectUtil.checkNotNull(leak, "leak");
    }
}
复制代码

总结

  • 如何使用Netty泄露检测?

    • 编码: ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
    • 配置:-Dio.netty.leakDetection.level=paranoid
  • Netty泄露检测等级如何配置更合理?

    不同泄露检测等级,对应的泄露检测行为不同。

    等级 开启检测几率 对应ByteBuf实现类
    DISABLED 0%
    SIMPLE 默认1/128 SimpleLeakAwareByteBuf
    ADVANCED 默认1/128 AdvancedLeakAwareByteBuf
    PARANOID 100% AdvancedLeakAwareByteBuf

    默认情况下,泄露检测等级是SIMPLE,SIMPLE和ADVANCED的采样率是1/128,对于单个泄露地方的Record记录最大值是4。

  • Netty泄露检测如何实现?

    • 当创建ByteBuf时,会开启泄露检测;当ByteBuf引用计数为0,执行release方法时,会关闭泄露检测。
    • 只有当创建ByteBuf时,会触发泄露检测,打印泄露报告。
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享