Multithreading - targeting a specific thread of a thread group

I want to make a group of threads, each of which gets a wave passed as a parameter, so its content can be mofified outside the thread. I want thread 0 to control waveA_0, waveB_0, WaveC_0, thread 1 to control waveA_1, waveB_1, WaveC_1, etc. So when, e.g., WaveA_0, WaveB_0, and WaveC_0 are modified, I can signal thread 0 to do its thing.

But I can't seem to signal a single thread among the many, I can only send a datafolder to the whole pool of threads and any available thread can grab that datafolder from the thread-queue.

My ugly solution was to tell which thread I wanted, and if the wrong thread got the datafolder, it just threw it back where it was caught by the main thread, and thrown back on the queue. When the traffic back and forth stopped, the threads must have their correct data. Not very elegant. But the code below does work, in its own byzantine way.

It seems you can't throw a datafolder back on the thread-queue from a thread, you have to throw it back to the main Igor thread, which then throws it back on the thread-queue. Is this really the case?

Because it would make it a lot easier if you could do that.

The other possibility is to make a separate thread group of a single thread for each set of waves. But that's not very elegant either, as it multiplies the number of thread groups that need to be managed.

function testThrowStart (nThreads)
    variable nThreads
   
    setdatafolder root:
    variable/G threadGrpIDG = ThreadGroupCreate(nThreads)
    variable/G root:numberOfThreads = nThreads
    NVAR threadGrpIDG = root:threadGrpIDG
    variable iThread
    // Make some data specific for each thread and start each thread
    // root:threadDF:threadNum contains number of thread, which matches data passed as paramater in thread Function
    // Threads will get started with the right data becuase each thread waits
    // for a bit after getting its data, see thread function
    for (iThread =0; iThread < nThreads; iThread +=1)
        newdatafolder/o root:threadDF
        variable/G  root:threadDF:threadGrpID =threadGrpIDG
        variable/G root:threadDF:threadNum = iThread
        make/o/n = (100) $"waveA_" + num2str (iThread), $"waveB_" + num2str (iThread), $"waveC_" + num2str (iThread)
        WAVE WaveA = $"waveA_" + num2str (iThread)
        WAVE WAVEB = $"waveB_" + num2str (iThread)
        WAVE WaveC = $"waveC_" + num2str (iThread)
        ThreadStart threadGrpIDG, iThread, testThrowThread (waveA, waveB, waveC)
        ThreadGroupPutDF threadGrpIDG, root:threadDF
    endfor
       
end
 
// calls all the threads nTimes, plays catch with the thread-queue until each thread has its own data
function testThrowRun (nTimes)
    variable nTimes
   
    setdatafolder root:
    NVAR threadGrpIDG = root:threadGrpIDG
    NVAR nThreads = root:numberOfThreads
    variable iThread, iTime
    for (iTime =1; iTime <= nTimes; iTime += 1)
        for (iThread =0; iThread < nThreads; iThread +=1)
            newdatafolder/o root:threadDF
            // data passed to thread is which thread we want to get the data (iThread)
            // and the number of times we have been called (dataVar)
            // when dataVar is 0, the thread will terminate (see testThrowEnd)
            variable/G root:threadDF:threadNum = iThread
            variable/G  root:threadDF:dataVar = iTime
            ThreadGroupPutDF threadGrpIDG, root:threadDF
        endfor
        // Here's we play catch, back and forth till each thread gets its own data
        for (;;)
            // wait for a datafolder from the threads
            DFREF dfr = ThreadGroupGetDFR(threadGrpIDG, 20)
            if (DataFolderRefStatus(dfr ) ==0)
                // if you don't get a datafolder back, everyone is happy
                break
            else
                // take the datafolder, duplicate it, and throw it back on the queue, better luck next time
                DuplicateDataFolder dfr, :notMyFolder
                ThreadGroupPutDF threadGrpIDG, :notMyFolder
                killdatafolder dfr
            endif
        endfor
        sleep/S .1
        doUpdate
    endfor
end
           
// Same strategy as testThrowRun, but passes 0 as dataVar, to signal threads to quit
// then checks that they have quit
function testThrowEnd ()
   
    setdatafolder root:
    NVAR threadGrpIDG = root:threadGrpIDG
    NVAR nThreads = root:numberOfThreads
    variable iThread
    for (iThread =0; iThread < nThreads; iThread +=1)
        newdatafolder/o root:threadDF
        variable/G root:threadDF:threadNum = iThread
        variable/G  root:threadDF:dataVar = 0
        ThreadGroupPutDF threadGrpIDG, root:threadDF
    endfor
     
    for (;;)
        DFREF dfr = ThreadGroupGetDFR(threadGrpIDG, 20)
        if (DataFolderRefStatus(dfr ) ==0)
            break
        else
            DuplicateDataFolder dfr, :notMyFolder
            ThreadGroupPutDF threadGrpIDG, :notMyFolder
            killdatafolder dfr
        endif
    endfor

    variable threadsLeft = ThreadGroupWait(threadGrpIDG, 1000)
    if (threadsLeft > 0)
        print "Threads have not ended"
        variable isReleased = ThreadGroupRelease(threadGrpIDG )
        if (isReleased != 0)
            printf "ThreadGroupRelease returned %d, uh oh.\r", isReleased
        endif
    else
        for (iThread = 0; iThread < nThreads; iThread +=1)
            printf "Thread %d returned %d\r", iThread, ThreadReturnValue(threadGrpIDG, iThread )
        endfor
    endif
