迁移到es使用上遇到的问题

docker 配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  elasticsearch:
    image: xxxx
    network_mode: host
    privileged: true
    container_name: elasticsearch
    environment:
      ES_JAVA_OPTS: "-Xmx10g -Xms10g"
      discovery.type: single-node
      bootstrap.memory_lock: "true"
    volumes:
      - /etc/localtime:/etc/localtime
      - ./elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
      - ./data/es:/usr/share/elasticsearch/data
    ulimits:
      memlock:
        soft: -1
        hard: -1
    restart: on-failure

data目录权限问题

1
AccessDeniedException[/usr/share/elasticsearch/data/nodes]

解决 必须777 设置了privileged: true并没有用证明官方docker傻逼

1
mkdir -p data/es/nodes && chmod -R 777 data/es

volume data迁移

不要用官方volume配置,数据库容量问题,目录定制,迁移都比较方便

1
2
3
4
5
# 查看volume名字
docker volume ls

# volume 宿主数据目录 数据可以直接搬到新映射目录
/var/lib/docker/volumes/VOLUME_NAME/_data

es7.9注意事项

sql to dsl

sql-es

index 配置

elasticsearch.yml 不能有index配置

1
elasticsearch    | "stacktrace": ["org.elasticsearch.bootstrap.StartupException: java.lang.IllegalArgumentException: node settings must not contain any index level settings"

只能通过curl配置,这里面就有个问题,用docker启动鬼知道他什么时候真启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash 

set -e
# 监控es是否真的启动
while [[ "$(curl -s -o /dev/null -w '%{http_code}' 127.0.0.1:9200)" != "200" ]]; do sleep 1; done

curl -X PUT '127.0.0.1:9200/_settings' -d '{
    "index.merge.scheduler.max_thread_count" : 1
}'

curl -X PUT "127.0.0.1:9200/alert" -H 'Content-Type: application/json' -d'
xxxx
'
curl -X PUT "127.0.0.1:9200/alert/_settings" -H 'Content-Type: application/json' -d'
{"index":{"refresh_interval":"3s"}}
'

数据类型

  • es整形支持到long int64,之前有些字段都是uint64的字段。

  • string 类型字段查询结果有些值查询不出来,甚至保存,字符串将默认被同时映射成textkeyword类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
    "foo": {
        "type": "text",

        "fields": {
            "keyword": {
                "type": "keyword",
                "ignore_above": 256

            }

        }
    }
}

Text:会分词,然后进行索引 支持模糊、精确查询 不支持聚合 (match*)

keyword:不进行分词,直接索引 支持模糊、精确查询 支持聚合 (term agg)

ignore_above

ingore_above keyword

字符串长度>ignore_above的值,不会被索引(通过这个值查不到,聚合不了),但是可以保存。

对于长字符串,可以使用字符串数组(方便分词,ignore_above的值所用到数组的每个元素,而不是数组长度)

dynamic mapping 默认ignore_above: 256

keyword类型的最大支持的长度为32766个UTF-8类型的字符

## size

不设置size默认返回头10条数据

from=0 size=10 && from * size = 1w

返回值数量超过10000条

设置max_result_window

1
2
3
curl -X PUT '127.0.0.1:9200/_settings' -d '{
 "index" : { "max_result_window" : value}
}'

之后会遇到问题hits.total.value不准确

