这是我参与更文挑战的第7天,活动详情查看: 更文挑战
需求背景:
接上一篇文章,《开箱即用|基于BDB实现本地消息队列》,我们在使用的时候会发现,当大量数据进行入队的时候会出现性能问题,内存飙升极快,而且延迟也比较高,所以我们这期做一个优化,不使用AbstractQueue来通过队头队尾的方式来实现本地消息队列,同时对比一下其他几个相同类型的本地磁盘数据的性能及使用方法;
Berkeley DB:
以下简称为BDB (Berkeley DB是Oracle一个开源的KV单机文件数据库)
Level DB :
以下简称为LDB (LevelDB是Google开源的持久化KV单机数据库,具有很高的随机写,顺序读/写性能,但是随机读的性能很一般,也就是说,LevelDB很适合应用在查询较少,而写很多的场景。)
基于BDB指针方式实现本地队列:
通过指针方式遍历BDB队列 有序 但性能不如队列方式 可实现队列方式 同levenDB(见下文)实现方式一致
代码实现:
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import com.sleepycat.je.*;
import lombok.extern.slf4j.Slf4j;
/**
* 封装的bdb操作工具栏 集成了增、删、改、查、关闭、同步操作等方法
*
* @author taoze
* @version 1.0
* @date 4/9/21 15:31
*/
@Slf4j
public class BdbOperator {
// 环境变量的声明
/**
* Environment这个对象,这个对象是bdb的环境
* bdb je 只允许有一个写的进程,可有有多个只读的进程,
* 但是当写的进程更新数据以后,读的进程不能发现数据的改变,
* 只有close这个environment,在开启,
* 所以一个Environment尽量使用一个进程(proccess)操作
* 注意,关闭Environment是很耗时的
* 不是特别需要,尽量不要关闭Environment
*/
private Environment myDbEnvironment = null;
// 数据库操作的对象声明
private Database myDatabase = null;
/**
* bdb操作环境变量和数据库初始化
* setAllowCreate 是否允许创建这个环境,true为是,false为否
* setCacheSize 设置缓存单位为字节,比如设置1M缓存setCacheSize(1000000);
* setTransactional 设置是否启用事务
* setReadOnly 设置是否为只读模式访问,true为只读
* setLocking 设置环境是否为锁定
* 更多的参数设置可以用
* setConfigParam
* 这个方法可设置选项非常多
* envConfig.setConfigParam("je.log.fileMax","20000000");设置日志文件最大为20M,默认是10M
* je.log.bufferSize 设置日志的缓冲 缺省为1048576 (1M)
* je.lock.timeout 锁定时间
*
* @param dbEnvFilePath
* @param databaseName
*/
public BdbOperator(String dbEnvFilePath, String databaseName) {
/**
* 初始化数据库参数
*/
try {
// 初始化数据存储根目录文件夹
File f = new File(dbEnvFilePath);
if (!f.exists()) {
f.mkdirs();
}
// 数据库配置变量初始化
DatabaseConfig dbConfig = new DatabaseConfig();// 打开数据库
dbConfig.setAllowCreate(true);
// 初始化环境配置变量,基于该变量去配置环境变量
EnvironmentConfig envConfig = new EnvironmentConfig();
// 当使用的数据库配置变量不存在的时候,就自动创建
envConfig.setAllowCreate(true);
// 正式初始化数据库的环境
myDbEnvironment = new Environment(f, envConfig);
// 打开一个数据库,如果不存在,则自动创建;第一个参数表示是否是事务
myDatabase = myDbEnvironment.openDatabase(null, databaseName, dbConfig);
} catch (Exception e) {
log.warn("BdbOperator init DBD环境异常", e);
}
}
/**
* 将指定的kv存放到bdb当中,并可以选择是否实时同步到磁盘中
*
* @param key
* @param value
* @param isSync
* @return
*/
public boolean put(String key, String value, boolean isSync) {
// 数据的key
// 数据的value
try {
// 将key和value都封装到DatabaseEntry中
DatabaseEntry theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8));
DatabaseEntry theData = new DatabaseEntry(value.getBytes(StandardCharsets.UTF_8));
// 写入数据库
myDatabase.put(null, theKey, theData);
if (isSync) {
// 数据同步到磁盘
this.sync();
}
return true;
} catch (Exception e) {
log.warn("BdbOperator put 失败", e);
}
return false;
}
// 删除bdb中指定的key值
public boolean delete(String key) {
DatabaseEntry theKey;
try {
theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8));
myDatabase.delete(null, theKey);
return true;
} catch (Exception e) {
log.warn("BdbOperator delete 失败", e);
}
return false;
}
/**
* 读取bdb的key对应的数据
*
* @param key
* @return
*/
public String getValue(String key) {
// 要读取数据的key
try {
// 将读取数据的key封装到DatabaseEntry中
DatabaseEntry theKey = new DatabaseEntry(key.getBytes(StandardCharsets.UTF_8));
// 将读取出来的值以二进制形式放到DatabaseEntry中
DatabaseEntry theData = new DatabaseEntry();
// 执行读取操作
myDatabase.get(null, theKey, theData, LockMode.DEFAULT);
if (theData.getData() == null) {
return null;
}
// 将二进制数据转化成字符串值
return new String(theData.getData(), StandardCharsets.UTF_8);
} catch (Exception e) {
log.warn("BdbOperator getValue 失败", e);
}
return null;
}
/**
* 查询所有,可遍历数据
* selectAll(Here describes this method function with a few words)*
* <p>
* void
*/
public List<String> selectAll() {
Cursor cursor = null;
cursor = myDatabase.openCursor(null, null);
DatabaseEntry theKey = null;
DatabaseEntry theData = null;
theKey = new DatabaseEntry();
theData = new DatabaseEntry();
ArrayList<String> list = new ArrayList<>();
while (cursor.getNext(theKey, theData, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
list.add(new String(theData.getData()));
}
cursor.close();
return list;
}
public Database getMyDatabase() {
return myDatabase;
}
/**
* 同步数据到磁盘当中,相当于让数据实时持久化
*
* @return
*/
public boolean sync() {
if (myDbEnvironment != null) {
try {
myDbEnvironment.sync();
} catch (Exception e) {
log.warn("BdbOperator sync 失败", e);
}
return true;
}
return false;
}
/**
* 关闭环境变量数据库
*
* @return
*/
public boolean close() {
try {
if (myDatabase != null) {
myDatabase.close();
}
if (myDbEnvironment != null) {
myDbEnvironment.sync();
myDbEnvironment.cleanLog();
myDbEnvironment.close();
}
return true;
} catch (DatabaseException e) {
log.warn("BdbOperator close 失败", e);
}
return false;
}
}
复制代码
基于LevelDB实现本地队列:
原理:
内存中维护俩个值:head:队头 tail:队尾 同时将这俩个值 插入到levelDB里
InitLevelDB:容器加载时初始化,获取队头队尾的值
@PostConstruct
public void initLevelDB() throws IOException {
if (StringUtils.isNotBlank(dbPath)) {
DBFactory factory = new Iq80DBFactory();
Options options = new Options();
options.createIfMissing(true);
db = factory.open(new File(dbPath), options);
String headString = get(HEAD_KEY);
if (headString != null) {
head = Long.parseLong(headString);
}
String tailString = get(TAIL_KEY);
if (tailString != null) {
tail = Long.parseLong(tailString);
}
//启动时读到末尾,重置队列游标。队头越过队尾重置队头
if (head == tail && head != 0) {
head = 0;
tail = 0;
put(HEAD_KEY, String.valueOf(head));
put(TAIL_KEY, String.valueOf(tail));
} else if (head > tail) {
head = tail;
}
}
}
复制代码
Push:每次push插入到队尾 tail+=1,存储K:V为:(tail+=1,value),更新tail_key队尾的值
/**
* 插入队尾
* @param value
*/
public synchronized void push(String value) {
if (db == null) {
return;
}
if (tail == Long.MAX_VALUE) {
log.error("本地缓存队列已超过系统瓶颈,请重启服务;msg={}", value);
return;
}
tail += 1;
put(TAIL_KEY, String.valueOf(tail));
put(String.valueOf(tail), value);
}
复制代码
Pop:获取当前初始化的held队头+1(初始化后对头=队尾,每次push队尾+1),为空二分法获取最小有值队头,删除当前弹出队头, 更新head_key为head+1
/**
* 弹出队头
* @return
*/
public synchronized String pop() {
if (db == null) {
return null;
}
String find = String.valueOf(head + 1);
String value = get(find);
if (value == null) {
Long hasValueHead = findHasValueHead(head + 1, tail);
if (hasValueHead != null) {
delete(String.valueOf(hasValueHead.longValue()));
head = hasValueHead;
put(HEAD_KEY, String.valueOf(head));
return get(String.valueOf(head));
}
return null;
}
delete(find);
head += 1;
put(HEAD_KEY, String.valueOf(head));
return value;
}
复制代码
二分法查找最小有值对头:
/**
* 二分查找最小有值的队头
* @param head
* @param tail
* @return
*/
private Long findHasValueHead(long head, long tail) {
if (head > tail) {
return null;
}
long mid = (head + tail) / 2;
if (get(String.valueOf(mid)) == null) {
return findHasValueHead(mid + 1, tail);
} else {
if (head == mid) {
return head;
}
return findHasValueHead(head, mid);
}
}
复制代码
具体代码实现:
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBFactory;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.impl.Iq80DBFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.File;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
@Service
@Slf4j
public class LevelDb {
@Value("${levelDB.folder:}")
private String dbPath;
private DB db = null;
private static final String HEAD_KEY = "head_key";
private long head = 0;
private static final String TAIL_KEY = "tail_key";
private long tail = 0;
@PostConstruct
public void initLevelDB() throws IOException {
if (StringUtils.isNotBlank(dbPath)) {
DBFactory factory = new Iq80DBFactory();
Options options = new Options();
options.createIfMissing(true);
db = factory.open(new File(dbPath), options);
String headString = get(HEAD_KEY);
if (headString != null) {
head = Long.parseLong(headString);
}
String tailString = get(TAIL_KEY);
if (tailString != null) {
tail = Long.parseLong(tailString);
}
//启动时读到末尾,重置队列游标。队头越过队尾重置队头
if (head == tail && head != 0) {
head = 0;
tail = 0;
put(HEAD_KEY, String.valueOf(head));
put(TAIL_KEY, String.valueOf(tail));
} else if (head > tail) {
head = tail;
}
}
}
/**
* 插入队尾
* @param value
*/
public synchronized void push(String value) {
if (db == null) {
return;
}
if (tail == Long.MAX_VALUE) {
log.error("本地缓存队列已超过系统瓶颈,请重启服务;msg={}", value);
return;
}
tail += 1;
put(TAIL_KEY, String.valueOf(tail));
put(String.valueOf(tail), value);
}
/**
* 弹出队头
* @return
*/
public synchronized String pop() {
if (db == null) {
return null;
}
String find = String.valueOf(head + 1);
String value = get(find);
if (value == null) {
Long hasValueHead = findHasValueHead(head + 1, tail);
if (hasValueHead != null) {
delete(String.valueOf(hasValueHead.longValue()));
head = hasValueHead;
put(HEAD_KEY, String.valueOf(head));
return get(String.valueOf(head));
}
return null;
}
delete(find);
head += 1;
put(HEAD_KEY, String.valueOf(head));
return value;
}
/**
* 获取缓存的值
* @param key
* @return
*/
public String get(String key) {
if (db == null) {
return null;
}
byte[] bytes = db.get(Iq80DBFactory.bytes(key));
return Iq80DBFactory.asString(bytes);
}
/**
* 游标方式获取列表
* @param max
* @return
*/
public LinkedHashMap<String, String> iteratorDb(int max) {
LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();
if (db == null) {
return linkedHashMap;
}
DBIterator iterator = db.iterator();
int num = 0;
while (iterator.hasNext()) {
if (num ++ > max) {
break;
}
Map.Entry<byte[], byte[]> next = iterator.next();
String key = Iq80DBFactory.asString(next.getKey());
String value = Iq80DBFactory.asString(next.getValue());
linkedHashMap.put(key, value);
}
return linkedHashMap;
}
/**
* 获取队头游标
* @return
*/
public long getHead() {
return head;
}
/**
* 获取队尾游标
* @return
*/
public long getTail() {
return tail;
}
/**
* 获取队列长度
* @return
*/
public long getLength() {
return tail - head;
}
/**
* 写入k-v db
* @param key
* @param value
*/
private void put(String key, String value) {
db.put(Iq80DBFactory.bytes(key), Iq80DBFactory.bytes(value));;
}
/**
* 删除key
* @param key
*/
public void delete(String key) {
db.delete(Iq80DBFactory.bytes(key));;
}
/**
* 二分查找最小有值的队头
* @param head
* @param tail
* @return
*/
private Long findHasValueHead(long head, long tail) {
if (head > tail) {
return null;
}
long mid = (head + tail) / 2;
if (get(String.valueOf(mid)) == null) {
return findHasValueHead(mid + 1, tail);
} else {
if (head == mid) {
return head;
}
return findHasValueHead(head, mid);
}
}
}
复制代码
性能对比结果:
push性能比较:
DBD 填充ms 单条 耗时: 3 500条:548 5000条:2820
LDB 填充ms 单条 耗时: 3 500条:36 5000条:129
LDB吞吐量测试:
吞吐量:
1条:push:2ms pop:2ms
100条:push:10ms pop:11ms
1000条:push:33ms pop:138ms
10000条:push:375ms pop:1213ms
100000条:push:2707ms pop:7293ms
1000000条:push:33601ms pop:159013ms
10000000条:push:494457ms
最大容量 10000000+
并发:200线程/每个线程push1000条消息
java.net.SocketTimeoutException: Read timed out
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END