ELK
ElasticSearch
特点
- 分布式搜索,分析
- 全文检索,结构化检索
- 海量数据近实时处理
核心概念
Index DataBase 索引,一类文档的集合
Type table 类型 8.0废弃
document Row 一条记录, 关系型数据的row
Shard 分片
Replica 副本
primary shard -> replica shard
安装配置
# 开启ElasticSearch服务
bin/elasticSearch
# 配置elasticsearch.yml
cluster.name: es-nx
node.name:es1
path.data: /home/es/elk/elasticsearch/data
path.logs: /home/es/elk/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.sys_call_filter: false
network.host: hadoop02
discovery.zen.ping.unicast.hosts:["hadoop02","hadoop03","hadoop04"]
# 开启服务
bin/kibana
#配置kibana
server.host: "0.0.0.0"
elasticsearch.hosts:["http://hadoop02:9200","http://hadoop03:9200","http://hadoop04:9200"]
复制代码
ElasticSearch: 9200外部通讯, 9300内部通信
Kibana: 5601
Kibana对ElasticSearch操作
-
集群健康
GET /_cat/health?v 复制代码
- green: 每个索引的primary shard和replica shard都是active状态的
- yellow: 每个索引的primary shard都是active状态, 部分replica shard不是active状态,处于不可用状态
- red: 不是索引的primary shard都是active的,部分索引有数据丢失
-
快速查看集群中的索引
GET /_cat/indices?v 复制代码
-
索引的基本操作
添加索引
PUT /test_index/test_type/1 { "name":"a" } 复制代码
查询索引
GET /test_index/test_type/1 复制代码
删除索引
DELETE /test_index/test_type/1 复制代码
覆盖更改
PUT /test_index/test_type/1 { "name":"ab" } 复制代码
部分更新
PUT /test_index/test_type/1/_update { "docs":{ "name":"aab" } } 复制代码
搜索
GET /test_index/test_type/_search GET /test_index/_search GET /_search GET /test_index1, test_index2/_search GET /t*/_search 复制代码
DSL
#query标签 ## match_all match match_phrase GET /test_index/test_type/_search { "query":{ "match_all":{} } } GET /test_index/test_type/_search { "query":{ "match":{ "address": "zx" } } } GET /test_index/test_type/_search { "query":{ "match_phrase":{ "like": "adbc asdas" } } } ## bool 多条件 ### must must_not should GET /test_index/test_type/_search { "query":{ "bool":{ "must":{ "match": { "address": "beijing" } }, "must_not":[{ "match":{ "like": "baseketball" } }], "should":[{ "match":{ "age": 19 } }], "minimum_should_match":0 } } } ## filter 过滤 GET /test_index/test_type/_search { "query":{ "bool":{ "filter":{ "range":{ "age":{ "gt":19, "lte":20 } } } } } } ##sort GET /test_index/test_type/_search { "query":{ "match_all":{} }, "sort":[{ "age": { "order": "desc" } }] } ## from size _source highlight GET /test_index/test_type/_search { "query":{ "match":{ "like": "running" } }, "highlight":{ "field":{ "like":{} } }, "from": 0, "size":10, "_source":["a","b","c"] } ##聚合分析 GET /test_index/test_type/_search { "aggs":{ "group_by_age":{ "terms":{ "field": "age" } } } } ## 搜索,分析 GET /test_index/test_type/_search { "query":{ "match":{ "address": "beijing" }, }, "aggs":{ "group_by_age":{ "terms":{ "field": "age" } } } } ### 聚合 avg sum PUT /test_index/_mapping/test_type { "properties":{ "age":{ "type":"text", "fielddata": true } } } GET /test_index/test_type/_search { "size": 0, "query":{ "bool":{ "filter":{ "range":{ "age":{ "gt":19, "lte":20 } } } } }, "aggs":{ "group_by_age":{ "terms":{ "field": "age" }, "aggs":{ "avg_salary": { "avg":{ "field": "salary" } } } } } } 复制代码
扩容
垂直扩容: 优化节点配置
水平扩容: 添加新节点
扩容极限: 3个primary shard, 1replica shard, 至多扩容到6个节点,突破的方案是num_of_replica的个数。
负载机制
ES 会自动在每个node上创建shard,实现负载均衡效果, 新增节点,也会自动分配到新增节点。
Primary Replica Shard
- primary shard是数据分片,每个index由多个primary shard组成
- replica shard 是primary shard 副本,负责容错
- primary 和自己的replica不能在同一个node
- 在动态增减节点 shard自动在nodes中负载均衡
- primary shard在创建时指定,不可变
- es采用round-robin 随机轮询的算法在主算法和副本算法中,保证查询请求均衡
put /test_index
{
"settings":{
"number_of_shards":3,
"number_of_replicas": 1
}
}
复制代码
对等式架构
- master节点管理ES的元数据, 负责索引创建, 修改,删除
- master节点并不承载所有请求,每个节点都可以接受,接受的节点可以称为协同节点,避免单点瓶颈问题。
容错机制
red: 当有节点宕机, primary shard会挂掉,此时状态为red
yellow: 此时会将宕机对应的primary shard 在其他集群的replica shard 提升为primary shard
green: 重新启动宕机节点, master会将丢失的副本数据复制一份到宕机节点, 宕机节点将宕机期间发生的修改更新到原有数据中, 所有主节点和副本节点都是active, 状态是green
并发控制机制
ES采用乐观锁
external version: 可以不使用内部提供的version版本号进行并发控制, 可以基于自己的版本号。与使用version不同的是,_version是版本一致时才能写入, external version必须写入的版本比之间的版本大才能写入成功。
put /test_index/test_type/1?version=1&version_type=external_gte
{
"name": "zser"
}
复制代码
路由原理
ES 中的index 都是分片存储的
shard = hash (routing)% number_of_primary_shards
routing的值是document的_id值, 或者也可以手动指定。
#手动指定 routing
put /test_index/test_type/1?routing=123
{
"name":"setnx"
}
#查看index分片
get /test_index/_search_shards
#查看document存储的分片
get /test_index/_search_shards?routing=ID号
复制代码
集群模式配置
#node1-node3 ES node4 kibana
# 配置es 一定不能用root用户, 其他和上面的配置一样
复制代码
quorum机制
保持数据一致性通过增删改查操作时加入consistency参数
put /test_index/test_type/1?consistency=one
{
"name": "str"
}
复制代码
consistency 可取参数有:
-
one : 只要有一个primary shard 活跃的就可以完成写入
-
all: 所有的shard都是活跃的才可以写入
-
quorum: 大部分的shard都是活跃的才可以成功
int((primary+num_of_replica)/2)+1 复制代码
int((3+2)/2) +1 =3
另外为了保证单节点可以正常运行, quorum在num_of_replica设置1 不生效.
如果分片活跃数不能满足, 会等待,默认等待时长是1分钟,之后timeout
倒排索引
存储的过程需要每条记录进行分词,倒排索引来完成.
分词器
-
_mapping
数据写入es中对数据进行指定后存储进去, ES有动态映射的功能。
put /test_index/_mapping/test_type { "properties":{ "name":{ "type": "text" }, "desc":{ "type": "text" } } } 复制代码
-
分词器
analysis是将全文转换成一系列的单词的过程, 这个过程是通过analyzer完成的。
倒排索引的过程是将文档通过analyzer分成一个个单词, 每个单词都指向这个文档的集合
Analyzer组件:
-
character filters
字符过滤器, 在分词前进行预处理,过滤html标签
-
tokenizers
分词,对文本进行分词
-
token filter
英文,大小写转换, 停用词[the a an],单复数转换,没有意义的去掉
在ES中内置了很多分词器,默认stardard, 这个分词器对英文友好, 中文分词器中比较流行的是IK分词器
GET /_analyze { "text": "中华人民共和国", "analyzer":"standard" } GET /_analyze { "text": "中华人民共和国", "analyzer":"ik_smart" } GET /_analyze { "text": "中华人民共和国", "analyzer":"ik_max_word" } 复制代码
-
java
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.14.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.14.1</version>
</dependency>
</dependencies>
复制代码
增删改查
public static void main(String[] args) throws IOException {
connect();
// indexStudent();
// getStudent();
// deleteStudent();
updateStudent();
close();
}
/**
* 更新
* @throws IOException
*/
private static void updateStudent() throws IOException {
// IndexRequest request = new IndexRequest("person", "student", "2");
// request.source("name", "bom", "age", 18);
// IndexResponse index = client.index(request, RequestOptions.DEFAULT);
// System.out.println(index);
UpdateRequest request = new UpdateRequest("person", "student", "2");
request.doc("name","pom");
UpdateResponse update = client.update(request, RequestOptions.DEFAULT);
System.out.println(update);
}
/**
* 查询
*/
private static void getStudent() throws IOException {
GetRequest request = new GetRequest("person", "student", "1");
GetResponse response = client.get(request, RequestOptions.DEFAULT);
System.out.println(response);
Map<String, Object> source = response.getSource();
for (Map.Entry<String, Object> entry:source.entrySet()) {
System.out.println(entry.getKey()+"=="+entry.getValue());
}
}
/**
* 删除
*/
private static void deleteStudent() throws IOException {
DeleteRequest request = new DeleteRequest("person", "student", "1");
DeleteResponse delete = client.delete(request, RequestOptions.DEFAULT);
System.out.println(delete);
}
/**
* 插入数据
* @throws IOException
*/
private static void indexStudent() throws IOException {
IndexRequest indexRequest = new IndexRequest("person", "student", "1");
indexRequest.source("name", "bom", "age", 18);
// client.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
// public void onResponse(IndexResponse indexResponse) {
// System.out.println(indexResponse.toString());
// }
//
// public void onFailure(Exception e) {
//
// }
// });
IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(index.toString());
IndexRequest indexRequest2 = new IndexRequest("person", "student", "2");
indexRequest2.source(XContentFactory.jsonBuilder()
.startObject()
.field("name", "TOM")
.field( "age", 12)
.endObject());
IndexResponse response2 = client.index(indexRequest2, RequestOptions.DEFAULT);
System.out.println(response2);
IndexRequest indexRequest3 = new IndexRequest("person", "student", "3");
Map<String, Object> map = new HashMap<String, Object>();
map.put("name", "JACK");
map.put("age", 20);
indexRequest3.source(map);
IndexResponse response3 = client.index(indexRequest3, RequestOptions.DEFAULT);
System.out.println(response3);
//也可以source (JSON字符串, xContentType.JSON)
}
//构建客户端
private static void connect(){
client = new RestHighLevelClient(RestClient.builder(new HttpHost(
"localhost", 9200, "http")));
}
//关闭客户端
private static void close(){
if(client != null){
try{
client.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
复制代码
搜索
/**
* 基础搜索
*/
private static void search() throws IOException {
SearchRequest request = new SearchRequest("person");
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
/**
* match_all
*/
private static void matchAll() throws IOException {
SearchRequest request = new SearchRequest("person");
request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
/**
* match
*
* @throws IOException
*/
private static void match() throws IOException {
SearchRequest request = new SearchRequest("person");
request.source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("name", "pom")));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
/**
* match_phrase
*
* @throws IOException
*/
private static void matchPhrase() throws IOException {
SearchRequest request = new SearchRequest("person");
request.source(new SearchSourceBuilder().query(QueryBuilders.matchPhraseQuery("name", "p om")));
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
/**
* 多条件
*
* @throws IOException
*/
private static void boolSearch() throws IOException {
SearchRequest request = new SearchRequest("person");
SearchSourceBuilder search = new SearchSourceBuilder();
search.query(new BoolQueryBuilder()
.must(QueryBuilders.matchQuery("name", "pom"))
.mustNot(QueryBuilders.matchQuery("age", 200))
.should(QueryBuilders.matchQuery("age", 18))
.should(QueryBuilders.matchQuery("age", 20))
.filter(QueryBuilders.rangeQuery("age").gt(17).lt(30)));
request.source(search);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
/**
* search_sort_from_size_fetchSource_highlight
*
* @throws IOException
*/
private static void search1() throws IOException {
SearchRequest request = new SearchRequest("person");
SearchSourceBuilder search = new SearchSourceBuilder();
search.query(QueryBuilders.matchQuery("name", "pom"))
.sort("age", SortOrder.ASC)
.from(0)
.size(2)
.fetchSource("age", "name")
.highlighter(new HighlightBuilder().field("age"));
request.source(search);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
}
}
/**
* 批量新增
* @throws IOException
*/
private static void bulkApi() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest("company", "employee","1")
.source("name","jack", "age", 10))
.add(new IndexRequest("company", "employee","2")
.source("name","tom", "age", 23))
.add(new IndexRequest("company", "employee","3")
.source("name","to m", "age", 23))
.add(new IndexRequest("company", "employee","4")
.source("name","t om", "age", 23))
.add(new IndexRequest("company", "employee","5")
.source("name","to m t", "age", 10));
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
BulkItemResponse[] items = response.getItems();
for (BulkItemResponse item: items) {
System.out.println(item.getResponse().getResult());
}
}
/**
* putMapping
* @throws IOException
*/
private static void putMapping() throws IOException {
PutMappingRequest request = new PutMappingRequest("company");
request.source(XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
.startObject("name")
.field("type", "text")
.field("fielddata", true)
.endObject()
.endObject()
.endObject());
AcknowledgedResponse acknowledgedResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);
System.out.println(acknowledgedResponse.isAcknowledged());
}
复制代码
聚合
/**
* 聚合
* @throws IOException
*/
private static void groupBy() throws IOException {
SearchRequest request = new SearchRequest("person");
SearchSourceBuilder search = new SearchSourceBuilder();
search.query(QueryBuilders.matchQuery("name", "pom"))
.size(0)
.aggregation(AggregationBuilders.terms("group_by_age").field("age"));
request.source(search);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Terms group_by_age = response.getAggregations().get("group_by_age");
List<? extends Terms.Bucket> buckets = group_by_age.getBuckets();
for (Terms.Bucket bucket :buckets) {
System.out.println(bucket.getKey()+"..key"+bucket.getDocCount());
}
}
/**
* 聚合
* @throws IOException
*/
private static void avgAggs() throws IOException {
SearchRequest request = new SearchRequest("person");
SearchSourceBuilder search = new SearchSourceBuilder();
search.query(QueryBuilders.rangeQuery("age").gte(1))
.size(0)
.aggregation(AggregationBuilders.terms("group_by_age").field("age")
.subAggregation(AggregationBuilders.avg("avg_age").field("age")));
request.source(search);
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
Terms group_by_age = response.getAggregations().get("group_by_age");
List<? extends Terms.Bucket> buckets = group_by_age.getBuckets();
for (Terms.Bucket bucket :buckets) {
Avg avg_age = bucket.getAggregations().get("avg_age");
System.out.println(avg_age.getValue());
}
}
复制代码
Filebeat
log_console
touch log_console.yml
filebeat.inputs:
- type: log
paths:
- /u01/log/*.log
output.console:
pretty: true
nohup ./filebeat -c log_console.yml >/dev/null 2>&1 &
复制代码
redis
touch log_redis.yml
复制代码
filebeat.inputs:
- type: log
enabled: true
paths:
- /u01/log/*.log
output.redis:
hosts: ["node1"]
password:
key: "filebeat"
db: 0
timeout: 5
复制代码
./filebeat -c log_redis.yml
复制代码
Logstash
一个logstash 需要两个必须的组件, input, output和一个可选的组件: filter。
input: 负责消费来自数据源的数据
filter: 负责根据需要修改数据
output: 负责写数据到目的地
bin/logstash -e "input{stdin{}} output{stdout{}}"
touch stdin_stdout.yml
bin/logstash -f config/stdin_stdout.yml
复制代码
input{
stdint{}
}
output{
stdout{}
}
复制代码
touch file_stdout.yml
复制代码
input{
file{
path => "/root/test/*.log"
}
}
output{
stdout{}
}
复制代码
bin/logstash -f config/file_stdout.yml
复制代码
path是必须的属性
可选属性:
start_position: 开始监控的位置 beginning 或者 end 默认位置
start_position只有在第一次监听文件时生效, 兼听过的文件会在logstash/data/plugins/file中生成已经读取的偏移量。
ignore_older: 忽略旧数据。
input{
redis{
host=> "redis1"
port=> 6379
data_type =>list
key =>"filebeat"
}
}
output{
elasticsearch{
hosts =>["node1:9200"]
index => "applogs"
}
}
复制代码
grok 插件
input{
stdin{}
}
filter{
grok{
match=>{"message" => "%{NUMBER:num} %{WORD:name}"}
}
}
output{
stdout{}
}
复制代码
input{
stdin{}
}
filter{
grok{
match=>{"message" => "(?<num>[0-9]) (?<name> [\w]+)"}
}
}
output{
stdout{}
}
复制代码
多数据源
filebeat_inputs:
- type: log
paths:
- /root/test/*.log
fields:
log_type: app
fields_under_root: true
- type: log
paths:
- /root/test1/*.log
fields:
log_type: tomcat
fields_under_root: true
output.redis:
hosts: ["node1"]
password:
key: "filebeat"
db: 0
timeout: 5
复制代码
input{
redis{
host=> "redis1"
port=> 6379
data_type =>list
key =>"filebeat"
}
}
filter{
if[log_type] = "app"{
grok{
match=>{"message" => "(?<num>[0-9]) (?<name> [\w]+)"}
}
} else if[log_type] = "tomcat"{
grok{
match=>{"message" => "(?<num>[0-9]) (?<name> [\w]+)"}
}
}
}
output{
elasticsearch{
hosts =>["node1:9200"]
index => "applogs"
}
}
复制代码