ELK

ELK

ElasticSearch

特点

  • 分布式搜索,分析
  • 全文检索,结构化检索
  • 海量数据近实时处理

核心概念

Index DataBase 索引,一类文档的集合

Type table 类型 8.0废弃

document Row 一条记录, 关系型数据的row

Shard 分片

Replica 副本

primary shard -> replica shard

安装配置

# 开启ElasticSearch服务
bin/elasticSearch

# 配置elasticsearch.yml
cluster.name: es-nx
node.name:es1
path.data: /home/es/elk/elasticsearch/data
path.logs: /home/es/elk/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.sys_call_filter: false
network.host: hadoop02
discovery.zen.ping.unicast.hosts:["hadoop02","hadoop03","hadoop04"]

# 开启服务
bin/kibana

#配置kibana
server.host: "0.0.0.0"
elasticsearch.hosts:["http://hadoop02:9200","http://hadoop03:9200","http://hadoop04:9200"]
复制代码

ElasticSearch: 9200外部通讯, 9300内部通信

Kibana: 5601

Kibana对ElasticSearch操作

  • 集群健康

    GET /_cat/health?v
    复制代码
    • green: 每个索引的primary shard和replica shard都是active状态的
    • yellow: 每个索引的primary shard都是active状态, 部分replica shard不是active状态,处于不可用状态
    • red: 不是索引的primary shard都是active的,部分索引有数据丢失
  • 快速查看集群中的索引

    GET /_cat/indices?v
    复制代码
  • 索引的基本操作

    添加索引

    PUT /test_index/test_type/1
    {
    "name":"a"
    }
    复制代码

    查询索引

    GET /test_index/test_type/1
    复制代码

    删除索引

    DELETE /test_index/test_type/1
    复制代码

    覆盖更改

    PUT /test_index/test_type/1
    {
    "name":"ab"
    }
    复制代码

    部分更新

    PUT /test_index/test_type/1/_update
    {
      "docs":{
    	"name":"aab"
       }
    }
    复制代码

    搜索

    GET /test_index/test_type/_search
    GET /test_index/_search
    GET /_search
    GET /test_index1, test_index2/_search
    GET /t*/_search
    复制代码

    DSL

    #query标签
    ## match_all match match_phrase
    GET /test_index/test_type/_search
    {
       "query":{
            "match_all":{}
       }
    }
    
    GET /test_index/test_type/_search
    {
       "query":{
            "match":{
               "address": "zx"
            }
       }
    }
    
    GET /test_index/test_type/_search
    {
       "query":{
            "match_phrase":{
            	"like": "adbc  asdas"
            }
       }
    }
    
    ## bool 多条件
    ### must must_not should
    GET /test_index/test_type/_search
    {
       "query":{
           "bool":{
              "must":{
                  "match": {
                     "address": "beijing"
                  }
              },
              "must_not":[{
                  "match":{
                      "like": "baseketball"
                  }
              }],
              "should":[{
                "match":{
                   "age": 19
                }
              }],
              "minimum_should_match":0
           }
       }
    }
    
    ## filter 过滤
    GET /test_index/test_type/_search
    {
       "query":{
          "bool":{
              "filter":{
                 "range":{
                    "age":{
                        "gt":19,
                        "lte":20
                    }
                 }
              }
          }
       }
    }
    
    ##sort 
    GET /test_index/test_type/_search
    {
       "query":{
          "match_all":{} 
       },
       "sort":[{
          "age": {
            "order": "desc"
          }
       }]
    }
    ## from size _source highlight
    GET /test_index/test_type/_search
    {
        "query":{
             "match":{
                "like": "running"
             }
        },
        "highlight":{
          "field":{
             "like":{}
          }
        },
        "from": 0,
        "size":10,
        "_source":["a","b","c"]
    }
    ##聚合分析
    GET /test_index/test_type/_search
    {
       "aggs":{
          "group_by_age":{
              "terms":{
                 "field": "age"
              }
          }
       }
    }
    
    ## 搜索,分析
    GET /test_index/test_type/_search
    {
       "query":{
           "match":{
              "address": "beijing"
           },
       },
       "aggs":{
          "group_by_age":{
              "terms":{
                 "field": "age"
              }
          }
       }
    }
    
    ### 聚合 avg  sum
    PUT /test_index/_mapping/test_type
    {
       "properties":{
          "age":{
              "type":"text",
              "fielddata": true
          }
       }
    }
    
    GET /test_index/test_type/_search
    {
       "size": 0,
       "query":{
          "bool":{
              "filter":{
                 "range":{
                    "age":{
                        "gt":19,
                        "lte":20
                    }
                 }
              }
          }
       }, 
       "aggs":{
          "group_by_age":{
              "terms":{
                 "field": "age"
              }, 
              "aggs":{
                  "avg_salary": {
                      "avg":{
                         "field": "salary"
                      }
                  }
              }
          }
       }
    }
    复制代码