1
2
3
4
5
   "hits" : {
          "total" : {
            "value" : 1000,
           "relation" : "eq"
         },

relation= eq 总数是准的,突破max_result_window或者agg的时候relation会变成gte然后一直等于10000

如果必须精确知道命中的文档数量,track_total_hits=true,(和query同级)如果命中的文档数量很大,会影响查询性能,而且会消耗大量的内存,甚至存在内存异常的风险。

agg size

使用agg的时候,使用hit的size=0避免不必要的返回,一般集中解析aggregations部分

agg 返回所有可以使用默认10000 max_result_window

多重agg 使用默认100 max_inner_result_window

py

使用elasticsearch_dsl

连接and or

1
2
3
4
5
6
7
8
9
10
    def _risk_machine_by_time(cls, start_time: int = -1) -> typing.List:
        q = [Script(**esutil.script("doc['update_time'].value >= doc['ignore_time'].value"))]

        if start_time > 0:
            or_field = ['risk_file_time', 'access_rule_time', 'attack_event_time', 'threat_access_time']
            or_cond = Bool()
            for f in or_field:
                or_cond |= Range(**{f: {'gte': start_time}})
            q.append(or_cond)
        return q

count

count(*) get result count only

1
curl http://server:9200/index/_search?filter_path=hits.total
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def count() -> typing.Dict:
    """
    只获取总数
    Search(
            using=ElasticSearch(...),
            index='my_index',
            doc_type='document',
        ).params(
            filter_path=['hits.total']
        )
    :return:
    """
    d = {
        'filter_path': ['hits.total']
    }
    return d
  
  s = cls.search().params(**esutil.count())
  s.query = Bool(must=cls._risk_machine_by_time(start_time))
  resp = s.execute().to_dict()
  if resp:
    count = resp['hits']['total']['value']
    return count

count(distinct)

cardinality

1
2
3
4
5
6
s = cls.search().filter(
    Term(level=AlertLevel.HIGH) & Range(first_time=esutil.es_range(gte=start_ts))
).extra(size=0)
s.aggs.bucket('count', aggs.Cardinality(field='id.keyword'))
resp = s.execute()
return resp.aggregations.count.value

select

select a,b , a > c

script_fields

1
2
3
4
5
6
7
8
9
10
11
s = cls.search().extra(size=1) \
    .script_fields(ignore=esutil.script("doc['ignore_time'].value > doc['update_time'].value")) \
    .source(
    [
        'risk_file_time',
        'access_rule_time',
        'attack_event_time',
        'threat_access_time',
    ]
)
s.query = Bool(must=[Term(inner_machine_id=machine['id'])])

select xx where order by limit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
s = cls.search().extra(size=5).filter(
            Term(**{'alert_id.keyword': alert_id})
        ).sort(
            esutil.es_sort('count', True)
        ).source(['object', 'count'])


cond = Range(first_time=esutil.es_range(gte=start_time, lte=end_time))
if object:
    cond &= Term(**{"object.keyword": object})
if type1s:
    cond &= Terms(type1=type1s)
if levels:
    cond &= Terms(level=levels)

s = cls.search().extra(
    **esutil.extra_page(page, page_size)
).filter(
    cond
).source(
    ['id', 'type1', 'type2', 'level', 'object', 'count1', 'count2', 'first_time']
).sort(esutil.es_sort('first_time', True))

resp = s.execute()
count = resp.hits.total.value
results = []
for hit in resp.hits:
  results.append(hit.to_dict())

agg

普通group by

1
select count(distinct(a)) from table group by xx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
s = cls.search().extra(size=0).filter(
    Range(date=esutil.es_range(gte=start_ts, lte=end_ts))
)
s.aggs.bucket('kid', 'terms', field="kid", size=esutil.MAX_SIZE). \
    metric('signature_id', 'cardinality', field='signature_id')
resp = s.execute()

result = []
for k in resp.aggregations.kid:
    result.append({
        'kid': int(k.key),
        'count': k.signature_id.value,
    })

return result





# 高级 agg function
s = cls.search().extra(size=0).filter(
    Term(kid=kid) & Range(date=esutil.es_range(gte=start_ts, lte=end_ts))
)
s.aggs.bucket(
    'categories', aggs.Terms(size=esutil.MAX_SIZE, field="category.keyword")
).metric(
    'ips', aggs.Terms(size=esutil.MAX_SIZE, field="dst_ips.keyword")
).metric('count', aggs.Sum(field="count"))

Composite-Aggregation

agg 分页

pagination + sorting + search+agg

1
select a, max(b) as b ,min(c) as c ,sum(d) as d from table group by xxx order by c limit page, 

bucket_sort可以排序分页,但只对父agg有效,排序只对agg过的字段有效 example:ordery by a会报错

top_hits根据agg结果进行二次聚合等等,这里只选择想要的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 查询
s = cls.search().extra(size=0)
s.query = q
order_select = esutil.es_sort('count')
s.aggs.bucket(
    'signature_id', 'terms', size=esutil.MAX_SIZE, field="signature_id"
).metric(
    'count', 'sum', field='count'
).metric(
    'first_time', 'min', field='first_time'
).metric(
    'last_time', 'max', field='last_time'
).metric(
    'src_ips', 'terms', field='src_ips.keyword'
).metric(
    'dst_ips', 'terms', field='dst_ips.keyword'
).metric(
    'source', 'top_hits', _source=['kid', 'level', 'signature', 'category'],
    size=1,
).metric(
    'bucket_sort', 'bucket_sort', sort=[order_select] if order_select else [], **esutil.bucket_page(page, page_size)
)

resp = s.execute()

result = []
for k in resp.aggregations.signature_id:
    r = k.source.hits.hits[0]._source.to_dict()
    r['signature_id'] = k.key
    r['last_time'] = int(k.last_time.value)
    r['first_time'] = int(k.first_time.value)
    r['count'] = int(k.count.value)
    r['src_ips'] = ",".join([b.key for b in k.src_ips.buckets])
    r['dst_ips'] = ",".join([b.key for b in k.dst_ips.buckets])
    result.append(r)

random fucntion

1
2
3
4
5
s = cls.search().extra(size=top).source(['inner_machine_id'])
q = cls._risk_machine_by_time()
q.append(FunctionScore(functions=[{'random_score': {'seed': timeutil.nowts()}}]))
s.query = Bool(must=q)
resp = s.execute()

update by query

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseDocument(Document):
    @classmethod
    def update_by_query(cls) -> UpdateByQuery:
        return UpdateByQuery().index(
            cls.Index.name
        )

def update_kw(**kwargs) -> typing.Dict:
    """
    返回拼接后字符串
    :param kwargs:
    :return: ctx._source['ignore_time']= 'new_name'; ctx._source['item_price']= 10000;
    """
    s = ""
    for k, v in kwargs.items():
        if isinstance(v, str):
            s += f"ctx._source['{k}']= '{v}';"
        else:
            s += f"ctx._source['{k}']= {v};"
    return {'source': s}


udp = cls.update_by_query().filter(
            Term(inner_machine_id=inner_machine_id)
        ).script(**esutil.update_kw(ignore_time=ignore_time))
udp.execute()

# 自增 "source":"ctx._source['confirmed_count']++"
# script(**{'source': "ctx._source.confirmed_count+=1"})

delete by query

1
cls.search().filter(Terms(inner_machine_id=inner_machine_id)).delete()

go

使用 github.com/olivere/elastic/v7

bulk

1
2
3
4
5
6
7
8
9
bulk := p.DB.Bulk()
// insert
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(AlertTable).Id(alert.ID).Doc(alert))
bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(AlertDetailTable).Id(alertDetail.ID).Doc(alertDetail))
// update
script := elastic.NewScript(fmt.Sprintf("ctx._source.count1=ctx._source.count1+%d;ctx._source.count2+=1;ctx._source.last_time=%d;", count, now))
					bulk = bulk.Add(elastic.NewBulkUpdateRequest().Index(AlertTable).Id(existAlert.ID).Script(script))

