filter { if [etltype] == "blocks" { #[fields][srctype] csv { columns => [ "number", "hash", "parent_hash", "nonce", "sha3_uncles", "logs_bloom", "transactions_root", "state_root", "receipts_root", "miner", "difficulty", "total_difficulty", "size", "extra_data", "gas_limit", "gas_used", "timestamp", "transaction_count" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "contracts" { #[fields][srctype] csv { columns => [ "address", "bytecode", "function_sighashes", "is_erc20", "is_erc721" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "logs" { #[fields][srctype] csv { columns => [ "log_index", "transaction_hash", "transaction_index", "block_hash", "block_number", "address", "data", "topics" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "receipts" { #[fields][srctype] csv { columns => [ "transaction_hash", "transaction_index", "block_hash", "block_number", "cumulative_gas_used", "gas_used", "contract_address", "root", "status" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "token_transfers" { #[fields][srctype] csv { columns => [ "" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "tokens" { #[fields][srctype] csv { columns => [ "" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "transactions" { #[fields][srctype] csv { columns => [ "hash", "nonce", "block_hash", "block_number", "transaction_index", "from_address", "to_address", "value", "gas", "gas_price", "inputcontext" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } } } output { if [etltype] == "blocks" { elasticsearch { hosts => "xxx.
Use filebeat nginx module send nginx log to logstash or driect to elastick all get error!!
BUT some nginx log record can send success. That success records try to copy to other VM that have filebeat and logstash, try to send again. all get error!!
https://segmentfault.com/a/1190000002972420
通配符
? 匹配单个字符
* 匹配0到多个字符
kiba?a, el*search
? * 不能用作第一个字符,例如:?text *text
====================
正则
es支持部分正则功能,性能较差
name:/joh?n(ath[oa]n)/
====================
模糊搜索
quikc~ brwn~ foks~
~:在一个单词后面加上~启用模糊搜索,可以搜到一些拼写错误的单词
first~ 这种也能匹配到 frist
还可以设置编辑距离(整数),指定需要多少相似度
cromm~1 会匹配到 from 和 chrome
默认2,越大越接近搜索的原始值,设置为1基本能搜到80%拼写错误的单词
====================
逻辑操作
AND
OR
+:搜索结果中必须包含此项
-:不能含有此项
+apache -jakarta test aaa bbb:结果中必须存在apache,不能有jakarta,剩余部分尽量都匹配到
====================
分组
(jakarta OR apache) AND jakarta
====================
转义特殊字符
+ - = && || > < ! ( ) { } [ ] ^ " ~ * ?
all output columns with logstash
filter { if [srctype] == "etl" { #[fields][srctype] csv { columns => [ "number", "hash", "parent_hash", "nonce", "sha3_uncles", "logs_bloom", "transactions_root", "state_root", "receipts_root", "timestamp", "extra_data", "transaction_count", "gas_limit", "size", "total_difficulty", "difficulty", "miner", "block_hash", "block_number", "transaction_index", "from_address", "to_address", "value", "gas", "gas_price", "input", "address", "bytecode", "function_sighashes", "is_erc20", "is_erc721", "log_index", "transaction_hash", "data", "topics", "cumulative_gas_used", "gas_used", "contract_address", "root,status" ] separator => "," remove_field => ["message"] #autodetect_column_names => true #have problems #autogenerate_column_names => true #have problems skip_empty_columns => true skip_empty_rows => true } }
https://sueboy.blogspot.com/2018/11/elk60filebeatdocumenttype.html
filebeat.yml
- type: log paths: - /var/log/geth.log exclude_files: ['.gz$'] fields: srctype: "geth" pipleline logstah.conf
if [fields][srctype] == “geth” {
BUT fields_under_root: true - type: log paths: - /var/log/geth.log exclude_files: ['.gz$'] fields: srctype: "geth" fields_under_root: true if [srctype] == “geth” {
Filebeat + Elasticsearch + Kibana 轻量日志收集与展示系统
https://wzyboy.im/post/1111.html?utm_source=tuicool&utm_medium=referral
提到
beat -> logstash -> elk
可以
beat -> elk ingest plugs ( Elasticsearch Ingest Node )
Elasticsearch Ingest Node 是 Elasticsearch 5.0 起新增的功能。在 Ingest Node 出现之前,人们通常会在 ES 前置一个 Logstash Indexer,用于对数据进行预处理。有了 Ingest Node 之后,Logstash Indexer 的大部分功能就可以被它替代了,grok, geoip 等 Logstash 用户所熟悉的处理器,在 Ingest Node 里也有。对于数据量较小的 ES 用户来说,省掉一台 Logstash 的开销自然是令人开心的,对于数据量较大的 ES 用户来说,Ingest Node 和 Master Node, Data Node 一样也是可以分配独立节点并横向扩展的,也不用担心性能瓶颈。
目前 Ingest Node 已支持数十种处理器,其中的 script 处理器具有最大的灵活性。
与 /_template 类似,Ingest API 位于 /_ingest 下面。用户将 pipeline 定义提交之后,在 Beats 中即可指定某 pipeline 为数据预处理器。
1、filebeat /var/log/secure
2、
filter { grok { #type => "syslog" match => ["message", "%{SYSLOGBASE} Failed password for (invalid user |)%{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"] add_tag => "ssh_brute_force_attack" } grok { #type => "syslog" match => ["message", "%{SYSLOGBASE} Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"] add_tag => "ssh_sucessful_login" } geoip { source => "src_ip" target => "geoip" add_tag => [ "ssh-geoip" ] add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] add_field => [ "geoipflag", "true" ] } }