writeDataToRealtimeData.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. import datetime
  2. import psycopg2
  3. import json
  4. import writeDataToCurrentLibrary
  5. def get_ts_time():
  6. current_time = datetime.datetime.now()
  7. timestamp = current_time.timestamp()
  8. return int(timestamp)
  9. def upsert_realtime_data(cursor, data):
  10. writeDataToCurrentLibrary.writeDataToCurrentDay_Library(data)
  11. # 遍历字典中的每个键值对
  12. for column_name, value in data.items():
  13. # 检查是否存在具有特定 nm 值的行
  14. cursor.execute("""
  15. SELECT 1 FROM realtimedata WHERE nm = %s
  16. """, (column_name,))
  17. print('check realtime data')
  18. exists = cursor.fetchone()
  19. if exists:
  20. # 如果存在,更新该行
  21. cursor.execute("""
  22. UPDATE realtimedata
  23. SET v= %s,ts =%s
  24. WHERE nm = %s
  25. """, (value, get_ts_time(),column_name))
  26. print(f'Upsert realtime data,time:{get_ts_time()}')
  27. else:
  28. # 如果不存在,插入新行
  29. ts_time = get_ts_time()
  30. cursor.execute(f"""
  31. INSERT INTO realtimedata (nm, v,ts,createtime,factoryname,devicename,type,gatewaycode)
  32. VALUES (%s, %s,{ts_time},{ts_time},'lfgawqyq','plc3',4,'SN2_30101_002209_00014')
  33. """, (column_name, value))
  34. print('Insert realtime data')
  35. # 提交事务
  36. cursor.connection.commit()
  37. # 示例使用
  38. data = {
  39. "lfgawqyq_plc3_Tag17": 11.111,
  40. "Ifgawqyq_plc3_Tag22": 8.2333,
  41. "Ifgawqyq_plc3_Tag20": 9.222,
  42. "Ifgawqyq_plc10_Tag45": 9.333,
  43. "lfgawqyq_plc10_Tag43": 9.444,
  44. "lfgawqyq_plc10_Tag50": 9.555,
  45. "lfgawqyq_plc10_Tag52": 9.556,
  46. "lfgawqyq_plc10_Tag51": 9.777,
  47. "lfgawqyq_plc10_Tag53": 9.888
  48. }
  49. #
  50. # # 调用函数
  51. # upsert_realtime_data(cursor, data)
  52. #