Source code for serge.blocks.worker
"""Some helper classes to implement various parallel processing workers"""
import multiprocessing
import pygame
import Queue
import new
import serge.engine
#
# The pipeline function
[docs]def pipelineProcessor(qin, qout, target):
"""Implements the surface processing pipeline"""
try:
while True:
#
# Get the next job
job = qin.get()
if job is None:
qout.put(None)
break
#
surface, args = unmarshallSurface(*job[0]), job[1:]
#
# Process it
results = target(surface, *args)
if isinstance(results, tuple):
new_surface, other = results[0], results[1:]
else:
new_surface, other = results, []
#
# Package back
qout.put_nowait([marshallSurface(new_surface)] + list(other))
except Exception, err:
print '**Worker failed with %s' % err
print '**Stopping now'
qout.put(None)
[docs]def getSurfaceProcessingPipeline(target, start=True):
"""Return a pair of queues to implement a surface processing pipeline
An input and output queue are returned. The queues are passed a tuple of items
and the first one is a surface which is marshalled to the target function.
The function must also return a tuple, the first of which is assumed to be a surface
which will be marshalled.
"""
#
# Create queues
todo = SkippableQueue()
result = SkippableQueue()
#
# Create the worker and start it up
worker = multiprocessing.Process(target=pipelineProcessor, args=(todo, result, target))
worker.daemon = True
if start:
worker.start()
#
# Make sure we go away
def stoppingNow(obj, arg):
"""The engine is stopping"""
engine.log.info('Cleaning up worker - sent quit signal')
todo.put(None)
engine.log.info('Waiting for worker to finish')
while True:
answer = result.get()
if answer is None:
engine.log.info('Worker seems to have finished')
break
#
engine = serge.engine.CurrentEngine()
if engine:
engine.linkEvent(serge.events.E_BEFORE_STOP, stoppingNow)
#
return (todo, result, worker)
FORMAT = 'RGBA'
[docs]def marshallSurface(surface):
"""Return a surface that can be passed from one process to another"""
return surface.get_width(), surface.get_height(), FORMAT, pygame.image.tostring(surface, FORMAT)
[docs]def unmarshallSurface(width, height, fmt, string):
"""Return a surface returned from another process"""
return pygame.image.fromstring(string, (width, height), fmt)
[docs]def SkippableQueue():
"""Return A queue where only one item is retained"""
def replace(self, job):
"""Replace all items in the queue with this one"""
#
# Drain queue
while True:
try:
_ = self.get_nowait()
except Queue.Empty:
break
else:
pass
#
# Put item
self.put(job)
#
# Get a queue and then add a method to it
queue = multiprocessing.Queue()
queue.replace = new.instancemethod(replace, queue, queue.__class__)
#
return queue