博客
关于我
openresty lua集成kafka
阅读量:664 次
发布时间:2019-03-15

本文共 4113 字,大约阅读时间需要 13 分钟。

前提

1、 安装openresty,记得安装nginx的监控模块

2、 安装kafka
3、 下载lua+kafka插件:
4、 解压插件,将lua-resty-kafka-master\lib\resty\kafka文件夹放到openresty/lualib/resty下

首先修改openresty的配置文件中localtion位置,引入外部lua文件,这样修改lua文件会比较方便

location / {           default_type text/html;        content_by_lua_file /usr/local/openresty/tmp.lua;    }

案例

1 先获取kafka的实例

2 通过实例获取连接
3设置分区发送策略
4调用send方法发送数据
5启动一个kafka消费测试,验证是否发送成功

lua代码

------ Generated by EmmyLua(https://github.com/EmmyLua)--- Created by NH55.--- DateTime: 2020/12/11 11:48------ 数据采集运行线程阈值监控,如果超过了我们设置的最大阈值,那么就等待不send数据,下个批次再次执行local DEFAULT_THRESH = 100-- 编写kafka相关配置-- 配置broker地址local BROKER_LIST = {       {    host = "192.168.xx.101", port = 9092 },    {    host = "192.168.xx.102", port = 9092 },    {    host = "192.168.xx.103", port = 9092 }}-- kafka分区数local PARTITION_NUM = 3-- kafka的topiclocal TOPIC = "csdn"-- producerConfiglocal CONNECT_PARAMS = {       producer_type = "async", socket_timeout = 30000,    flush_time = 10000, request_timeout = 20000}-- 默认分区local function default_partitioner(key, num, correlation_id)    local id = key and crc32(key) or correlation_id    -- partition_id is continuous and start from 0    return id % numend--- 我们为了让数据均匀分布到每一个分区内,这里我们使用轮询方式发送消息至Kafka分区中--- 相当于自定义分区的模式,当然你也可以不用这种方式,使用默认的分区也行-- 获取共享内存数据local shared_data = ngx.shared.shared_data-- 设置共享内存的变量(Key)local sharedKey = "shared_Key"local key_val = shared_data:get(sharedKey)if not key_val then    key_val = 1    shared_data:set(sharedKey,key_val)end-- 计算消息发送分区local partition_id = ""..tonumber(key_val%PARTITION_NUM)--每个key的value要自增shared_data:incr(sharedKey,1)-- 变量监控local isGone = true-- 进行阈值判断if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESH) then    isGone =falseend-- 满足条件true执行,反之不执行if isGone then    -- 获取页面数据信息    local time_local = ngx.var.time_local    if time_local == nil then        time_local = ""    end    local request = ngx.var.request    if request == nil then        request = ""    end    local request_method = ngx.var.request_method    if request_method == nil then        request_method = ""    end    local content_type = ngx.var.content_type    if content_type == nil then        content_type = ""    end    ngx.req.read_body()    local request_body = ngx.var.request_body    if request_body == nil then        request_body = ""    end    local http_referer = ngx.var.http_referer    if http_referer == nil then        http_referer = ""    end    local remote_addr = ngx.var.remote_addr    if remote_addr == nil then        remote_addr = ""    end    local http_user_agent = ngx.var.http_user_agent    if http_user_agent == nil then        http_user_agent = ""    end    local time_iso8601 = ngx.var.time_iso8601    if time_iso8601 == nil then        time_iso8601 = ""    end    local server_addr = ngx.var.server_addr    if server_addr == nil then        server_addr = ""    end    local http_cookie = ngx.var.http_cookie    if http_cookie == nil then        http_cookie = ""    end    --封装数据    local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#"..            content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#"..            remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#"..            server_addr .."#CS#".. http_cookie;    -- 引入生产者模块创建实例    local producerDic = require "resty.kafka.producer"    -- 创建实例    local producer = producerDic:new(BROKER_LIST,CONNECT_PARAMS)    -- 调用发送方法send    local ok,err = producer:send(TOPIC,partition_id,message)    -- 判断发送消息是否成功打印日志    if not ok then        ngx.log("kafka send message err:",err)    endend

之后打开消费者

kafka-console-consumer.sh \--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \--topic csdn

刷新nginx监听的网页,在消费者端就可以收到内容

tt

11/Dec/2020:19:53:58 +0800#CS#GET / HTTP/1.1#CS#GET#CS##CS##CS##CS#192.168.xx.1#CS#Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Edg/87.0.664.57#CS#2020-12-11T19:53:58+08:00#CS#192.168.xx.101#CS#11/Dec/2020:19:54:01 +0800#CS#GET /

注意事项

kafka server.properties 需开启如下选项

集群的每台机器都需要打开

advertised.listeners=PLAINTEXT://192.168.xx.103:9092

转载地址:http://kasmz.baihongyu.com/

你可能感兴趣的文章
mysql 多字段删除重复数据,保留最小id数据
查看>>
MySQL 多表联合查询:UNION 和 JOIN 分析
查看>>
MySQL 大数据量快速插入方法和语句优化
查看>>
mysql 如何给SQL添加索引
查看>>
mysql 字段区分大小写
查看>>
mysql 字段合并问题(group_concat)
查看>>
mysql 字段类型类型
查看>>
MySQL 字符串截取函数,字段截取,字符串截取
查看>>
MySQL 存储引擎
查看>>
mysql 存储过程 注入_mysql 视图 事务 存储过程 SQL注入
查看>>
MySQL 存储过程参数:in、out、inout
查看>>
mysql 存储过程每隔一段时间执行一次
查看>>
mysql 存在update不存在insert
查看>>
Mysql 学习总结(86)—— Mysql 的 JSON 数据类型正确使用姿势
查看>>
Mysql 学习总结(87)—— Mysql 执行计划(Explain)再总结
查看>>
Mysql 学习总结(88)—— Mysql 官方为什么不推荐用雪花 id 和 uuid 做 MySQL 主键
查看>>
Mysql 学习总结(89)—— Mysql 库表容量统计
查看>>
mysql 实现主从复制/主从同步
查看>>
mysql 审核_审核MySQL数据库上的登录
查看>>
mysql 导入 sql 文件时 ERROR 1046 (3D000) no database selected 错误的解决
查看>>