基本概念
- Measurement:度量,相当于“表”
- DataPoint:数据点,相当于“一条数据”
- Time:时间戳,代表数据点产生的时间。
- Field:不带索引的字段
- Tag:带索引的字段。Measurement+Tag 可以用于唯一索引一部分数据
1. 准备连接influxdb
首先查看能否连接上数据库:
from datetime import datetime
from influxdb_client import InfluxDBClient
bucket = "manager_test_bucket"
influxdb_token = "SkeHprHCgmvtX3LXluMUlgyl5nzwM4zdMtsCuT7BQXsaJlhFPMJizKj0nX3ugr9vRfY7Ak4rIhu-wx-aIqNFig=="
influxdb_org = "manager"
client = InfluxDBClient(url="http://localhost:8086", token=influxdb_token, org=influxdb_org)
然后新建一个bucket用于测试:
buckets_api = client.buckets_api()
created_bucket = buckets_api.create_bucket(bucket_name=bucket_name, org=influxdb_org)
2. 新增数据
方法1:使用Point
通常的新增数据的方法如下:
from influxdb_client import Point
from datetime import datetime
from influxdb_client.client.write_api import SYNCHRONOUS
add_data1 = Point("measurement_1").field("open", 1.1).field("close", 1.1).time(datetime(2023, 3, 14, 12, 1, 1))
add_data2 = Point("measurement_1").field("open", 1.2).field("close", 1.2).time(datetime(2023, 3, 13, 13, 2, 1))
add_data3 = Point("measurement_1").field("open", 1.3).field("close", 1.3).time(datetime(2023, 3, 12, 14, 3, 1))
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=bucket_name, record=[add_data1, add_data2, add_data3])
结果如下图所示:
方法2:字典dict方式
write_api = client.write_api(write_options=SYNCHRONOUS)
add_data1_new = {
"measurement": "measurement_2",
"fields": {"open": 1.1, "close": 1.1},
"time": datetime(2023, 3, 15, 12, 1, 1),
}
add_data2_new = {
"measurement": "measurement_2",
"fields": {"open": 1.2, "close": 1.2},
"time": datetime(2023, 3, 14, 12, 1, 1),
}
write_api.write(bucket=bucket_name, org=influxdb_org, record=[add_data1_new, add_data2_new])
方法3:带有Tag索引的数据
add_data1_new = {
"measurement": "measurement_2",
"tags": {"stock": "examp_stock"},
"fields": {"open": 1.1, "close": 1.1},
"time": datetime(2023, 3, 15, 12, 1, 1),
}
add_data2_new = {
"measurement": "measurement_2",
"tags": {"stock": "examp_stock"},
"fields": {"open": 1.2, "close": 1.2},
"time": datetime(2023, 3, 14, 12, 1, 1),
}
add_data3_new = {
"measurement": "measurement_2",
"fields": {"open": 1.3, "close": 1.3},
"time": datetime(2023, 3, 13, 12, 1, 1),
}
write_api.write(bucket=bucket_name, org=influxdb_org, record=[add_data1_new, add_data2_new, add_data3_new])
效果图如下:
可以看到,此时在数据库中,使用measurement + tags
,可以唯一索引一部分数据,而如果没有指定tags
,那么measurement
,就会唯一的索引一部分数据
3. 修改数据
修改数据的程序与新增数据类似,如果对应的_measurement
与field
一致,则值是会覆盖的,如果不一致,则是追加数据
new_data = Point("measurement_1").field("open", 11.1).field("high", 21.1).time(datetime(2023, 3, 14, 12, 2, 3))
write_api.write(bucket=bucket_name, record=[new_data])
4. 查询数据
query_api = client.query_api()
query_tables = query_api.query("""
from(bucket: "manager_test_bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r["_measurement"] == "measurement_1")
""")
for _table in query_tables:
for record in _table.records:
print(record.values)
我们会得到这样的结果:
{'result': '_result', 'table': 0, '_start': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), '_stop': datetime.datetime(2023, 3, 16, 5, 27, 6, 567016, tzinfo=datetime.timezone.utc), '_time': datetime.datetime(2023, 3, 12, 5, 3, 1, tzinfo=datetime.timezone.utc), '_value': 1.3, '_field': 'close', '_measurement': 'measurement_1'}
{'result': '_result', 'table': 0, '_start': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), '_stop': datetime.datetime(2023, 3, 16, 5, 27, 6, 567016, tzinfo=datetime.timezone.utc), '_time': datetime.datetime(2023, 3, 13, 13, 2, 1, tzinfo=datetime.timezone.utc), '_value': 1.2, '_field': 'close', '_measurement': 'measurement_1'}
{'result': '_result', 'table': 0, '_start': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), '_stop': datetime.datetime(2023, 3, 16, 5, 27, 6, 567016, tzinfo=datetime.timezone.utc), '_time': datetime.datetime(2023, 3, 14, 12, 1, 1, tzinfo=datetime.timezone.utc), '_value': 1.1, '_field': 'close', '_measurement': 'measurement_1'}
{'result': '_result', 'table': 1, '_start': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), '_stop': datetime.datetime(2023, 3, 16, 5, 27, 6, 567016, tzinfo=datetime.timezone.utc), '_time': datetime.datetime(2023, 3, 12, 5, 3, 1, tzinfo=datetime.timezone.utc), '_value': 1.3, '_field': 'open', '_measurement': 'measurement_1'}
{'result': '_result', 'table': 1, '_start': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), '_stop': datetime.datetime(2023, 3, 16, 5, 27, 6, 567016, tzinfo=datetime.timezone.utc), '_time': datetime.datetime(2023, 3, 13, 13, 2, 1, tzinfo=datetime.timezone.utc), '_value': 1.2, '_field': 'open', '_measurement': 'measurement_1'}
{'result': '_result', 'table': 1, '_start': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), '_stop': datetime.datetime(2023, 3, 16, 5, 27, 6, 567016, tzinfo=datetime.timezone.utc), '_time': datetime.datetime(2023, 3, 14, 12, 1, 1, tzinfo=datetime.timezone.utc), '_value': 1.1, '_field': 'open', '_measurement': 'measurement_1'}
5. 删除数据
start = "2022-03-13T00:00:00Z"
stop = "2023-05-30T00:00:00Z"
delete_api = client.delete_api()
delete_api.delete(start, stop,
predicate='_field=open', # 删除的规则
bucket=bucket_name, org=influxdb_org)
注意:删除数据不能使用_time
,_field
,_value
,不会报错但会导致删除代码无效
完整示例代码
注意事项
time 相当于表的主键,当一条数据的time和tags完全相同时候,新数据会替换掉旧数据,旧数据则丢失(线上环境尤其要注意)。
fields和tags的字段类型是由存入的第一条记录值决定的,建议只包含浮点型与字符串类型