import datetime import psycopg2 import json import writeDataToCurrentLibrary def get_ts_time(): current_time = datetime.datetime.now() timestamp = current_time.timestamp() return int(timestamp) def upsert_realtime_data(cursor, data): writeDataToCurrentLibrary.writeDataToCurrentDay_Library(data) # 遍历字典中的每个键值对 for column_name, value in data.items(): # 检查是否存在具有特定 nm 值的行 cursor.execute(""" SELECT 1 FROM realtimedata WHERE nm = %s """, (column_name,)) print('check realtime data') exists = cursor.fetchone() if exists: # 如果存在,更新该行 cursor.execute(""" UPDATE realtimedata SET v= %s,ts =%s WHERE nm = %s """, (value, get_ts_time(),column_name)) print(f'Upsert realtime data,time:{get_ts_time()}') else: # 如果不存在,插入新行 ts_time = get_ts_time() cursor.execute(f""" INSERT INTO realtimedata (nm, v,ts,createtime,factoryname,devicename,type,gatewaycode) VALUES (%s, %s,{ts_time},{ts_time},'lfgawqyq','plc3',4,'SN2_30101_002209_00014') """, (column_name, value)) print('Insert realtime data') # 提交事务 cursor.connection.commit() # 示例使用 data = { "lfgawqyq_plc3_Tag17": 11.111, "Ifgawqyq_plc3_Tag22": 8.2333, "Ifgawqyq_plc3_Tag20": 9.222, "Ifgawqyq_plc10_Tag45": 9.333, "lfgawqyq_plc10_Tag43": 9.444, "lfgawqyq_plc10_Tag50": 9.555, "lfgawqyq_plc10_Tag52": 9.556, "lfgawqyq_plc10_Tag51": 9.777, "lfgawqyq_plc10_Tag53": 9.888 } # # # 调用函数 # upsert_realtime_data(cursor, data) #