OPCUA-Python
1 | pip install opcua |
Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | from threading import Thread import copy import logging from datetime import datetime import time from math import sin import sys sys.path.insert( 0 , ".." ) try : from IPython import embed except ImportError: import code def embed(): myvars = globals () myvars.update( locals ()) shell = code.InteractiveConsole(myvars) shell.interact() from opcua import ua, uamethod, Server class SubHandler( object ): """ Subscription Handler. To receive events from server for a subscription """ def datachange_notification( self , node, val, data): print ( "Python: New data change event" , node, val) def event_notification( self , event): print ( "Python: New event" , event) # method to be exposed through server def func(parent, variant): ret = False if variant.Value % 2 = = 0 : ret = True return [ua.Variant(ret, ua.VariantType.Boolean)] # method to be exposed through server # uses a decorator to automatically convert to and from variants @uamethod def multiply(parent, x, y): print ( "multiply method call with parameters: " , x, y) return x * y class VarUpdater(Thread): def __init__( self , var): Thread.__init__( self ) self ._stopev = False self .var = var def stop( self ): self ._stopev = True def run( self ): while not self ._stopev: v = sin(time.time() / 10 ) self .var.set_value(v) time.sleep( 0.1 ) if __name__ = = "__main__" : # optional: setup logging logging.basicConfig(level = logging.WARN) #logger = logging.getLogger("opcua.address_space") # logger.setLevel(logging.DEBUG) #logger = logging.getLogger("opcua.internal_server") # logger.setLevel(logging.DEBUG) #logger = logging.getLogger("opcua.binary_server_asyncio") # logger.setLevel(logging.DEBUG) #logger = logging.getLogger("opcua.uaprocessor") # logger.setLevel(logging.DEBUG) # now setup our server server = Server() #server.disable_clock() #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/") server.set_endpoint( "opc.tcp://0.0.0.0:4841/freeopcua/server/" ) server.set_server_name( "FreeOpcUa Example Server" ) # set all possible endpoint policies for clients to connect through server.set_security_policy([ ua.SecurityPolicyType.NoSecurity, # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt, # ua.SecurityPolicyType.Basic256Sha256_Sign ]) # setup our own namespace idx = server.register_namespace(uri) # create a new node type we can instantiate in our address space dev = server.nodes.base_object_type.add_object_type( 0 , "MyDevice" ) dev.add_variable( 0 , "sensor1" , 1.0 ).set_modelling_rule( True ) dev.add_property( 0 , "device_id" , "0340" ).set_modelling_rule( True ) ctrl = dev.add_object( 0 , "controller" ) ctrl.set_modelling_rule( True ) ctrl.add_property( 0 , "state" , "Idle" ).set_modelling_rule( True ) # populating our address space # First a folder to organise our nodes myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder" ) # instanciate one instance of our device mydevice = server.nodes.objects.add_object(idx, "Device0001" , dev) mydevice_var = mydevice.get_child([ "0:controller" , "0:state" ]) # get proxy to our device state variable # create directly some objects and variables myobj = server.nodes.objects.add_object(idx, "MyObject" ) myvar = myobj.add_variable(idx, "MyVariable" , 6.7 ) mysin = myobj.add_variable(idx, "MySin" , 0 , ua.VariantType. Float ) myvar.set_writable() # Set MyVariable to be writable by clients mystringvar = myobj.add_variable(idx, "MyStringVariable" , "Really nice string" ) mystringvar.set_writable() # Set MyVariable to be writable by clients mydtvar = myobj.add_variable(idx, "MyDateTimeVar" , datetime.utcnow()) mydtvar.set_writable() # Set MyVariable to be writable by clients myarrayvar = myobj.add_variable(idx, "myarrayvar" , [ 6.7 , 7.9 ]) myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable" , ua.Variant([], ua.VariantType.UInt32)) myprop = myobj.add_property(idx, "myproperty" , "I am a property" ) mymethod = myobj.add_method(idx, "mymethod" , func, [ua.VariantType.Int64], [ua.VariantType.Boolean]) multiply_node = myobj.add_method(idx, "multiply" , multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64]) # import some nodes from xml # server.import_xml("custom_nodes.xml") # creating a default event object # The event object automatically will have members for all events properties # you probably want to create a custom event type, see other examples myevgen = server.get_event_generator() myevgen.event.Severity = 300 # starting! server.start() print ( "Available loggers are: " , logging.Logger.manager.loggerDict.keys()) vup = VarUpdater(mysin) # just a stupide class update a variable vup.start() try : # enable following if you want to subscribe to nodes on server side #handler = SubHandler() #sub = server.create_subscription(500, handler) #handle = sub.subscribe_data_change(myvar) # trigger event, all subscribed clients wil receive it var = myarrayvar.get_value() # return a ref to value in db server side! not a copy! var = copy.copy(var) # WARNING: we need to copy before writting again otherwise no data change event will be generated var.append( 9.3 ) myarrayvar.set_value(var) mydevice_var.set_value( "Running" ) myevgen.trigger(message = "This is BaseEvent" ) server.set_attribute_value(myvar.nodeid, ua.DataValue( 9.9 )) # Server side write method which is a but faster than using set_value embed() finally : vup.stop() server.stop() |
这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。
下图展示了server端的对象结构
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | from IPython import embed from opcua import Client class SubHandler( object ): def event_notification( self , event): print ( "Event:" , event.EventId, event.Time, event.proper_random, event.Message.Text) def main_c(): url = "opc.tcp://127.0.0.1:4841/freeopcua/server/" c = Client(url) try : c.connect() root = c.get_root_node() embed() except Exception as e: print ( "Client Exception:" , e) finally : c.disconnect() if __name__ = = "__main__" : main_c() |
客户端遍历流程
- 1.获取
root
下的Objects
节点的所有子节点,一般类型都为Object
,取一代表为A - 2.获取A的
NodeClass
(一般为Object)和TypeDefinition
,其中NodeId需要与路径["0:Types","0:ObjectTypes","0:BaseObjectType"]
下的类型对照(自定义类型也在内)。 - 3.继续以A为根,遍历子节点,如果时Object继续2,如果是
Variable
,NodeId对照["0:Types","0:VariableTypes","0:BaseVariableType"]
,可以判断是变量(variable,63)还是属性(property,65)。 - 4.对于
Method
使用a_root.call_method(a, arg1)
调用。对于Variable
使用get_value/get_data_value()
获取存储的值。如果是Object
继续2. - 5.对于
Variable
使用access_level
获取是否有写权限,如果有set_value
可以设置值。
一个比较完美的遍历客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | from opcua import Client, ua def brower_child(root): """ 递归调用遍历,格式化不好做,有深度问题 """ name = root.get_node_class().name # print(name) if name = = "Object" : brower_obj(root) for c in root.get_children(): print ( " " , end = '') brower_child(c) elif name = = 'Variable' : brower_var(root) else : brower_method(root) class CurState(): def __init__( self , parent = None , p = None , d = 0 ): self .parent = parent # unused self .p = p self .d = d def brower_child2(root, max_d = - 1 , ignore = []): """ 栈+循环遍历,非常好用 """ stack = [CurState( None , root, 0 )] while len (stack): cur = stack.pop() name = cur.p.get_node_class().name print (' '.join([' ' for i in range (cur.d)]), end = "") if cur.p.get_browse_name().Name in ignore: continue if name = = "Object" : brower_obj(cur.p) if max_d > 0 and cur.d > = max_d: continue for c in cur.p.get_children(): stack.append(CurState(cur.p, c, cur.d + 1 )) elif name = = 'Variable' : brower_var(cur.p) else : brower_method(cur.p) def brower_obj(v): # print(v.get_browse_name()) rw = 'R ' bname = v.get_browse_name() print ( "*%2d:%-30s (%-2s, %-23s)" % (bname.NamespaceIndex, bname.Name, rw, "Object" )) def brower_var(v): # print(v.get_browse_name()) rw = 'R ' if ua.AccessLevel.CurrentWrite in v.get_access_level(): rw = "RW" bname = v.get_browse_name() tv = v.get_data_value().Value v_show = tv.Value if len ( str (v_show)) > 1024 : v_show = str (v_show[: 56 ]) + "..." print ( "-%2d:%-30s (%-2s, %-23s) =>" % (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show) def brower_method(v): # print(v.get_description()) rw = 'C ' bname = v.get_browse_name() # args = [] # for a in v.get_properties(): # dt = a.get_data_type().NodeIdType.name # args.append(dt) print ( "@%2d:%-30s (%-2s, %-23s)" % (bname.NamespaceIndex, bname.Name, rw, "Method" )) def main_c(): url = "opc.tcp://127.0.0.1:4841/freeopcua/server/" c = Client(url) try : c.connect() root = c.get_root_node() print ( "rnBrower:" ) brower_child2(root.get_child([ "0:Objects" ]), - 1 , [ "Server" ]) except Exception as e: print ( "Client Exception:" , e) finally : c.disconnect() if __name__ = = "__main__" : main_c() |
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持IT俱乐部。