OPCUA-Python实例

OPCUA-Python

1
pip install opcua

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from threading import Thread
import copy
import logging
from datetime import datetime
import time
from math import sin
import sys
sys.path.insert(0, "..")
 
try:
    from IPython import embed
except ImportError:
    import code
 
    def embed():
        myvars = globals()
        myvars.update(locals())
        shell = code.InteractiveConsole(myvars)
        shell.interact()
 
 
from opcua import ua, uamethod, Server
 
 
class SubHandler(object):
 
    """
    Subscription Handler. To receive events from server for a subscription
    """
 
    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)
 
    def event_notification(self, event):
        print("Python: New event", event)
 
 
# method to be exposed through server
 
def func(parent, variant):
    ret = False
    if variant.Value % 2 == 0:
        ret = True
    return [ua.Variant(ret, ua.VariantType.Boolean)]
 
 
# method to be exposed through server
# uses a decorator to automatically convert to and from variants
 
@uamethod
def multiply(parent, x, y):
    print("multiply method call with parameters: ", x, y)
    return x * y
 
 
class VarUpdater(Thread):
    def __init__(self, var):
        Thread.__init__(self)
        self._stopev = False
        self.var = var
 
    def stop(self):
        self._stopev = True
 
    def run(self):
        while not self._stopev:
            v = sin(time.time() / 10)
            self.var.set_value(v)
            time.sleep(0.1)
 
 
 
if __name__ == "__main__":
    # optional: setup logging
    logging.basicConfig(level=logging.WARN)
    #logger = logging.getLogger("opcua.address_space")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.internal_server")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.binary_server_asyncio")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.uaprocessor")
    # logger.setLevel(logging.DEBUG)
 
    # now setup our server
    server = Server()
    #server.disable_clock()
    #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
    server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")
    server.set_server_name("FreeOpcUa Example Server")
    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
        ua.SecurityPolicyType.NoSecurity,
        # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
        # ua.SecurityPolicyType.Basic256Sha256_Sign
    ])
 
    # setup our own namespace
    idx = server.register_namespace(uri)
 
    # create a new node type we can instantiate in our address space
    dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
    dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)
    dev.add_property(0, "device_id", "0340").set_modelling_rule(True)
    ctrl = dev.add_object(0, "controller")
    ctrl.set_modelling_rule(True)
    ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)
 
    # populating our address space
 
    # First a folder to organise our nodes
    myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
    # instanciate one instance of our device
    mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
    mydevice_var = mydevice.get_child(["0:controller", "0:state"])  # get proxy to our device state variable
    # create directly some objects and variables
    myobj = server.nodes.objects.add_object(idx, "MyObject")
    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
    myvar.set_writable()    # Set MyVariable to be writable by clients
    mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
    mystringvar.set_writable()    # Set MyVariable to be writable by clients
    mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
    mydtvar.set_writable()    # Set MyVariable to be writable by clients
    myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
    myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
    myprop = myobj.add_property(idx, "myproperty", "I am a property")
    mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
    multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])
 
    # import some nodes from xml
    # server.import_xml("custom_nodes.xml")
 
    # creating a default event object
    # The event object automatically will have members for all events properties
    # you probably want to create a custom event type, see other examples
    myevgen = server.get_event_generator()
    myevgen.event.Severity = 300
 
    # starting!
    server.start()
    print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
    vup = VarUpdater(mysin)  # just  a stupide class update a variable
    vup.start()
    try:
        # enable following if you want to subscribe to nodes on server side
        #handler = SubHandler()
        #sub = server.create_subscription(500, handler)
        #handle = sub.subscribe_data_change(myvar)
        # trigger event, all subscribed clients wil receive it
        var = myarrayvar.get_value()  # return a ref to value in db server side! not a copy!
        var = copy.copy(var)  # WARNING: we need to copy before writting again otherwise no data change event will be generated
        var.append(9.3)
        myarrayvar.set_value(var)
        mydevice_var.set_value("Running")
        myevgen.trigger(message="This is BaseEvent")
        server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9))  # Server side write method which is a but faster than using set_value
 
        embed()
    finally:
        vup.stop()
        server.stop()

