filbeat to kafka
https://www.elastic.co/guide/en/beats/filebeat/current/kafka-output.html
sink-connector to mysql
https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html
filter { if [etltype] == "blocks" { #[fields][srctype] csv { columns => [ "number", "hash", "parent_hash", "nonce", "sha3_uncles", "logs_bloom", "transactions_root", "state_root", "receipts_root", "miner", "difficulty", "total_difficulty", "size", "extra_data", "gas_limit", "gas_used", "timestamp", "transaction_count" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "contracts" { #[fields][srctype] csv { columns => [ "address", "bytecode", "function_sighashes", "is_erc20", "is_erc721" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "logs" { #[fields][srctype] csv { columns => [ "log_index", "transaction_hash", "transaction_index", "block_hash", "block_number", "address", "data", "topics" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "receipts" { #[fields][srctype] csv { columns => [ "transaction_hash", "transaction_index", "block_hash", "block_number", "cumulative_gas_used", "gas_used", "contract_address", "root", "status" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "token_transfers" { #[fields][srctype] csv { columns => [ "" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "tokens" { #[fields][srctype] csv { columns => [ "" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } }else if [etltype] == "transactions" { #[fields][srctype] csv { columns => [ "hash", "nonce", "block_hash", "block_number", "transaction_index", "from_address", "to_address", "value", "gas", "gas_price", "inputcontext" ] separator => "," remove_field => ["message"] skip_empty_columns => true skip_empty_rows => true } } } output { if [etltype] == "blocks" { elasticsearch { hosts => "xxx.
Use filebeat nginx module send nginx log to logstash or driect to elastick all get error!!
BUT some nginx log record can send success. That success records try to copy to other VM that have filebeat and logstash, try to send again. all get error!!
https://stackoverflow.com/questions/41703689/how-do-i-force-rebuild-logs-data-in-filebeat-5
sudo service filbeat stop mv /var/lib/filebeat/registry /var/lib/filebeat/registry.old sudo service filbeat start
Filebeat + Elasticsearch + Kibana 轻量日志收集与展示系统
https://wzyboy.im/post/1111.html?utm_source=tuicool&utm_medium=referral
提到
beat -> logstash -> elk
可以
beat -> elk ingest plugs ( Elasticsearch Ingest Node )
Elasticsearch Ingest Node 是 Elasticsearch 5.0 起新增的功能。在 Ingest Node 出现之前,人们通常会在 ES 前置一个 Logstash Indexer,用于对数据进行预处理。有了 Ingest Node 之后,Logstash Indexer 的大部分功能就可以被它替代了,grok, geoip 等 Logstash 用户所熟悉的处理器,在 Ingest Node 里也有。对于数据量较小的 ES 用户来说,省掉一台 Logstash 的开销自然是令人开心的,对于数据量较大的 ES 用户来说,Ingest Node 和 Master Node, Data Node 一样也是可以分配独立节点并横向扩展的,也不用担心性能瓶颈。
目前 Ingest Node 已支持数十种处理器,其中的 script 处理器具有最大的灵活性。
与 /_template 类似,Ingest API 位于 /_ingest 下面。用户将 pipeline 定义提交之后,在 Beats 中即可指定某 pipeline 为数据预处理器。
1、filebeat /var/log/secure
2、
filter { grok { #type => "syslog" match => ["message", "%{SYSLOGBASE} Failed password for (invalid user |)%{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"] add_tag => "ssh_brute_force_attack" } grok { #type => "syslog" match => ["message", "%{SYSLOGBASE} Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"] add_tag => "ssh_sucessful_login" } geoip { source => "src_ip" target => "geoip" add_tag => [ "ssh-geoip" ] add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ] add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ] add_field => [ "geoipflag", "true" ] } }