_, err = bulk.Do(context.Background())

解析单个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
query := elastic.NewBoolQuery().Must(
			elastic.NewTermQuery("hash.keyword", alert.Hash),
			elastic.NewTermQuery("type1", alert.Type1),
			elastic.NewTermQuery("type2.keyword", alert.Type2),
			elastic.NewTermQuery("object.keyword", alert.Object),
			elastic.NewRangeQuery("first_time").Gt(anHourAgo),
		)
existAlertResult, err := p.DB.Search().Index(AlertTable).Query(query).
Size(1).Sort("first_time", false).Do(context.Background())
if err != nil && !NoSuchIndexError(err) {
  log.Error(err)
  return nil, err
}

if existAlertResult != nil && len(existAlertResult.Hits.Hits) == 1 {
  target := existAlertResult.Hits.Hits[0]
  err = json.Unmarshal(target.Source, &existAlert)
  if err != nil {
    log.Error(err)
    return nil, err
  }
  existAlert.ID = target.Id
}

select source 解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	query := elastic.NewBoolQuery().Must(
		elastic.NewTermQuery("date", events[0].Date),
	)
	source := elastic.NewFetchSourceContext(true).Include("signature_id")

	existAttackEventResults, err := p.DB.Search().Index(AttackEventOverallTable).Query(query).
		Size(MaxSize).FetchSourceContext(source).Do(context.Background())
	if err != nil && !NoSuchIndexError(err) {
		log.Error(err)
		return
	}
	idMap := make(map[uint32]string)
	if existAttackEventResults != nil && len(existAttackEventResults.Hits.Hits) > 0 {
		for _, attackEvent := range existAttackEventResults.Hits.Hits {
			var id *ID
			err = json.Unmarshal(attackEvent.Source, &id)
			if err != nil {
				log.Error(err)
				return err
			}
			idMap[id.SID] = attackEvent.Id
		}
	}

