简易的cachVerticle设计思路
作为一个简易的cache,首先应该具有的字段就是一个Map<String,Any>用于代表键值对形式的一个缓存
为了不付出额外的线程同步去操作缓存代价我们可以把他部署在一个worker模型下面的Verticle里面,对外以eventbus为基础暴露缓存操作api,比如增加,删除等
现在就可以写出这样的伪代码
class CacheVerticle:AbstractVerticle(){
private val cache = mutableMapOf<String,Any>()
override fun start() {
val eventbus = vertx.eventBus()
eventbus.localConsumer(GET_CACHE_ADDRESS){//todo}
eventbus.localConsumer(ADD_CACHE_ADDRESS){//todo}
eventbus.localConsumer(REMOVE_CACHE_ADDRESS){//todo}
}
}
复制代码
下一步就是完成localConsumer对应的handler内容了
add方法
因为kotlin包含一个 to
的中缀函数,用起来更加直观,所以我决定把对外暴露的api设计为
fun <T> addCache(pair: Pair<String,T>) = Vertx.currentContext().owner().eventBus().send(ADD_CACHE_ADDRESS,pair)
复制代码
只要addCache(“key” to 012)
这种就可以使用了
然后我们就可以为Pair
写一个eventbus MessageCodec
就可以把它传递过去了
在视频里面我在这里设计了一个class AddWrapper<T>(val addPair:Pair<String,T>)
的包装类,其实不需要包装,只要直接写一个Pair
的MessageCodec
然后注册到evenbus实例上就可以了
所以只要这样实现就可以了
eventbus.localConsumer<AddWrapper<Any>>(ADD_CACHE_ADDRESS){
val add = it.body()
cache[add.first] = add.second
}
复制代码
remove方法
因为本身Vertx就提供了String
的MessageCodec
所以直接传递即可
eventbus.localConsumer<String>(REMOVE_CACHE_ADDRESS){
val key = it.body()
cache.remove(key)
}
复制代码
get方法
由于放入缓存的对象不一定有对应的MessageCodec
,为他们每一个都定制一个MessageCodec
也不现实。
对于Event创建Message对象的逻辑来看,对于没有指定codec名字且不是内置那几种类型的对象会简单地用Object::getClass
作为键获取到注册的codec,如果获取不到就抛出异常,而无论是java lambda还是kt lambda本质上都是一个运行时接口的子类对象,所以没有办法提供lambda的MessageCodec
因此我的解决办法就是在外面包裹一层,把各种可变的作为其内容,外部包裹类的getClass
方法会固定返回同一个Class
,因而只要注册这个包裹类的MessageCodec
就可以了
由于我的设计思路是不只是保证对整个缓存(map实例)读写删除是线程安全的,而且对放入其中的对象操作(比如修改属性)也是线程安全的的,所以我设定的其中内容是键,和一个对值的处理函数(其可以使用Pair<String,(T?) -> Unit
在eventbus上面传递)
这个处理函数会直接跑在cache verticle所在的线程上,因此不用担心并发修改同一个缓存内部的值会发生线程不安全问题,单纯的修改缓存内容是原子化的,同时也要注意如果含有耗时操作则会影响缓存性能
对外暴露的api则是
fun <T> getCache(key:String, handler: (T?) -> Unit){
Vertx.currentContext().owner().eventBus().send(GET_CACHE_ADDRESS,key to handler)
}
复制代码
使用T?类型原因是应对key不存在的情况
eventbus.localConsumer<Pair<String,(Any?)->Unit>>(GET_CACHE_ADDRESS){
val body = it.body()
body.second(cache[body.first])
}
复制代码
附上全部实现
class CacheVerticle: AbstractVerticle(){
companion object{
private const val GET_CACHE_ADDRESS = "get_cache_address"
private const val ADD_CACHE_ADDRESS = "add_cache_address"
private const val REMOVE_CACHE_ADDRESS = "remove_cache_address"
fun <T> addCache(pair: Pair<String,T>) = Vertx.currentContext().owner().eventBus().send(ADD_CACHE_ADDRESS,pair)
fun <T> getCache(key:String, handler: (T?) -> Unit){
Vertx.currentContext().owner().eventBus().send(GET_CACHE_ADDRESS,key to handler)
}
fun remove(key:String){
Vertx.currentContext().owner().eventBus().send(REMOVE_CACHE_ADDRESS,key)
}
}
private val cache = mutableMapOf<String,Any>()
override fun start() {
val eventbus = vertx.eventBus()
eventbus.registerDefaultCodec(Pair::class.java,UnModifiableObjectCodec(Pair::class.java))
eventbus.localConsumer<Pair<String,(Any?)->Unit>>(GET_CACHE_ADDRESS){
val body = it.body()
body.second(cache[body.first])
}
eventbus.localConsumer<Pair<String,Any>>(ADD_CACHE_ADDRESS){
val add = it.body()
cache[add.first] = add.second
}
eventbus.localConsumer<String>(REMOVE_CACHE_ADDRESS){
val key = it.body()
cache.remove(key)
}
}
class UnModifiableObjectCodec<T>(private val msgClass:Class<T>):MessageCodec<T,T>{
override fun encodeToWire(buffer: Buffer?, s: T) {
TODO("Not yet implemented")
}
override fun decodeFromWire(pos: Int, buffer: Buffer?): T {
TODO("Not yet implemented")
}
override fun transform(s: T)=s
override fun name()=msgClass.name
override fun systemCodecID(): Byte {
return -1
}
}
}
复制代码
使用方法
class NormalVerticle:AbstractVerticle(){
override fun start() {
CacheVerticle.addCache("string" to UUID.randomUUID())
CacheVerticle.getCache<UUID>("string") {
it?.let {
println(it)
}
}
CacheVerticle.remove("string")
}
}
复制代码