前言
使用池化资源时,如果不归还资源,会导致资源泄露,如数据库连接。对于Netty内存池,如果使用内存块后不归还则会导致内存泄露。Netty为了方便用户排查内存泄露问题,提供了泄露检测服务。本章从几个角度学习一下:
- 如何使用Netty泄露检测
- Netty泄露检测等级如何配置更合理
- Netty泄露检测如何实现
一、泄露检测使用
public class MyLeakTest {
public static final int _1MB = 1024 * 1024;
public static final int _17MB = 17 * 1024 * 1024;
@Before
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
long memory = PlatformDependent.usedDirectMemory()/ _1MB;
System.out.println("使用直接内存:" + memory + "MB");
}, 0, 1, TimeUnit.SECONDS);
}
@Test
public void testLeak01() throws InterruptedException {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
for (int i = 0; i < 3; i++) {
allocate17();
System.gc();
Thread.sleep(1000);
}
}
private void allocate17() {
System.out.println("开始分配");
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
allocator.newDirectBuffer(_17MB, Integer.MAX_VALUE)/*.release()*/;
System.out.println("成功分配");
}
}
复制代码
init方法启动一个线程负责打印当前使用的直接内存。testLeak01方法模拟内存泄露。
testLeak01方法首先通过ResourceLeakDetector.setLevel方法设置Netty泄露检测的级别为PARANOID,这一步也可以通过**-Dio.netty.leakDetection.level**=paranoid设置。然后循环分配17MB内存,不调用ByteBuf的release方法,调用System.gc通知进行垃圾回收。
控制台输出:
开始分配
成功分配
使用直接内存:17MB
开始分配
14:12:51.719 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:372)
io.netty.buffer.MyLeakTest.allocate17(MyLeakTest.java:65)
io.netty.buffer.MyLeakTest.testLeak01(MyLeakTest.java:40)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
成功分配
使用直接内存:34MB
开始分配
成功分配
使用直接内存:51MB
复制代码
如果我们使用SIMPLE级别或ADVANCED级别,则需要经过多次内存分配后,才会打印出这个异常日志。
@Test
public void testLeak03() throws InterruptedException {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
for (int i = 0; i < 300; i++) {
allocate1KB(); // 分配1KB
System.gc();
Thread.sleep(100);
}
}
private void allocate1KB(int count) {
System.out.println("开始分配,第" + count + "次");
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
allocator.newDirectBuffer(1024, Integer.MAX_VALUE)/*.release()*/;
System.out.println("成功分配,第" + count + "次");
}
复制代码
控制台输出:
成功分配,第243次
使用直接内存:16MB
开始分配,第244次
成功分配,第244次
// ...
开始分配,第251次
成功分配,第251次
开始分配,第252次
14:21:33.063 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:372)
io.netty.buffer.MyLeakTest.allocate1KB(MyLeakTest.java:75)
io.netty.buffer.MyLeakTest.testLeak03(MyLeakTest.java:66)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
成功分配,第252次
使用直接内存:16MB
开始分配,第253次
复制代码
为什么这里直接内存一直是16MB?可以回忆一下Netty内存分配的过程。Netty一次性向系统申请的内存大小就是16MB,后续分配都是走Chunk分配或Subpage分配,不会再向系统申请。
二、泄露检测等级
泄露检测等级对应枚举类ResourceLeakDetector.Level。
public enum Level {
/**
* Disables resource leak detection.
*/
DISABLED,
/**
* Enables simplistic sampling resource leak detection which reports there is a leak or not,
* at the cost of small overhead (default).
*/
SIMPLE,
/**
* Enables advanced sampling resource leak detection which reports where the leaked object was accessed
* recently at the cost of high overhead.
*/
ADVANCED,
/**
* Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
* at the cost of the highest possible overhead (for testing purposes only).
*/
PARANOID;
}
复制代码
不同泄露检测等级,对应的泄露检测行为不同。
等级 | 开启检测几率 | 对应ByteBuf实现类 |
---|---|---|
DISABLED | 0% | – |
SIMPLE | 默认1/128 | SimpleLeakAwareByteBuf |
ADVANCED | 默认1/128 | AdvancedLeakAwareByteBuf |
PARANOID | 100% | AdvancedLeakAwareByteBuf |
默认情况下,泄露检测等级是SIMPLE,SIMPLE和ADVANCED的采样率是1/128,对于单个泄露地方的Record记录最大值是4。
private static final Level DEFAULT_LEVEL = Level.SIMPLE;
private static final int DEFAULT_TARGET_RECORDS = 4;
private static final int DEFAULT_SAMPLING_INTERVAL = 128;
private static Level level;
static {
final boolean disabled;
if (SystemPropertyUtil.get("io.netty.noResourceLeakDetection") != null) {
disabled = SystemPropertyUtil.getBoolean("io.netty.noResourceLeakDetection", false);
} else {
disabled = false;
}
// io.netty.noResourceLeakDetection默认为false,默认等级是SIMPLE
Level defaultLevel = disabled? Level.DISABLED : DEFAULT_LEVEL;
// First read old property name
String levelStr = SystemPropertyUtil.get(PROP_LEVEL_OLD, defaultLevel.name());
// If new property name is present, use it
levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr);
Level level = Level.parseLevel(levelStr);
// 对于一个地方的泄露,最多记录几个RECORD,默认 io.netty.leakDetection.targetRecords = 4
TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
// 泄露检测采样率 默认 io.netty.leakDetection.samplingInterval = 128
SAMPLING_INTERVAL = SystemPropertyUtil.getInt(PROP_SAMPLING_INTERVAL, DEFAULT_SAMPLING_INTERVAL);
ResourceLeakDetector.level = level;
}
复制代码
泄露检测实现类都是WrappedByteBuf的子类,WrappedByteBuf包装了一个ByteBuf,所有ByteBuf的接口都委托里面这个ByteBuf实现。对于SIMPLE等级,选择SimpleLeakAwareByteBuf包装泄露检测,对于SIMPLE以上等级,选择AdvancedLeakAwareByteBuf包装泄露检测。
具体逻辑见PooledByteBufAllocator#newDirectBuffer、PooledByteBufAllocator#newHeapBuffer、UnpooledByteBufAllocator#newDirectBuffer等入口。这里以PooledByteBufAllocator#newDirectBuffer为例。
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 1. 获得当前线程缓存 和 线程缓存对应的PoolArena
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
// 2. 选择池化
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
// 选择非池化
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// 3. 如果配置了内存泄露检测,将ByteBuf包装一下
return toLeakAwareBuffer(buf);
}
复制代码
newDirectBuffer在最后一步,将PooledByteBuf包装为LeakAware的ByteBuf。根据ResourceLeakDetector类的level字段,确定实例化哪个LeakAwareByteBuf,同时构造时传入了一个ResourceLeakTracker实例。
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
ResourceLeakTracker<ByteBuf> leak;
switch (ResourceLeakDetector.getLevel()) {
case SIMPLE:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new SimpleLeakAwareByteBuf(buf, leak);
}
break;
case ADVANCED:
case PARANOID:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new AdvancedLeakAwareByteBuf(buf, leak);
}
break;
default:
break;
}
return buf;
}
复制代码
注意ResourceLeakTracker实例是由AbstractByteBuf类变量leakDetector的track方法创建的。
public abstract class AbstractByteBuf extends ByteBuf {
static final ResourceLeakDetector<ByteBuf> leakDetector = ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
}
复制代码
SimpleLeakAwareByteBuf简单泄露检测,不记录内存泄露的额外信息。特点是只有release方法调用时,才会使用ResourceLeakTracker的close方法关闭泄露检测。
class SimpleLeakAwareByteBuf extends WrappedByteBuf {
// 参与内存泄露检测的ByteBuf
private final ByteBuf trackedByteBuf;
final ResourceLeakTracker<ByteBuf> leak;
SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
super(wrapped);
this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
this.leak = ObjectUtil.checkNotNull(leak, "leak");
}
@Override
public boolean release() {
if (super.release()) {
closeLeak();
return true;
}
return false;
}
@Override
public boolean release(int decrement) {
if (super.release(decrement)) {
closeLeak();
return true;
}
return false;
}
// 当正确调用release方法后,关闭泄露检测
private void closeLeak() {
boolean closed = leak.close(trackedByteBuf);
assert closed;
}
}
复制代码
AdvancedLeakAwareByteBuf继承SimpleLeakAwareByteBuf,在执行ByteBuf的API时会记录调用记录Record信息,方便后续排查。ResourceLeakTracker的record方法之后再说。
final class AdvancedLeakAwareByteBuf extends SimpleLeakAwareByteBuf {
private static final String PROP_ACQUIRE_AND_RELEASE_ONLY = "io.netty.leakDetection.acquireAndReleaseOnly";
// 是否只有获取和释放的时候记录record,默认false
private static final boolean ACQUIRE_AND_RELEASE_ONLY;
static {
ACQUIRE_AND_RELEASE_ONLY = SystemPropertyUtil.getBoolean(PROP_ACQUIRE_AND_RELEASE_ONLY, false);
}
AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker<ByteBuf> leak) {
super(buf, leak);
}
AdvancedLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
super(wrapped, trackedByteBuf, leak);
}
// ACQUIRE_AND_RELEASE_ONLY 为 false 才记录泄露Record
static void recordLeakNonRefCountingOperation(ResourceLeakTracker<ByteBuf> leak) {
if (!ACQUIRE_AND_RELEASE_ONLY) {
leak.record();
}
}
// read & write 都要判断ACQUIRE_AND_RELEASE_ONLY为false 才执行leak.record
@Override
public byte getByte(int index) {
recordLeakNonRefCountingOperation(leak);
return super.getByte(index);
}
@Override
public ByteBuf writeByte(int value) {
recordLeakNonRefCountingOperation(leak);
return super.writeByte(value);
}
// retain(增加引用计数)/release(减少引用计数)/touch(专门用于记录一次record)
// 直接执行leak.record
@Override
public ByteBuf retain() {
leak.record();
return super.retain();
}
@Override
public boolean release() {
leak.record();
return super.release();
}
@Override
public ByteBuf touch() {
leak.record();
return this;
}
}
复制代码
三、Record
Record代表一个记录,链表数据结构,是ResourceLeakDetector的内部类。
private static final class Record extends Throwable {
// Record链表的尾节点
private static final Record BOTTOM = new Record();
// 外部传入的hint.toString,为了记录内存泄露的额外信息
private final String hintString;
// 指向下一个Record
private final Record next;
// 当前Record所处链表位置0-n,只有尾节点BOTTOM是-1
private final int pos;
Record(Record next, Object hint) {
hintString = hint.toString();
this.next = next;
this.pos = next.pos + 1;
}
Record(Record next) {
hintString = null;
this.next = next;
this.pos = next.pos + 1;
}
private Record() {
hintString = null;
next = null;
pos = -1;
}
}
复制代码
Record继承Throwable,省略部分代码,可以看到这里继承Throwable就是为了获取调用栈(getStackTrace)。
@Override
public String toString() {
StringBuilder buf = new StringBuilder(2048);
if (hintString != null) {
buf.append("\tHint: ").append(hintString).append(NEWLINE);
}
// 继承Throwable,可以打印StackTrace
StackTraceElement[] array = getStackTrace();
// 跳过前3个栈
for (int i = 3; i < array.length; i++) {
StackTraceElement element = array[i];
buf.append('\t');
buf.append(element.toString());
buf.append(NEWLINE);
}
return buf.toString();
}
复制代码
通过一个单元测试,看看Record的作用是什么。
@Test
public void testRecord() throws InterruptedException {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
PooledByteBufAllocator allocator = (PooledByteBufAllocator) ByteBufAllocator.DEFAULT;
// Created at
ByteBuf byteBuf = allocator.newDirectBuffer(1024, Integer.MAX_VALUE);
// #2
byteBuf.touch();
// #1
ByteBuf slice = byteBuf.slice();
// 释放强引用,通知gc
slice = null;
byteBuf = null;
System.gc();
Thread.sleep(1000);
// 为了触发泄露检测日志打印
byteBuf = allocator.newDirectBuffer(1024, Integer.MAX_VALUE);
}
复制代码
控制台输出:
使用直接内存:0MB
使用直接内存:16MB
14:18:34.888 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
io.netty.buffer.AdvancedLeakAwareByteBuf.slice(AdvancedLeakAwareByteBuf.java:76)
io.netty.buffer.MyLeakTest.testRecord(MyLeakTest.java:80)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#2:
io.netty.buffer.MyLeakTest.testRecord(MyLeakTest.java:79)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:372)
io.netty.buffer.MyLeakTest.testRecord(MyLeakTest.java:78)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
复制代码
可以看到输出的records包含3段
- #1:slice方法调用的堆栈信息
- #2:touch方法调用的堆栈信息
- Created at:Bytebuf刚创建的堆栈信息
四、DefaultResourceLeak
ResourceLeakTracker接口暴露给客户端使用,一方面提供记录Record,另一方面支持关闭针对某个对象的泄露检测。
public interface ResourceLeakTracker<T> {
// 记录一个Record
void record();
// 记录一个Record,附带一些hint信息帮助排查
void record(Object hint);
// 关闭trackedObject的内存泄露检测,往往是个ByteBuf实例
boolean close(T trackedObject);
}
复制代码
ResourceLeakTracker默认实现是DefaultResourceLeak。
private static final class DefaultResourceLeak<T>
extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
// 针对head指针的原子更新器
private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater =
(AtomicReferenceFieldUpdater)
AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head");
// 针对droppedRecords丢弃Record数量的原子更新器
private static final AtomicIntegerFieldUpdater<DefaultResourceLeak<?>> droppedRecordsUpdater =
(AtomicIntegerFieldUpdater)
AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class, "droppedRecords");
// 针对T泄露检测的Record链表
private volatile Record head;
// 丢弃Record数量
private volatile int droppedRecords;
// 包含所有DefaultResourceLeak实例,由外部传入
private final Set<DefaultResourceLeak<?>> allLeaks;
// T实例的哈希值
private final int trackedHash;
DefaultResourceLeak(Object referent,
ReferenceQueue<Object> refQueue,
Set<DefaultResourceLeak<?>> allLeaks) {
// 调用WeakReference的构造方法,传入引用对象和引用队列
super(referent, refQueue);
// 计算引用对象的哈希值
trackedHash = System.identityHashCode(referent);
// 把当前实例加入allLeaks
allLeaks.add(this);
// Record链表的头节点设置为Record.BOTTOM
headUpdater.set(this, new Record(Record.BOTTOM));
this.allLeaks = allLeaks;
}
}
复制代码
ResourceLeak接口已经废弃了,可以忽略。关键点是DefaultResourceLeak继承了WeakReference,将外部传入的引用对象和引用队列通过WeakReference构造传入,这个引用对象对于Netty内存泄露检测来说就是监测的ByteBuf实例。
1、record0
record0方法负责记录一个Record。实现ResourceLeakTracker接口的record()和record(T)抽象方法。
private void record0(Object hint) {
if (TARGET_RECORDS > 0) {
Record oldHead;
Record prevHead;
Record newHead;
boolean dropped;
do {
// 如果头节点为空,表示close方法已经被调用,资源已经释放,不需要继续
if ((prevHead = oldHead = headUpdater.get(this)) == null) {
return;
}
// 当前记录数
final int numElements = oldHead.pos + 1;
// 如果当前记录数大于io.netty.leakDetection.targetRecords(默认4),可能丢弃链表头部的一个Record
if (numElements >= TARGET_RECORDS) {
// 丢弃几率 = 1 / 2 ^ (numElements - targetRecords)
final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30);
if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << backOffFactor) != 0) {
prevHead = oldHead.next;
}
} else {
dropped = false;
}
// cas设置链表头节点为新创建的Record
newHead = hint != null ? new Record(prevHead, hint) : new Record(prevHead);
} while (!headUpdater.compareAndSet(this, oldHead, newHead));
if (dropped) {
droppedRecordsUpdater.incrementAndGet(this);
}
}
}
复制代码
record0方法主要就是构造一个新的Record记录,用CAS头插进Record链表。值得注意的是,当超出4个Record之后,每次新插入Record都可能导致链表头节点被丢弃。
2、close
close方法负责关闭对于当前引用对象的泄露检测工作。实现ResourceLeakTracker接口的close(T)抽象方法。
private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater =
(AtomicReferenceFieldUpdater)
AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head");
private final Set<DefaultResourceLeak<?>> allLeaks;
private final int trackedHash;
@Override
public boolean close() {
// 从Set中移除当前DefaultResourceLeak实例
if (allLeaks.remove(this)) {
// 调用WeakReference.clear方法移除referent
clear();
// 设置Record链表的头节点为空
headUpdater.set(this, null);
return true;
}
return false;
}
@Override
public boolean close(T trackedObject) {
// 确保传入实例和监控实例是一个实例
assert trackedHash == System.identityHashCode(trackedObject);
try {
return close();
} finally {
reachabilityFence0(trackedObject);
}
}
复制代码
close方法做了三件事情:
- 从allLeaks中移除当前实例
- 调用父类WeakReference的clear方法,设置引用对象为空
- 设置Record的头节点为空
看一下reachabilityFence0这个方法,这个方法是为了防止当前DefaultResourceLeak实例被提前回收。因为JDK9之前可能会导致提前判定对象不可达,导致提前回收,这里通过finally+synchronized保证对象不会被提前回收,JDK9提供了Reference.reachabilityFence方法解决这个问题。
/**
* Recent versions of the JDK have a nasty habit of prematurely deciding objects are unreachable.
* see: https://stackoverflow.com/questions/26642153/finalize-called-on-strongly-reachable-object-in-java-8
* The Java 9 method Reference.reachabilityFence offers a solution to this problem.
*/
private static void reachabilityFence0(Object ref) {
if (ref != null) {
synchronized (ref) {
// Empty synchronized is ok: https://stackoverflow.com/a/31933260/1151521
}
}
}
复制代码
3、dispose
dipose方法是非public方法,给Netty自己用的,目的就是判断DefaultResourceLeak的close方法有没有被客户端调用过,如果调用过代表监控的资源被正常回收(ByteBuf的release方法被正常调用),allLeaks集合中应该不存在当前DefaultResourceLeak实例。
/**
* @return true 没有执行过close,即存在内存泄露
* false 执行过close,即对象被正常关闭,不存在内存泄露
*/
boolean dispose() {
clear();
return allLeaks.remove(this);
}
复制代码
至于为什么这里用一个Set集合来判断是否DefaultResourceLeak实例被正常调用close,而不是通过一个布尔值来判断,是因为要保证DefaultResourceLeak被强引用,不会被GC,这个后面再看。
五、ResourceLeakDetector
ResourceLeakDetector负责管理针对某类实例(泛型T)的泄露检测。
public class ResourceLeakDetector<T> {
// 所有由当前Detector管理的泄露检测任务的集合
private final Set<DefaultResourceLeak<?>> allLeaks =
Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());
// 引用队列,当DefaultResourceLeak包裹的检测对象被回收时,这里可以收到DefaultResourceLeak实例
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
// 已经做过内存泄露报告输出的报告String集合
private final Set<String> reportedLeaks =
Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
// io.netty.leakDetection.samplingInterval 默认 128
private final int samplingInterval;
}
复制代码
ResourceLeakDetector主要要做下面三件事情:
- 建立检测对象与ResourceLeakTracker的关系,开启泄露检测
- 检测泄露
- 泄露报告
这三件事情都在track方法中执行了,对于Netty内存泄露检测而言,就是创建ByteBuf的时候把这三件事情全做了。
这也是为什么第三节关于Record的单元测试最后一行要分配一次内存,因为只有分配ByteBuf时,会调用toLeakAwareBuffer方法,toLeakAwareBuffer方法会调用track方法,这才能做一次泄露检测,打印泄露报告。
private DefaultResourceLeak track0(T obj) {
Level level = ResourceLeakDetector.level;
if (level == Level.DISABLED) {
return null;
}
// 如果level小于PARANOID,不一定会返回DefaultResourceLeak
// 取决于随机PlatformDependent.threadLocalRandom().nextInt(samplingInterval)
if (level.ordinal() < Level.PARANOID.ordinal()) {
// 1/128的几率 做一次泄露检测 并 返回DefaultResourceLeak
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
return null;
}
// level 等于 PARANOID 立即做一次泄露检测 并返回DefaultResourceLeak
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
复制代码
track方法根据检测等级的不同,行为稍有不同。区别是小于PARANOID级别的情况下,不一定会同步做一次泄露检测+泄露报告+开启obj的泄露检测。
track方法里,reportLeak方法 = 泄露检测+泄露报告;new DefaultResourceLeak = 开启针对obj的泄露检测。DefaultResourceLeak的构造方法在第四节看过了,这里重点看reportLeak方法。
private void reportLeak() {
// 是否开启泄露检测(目前是通过日志级别是否允许error控制的)
if (!needReport()) {
// 清空refQueue
clearRefQueue();
return;
}
for (;;) {
// 从ReferenceQueue获取DefaultResourceLeak
// DefaultResourceLeak作为虚引用包裹的obj(ByteBuf实例)此时已经被回收
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
if (ref == null) {
break;
}
// 判断是否发生泄露
if (!ref.dispose()) {
continue;
}
// 打印泄露日志
String records = ref.toString();
// 如果records已经输出过了,不会再次输出
if (reportedLeaks.add(records)) {
if (records.isEmpty()) {
reportUntracedLeak(resourceType);
} else {
reportTracedLeak(resourceType, records);
}
}
}
}
复制代码
首先reportLeak方法由日志级别控制,如果日志级别小于Error是不会做泄露检测的。
protected boolean needReport() {
return logger.isErrorEnabled();
}
复制代码
接下来循环拉取引用队列中的DefaultResourceLeak虚引用实例。
如果获取到DefaultResourceLeak,代表DefaultResourceLeak实例监控的引用实例已经被垃圾回收,通过dispose方法判断监控引用实例是否正常调用过close关闭方法。
如果发生泄露,判断DefaultResourceLeak的toString方法输出是否在reportedLeaks集合内,如果存在表示已经输出过本次泄露了,不会再次输出泄露报告,否则输出泄露报告。
六、疑问
为什么要用一个allLeaks的Set通过remove方法来判断正常释放资源,而不用一个boolean判断?
private final Set<DefaultResourceLeak<?>> allLeaks;
private final int trackedHash;
DefaultResourceLeak(
Object referent,
ReferenceQueue<Object> refQueue,
Set<DefaultResourceLeak<?>> allLeaks) {
super(referent, refQueue);
trackedHash = System.identityHashCode(referent);
// 注意,这个allLeaks是外部传入的
// 就是ResourceLeakDetector的allLeaks集合
allLeaks.add(this);
this.allLeaks = allLeaks;
}
/**
* Netty调用这个方法用于判断检测对象实例是否正常调用过close方法。
* @return true 没有执行过close,即存在内存泄露
* false 执行过close,即对象被正常关闭,不存在内存泄露
*/
boolean dispose() {
clear();
return allLeaks.remove(this);
}
/**
* 暴露给客户端调用,当监测对象正常释放时被调用
*/
@Override
public boolean close() {
// 从Set中移除当前DefaultResourceLeak实例
if (allLeaks.remove(this)) {
// Call clear so the reference is not even enqueued.
// 调用WeakReference.clear方法移除referent
clear();
// 设置Record链表的头节点为空
headUpdater.set(this, null);
return true;
}
return false;
}
复制代码
如果这里不通过Set强引用DefaultResourceLeak实例,当ByteBuf不存在强引用时,DefaultResourceLeak会被直接GC。
如SimpleLeakAwareByteBuf持有DefaultResourceLeak实例,当SimpleLeakAwareByteBuf不被强引用,这里DefaultResourceLeak实例也会被回收,就无法检测内存泄露了。
class SimpleLeakAwareByteBuf extends WrappedByteBuf {
// 被检测的ByteBuf
private final ByteBuf trackedByteBuf;
// DefaultResourceLeak实例
final ResourceLeakTracker<ByteBuf> leak;
SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
super(wrapped);
this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
this.leak = ObjectUtil.checkNotNull(leak, "leak");
}
}
复制代码
总结
-
如何使用Netty泄露检测?
- 编码: ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
- 配置:-Dio.netty.leakDetection.level=paranoid
-
Netty泄露检测等级如何配置更合理?
不同泄露检测等级,对应的泄露检测行为不同。
等级 开启检测几率 对应ByteBuf实现类 DISABLED 0% – SIMPLE 默认1/128 SimpleLeakAwareByteBuf ADVANCED 默认1/128 AdvancedLeakAwareByteBuf PARANOID 100% AdvancedLeakAwareByteBuf 默认情况下,泄露检测等级是SIMPLE,SIMPLE和ADVANCED的采样率是1/128,对于单个泄露地方的Record记录最大值是4。
-
Netty泄露检测如何实现?
- 当创建ByteBuf时,会开启泄露检测;当ByteBuf引用计数为0,执行release方法时,会关闭泄露检测。
- 只有当创建ByteBuf时,会触发泄露检测,打印泄露报告。