扩容

垂直扩容: 优化节点配置

水平扩容: 添加新节点

扩容极限: 3个primary shard, 1replica shard, 至多扩容到6个节点,突破的方案是num_of_replica的个数。

负载机制

ES 会自动在每个node上创建shard,实现负载均衡效果, 新增节点,也会自动分配到新增节点。

Primary Replica Shard

  • primary shard是数据分片,每个index由多个primary shard组成
  • replica shard 是primary shard 副本,负责容错
  • primary 和自己的replica不能在同一个node
  • 在动态增减节点 shard自动在nodes中负载均衡
  • primary shard在创建时指定,不可变
  • es采用round-robin 随机轮询的算法在主算法和副本算法中,保证查询请求均衡
put /test_index
{
   "settings":{
        "number_of_shards":3,
        "number_of_replicas": 1
   }
}
复制代码

对等式架构

  • master节点管理ES的元数据, 负责索引创建, 修改,删除
  • master节点并不承载所有请求,每个节点都可以接受,接受的节点可以称为协同节点,避免单点瓶颈问题。

容错机制

red: 当有节点宕机, primary shard会挂掉,此时状态为red

yellow: 此时会将宕机对应的primary shard 在其他集群的replica shard 提升为primary shard

green: 重新启动宕机节点, master会将丢失的副本数据复制一份到宕机节点, 宕机节点将宕机期间发生的修改更新到原有数据中, 所有主节点和副本节点都是active, 状态是green

并发控制机制

ES采用乐观锁

external version: 可以不使用内部提供的version版本号进行并发控制, 可以基于自己的版本号。与使用version不同的是,_version是版本一致时才能写入, external version必须写入的版本比之间的版本大才能写入成功。

put /test_index/test_type/1?version=1&version_type=external_gte
{
   "name": "zser"
}
复制代码

路由原理

ES 中的index 都是分片存储的

shard = hash (routing)% number_of_primary_shards

routing的值是document的_id值, 或者也可以手动指定。

#手动指定 routing
put /test_index/test_type/1?routing=123
{
  "name":"setnx"
}

#查看index分片
get /test_index/_search_shards
#查看document存储的分片
get /test_index/_search_shards?routing=ID号
复制代码

集群模式配置

#node1-node3 ES  node4 kibana
# 配置es 一定不能用root用户, 其他和上面的配置一样
复制代码

quorum机制

保持数据一致性通过增删改查操作时加入consistency参数

put /test_index/test_type/1?consistency=one
{
   "name": "str"
}
复制代码

consistency 可取参数有:

  • one : 只要有一个primary shard 活跃的就可以完成写入

  • all: 所有的shard都是活跃的才可以写入

  • quorum: 大部分的shard都是活跃的才可以成功

    int((primary+num_of_replica)/2)+1
    复制代码

    int((3+2)/2) +1 =3

    另外为了保证单节点可以正常运行, quorum在num_of_replica设置1 不生效.

    如果分片活跃数不能满足, 会等待,默认等待时长是1分钟,之后timeout

倒排索引

存储的过程需要每条记录进行分词,倒排索引来完成.

