前言
InfluxDB的官方文档很齐全,文章的写不好或遗漏的地方还请自行查阅。
安装
docker一键安装简单快捷
docker run -dit --restart always \
--name influxdb \
-p 8086:8086 \
-e TZ=Asia/Shanghai \
-v /www/InfluxDB:/var/lib/influxdb2 \
influxdb:2.7.0
初始化数据库
浏览器访问 http://[ip]:8086 按照提示输入信息,注意记录下生成的token
python3中使用
- 安装
influxdb-client
pip3 install influxdb-client
- 读写数据库
import influxdb_client, os, time from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS token = 'my_token' # token org = 'nyy' # 组织名称 bucket = 'tmip' url = 'http://[ip]:8086' # 替换为你的数据库地址 if __name__ == '__main__': client = influxdb_client.InfluxDBClient(url=url, token=token, org=org) # 写入api write_api = client.write_api(write_options=SYNCHRONOUS) for value in range(5): point = ( Point("measurement1") .tag("tagname1", "tagvalue1") .field("field1", value) ) write_api.write(bucket=bucket, org="nyy", record=point) # 写入数据 time.sleep(1) # separate points by 1 second # 查询api query_api = client.query_api() # 计算平均值 query = f'''from(bucket: "{bucket}") |> range(start: -10m) |> filter(fn: (r) => r._measurement == "measurement1") |> mean()''' tables = query_api.query(query, org=org) for table in tables: for record in table.records: print(record)
对接Modbus
我们借助中间件Telegraf来实现从modbus设备寄存器中读取数据,当然你也可以采用其他方式,比如借助python的pymodbus模块
生成配置文件
- 直接再
sources
中搜索modbus
- 创建一个新的配置
- 填写modbus的信息
- 完成配置, 记录下生成的token
- 最后我们把配置文件下载下来
安装Telegraf
仍然是用Docker安装,简单省事
docker run -dit --restart always \
--name telegraf \
-e INFLUX_TOKEN=my_token \
-e TZ=Asia/Shanghai \
-v /www/Telegraf/test.conf:/etc/telegraf/telegraf.conf \
telegraf:1.26.1-alpine
这边的test.conf
即是上一步最后下载到的配置文件,注意替换token
以下是简单配置
# Configuration for telegraf agent
[agent]
interval = "10s"
round_interval = true
metric_batch_size = 1000
metric_buffer_limit = 10000
collection_jitter = "0s"
flush_interval = "10s"
flush_jitter = "0s"
precision = ""
hostname = ""
omit_hostname = false
[[outputs.influxdb_v2]]
urls = ["http://172.100.1.91:8086"]
token = "$INFLUX_TOKEN"
organization = "nyy"
bucket = "tmip"
# Retrieve data from MODBUS slave devices
[[inputs.modbus]]
name = "烘干机1"
slave_id = 1
timeout = "1s"
controller = "tcp://172.100.1.80:502"
configuration_type = "register"
holding_registers = [
{ name = "tempature", byte_order = "AB", data_type = "UINT16", scale=0.1, address = [5000]},
{ name = "fan", byte_order = "AB", data_type = "UINT16", scale=0.1, address = [5060]}
]
[[inputs.modbus]]
name = "烘干机2"
slave_id = 2
timeout = "1s"
controller = "tcp://172.100.1.83:502"
configuration_type = "register"
holding_registers = [
{ name = "tempature", byte_order = "AB", data_type = "UINT16", scale=0.1, address = [5000]},
{ name = "fanA", byte_order = "AB", data_type = "UINT16", scale=0.1, address = [5060]}
]
InfluxDB2语法
from(bucket: "test")
|> range(start: -30s)
|> filter(fn: (r) => r._measurement == "modbus")
|> filter(fn: (r) => r.name == "烘干机1")
|> last()
- 查最大最小值
from(bucket: "nyy") |> range(start: -48h) |> group(columns: ["_measurement", "_field"]) |> reduce( identity: {max: 0.0, min: 0.0}, fn: (r, accumulator) => ({ max: if r._value > accumulator.max then r._value else accumulator.max, min: if r._value < accumulator.min then r._value else accumulator.min }) ) |> map(fn: (r) => ({ r with diff: r.max - r.min })) |> filter(fn: (r) => r.diff == 0)