Python Polling Script using minimalmodbus

This was replaced with a more comprehensive C program. See: Home Solar Project

#!/usr/bin/env python3
#
# powermeter.py
#
# I don't like python very much
#
import minimalmodbus
import serial
import time
from datetime import datetime
import csv
import psycopg2
import sys

#----------------------------------------------------------------------
# ADD_TABLE_ENTRY
#----------------------------------------------------------------------
def add_table_entry(sensor_id,stamp,volts,amps,watts,energy,frequency,power_factor,alarm):

    sql  = "INSERT INTO power_monitor\n"
    sql += "  ( pwm_sensor, pwm_stamp, pwm_volts, pwm_amps, pwm_watts, pwm_energy, pwm_frequency, pwm_power_factor, pwm_alarm\n"
    sql += "  ) VALUES (\n"
    sql += "    {},'{}',{},{},{},{},{},{},{}".format(sensor_id, stamp, volts, amps, watts, energy, frequency, power_factor, alarm)
    sql += "  )"

    if(verbose > 0):
        print("--")
        print(sql)
        print("--")
    cursor.execute(sql)
    connection.commit()


#----------------------------------------------------------------------
# CSV_ADD_ENTRY
#----------------------------------------------------------------------
def csv_add_entry(csv_log,csv_out,sensor_id,stamp,amps,watts,volts,frequency):
    data = [ sensor_id, stamp, amps, watts, volts, frequency ]
    csv_out.writerow(data)
    csv_log.flush()

#----------------------------------------------------------------------
# CSV_OPEN_FILE
#----------------------------------------------------------------------
def csv_open_file(csv_log,now,filename):
    new_filename = '/var/log/USER/powermeter.py/{}{:02d}{:02d}'.format(now.year,now.month,now.day)
    if(new_filename != filename):
        if(filename != ""):
            csv_log.close()
        csv_log = open(new_filename, 'a', encoding='UTF8', newline='')
        csv_out = csv.writer(csv_log)
    return csv_log,csv_out,filename

#----------------------------------------------------------------------
# NOW_TO_STAMP
#----------------------------------------------------------------------
def now_to_stamp(now):
    stamp = '{:04d}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(now.year, now.month, now.day, now.hour, now.minute, now.second)
    return stamp

#----------------------------------------------------------------------
# PSENSOR_READ
# Read all 10 registers, in one pass . . . Parse them out
#----------------------------------------------------------------------
def psensor_read(pwm):
    try:
        reg_ar = pwm.read_registers(0,10,4)
        volts = reg_ar[0] / 10
        amps = ((reg_ar[2] << 16) + reg_ar[1]) / 1000
        watts = ((reg_ar[4] << 16) + reg_ar[3]) / 10
        energy = ((reg_ar[6] << 16) + reg_ar[5])
        frequency = reg_ar[7] / 10
        power_factor = reg_ar[8] / 100
        alarm = reg_ar[9]
        if(alarm > 0):
            alarm = 1
        if(verbose > 0):
            print(reg_ar)

    except IOError:
        volts = 0
        amps = 0
        watts = 0
        energy = 0
        frequency = 0
        power_factor = 0
        alarm = 1


    return volts,amps,watts,energy,frequency,power_factor,alarm

#----------------------------------------------------------------------
# PSENSOR_OPEN
# Parm 4 = Close on each query
#----------------------------------------------------------------------
def psensor_open(psd_id):
    psd = minimalmodbus.Instrument('/dev/ttyUSB0', psd_id, 'rtu', True)
    psd.mode = minimalmodbus.MODE_RTU
    if(psd_id == 11):
        psd.serial.baudrate = 9600
        psd.serial.bytesize = 8
        psd.serial.parity = serial.PARITY_NONE
        psd.serial.stopbits = 1
        psd.serial.timeout = 0.2
#        psd.BYTEORDER_LITTLE = 1
    if(verbose == 1):
        print('Details of the meter are:')
        print(psd)
    return psd

#----------------------------------------------------------------------
# MAIN
#----------------------------------------------------------------------
verbose = 0
argc = len(sys.argv)
# print("argc = {}\n".format(argc))
if(argc > 1):
    if(sys.argv[1] == "-v"):
        verbose = 1

sensor_ids = [ 11, 12 ]

# Global
connection = psycopg2.connect(user="myhouse",
  password="myhouse",
  host="127.0.0.1",
  port="5432",
  database="myhouse")
cursor = connection.cursor()

psd_ar = []
for sensor_id in sensor_ids:
    psd_ar.append(psensor_open(sensor_id))

# csv_log,csv_out,filename = csv_open_file(csv_log,now,"")

now = datetime.now()
stamp = now_to_stamp(now)
cur_stamp = stamp


# Run the function to read the psensor meter.
while True:
    for ix in [0,1]:
        sensor_id = sensor_ids[ix]
        psensor = psd_ar[ix]
        volts,amps,watts,energy,frequency,power_factor,alarm = psensor_read(psensor)
        if(amps != 0):
            add_table_entry(sensor_id,stamp,volts,amps,watts,energy,frequency,power_factor,alarm)
            # csv_add_entry(csv_log,csv_out,ix,stamp,amps,watts,volts,frequency)
        else:
            if (verbose == 1):
                print ("Read Failed\n")
    time.sleep(5)
    now = datetime.now()
    stamp = now_to_stamp(now)
code/old_sensor_poll_python.txt · Last modified: 2022-12-21 19:47 by keith
CC Attribution-Noncommercial-Share Alike 4.0 International
Driven by DokuWiki Recent changes RSS feed Valid CSS Valid XHTML 1.0