데이터/Modbus

pymodbus

쿠와와 2021. 3. 18. 13:47

python에서 thread를 사용해서 모드버스 구동하여 LS 산전(modbus)모드를 구축해보았다.

 

0. 초기 설정

이미 설정되어 있는 부분은 LS 산전 PLC에서 아이피, modbus 버전, 회로이다. 간단한 회로를 구성했으며 아래에 이미지를 추가하겠다.

아래는 PLC 셋팅을 위한 XG5000의 설정부분이다.

modbus server로 셋팅했고 bit단위는 PLC 메모리의 M부분에 word단위는 D부분을 통해 통신을 하겠다는 소리이다.(이말은 그 외의 메모리는 PC에서 접근할 수 없다는 소리이다.)

뭐 회로는 간단하에 온도 모듈인 XGF-RD4A가 준비가 되었다면 온도 측정을 하여서 D메모리에 10초 단위로 저장을 해주는 회로로 만들었다.  -> 나중에는 회로도 바꿀 예정이지만 지금은 간단하게 구축했다. 

 

1. Import

import sys
import logging
from pymodbus.client.sync import ModbusTcpClient as ModbusClient
import pymongo
from time import sleep
import socket
from scipy.spatial import distance
import datetime
from _thread import *
import threading

내가 사용한 임폴드부분이다. 

 

밑의 코드는 pymodbus 공식 예제에 있었는데 plc와 통신할때 계속 log를 띄워주는 것이다. 귀잖아서 수정해주지 않았다

# modbus 공식 예제에 있엇음
FORMAT = ('%(asctime)-15s %(threadName)-15s '
          '%(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.DEBUG)

 

나는 plc에서 데이터를 DB에 저장할려고 DB를 설정해주었다. 

# DB 설정
myclient = pymongo.MongoClient("mongodb://localhost:27017/")  # 이곳은 보통 똑같음 IP add
mydb = myclient["plc_data"]
mycol = mydb["temp"]

 

메인부분 - 쓰레드를 이용해서 통신을 하기위해 간단하게 만들어봤다. args에 들어가있는 HOST1은 PLC IP이다.

# 메인
if __name__ == "__main__":
    print('Thread start')
    # 클라이언트가 접속하면 accept 함수에서 새로운 소켓을 리턴.

    # Thread 1 설정
    modbus_th1 = threading.Thread(target=Modbus_LS_PLC, args=(HOST1, ))

    # 새로운 쓰레드에서 해당 소켓을 사용하여 통신.
    modbus_th1.start()  # 쓰레드 실행
    modbus_th1.join()   # 쓰레드 끝날 때까지 대기

 

그리고 쓰레드에 들어갈 function 코드이다.

# 모드 버스 모드로 실행된 ls plc
def Modbus_LS_PLC(ip, port=502):
    cgac = ModbusCGAC(ip, port)
    # cgac._get_device_info()
    # cgac._getversions()
    print('*' * 30)

    # 60초마다 활성화
    while True:
        # cgac.get_word_data()
        # cgac.set_bit_data(0)
        # cgac.set_word_data(63, start=3)
        cgac.set_bits_data([1, 1, 0, 0, 1, 1])
        sleep(60)

여러가지 테스트를 위해 이것저것 실행해보고 있다ㅣ ModbusCGAC은 내가 만든 클래스이다. 클래스 안에서 pymodbus를 이용하여 데이터 송수신이 이루어진다. 

 

class ModbusCGAC:
    # bit = M 메모리로 설정
    # word = D 메모리로 설정
    def __init__(self, ip_address, port):
        # modbus를 이용해서 연결
        self.ip_address = ip_address
        self.port = port
        self.word_data = []
        try:
            self._cgac = ModbusClient(ip_address, port)
            log.debug('plc connect - ip is {}'.format(self.ip_address))
        except Exception as e:
            print(e)

    # connect 된 device 버전
    def _getversions(self):
        data = self._cgac.read_input_registers(17500, 6)
        self.os_version = '.'.join(map(str, data.registers[0:3]))
        self.boot_version = '.'.join(map(str, data.registers[3:6]))
        print(self.os_version)
        print(self.boot_version)

    # connect 된 device 정보
    def _get_device_info(self):
        data = self._cgac.read_input_registers(17510, 19)
        # data = self._cgac.read_device_information(unit=1)
        self.device_version = data.registers[0]
        self.family = data.registers[1]
        print(data.registers)

    # 0x01 read_coils (data_table, start_address, data_length)
    def get_bit_data(self, my_db_table=None, start=0, end=16):...

    # 0x02 read_discrete_inputs (data_table, start_address, data_length)
    def get_discrete_input_bit_data(self, my_db_table=None, start=0, end=16):...

    # 0x03 read_holding_registers (data_table, start_address, data_length)
    def get_word_data(self, my_db_table=None, start=0, end=16):...
    
    # 0x04 read_input_registers (data_table, start_address, data_length)
    def get_input_word_data(self, my_db_table=None, start=0, end=16):...

    # 0x05 write_single_coil (input_value, data_table, start_address)
    # flag 느낌이라 어떤 수를 집어 넣어도 0, 1로 바뀜
    def set_bit_data(self, input_value, start=0):...

    # 0x06 write_single_Register (input_value, data_table, start_address)
    # 0~63까지 바뀜 더 높은 수를 집어 넣어도 63
    def set_word_data(self, input_value, start=0):...

    # 0x0F write_coils
    def set_bits_data(self, input_values, start=0):
        data = self._cgac.write_coil(start, input_values)
        end = len(input_values)
        log.debug('PLC bit date M{} to {} is changed : {}'.format(start, end, data))

    def memory_bits(self):
        # self._cgac.
        pass

    # 연결 종료
    def disconnect(self):
        self._cgac.close()

 

더 많은 기능들이 지금 추가되고 있지만 간단하게 pymodbus를 어떻게 쓰는지 볼 수 있는 코드이니 참고했으면 좋겠다. 

-> write할 때 시작 주소와 덮어 씌워지는 버퍼들을 약간 손봐야한다. 

 

나중에 추가적으로 지멘스, 미쓰비시 통신을 PC에 구축할 것이다. 

'데이터 > Modbus' 카테고리의 다른 글

modbus 자주쓰는 function code 정리  (0) 2021.03.08
Modbus Protocol  (0) 2021.03.05
산전, 미쓰비시 plc IP,Port 설정 후 python 연결 (pymodbus)  (1) 2021.02.28
Modbus란  (0) 2021.02.02