分词器

  • _mapping

    数据写入es中对数据进行指定后存储进去, ES有动态映射的功能。

    put /test_index/_mapping/test_type
    {
        "properties":{
           "name":{
               "type": "text"
           },
           "desc":{
                "type": "text"
           }
        }
    }
    复制代码
  • 分词器

    analysis是将全文转换成一系列的单词的过程, 这个过程是通过analyzer完成的。

    倒排索引的过程是将文档通过analyzer分成一个个单词, 每个单词都指向这个文档的集合

    Analyzer组件:

    • character filters

      字符过滤器, 在分词前进行预处理,过滤html标签

    • tokenizers

      分词,对文本进行分词

    • token filter

      英文,大小写转换, 停用词[the a an],单复数转换,没有意义的去掉

    在ES中内置了很多分词器,默认stardard, 这个分词器对英文友好, 中文分词器中比较流行的是IK分词器

    github.com/medcl/elast…

    GET /_analyze
    {
      "text": "中华人民共和国",
      "analyzer":"standard"
    }
    
    GET /_analyze
    {
      "text": "中华人民共和国",
      "analyzer":"ik_smart"
    }
    
    GET /_analyze
    {
      "text": "中华人民共和国",
      "analyzer":"ik_max_word"
    }
    复制代码

java

<dependencies>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.14.1</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.14.1</version>
    </dependency>
</dependencies>
复制代码

增删改查

