这是我参与8月更文挑战的第17天,活动详情查看:8月更文挑战
简介
elasticsearch是一个分布式,RESTful风格的搜索和数据分析引擎,非常的快,能承载PB级数据,并且可靠性非常强,单个集群可同时运行几百个节点
可以做什么?
elasticsearch 可以做什么了?官方给了以下解决方案的完整列表
在实际业务中,大多用来做日志服务器,搜索服务
任务
这里主要是使用go语言调用sdk来实现es的增删改查,熟悉es api的搜索操作
安装相关工具
为了可以快速写代码,这里全部使用docker来安装,除了安装elasticsearch外,还会安装监控工具和查询工具,来辅助高效开发
centos快速安装docker传送门: juejin.cn/post/684490…
安装elasticsearch
#创建一个网络,同一个网络的内的容器,可以通过localhost:port 通信,方便kibana访问es
docker network create somenetwork
#安装elasticsearch
docker run -d --name elasticsearch --net somenetwork -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.6.2
复制代码
安装es查询工具dejavu
docker run -p 1358:1358 -d --name dejavu appbaseio/dejavu
复制代码
安装es监控工具
docker run -d --name cerebro -p 9000:9000 lmenezes/cerebro
复制代码
安装kibana查询工具
docker run -d --name kibana --net somenetwork -p 5601:5601 kibana:7.6.2
复制代码
API例子
go语言的es客户端其实有2个,一个是官方出的,另一个是olivere出口的,目前olivere比官方的还要流行,下面的例子使用olivere客户端
olivere客户端地址: github.com/olivere/ela…
olivere文档地址: olivere.github.io/elastic/
es官方客户端地址: github.com/elastic/go-…
因为我们使用的es7版本,引入客户端时指定一下
import "github.com/olivere/elastic/v7"
复制代码
全局变量
后续方法中多次使用的对象直接设置全局 变量,减少代码冗余
var (
client *elastic.Client
url = "http://es-ip:9200"
ctx = context.Background()
)
复制代码
创建客户端
SetSniff这个要关闭,因为我们用的docker启动的es,如果是rpm安装没有这个问题
写个init函数,将客户端创建和状态检测在 工程启动时执行,后面就可以专心写业务了
func init() {
client, _ = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(url))
//测试es连接状态
_, _, err := client.Ping(url).Do(ctx)
if err != nil {
log.Println("连接es失败", err)
}
//查看es当前版本
version, err := client.ElasticsearchVersion(url)
if err != nil {
log.Println("查询es版本错误", err)
}
log.Println("Elasticsearch version: ", version)
}
复制代码
创建索引和结构
es的索引mapping是创建时指定,后面无法更新的(有法子比较麻烦),最好在设计之初定好
mapping结构
该结构在官方文档中有mappings.tweet.properties 这里直接把tweet去掉。因为es7不能指定类型了
const mapping = `
{
"settings":{
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings":{
"properties":{
"user":{
"type":"keyword"
},
"message":{
"type":"text",
"store": true,
"fielddata": true
},
"image":{
"type":"keyword"
},
"created":{
"type":"date"
},
"tags":{
"type":"keyword"
},
"location":{
"type":"geo_point"
},
"suggest_field":{
"type":"completion"
}
}
}
}`
复制代码
创建索引函数
es7版本之没有就没有type,默认就是_doc,不管是现在创建方法,还是查询等都不需要引入.Type方法了,直接调用后续方法既可
func indexCreate(action string) {
//判断索引是否存在
exists, err := client.IndexExists("twitter").Do(ctx)
if err != nil {
log.Fatal(err)
}
//不存在继续创建索引
if !exists {
indexCreate, err := client.CreateIndex("twitter").BodyString(mapping).Do(ctx)
if err != nil {
log.Println("创建index失败", err)
}
if !indexCreate.Acknowledged {
log.Println("创建不成功")
}
}
}
复制代码
写入数据
在写数据之前,我们需要一个结构体
type Tweet struct {
User string `json:"user"`
Message string `json:"message"`
Retweets int `json:"retweets"`
Image string `json:"image,omitempty"`
Created time.Time `json:"created,omitempty"`
Tags []string `json:"tags,omitempty"`
Location string `json:"location,omitempty"`
Suggest *elastic.SuggestField `json:"suggest_field,omitempty"`
}
复制代码
为了测试方便,直接循环100条数据,更多也可以,不是每个字段都会写数据,视情况而定
func createTweet() {
//直接定义1条死数据
tweet := Tweet{User: "olivere", Message: "Take Five", Retweets: 0, Image: "icon", Tags: []string{"music", "book"}}
res, err := client.Index().Index("twitter").Id("1").BodyJson(tweet).Do(ctx)
if err != nil {
log.Println("写入数据失败", err)
}
fmt.Printf("Indexed tweet %s to index s%s, type %s\n", res.Id, res.Index, res.Type)
//循环100条数据
for i := 1; i < 100; i++ {
tweet := Tweet{User: "olivere", Message: "Take Five", Retweets: i, Image: "icon", Created: time.Now(), Tags: []string{"music", "book"}}
_, _ = client.Index().Index("twitter").Id(strconv.Itoa(i)).BodyJson(tweet).Do(context.Background())
}
//刷盘
_, _ = client.Flush().Index("twitter").Do(ctx)
}
复制代码
获取数据
指定数据ID,获取一条数据,数据存在Source中,将数据解码成结构体输出
func getTweet() {
res, _ := client.Get().Index("twitter").Id("3").Do(ctx)
if res.Found {
fmt.Printf("Got document %s in version %d from index %s, type %s\n", res.Id, res.Version, res.Index, res.Type)
var tweet Tweet
_ = json.Unmarshal(res.Source, &tweet)
fmt.Println("数据内容: ",tweet)
}
}
复制代码
删除数据
指定数据ID,删除一条数据
func deleteTweet() {
result, _ := client.Delete().Index("twitter").Id("3").Do(ctx)
fmt.Println(result.Result)
}
复制代码
更新数据
更新某一个字段的数据,指定ID,更新一条
func updateTweet() {
result, _ := client.Update().Index("twitter").Id("99").Doc(map[string]interface{}{"image": "image"}).Do(ctx)
fmt.Println(result.Result)
}
复制代码
一些简单的CURD例子这里就演示完成了,但是es是个搜索服务器,接下来看看搜索方面的api
搜索API
这里同样演示时,避免代码冗余,将返回值err直接不要了,生产环境记得要加上,错误处理是流程中必要一环
通用方法
因为调用搜索方法后,值都需要经常过处理,抽出来写个函数,使代码更简洁一些
该函数调用Each方法实现,Each使用反射将结构体字段信息提出来,然后使用interface转struct (item.(Struct)),避免了用字段一个一个的接
func searchValue(res *elastic.SearchResult) {
var tweet Tweet
//使用反射获取Struct字段信息
for _, item := range res.Each(reflect.TypeOf(tweet)) {
//强制将interface类型转为struct
t := item.(Tweet)
fmt.Printf("%#v\n", t)
}
}
复制代码
搜索全部
注意,虽然是搜的全部,但是res.Hits.Hits默认只返回了10条,如果想要显示更多结果,需要在调用search方法接链式操作: Search(“index”).size(100).Do
func searchAll() {
res, _ := client.Search("twitter").Do(ctx)
searchValue(res)
}
复制代码
字段搜索
搜索指定字段的某值
func searchField() {
stringQuery := elastic.NewQueryStringQuery("image:icon")
res, _ := client.Search("twitter").Query(stringQuery).Do(ctx)
searchValue(res)
}
复制代码
条件搜索
设定了两个条件进行搜索,一个是字段值image/icon,另一个retweets大于33
func searchCondition() {
boolQuery := elastic.NewBoolQuery()
boolQuery.Must(elastic.NewMatchQuery("image", "icon"))
boolQuery.Filter(elastic.NewRangeQuery("retweets").Gt(33))
res, _ := client.Search("twitter").Query(boolQuery).Do(ctx)
searchValue(res)
}
复制代码
短语搜索
短语搜索是指在一大段语中搜索单个词或多个连在一起的词
func searchPhrase() {
phraseQuery := elastic.NewMatchPhraseQuery("message", "Take")
res, _ := client.Search("twitter").Query(phraseQuery).Do(ctx)
searchValue(res)
}
复制代码
组合搜索
在生产环境常用的是组合搜索,因为用户只是输入一个或一组字,我们需要做的就是从多个字段中 找到用户所需的内容
func searchGroup() {
phraseGroup := elastic.NewMatchPhraseQuery("message", "Five")
stringGroup := elastic.NewQueryStringQuery("image:icon")
res, _ := client.Search("twitter").Query(phraseGroup).Query(stringGroup).Do(ctx)
searchValue(res)
}
复制代码
总结
以上的例子跑下来,可以感受到olivere客户端还是很强大的,基本上能满足我们日常使用。且在数据结构映射上方法也实现好了,不需要我们用字段一个一个的接
olivere官网列出已经实现的api,非常的多 传送门: github.com/olivere/ela…