agg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
	query := elastic.NewBoolQuery().Must(
		elastic.NewRangeQuery("timestamp").Gte(startTime),
	)
	agg := elastic.NewTermsAggregation().Field("threat_target.keyword").Size(MaxSize)
	searchResult, err := p.DB.Search().Index(ThreatTrafficTable).Query(query).Size(0).Aggregation("threat_target", agg).Do(context.Background())
	if err != nil && !NoSuchIndexError(err) {
		log.Error(err)
	}

	threatTargetAgg, found := searchResult.Aggregations.Terms("threat_target")
	if found && threatTargetAgg != nil {
		for _, t := range threatTargetAgg.Buckets {
			existTargets = append(existTargets, t.Key.(string))
		}
	}



	source := elastic.NewFetchSourceContext(true).Include("dst_ip", "src_ip", "src_port", "dst_port", "timestamp")
	queryAgg := elastic.NewTermsAggregation().Field("threat_target.keyword").Size(999).
		SubAggregation("source", elastic.NewTopHitsAggregation().FetchSourceContext(source).Size(100))

	builder := p.DB.Search().Index(ThreatTrafficTable).Size(0)
	builder = builder.Aggregation("threat_target", queryAgg)
	searchResult, err := builder.Do(context.Background())
	if err != nil && !NoSuchIndexError(err) {
		log.Error(err)
		return
	}
	threatTrafficMap = map[string][]*model.ThreatTraffic{}
	agg := searchResult.Aggregations
	if targetAgg, found := agg.Terms("threat_target"); found {
		for _, target := range targetAgg.Buckets {
			if sourceAgg, found := target.TopHits("source"); found {
				//for _,rawThreatTraffic := sourceAgg.
				for _, rawThreatTraffic := range sourceAgg.Hits.Hits {
					var threatTraffic model.ThreatTraffic
					err = json.Unmarshal(rawThreatTraffic.Source, &threatTraffic)
					threatTraffic.ThreatTarget = target.Key.(string)
					threatTrafficMap[threatTraffic.ThreatTarget] = append(threatTrafficMap[threatTraffic.ThreatTarget], &threatTraffic)
				}
			}
		}
	}

delete by query

1
2
3
4
5
6
7
8
9
10
11
12
query = elastic.NewBoolQuery().Must(
		elastic.NewRangeQuery("create_time").Gte(startTime).Lte(endTime),
	)
	_, err = p.DB.DeleteByQuery().Index(AlertDetailTable).Query(query).Do(context.Background())
	if err != nil {
		log.Error(err)
		return
	}