public static void main(String[] args) throws IOException {
    connect();
    //        indexStudent();
    //        getStudent();
    //        deleteStudent();
    updateStudent();
    close();
}

   /**
     * 更新
     * @throws IOException
     */
    private static void updateStudent() throws IOException {
//        IndexRequest request = new IndexRequest("person", "student", "2");
//        request.source("name", "bom", "age", 18);
//        IndexResponse index = client.index(request, RequestOptions.DEFAULT);
//        System.out.println(index);
        UpdateRequest request = new UpdateRequest("person", "student", "2");
        request.doc("name","pom");
        UpdateResponse update = client.update(request, RequestOptions.DEFAULT);
        System.out.println(update);
    }

    /**
     * 查询
     */
    private static void getStudent() throws IOException {
        GetRequest request = new GetRequest("person", "student", "1");
        GetResponse response = client.get(request, RequestOptions.DEFAULT);
        System.out.println(response);
        Map<String, Object> source = response.getSource();
        for (Map.Entry<String, Object> entry:source.entrySet()) {
            System.out.println(entry.getKey()+"=="+entry.getValue());
        }
    }


    /**
     * 删除
     */
    private static void deleteStudent() throws IOException {
        DeleteRequest request = new DeleteRequest("person", "student", "1");
        DeleteResponse delete = client.delete(request, RequestOptions.DEFAULT);
        System.out.println(delete);
    }

    /**
     * 插入数据
     * @throws IOException
     */
    private static void indexStudent() throws IOException {
        IndexRequest indexRequest = new IndexRequest("person", "student", "1");
        indexRequest.source("name", "bom", "age", 18);
//        client.indexAsync(indexRequest, RequestOptions.DEFAULT, new ActionListener<IndexResponse>() {
//            public void onResponse(IndexResponse indexResponse) {
//                System.out.println(indexResponse.toString());
//            }
//
//            public void onFailure(Exception e) {
//
//            }
//        });
        IndexResponse index = client.index(indexRequest, RequestOptions.DEFAULT);
        System.out.println(index.toString());

        IndexRequest indexRequest2 = new IndexRequest("person", "student", "2");
        indexRequest2.source(XContentFactory.jsonBuilder()
                .startObject()
                .field("name", "TOM")
                .field( "age", 12)
                .endObject());
        IndexResponse response2 = client.index(indexRequest2, RequestOptions.DEFAULT);
        System.out.println(response2);

        IndexRequest indexRequest3 = new IndexRequest("person", "student", "3");
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("name", "JACK");
        map.put("age", 20);
        indexRequest3.source(map);
        IndexResponse response3 = client.index(indexRequest3, RequestOptions.DEFAULT);
        System.out.println(response3);

        //也可以source (JSON字符串, xContentType.JSON)
    }


    //构建客户端
    private static void connect(){
        client = new RestHighLevelClient(RestClient.builder(new HttpHost(
                "localhost", 9200, "http")));
    }

    //关闭客户端
    private static void close(){
        if(client != null){
            try{
                client.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
复制代码

搜索

    /**
     * 基础搜索
     */
    private static void search() throws IOException {
        SearchRequest request = new SearchRequest("person");
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * match_all
     */
    private static void matchAll() throws IOException {
        SearchRequest request = new SearchRequest("person");
        request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * match
     *
     * @throws IOException
     */
    private static void match() throws IOException {
        SearchRequest request = new SearchRequest("person");
        request.source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("name", "pom")));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * match_phrase
     *
     * @throws IOException
     */
    private static void matchPhrase() throws IOException {
        SearchRequest request = new SearchRequest("person");
        request.source(new SearchSourceBuilder().query(QueryBuilders.matchPhraseQuery("name", "p  om")));
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 多条件
     *
     * @throws IOException
     */
    private static void boolSearch() throws IOException {
        SearchRequest request = new SearchRequest("person");
        SearchSourceBuilder search = new SearchSourceBuilder();
        search.query(new BoolQueryBuilder()
                .must(QueryBuilders.matchQuery("name", "pom"))
                .mustNot(QueryBuilders.matchQuery("age", 200))
                .should(QueryBuilders.matchQuery("age", 18))
                .should(QueryBuilders.matchQuery("age", 20))
                .filter(QueryBuilders.rangeQuery("age").gt(17).lt(30)));
        request.source(search);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * search_sort_from_size_fetchSource_highlight
     *
     * @throws IOException
     */
    private static void search1() throws IOException {
        SearchRequest request = new SearchRequest("person");
        SearchSourceBuilder search = new SearchSourceBuilder();
        search.query(QueryBuilders.matchQuery("name", "pom"))
                .sort("age", SortOrder.ASC)
                .from(0)
                .size(2)
                .fetchSource("age", "name")
                .highlighter(new HighlightBuilder().field("age"));
        request.source(search);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            System.out.println(hit.getSourceAsString());
        }
    }

    /**
     * 批量新增
     * @throws IOException
     */
    private static void bulkApi() throws IOException {
        BulkRequest request = new BulkRequest();
        request.add(new IndexRequest("company", "employee","1")
                .source("name","jack", "age", 10))
                .add(new IndexRequest("company", "employee","2")
                        .source("name","tom", "age", 23))
                .add(new IndexRequest("company", "employee","3")
                        .source("name","to m", "age", 23))
                .add(new IndexRequest("company", "employee","4")
                        .source("name","t om", "age", 23))
                .add(new IndexRequest("company", "employee","5")
                        .source("name","to m t", "age", 10));
        BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
        BulkItemResponse[] items = response.getItems();
        for (BulkItemResponse item: items) {
            System.out.println(item.getResponse().getResult());
        }
    }

    /**
     * putMapping
     * @throws IOException
     */
    private  static void putMapping() throws IOException {
        PutMappingRequest request = new PutMappingRequest("company");
        request.source(XContentFactory.jsonBuilder()
                .startObject()
                .startObject("properties")
                .startObject("name")
                .field("type", "text")
                .field("fielddata", true)
                .endObject()
                .endObject()
                .endObject());
        AcknowledgedResponse acknowledgedResponse = client.indices().putMapping(request, RequestOptions.DEFAULT);
        System.out.println(acknowledgedResponse.isAcknowledged());
    }
复制代码

聚合

    /**
     * 聚合
     * @throws IOException
     */
    private static void groupBy() throws IOException {
        SearchRequest request = new SearchRequest("person");
        SearchSourceBuilder search = new SearchSourceBuilder();
        search.query(QueryBuilders.matchQuery("name", "pom"))
                .size(0)
                .aggregation(AggregationBuilders.terms("group_by_age").field("age"));
        request.source(search);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Terms group_by_age = response.getAggregations().get("group_by_age");
        List<? extends Terms.Bucket> buckets = group_by_age.getBuckets();
        for (Terms.Bucket bucket :buckets) {
            System.out.println(bucket.getKey()+"..key"+bucket.getDocCount());
        }
    }



    /**
     * 聚合
     * @throws IOException
     */
    private static void avgAggs() throws IOException {
        SearchRequest request = new SearchRequest("person");
        SearchSourceBuilder search = new SearchSourceBuilder();
        search.query(QueryBuilders.rangeQuery("age").gte(1))
                .size(0)
                .aggregation(AggregationBuilders.terms("group_by_age").field("age")
                        .subAggregation(AggregationBuilders.avg("avg_age").field("age")));
        request.source(search);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        Terms group_by_age = response.getAggregations().get("group_by_age");
        List<? extends Terms.Bucket> buckets = group_by_age.getBuckets();
        for (Terms.Bucket bucket :buckets) {
            Avg avg_age = bucket.getAggregations().get("avg_age");
            System.out.println(avg_age.getValue());
        }
    }
复制代码

Filebeat

log_console

www.elastic.co/cn

touch log_console.yml

filebeat.inputs:
- type: log
  paths:
    - /u01/log/*.log
output.console:
  pretty: true

nohup ./filebeat -c log_console.yml >/dev/null 2>&1 &
复制代码

redis

touch log_redis.yml
复制代码
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /u01/log/*.log
output.redis:
  hosts: ["node1"]
  password: 
  key: "filebeat"
  db: 0
  timeout: 5
复制代码
./filebeat -c log_redis.yml
复制代码

Logstash

一个logstash 需要两个必须的组件, input, output和一个可选的组件: filter。

input: 负责消费来自数据源的数据

filter: 负责根据需要修改数据

output: 负责写数据到目的地

bin/logstash -e "input{stdin{}} output{stdout{}}"
touch stdin_stdout.yml
bin/logstash -f config/stdin_stdout.yml
复制代码
input{
   stdint{}
}
output{
   stdout{}
}
复制代码
touch file_stdout.yml
复制代码
input{
  file{
  	path => "/root/test/*.log"
  }
}
output{
  stdout{}
}
复制代码
bin/logstash -f config/file_stdout.yml
复制代码

path是必须的属性

可选属性:

start_position: 开始监控的位置 beginning 或者 end 默认位置

start_position只有在第一次监听文件时生效, 兼听过的文件会在logstash/data/plugins/file中生成已经读取的偏移量。

ignore_older: 忽略旧数据。

input{
   redis{
      host=> "redis1"
      port=> 6379
      data_type =>list
      key =>"filebeat"
   }
}
output{
   elasticsearch{
      hosts =>["node1:9200"]
      index => "applogs"
   }
}
复制代码

grok 插件

input{
   stdin{}
}
filter{
   grok{
      match=>{"message" => "%{NUMBER:num} %{WORD:name}"}
   }
}
output{
  stdout{}
}
复制代码
input{
   stdin{}
}
filter{
   grok{
      match=>{"message" => "(?<num>[0-9]) (?<name> [\w]+)"}
   }
}
output{
  stdout{}
}
复制代码

多数据源

filebeat_inputs:
- type: log
  paths:
     - /root/test/*.log
  fields:
     log_type: app
  fields_under_root: true
- type: log
  paths:
     - /root/test1/*.log
  fields:
     log_type: tomcat
  fields_under_root: true

output.redis:
  hosts: ["node1"]
  password: 
  key: "filebeat"
  db: 0
  timeout: 5
复制代码
input{
   redis{
      host=> "redis1"
      port=> 6379
      data_type =>list
      key =>"filebeat"
   }
}
filter{
   if[log_type] = "app"{
        grok{
          match=>{"message" => "(?<num>[0-9]) (?<name> [\w]+)"}
       }
   } else if[log_type] = "tomcat"{
        grok{
          match=>{"message" => "(?<num>[0-9]) (?<name> [\w]+)"}
       }
   }
  
}
output{
   elasticsearch{
      hosts =>["node1:9200"]
      index => "applogs"
   }
}
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享