这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。

下图展示了server端的对象结构

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from IPython import embed
from opcua import Client
 
class SubHandler(object):
    def event_notification(self, event):
        print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)
 
def main_c():
    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
    c = Client(url)
    try:
        c.connect()
        root = c.get_root_node()
        embed()
    except Exception as e:
        print("Client Exception:", e)
    finally:
        c.disconnect()
         
if __name__ == "__main__":
    main_c()

客户端遍历流程

  • 1.获取root下的Objects节点的所有子节点,一般类型都为Object,取一代表为A
  • 2.获取A的NodeClass(一般为Object)和TypeDefinition,其中NodeId需要与路径["0:Types","0:ObjectTypes","0:BaseObjectType"]下的类型对照(自定义类型也在内)。
  • 3.继续以A为根,遍历子节点,如果时Object继续2,如果是Variable,NodeId对照["0:Types","0:VariableTypes","0:BaseVariableType"],可以判断是变量(variable,63)还是属性(property,65)。
  • 4.对于Method使用a_root.call_method(a, arg1)调用。对于Variable使用get_value/get_data_value()获取存储的值。如果是Object继续2.
  • 5.对于Variable使用access_level获取是否有写权限,如果有set_value可以设置值。

一个比较完美的遍历客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from opcua import Client, ua
 
 
def brower_child(root):
    """
    递归调用遍历,格式化不好做,有深度问题
    """
    name = root.get_node_class().name
    # print(name)
    if name == "Object":
        brower_obj(root)
        for c in root.get_children():
            print("  ", end='')
            brower_child(c)
    elif name == 'Variable':
        brower_var(root)
    else:
        brower_method(root)
 
 
class CurState():
    def __init__(self, parent=None, p=None, d=0):
        self.parent = parent  # unused
        self.p = p
        self.d = d
 
 
def brower_child2(root, max_d=-1, ignore=[]):
    """
    栈+循环遍历,非常好用
    """
    stack = [CurState(None, root, 0)]
    while len(stack):
        cur = stack.pop()
        name = cur.p.get_node_class().name
 
        print(''.join(['  ' for i in range(cur.d)]), end="")
 
        if cur.p.get_browse_name().Name in ignore:
            continue
 
        if name == "Object":
            brower_obj(cur.p)
            if max_d > 0 and cur.d >= max_d:
                continue
            for c in cur.p.get_children():
                stack.append(CurState(cur.p, c, cur.d+1))
 
        elif name == 'Variable':
            brower_var(cur.p)
        else:
            brower_method(cur.p)
 
 
def brower_obj(v):
    # print(v.get_browse_name())
    rw = 'R '
    bname = v.get_browse_name()
    print("*%2d:%-30s (%-2s, %-23s)" %
          (bname.NamespaceIndex, bname.Name, rw, "Object"))
 
 
def brower_var(v):
    # print(v.get_browse_name())
    rw = 'R '
    if ua.AccessLevel.CurrentWrite in v.get_access_level():
        rw = "RW"
    bname = v.get_browse_name()
    tv = v.get_data_value().Value
    v_show = tv.Value
    if len(str(v_show)) > 1024:
        v_show = str(v_show[:56]) + "..."
    print("-%2d:%-30s (%-2s, %-23s) =>" %
          (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)
 
 
def brower_method(v):
    # print(v.get_description())
    rw = 'C '
    bname = v.get_browse_name()
    # args = []
    # for a in v.get_properties():
    #     dt = a.get_data_type().NodeIdType.name
    #     args.append(dt)
    print("@%2d:%-30s (%-2s, %-23s)" %
          (bname.NamespaceIndex, bname.Name, rw, "Method"))
 
 
def main_c():
    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
    c = Client(url)
    try:
        c.connect()
        root = c.get_root_node()
        print("rnBrower:")
        brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])
    except Exception as e:
        print("Client Exception:", e)
    finally:
        c.disconnect()
 
 
if __name__ == "__main__":
    main_c()

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持IT俱乐部。

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/code/python/9721.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部