Multiprocessing/Multithreading

Using clients with multiprocessing

Since version 9.3.0 PyTango provides cleanup() which resets CORBA connection. This static function is needed when you want to use tango with multiprocessing in your client code.

In the case when both your parent process and your child process create DeviceProxy, Database or/and AttributeProxy your child process inherits the context from your parent process, i.e. open file descriptors, the TANGO and the CORBA state. Sharing the above objects between the processes may cause unpredictable errors, e.g., TRANSIENT_CallTimedout, unidentifiable C++ exception. Therefore, when you start a new process you must reset CORBA connection:

import time
import tango

from multiprocessing import Process


class Worker(Process):

def __init__(self):
    Process.__init__(self)

def run(self):
        # reset CORBA connection
        tango.ApiUtil.cleanup()

    proxy = tango.DeviceProxy('test/tserver/1')

    stime = time.time()
    etime = stime
    while etime - stime < 1.:
    try:
        proxy.read_attribute("Value")
    except Exception as e:
        print(str(e))
    etime = time.time()


def runworkers():
workers = [Worker() for _ in range(6)]
for wk in workers:
    wk.start()
for wk in workers:
    wk.join()


db = tango.Database()
dp = tango.DeviceProxy('test/tserver/1')

for i in range(4):
runworkers()

After cleanup() all references to DeviceProxy, AttributeProxy or Database objects in the current process become invalid and these objects need to be reconstructed.

Multithreading - clients and servers

When performing Tango I/O from user-created threads, there can be problems. This is often more noticeable with event subscription/unsubscription, and when pushing events, but it could affect any Tango I/O.

A client subscribing and unsubscribing to events via a user thread may see a crash, a deadlock, or Event channel is not responding anymore errors.

A device server pushing events from a user-created thread (including asyncio callbacks) might see Not able to acquire serialization (dev, class or process) monitor errors, if it is using the default green mode tango.GreenMode.Synchronous.

If the device server is using an asynchronous green mode, i.e., tango.GreenMode.Gevent or tango.GreenMode.Asyncio, then Tango’s device server serialisation is disabled - see the green mode warning. This means you are likely to see a crash when pushing events from a user thread, especially if an attribute is read around the same time. The method described below WILL NOT help for this. There is no solution (at least with cppTango 9.5.0 and PyTango 9.5.0, and earlier).

As PyTango wraps the cppTango library, we need to consider how cppTango’s threads work. cppTango was originally developed at a time where C++ didn’t have standard threads. All the threads currently created in cppTango are “omni threads”, since this is what the omniORB library is using to create threads and since this implementation is available for free with omniORB.

In C++, users used to create omni threads in the past so there was no issue. Since C++11, C++ comes with an implementation of standard threads. cppTango is currently (version 9.4.1) not directly thread safe when a user is using C++11 standard threads or threads different than omni threads. This lack of thread safety includes threads created from Python’s threading module.

In an ideal future cppTango should protect itself, regardless of what type of threads are used. In the meantime, we need a work-around.

The work-around when using threads which are not omni threads is to create an object of the C++ class omni_thread::ensure_self in the user thread, just after the thread creation, and to delete this object only when the thread has finished its job. This omni_thread::ensure_self object provides a dummy omniORB ID for the thread. This ID is used when accessing thread locks within cppTango, so the ID must remain the same for the lifetime of the thread. Also note that this object MUST be released before the thread has exited, otherwise omniORB will throw an exception.

A Pythonic way to implement this work-around for multithreaded applications is available via the EnsureOmniThread class. It was added in PyTango version 9.3.2. This class is best used as a context handler to wrap the target method of the user thread. An example is shown below:

import tango
from threading import Thread
from time import sleep


def thread_task():
    with tango.EnsureOmniThread():
        eid = dp.subscribe_event(
            "double_scalar", tango.EventType.PERIODIC_EVENT, cb)
        while running:
            print(f"num events stored {len(cb.get_events())}")
            sleep(1)
        dp.unsubscribe_event(eid)


cb = tango.utils.EventCallback()  # print events to stdout
dp = tango.DeviceProxy("sys/tg_test/1")
dp.poll_attribute("double_scalar", 1000)
thread = Thread(target=thread_task)
running = True
thread.start()
sleep(5)
running = False
thread.join()

Another way to create threads in Python is the concurrent.futures.ThreadPoolExecutor. The problem with this is that the API does not provide an easy way for the context handler to cover the lifetime of the threads, which are created as daemons. One option is to at least use the context handler for the functions that are submitted to the executor. I.e., executor.submit(thread_task). This is not guaranteed to work. A second option to investigate (if using at least Python 3.7) is the initializer argument which could be used to ensure a call to the __enter__() method for a thread-specific instance of EnsureOmniThread. However, calling the __exit__() method on the corresponding object at shutdown is a problem. Maybe it could be submitted as work.