这是我参与更文挑战的第6天,活动详情查看: 更文挑战
需求背景:
如果你和小编一样在工作中遇到这样的场景,大量的mq消息推送场景,当mq,redis,DB等全都宕机 只剩下Java实例时 要保证消息投递的一定可用 提高系统的容灾能力那么你就可以继续看下去了;
本文主要实现技术:BDB(Berkeley DB)+Java消息AbstractQueue
简介:
队列很常见,但大部分的队列是将数据放入到内存.如果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.如果大家看过Heritrix的源码的话就会发现,Heritrix是基于Bdb实现了一个持久化队列,同时数据已经持久化,相比放在内存的一次性,可以循环累加使用.
BDB(Berkeley DB):
定义:
BDB是一个开源的kv类型数据库文件数据库
,介于关系数据库与内存数据库之间,使用方式与内存数据库类似,它提供的是一系列直接访问数据库的函数,而不是像关系数据库那样需要网络通讯、SQL解析等步骤。
优点:
- 部署以及发布简单,使用内嵌在应用程序中。
- 嵌入式数据库,提供一系列API,调用简单。
- DB库和应用程序可一起编译成为可执行程序
缺点:
- 数据库打开时,文件会被加载到内存,因为数据库不宜过大。
- DB库和应用程序在同一个地址空间,所以DB库无网络通信模块。
- 不支持对SQL代码解码,可以直接访问数据。后期支持部分SQL
大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.
实现原理:
这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue;
代码实现:
代码POM主要依赖:
<dependency>
<groupId>de.sciss</groupId>
<artifactId>bdb-je</artifactId>
<version>7.5.11</version>
</dependency>
复制代码
BDB数据库环境配置config:
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.je.*;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
/**
* BDB数据库环境
*
* @author taoze
* @version 1.0
* @date 4/6/21 9:32 PM
*/
@Slf4j
public class BdbEnvironmentConfig extends Environment {
StoredClassCatalog classCatalog;
Database classCatalogDB;
/**
* Constructor
*
* @param envHome 数据库环境目录
* @param envConfig config options 数据库换纪念馆配置
* @throws DatabaseException
*/
public BdbEnvironmentConfig(File envHome, EnvironmentConfig envConfig) throws DatabaseException {
super(envHome, envConfig);
}
/**
* 返回StoredClassCatalog
*
* @return the cached class catalog
*/
public StoredClassCatalog getClassCatalog() {
if (classCatalog == null) {
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
try {
classCatalogDB = openDatabase(null, "classCatalog", dbConfig);
classCatalog = new StoredClassCatalog(classCatalogDB);
} catch (DatabaseException e) {
// TODO Auto-generated catch block
log.error("DatabaseException ",e);
}
}
return classCatalog;
}
@Override
public synchronized void close() throws DatabaseException {
if (classCatalogDB != null) {
classCatalogDB.close();
}
super.close();
}
}
复制代码
基于BDB进行持久化队列:
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import com.sleepycat.je.*;
import org.apache.commons.io.FileUtils;
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.serial.SerialBinding;
import com.sleepycat.bind.serial.StoredClassCatalog;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.collections.StoredMap;
import com.sleepycat.collections.StoredSortedMap;
/**
* 持久化队列,基于BDB实现
*
* @author taoze
* @version 1.0
* @date 4/6/21 9:31 PM
*/
public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements Serializable {
private static final long serialVersionUID = 3427799316155220967L;
private transient BdbEnvironmentConfig dbEnv; // 数据库环境,无需序列化
private transient Database queueDb; // 数据库,用于保存值,使得支持队列持久化,无需序列化
private transient StoredMap<Long, E> queueMap; // 持久化Map,Key为指针位置,Value为值,无需序列化
private transient String dbDir; // 数据库所在目录
private transient String dbName; // 数据库名字
private AtomicLong headIndex; // 头部指针
private AtomicLong tailIndex; // 尾部指针
private transient E peekItem = null; // 当前获取的值
/**
* 构造函数,传入BDB数据库
*
* @param db
* @param valueClass
* @param classCatalog
*/
public BdbPersistentQueue(Database db, Class<E> valueClass, StoredClassCatalog classCatalog) {
this.queueDb = db;
this.dbName = db.getDatabaseName();
headIndex = new AtomicLong(0);
tailIndex = new AtomicLong(0);
bindDatabase(queueDb, valueClass, classCatalog);
}
/**
* 构造函数,传入BDB数据库位置和名字,自己创建数据库
*
* @param dbDir
* @param dbName
* @param valueClass
*/
public BdbPersistentQueue(String dbDir, String dbName, Class<E> valueClass) {
headIndex = new AtomicLong(0);
tailIndex = new AtomicLong(0);
this.dbDir = dbDir;
this.dbName = dbName;
createAndBindDatabase(dbDir, dbName, valueClass);
}
/**
* 绑定数据库
*
* @param db
* @param valueClass
* @param classCatalog
*/
public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog) {
EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass);
if (valueBinding == null) {
// 序列化绑定
valueBinding = new SerialBinding<E>(classCatalog, valueClass);
}
queueDb = db;
// db,key,value,allow write
queueMap = new StoredSortedMap<Long, E>(db,TupleBinding.getPrimitiveBinding(Long.class), valueBinding,true);
}
/**
* 创建以及绑定数据库
*
* @param dbDir
* @param dbName
* @param valueClass
* @throws DatabaseNotFoundException
* @throws DatabaseExistsException
* @throws DatabaseException
* @throws IllegalArgumentException
*/
private void createAndBindDatabase(String dbDir, String dbName, Class<E> valueClass)
throws DatabaseNotFoundException, DatabaseExistsException, DatabaseException, IllegalArgumentException {
File envFile = null;
EnvironmentConfig envConfig = null;
DatabaseConfig dbConfig = null;
Database db = null;
try {
// 数据库位置
envFile = new File(dbDir);
// 数据库环境配置
envConfig = new EnvironmentConfig();
envConfig.setAllowCreate(true);
envConfig.setTransactional(false);
// 数据库配置
dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
// 创建环境
dbEnv = new BdbEnvironmentConfig(envFile, envConfig);
// 打开数据库
db = dbEnv.openDatabase(null, dbName, dbConfig);
// 绑定数据库
bindDatabase(db, valueClass, dbEnv.getClassCatalog());
} catch (DatabaseNotFoundException e) {
throw e;
} catch (DatabaseExistsException e) {
throw e;
} catch (DatabaseException e) {
throw e;
} catch (IllegalArgumentException e) {
throw e;
}
}
public List<String> selectAll() {
Cursor cursor = null;
cursor=queueDb.openCursor(null, null);
DatabaseEntry theKey=null;
DatabaseEntry theData=null;
theKey = new DatabaseEntry();
theData = new DatabaseEntry();
ArrayList<String> list = new ArrayList<>();
while (cursor.getNext(theKey, theData, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
list.add(new String(theData.getData()));
System.out.println(new String(theData.getData()));
}
cursor.close();
return list;
}
/**
* 值遍历器
*/
@Override
public Iterator<E> iterator() {
return queueMap.values().iterator();
}
/**
* 大小
*/
@Override
public int size() {
synchronized (tailIndex) {
synchronized (headIndex) {
return (int)(tailIndex.get() - headIndex.get());
}
}
}
/**
* 插入值
*/
@Override
public boolean offer(E e) {
synchronized (tailIndex) {
queueMap.put(tailIndex.getAndIncrement(), e); // 从尾部插入
dbEnv.sync();
}
return true;
}
public boolean init(E e) {
synchronized (tailIndex) {
queueMap.put(tailIndex.getAndIncrement(), e); // 从尾部插入
}
return true;
}
/**
* 获取值,从头部获取
*/
@Override
public E peek() {
synchronized (headIndex) {
if (peekItem != null) {
return peekItem;
}
E headItem = null;
while (headItem == null && headIndex.get() < tailIndex.get()) { // 没有超出范围
headItem = queueMap.get(headIndex.get());
if (headItem != null) {
peekItem = headItem;
continue;
}
headIndex.incrementAndGet(); // 头部指针后移
}
return headItem;
}
}
/**
* 移出元素,移出头部元素
*/
@Override
public E poll() {
synchronized (headIndex) {
E headItem = peek();
if (headItem != null) {
queueMap.remove(headIndex.getAndIncrement());
peekItem = null;
dbEnv.sync();
return headItem;
}
}
return null;
}
/**
* 同步数据到磁盘当中,相当于让数据实时持久化
*
* @return
*/
public boolean sync() {
if (dbEnv != null) {
try {
dbEnv.sync();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
return false;
}
/**
* 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境
*/
public void close() {
try {
if (queueDb != null) {
queueDb.sync();
queueDb.close();
}
} catch (DatabaseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (UnsupportedOperationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close()
*/
@Override
public void clear() {
try {
close();
if (dbEnv != null && queueDb != null) {
dbEnv.removeDatabase(null, dbName == null ? queueDb.getDatabaseName() : dbName);
dbEnv.close();
}
} catch (DatabaseNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (DatabaseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
if (this.dbDir != null) {
FileUtils.deleteDirectory(new File(this.dbDir));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
复制代码
业务实现:
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.io.File;
import java.util.*;
/**
* BdbQueue
*
* @author taozeR
* @version 1.0
* @date 4/7/21 5:48 PM
*/
@Slf4j
@Service
@EnableScheduling
public class BdbQueueService {
Queue<String> persistentQueue;
@Value("${bdb.queue.dbDir:logs/dbtest}")
private String dbDir;
@Value("${bdb.queue.databaseName:pq}")
private String databaseName;
//mq操作类 需替换
@Autowired
private ProductMessage productMessage;
/**
* 填充队列
*
* @param topic
* @param msg
*/
public void addQueue(String topic, String tags, Object msg) {
log.info("BDB 填充 start");
File file = new File(dbDir);
if (!file.exists() || !file.isDirectory()) {
file.mkdirs();
}
persistentQueue = new BdbPersistentQueue(dbDir, databaseName, String.class);
Map<String, Object> map = new HashMap<>(2);
map.put("topic", topic);
map.put("tags", tags);
map.put("msg", msg);
String queueStr = JSONObject.toJSONString(map);
persistentQueue.offer(queueStr);
log.info("BDB 填充 end msg = {}", queueStr);
}
/**
* 排放队列
*/
@Scheduled(initialDelay=60000,fixedDelay=60000)
public void drainQueue() {
log.info("BDB队列 排放 start");
persistentQueue = new BdbPersistentQueue(dbDir, databaseName, String.class);
Iterator<String> iterator = persistentQueue.iterator();
while (iterator.hasNext()) {
try {
String next = iterator.next();
persistentQueue.add(next);
String queueStr = persistentQueue.poll();
log.info("BDB 排放 msg = {}", queueStr);
sendMessage(queueStr);
} catch (Exception e) {
log.error("BDB 队列 排放异常", e);
}
}
}
private void sendMessage(String queueStr) {
JSONObject jsonObject = JSONObject.parseObject(queueStr);
String topic = jsonObject.getString("topic");
Object msg = jsonObject.get("msg");
String tags = jsonObject.getString("tags");
if (StringUtils.isBlank(tags)) {
productMessage.produceMessage(topic, msg);
} else {
productMessage.produceMessage(topic, tags, msg);
}
}
/**
* 获取bdb 全部消息
*
* @return
*/
public List<String> selectQueueAll() {
BdbPersistentQueue pq = new BdbPersistentQueue(dbDir, databaseName, String.class);
return pq.selectAll();
}
}
复制代码
再给大家放一个操作BDB的工具类,其他操作也可以通过封装的工具类进行操作
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import com.sleepycat.je.*;
/**
* 封装的bdb操作工具栏 集成了增、删、改、查、关闭、同步操作等方法
*/
public class BDBOperatorUtil {
private String dbEnvFilePath;
private String databaseName;
// 环境变量的声明
private Environment myDbEnvironment = null;
// 数据库操作的对象声明
private Database myDatabase = null;
/**
* bdb操作环境变量和数据库初始化
*
* @param dbEnvFilePath
* @param databaseName
*/
public BDBOperatorUtil(String dbEnvFilePath, String databaseName) {
this.dbEnvFilePath = dbEnvFilePath;
this.databaseName = databaseName;
/**
* 初始化数据库参数
*/
try {
// 初始化数据存储根目录文件夹
File f = new File(dbEnvFilePath);
if (!f.exists()) {
f.mkdirs();
}
// 数据库配置变量初始化
DatabaseConfig dbConfig = new DatabaseConfig();// 打开数据库
dbConfig.setAllowCreate(true);
// 初始化环境配置变量,基于该变量去配置环境变量
EnvironmentConfig envConfig = new EnvironmentConfig();
// 当使用的数据库配置变量不存在的时候,就自动创建
envConfig.setAllowCreate(true);
// 正式初始化数据库的环境
myDbEnvironment = new Environment(f, envConfig);
// 打开一个数据库,如果不存在,则自动创建;第一个参数表示是否是事务
myDatabase = myDbEnvironment.openDatabase(null, databaseName,
dbConfig);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 将指定的kv存放到bdb当中,并可以选择是否实时同步到磁盘中
*
* @param key
* @param value
* @param isSync
* @return
*/
public boolean put(String key, String value, boolean isSync) {
// 数据的key
// 数据的value
try {
// 将key和value都封装到DatabaseEntry中
DatabaseEntry theKey = new DatabaseEntry(key.getBytes("UTF-8"));
DatabaseEntry theData = new DatabaseEntry(value.getBytes("UTF-8"));
// 写入数据库
myDatabase.put(null, theKey, theData);
if (isSync) {
// 数据同步到磁盘
this.sync();
}
// 对该库进行count操作,查看有多少条数据
System.out.println(myDatabase.count());
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// 删除bdb中指定的key值
public boolean delete(String key) {
DatabaseEntry theKey;
try {
theKey = new DatabaseEntry(key.getBytes("UTF-8"));
myDatabase.delete(null, theKey);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 读取bdb的key对应的数据
*
* @param key
* @return
*/
public String getValue(String key) {
// 要读取数据的key
try {
// 将读取数据的key封装到DatabaseEntry中
DatabaseEntry theKey = new DatabaseEntry(key.getBytes("UTF-8"));
// 将读取出来的值以二进制形式放到DatabaseEntry中
DatabaseEntry theData = new DatabaseEntry();
// 执行读取操作
myDatabase.get(null, theKey, theData, LockMode.DEFAULT);
if (theData.getData() == null) {
return null;
}
// 将二进制数据转化成字符串值
String result = new String(theData.getData(), "utf-8");
// 打印之
return result;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 查询所有,可遍历数据
* selectAll(Here describes this method function with a few words)*
*
* void
*/
public List<String> selectAll() {
Cursor cursor = null;
cursor=myDatabase.openCursor(null, null);
DatabaseEntry theKey=null;
DatabaseEntry theData=null;
theKey = new DatabaseEntry();
theData = new DatabaseEntry();
ArrayList<String> list = new ArrayList<>();
while (cursor.getNext(theKey, theData, LockMode.DEFAULT) == OperationStatus.SUCCESS) {
list.add(new String(theData.getData()));
System.out.println(new String(theData.getData()));
}
cursor.close();
return list;
}
/**
* 同步数据到磁盘当中,相当于让数据实时持久化
*
* @return
*/
public boolean sync() {
if (myDbEnvironment != null) {
try {
myDbEnvironment.sync();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}
return false;
}
/**
* 关闭环境变量数据库
*
* @return
*/
public boolean close() {
try {
if (myDatabase != null) {
myDatabase.close();
}
if (myDbEnvironment != null) {
myDbEnvironment.sync();
myDbEnvironment.cleanLog();
myDbEnvironment.close();
}
return true;
} catch (DatabaseException e) {
e.printStackTrace();
}
return false;
}
public static void main(String[] args) {
String dbEnvFilePath = "/Users/taoze/logs/dbtest";
String databaseName = "pq";
String key ="self_key_";
String value="工具类操作实例";
BDBOperatorUtil bdUtil=new BDBOperatorUtil(dbEnvFilePath, databaseName);
for (int i = 0; i < 5; i++) {
//bdUtil.put(key+i, value, true);
//bdUtil.delete(key+i);
}
//bdUtil.sync();
bdUtil.selectAll();
System.out.println(bdUtil.getValue(key));
}
}
复制代码
ok!文章到此就结束了,希望可以对大家有帮助,有不对的地方希望大家可以提出来的,共同成长;
整洁成就卓越代码,细节之中只有天地