// batch size parallelize
p.DB.DeleteByQuery().Index(EDRRelationship).Query(query).ScrollSize(3000).Slices("auto").ProceedOnVersionConflict().Do(context.Background())

insert struct model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
_, err := p.DB.Index().Index(RiskInnerMachineTable).BodyJson(newRiskInnerMachine).Do(context.Background())
if err != nil {
 return err
}

func (p *DB) NewInnerMachine(bulk *elastic.BulkService, m *model.ESInnerMachine, now time.Time) (uuid string, err error) {
	uuid = GenESID()
	m.ID = uuid
	m.CreatedTime = uint32(now.Unix())
	m.CreatedDate = now.Format("2006-01-02")
	m.OnlineTime = uint32(now.Unix())
	bulk = bulk.Add(elastic.NewBulkIndexRequest().Index(InnerMachineTable).Id(m.ID).Doc(m))

	return
}

update by query

1
2
3
4
5
6
7
8
9
			_, err := p.DB.Update().Index(RiskInnerMachineTable).Id(
				oldRiskMachine.ID).Doc(map[string]interface{}{
				"threat_access":      newRiskInnerMachine.ThreatAccess,
				"access_rule_time":   newRiskInnerMachine.AccessRuleTime,
				"update_time":        now,
			}).Do(context.Background())
			if err != nil {
				return err
			}

update by script

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
		updateScript := elastic.NewScript(strings.Join([]string{
				"ctx._source.potential_status_count += params.potential_status_count",
				"ctx._source.potential_status_update_time = (long)Math.max(ctx._source.potential_status_update_time, params.potential_status_update_time)",
				......
				"ctx._source.ip_range_id = params.ip_range_id",
			}, ";"))
			var params map[string]interface{}
			paramsData, err := json.Marshal(riskMachine)
			if err != nil {
				return err
			}
			err = json.Unmarshal(paramsData, &params)
			if err != nil {
				return err
			}
			updateScript = updateScript.Params(params)
bulk = bulk.Add(elastic.NewBulkUpdateRequest().Index(RiskInnerMachineTable).Id(oldRiskMachine.ID).Script(updateScript))


// update query by script  fix conflict
	query := elastic.NewBoolQuery().Filter(
		elastic.NewTermsQuery("alert_id", termsHelper(alertIDs)...),
	)
	script := elastic.NewScript(
		fmt.Sprintf("ctx._source.status=params.status"),
	).Params(map[string]interface{}{
		"status": model.ThreatActionStatusProcessed,
	})
	_, err = p.DB.UpdateByQuery(ThreatActionTable).Query(query).ProceedOnVersionConflict().Script(script).Do(context.Background())

// update query by script  fix conflict
query := elastic.NewBoolQuery().Filter(
		elastic.NewTermQuery("ip", ip),
		elastic.NewTermQuery("tracer_installed", false),
	)

	script := elastic.NewScript(strings.Join([]string{
		"ctx._source.host_name=params.host_name",
	}, ";")).Params(map[string]interface{}{
		"host_name": hostName,
	})

	_, err = p.DB.UpdateByQuery().Index(InnerMachineTable).Query(query).Script(script).ProceedOnVersionConflict().Do(context.Background())
	if err != nil {
		log.Error(err)
		return
	}

scroll pagesize

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
	ctx := context.Background()
	scroll := p.DB.Scroll(RiskInnerMachineTable).Size(scrollPageSize)
	for {
		results, err := scroll.Do(ctx)
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}

		for _, hit := range results.Hits.Hits {
			var riskMachine model.RiskInnerMachine
			err := json.Unmarshal(hit.Source, &riskMachine)
			if err != nil {
				continue
			}
			riskMachine.ID = hit.Id

			err = p.updateRiskMachineStatusViaThreatActions(riskMachine)
			if err != nil {
				continue
			}
		}
	}