end
 
 
ThreadSafe Function testThrowThread (wave1, wave2, wave3)
    WAVE wave1, wave2, wave3
   
    // starter folder tells me who I am
    DFREF dfrInit = ThreadGroupGetDFR(0,inf)
    NVAR myNumber = dfrInit:threadNum
    printf "I am %d.\r", myNumber
    wave1 =0
    wave2 =0
    wave3 =0
    sleep/s .05 // let other thread get their number
    // subsequent folders tell me to process data
    for (;;)
        DFREF dfr = ThreadGroupGetDFR(0,inf)
        NVAR threadNum = dfr:threadNum
        NVAR dataVar = dfr:dataVar
        if (threadNum != myNumber)
            if (!(NVAR_EXISTS (myNumber)))
                print "I don't even know who I am"
            else
                printf "threadNum was %d but I am %d.\r", threadNum, myNumber
                DuplicateDataFolder dfr, :notMyFolder
                ThreadGroupPutDF 0, notMyFolder
            endif
        else
            if (dataVar ==0)
                break
            else
                wave1 +=myNumber + 1
                wave2 +=(myNumber + 1) *2
                wave3 += (myNumber + 1) * 3
            endif
        endif
        killdatafolder dfr
    endfor
    return myNumber
end
Disclaimer: I haven't actually read and understood all of your code :)

So ThreadStart doesn't pay attention to the second (index) input?

Can't you pack up all the data necessary for what needs to be done in a data folder and use ThreadGroupPutDF and ThreadGroupGetDF to dispatch threads? It's hard to see why a given instance has to run on a particular processor. You just need to make sure that the right data gets dispatched all together.

John Weeks
WaveMetrics, Inc.
support@wavemetrics.com
[quote=johnweeks]Can't you pack up all the data necessary for what needs to be done in a data folder and use ThreadGroupPutDF and ThreadGroupGetDF to dispatch threads? It's hard to see why a given instance has to run on a particular processor. You just need to make sure that the right data gets dispatched all together.

Each thread, corresponding to a different channel of data acquisition, gets a specific set of waves as parameters. That's the only (?) way you can access the same wave's contents inside a thread and outside a thread. I want to modify the parameter wave's data outside the thread, then signal to the thread to do its business, which involves processing the data in one of the parameter waves, and placing the result in one of the other parameter waves. That way, I can monitor the data as it is collected. I don't want to have to keep duplicating waves and posting them to the thread queue, I just want to post a single variable to signal the thread to process its data. And I don't want to have to wait in the main thread to get the results back and display them. Using parameter waves gets around that problem.

So I want to signal a particular thread because it has a particular set of parameter waves associated with it. I guess I could pass all the waves to each thread as parameters, and then it doesn't matter which thread gets the datafolder, as it will have access to all the waves. Except I might not know ahead of time how many threads I will need, and would have to make a parameter list for the thread function that was "big enough for anyone", and then pass null waves for most of the time. The other solution, as I mentioned before, is to make a separate thread group of a single thread for each channel.

Jamie Boyd, Ph.D.
Sounds like that last is the right way to do it, without really examining it too deeply. A threadgroup is intended for a group of threads all working on the same problem. You don't really have that- you have separate acquisitions, each with its own parameters, etc., all working independently. They just happen to be working simultaneously.

John Weeks
WaveMetrics, Inc.
support@wavemetrics.com
I also lean towards the "multiple threadgroups" solution. A threadgroup should have only equal members.

If you want to use your original approach. Why not do the "thread wakeup signal" with setting a value in the passed parameter waves?
Let's say the parameter waves have a field labelled "DoProcessing". The thread does busy waiting for the field being 1. In the main thread you set that field to 1, then the thread does its job and afterwards sets the field to 0.
The ugliness in that solution is that it is formally not 100% correct as you access shared data in read/write form from multiple threads without mutual exclusion. Still you should get it working, just pay attention that you only write into "DoProcessing" if you are sure that the other side is not attempting to write also.
jamie wrote:
... That way, I can monitor the data as it is collected. ... And I don't want to have to wait in the main thread to get the results back and display them.


I have to wonder rather naively from the sidelines. Why are you implementing this to control what is shipped to/from parallel thread groups versus to control what is shipped to/from background tasks? IOW ...

* Collect data in background tasks. Store all data in background.
* For display, dump out a fraction of the data "real time" to a foreground task.

AFAIK, this approach is a demo in Igor Pro and it fits your demands. So, what problems does your approach solve that are not solved by background tasks?

--
J. J. Weimer
Chemistry / Chemical & Materials Engineering, UAH
jjweimer wrote:

I have to wonder rather naively from the sidelines. Why are you implementing this to control what is shipped to/from parallel thread groups versus to control what is shipped to/from background tasks?


Background tasks run in the main thread and *not* in parallel.
thomas_braun wrote:

Background tasks run in the main thread and *not* in parallel.


Oh. Somehow I had the mistaken impression then that, because background tasks are "shipped out", that means they run in parallel.

I see now the reason to ask for a flag to be able to document where processing is on a specific data set when it is shipped out to thread groups.

--
J. J. Weimer
Chemistry / Chemical & Materials Engineering, UAH