1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859 |
- import datetime
- import psycopg2
- import json
- import writeDataToCurrentLibrary
- def get_ts_time():
- current_time = datetime.datetime.now()
- timestamp = current_time.timestamp()
- return int(timestamp)
- def upsert_realtime_data(cursor, data):
- writeDataToCurrentLibrary.writeDataToCurrentDay_Library(data)
- # 遍历字典中的每个键值对
- for column_name, value in data.items():
- # 检查是否存在具有特定 nm 值的行
- cursor.execute("""
- SELECT 1 FROM realtimedata WHERE nm = %s
- """, (column_name,))
- print('check realtime data')
- exists = cursor.fetchone()
- if exists:
- # 如果存在,更新该行
- cursor.execute("""
- UPDATE realtimedata
- SET v= %s,ts =%s
- WHERE nm = %s
- """, (value, get_ts_time(),column_name))
- print(f'Upsert realtime data,time:{get_ts_time()}')
- else:
- # 如果不存在,插入新行
- ts_time = get_ts_time()
- cursor.execute(f"""
- INSERT INTO realtimedata (nm, v,ts,createtime,factoryname,devicename,type,gatewaycode)
- VALUES (%s, %s,{ts_time},{ts_time},'lfgawqyq','plc3',4,'SN2_30101_002209_00014')
- """, (column_name, value))
- print('Insert realtime data')
- # 提交事务
- cursor.connection.commit()
- # 示例使用
- data = {
- "lfgawqyq_plc3_Tag17": 11.111,
- "Ifgawqyq_plc3_Tag22": 8.2333,
- "Ifgawqyq_plc3_Tag20": 9.222,
- "Ifgawqyq_plc10_Tag45": 9.333,
- "lfgawqyq_plc10_Tag43": 9.444,
- "lfgawqyq_plc10_Tag50": 9.555,
- "lfgawqyq_plc10_Tag52": 9.556,
- "lfgawqyq_plc10_Tag51": 9.777,
- "lfgawqyq_plc10_Tag53": 9.888
- }
- #
- # # 调用函数
- # upsert_realtime_data(cursor, data)
- #
|