[go: up one dir, main page]

Menu

[r1450]: / mcomix / worker_thread.py  Maximize  Restore  History

Download this file

147 lines (129 with data), 5.3 kB

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
""" Worker thread class. """
from __future__ import with_statement
import threading
import traceback
from mcomix import log
class WorkerThread(object):
def __init__(self, process_order, name=None, max_threads=1,
sort_orders=False, unique_orders=False):
"""Create a new pool of worker threads.
Optional <name> will be added to spawned thread names.
<process_order> will be called to process each work order.
At most <max_threads> will be started for processing.
If <sort_orders> is True, the orders queue will be sorted
after each addition. If <unique_orders> is True, duplicate
orders will not be added to the queue. """
self._name = name
self._process_order = process_order
self._max_threads = max_threads
self._sort_orders = sort_orders
self._unique_orders = unique_orders
self._stop = False
self._threads = []
# Queue of orders waiting for processing.
self._orders_queue = []
if self._unique_orders:
# Track orders.
self._orders_set = set()
self._condition = threading.Condition()
def __enter__(self):
return self._condition.__enter__()
def __exit__(self, exc_type, exc_value, traceback):
return self._condition.__exit__(exc_type, exc_value, traceback)
def _start(self, nb_threads=1):
for n in range(nb_threads):
if len(self._threads) == self._max_threads:
break
thread = threading.Thread(target=self._run)
if self._name is not None:
thread.name += '-' + self._name
thread.setDaemon(False)
thread.start()
self._threads.append(thread)
def _order_uid(self, order):
if isinstance(order, tuple) or isinstance(order, list):
return order[0]
return order
def _run(self):
order_uid = None
while True:
with self._condition:
if order_uid is not None:
self._orders_set.remove(order_uid)
while not self._stop and 0 == len(self._orders_queue):
self._condition.wait()
if self._stop:
return
order = self._orders_queue.pop(0)
if self._unique_orders:
order_uid = self._order_uid(order)
try:
self._process_order(order)
except Exception, e:
log.error(_('! Worker thread processing %(function)r failed: %(error)s'),
{ 'function' : self._process_order, 'error' : e })
log.debug('Traceback:\n%s', traceback.format_exc())
def must_stop(self):
"""Return true if we've been asked to stop processing.
Can be used by the processing function to check if it must abort early.
"""
return self._stop
def clear_orders(self):
"""Clear the current orders queue."""
with self._condition:
if self._unique_orders:
# We can't just clear the set, as some orders
# can be in the process of being processed.
for order in self._orders_queue:
order_uid = self._order_uid(order)
self._orders_set.remove(order_uid)
self._orders_queue = []
def append_order(self, order):
"""Append work order to the thread orders queue."""
with self._condition:
if self._unique_orders:
order_uid = self._order_uid(order)
if order_uid in self._orders_set:
# Duplicate order.
return
self._orders_set.add(order_uid)
self._orders_queue.append(order)
if self._sort_orders:
self._orders_queue.sort()
self._condition.notifyAll()
self._start()
def extend_orders(self, orders_list):
"""Append work orders to the thread orders queue."""
with self._condition:
if self._unique_orders:
nb_added = 0
for order in orders_list:
order_uid = self._order_uid(order)
if order_uid in self._orders_set:
# Duplicate order.
continue
self._orders_set.add(order_uid)
self._orders_queue.append(order)
nb_added += 1
else:
self._orders_queue.extend(orders_list)
nb_added = len(orders_list)
if 0 == nb_added:
return
if self._sort_orders:
self._orders_queue.sort()
self._condition.notifyAll()
self._start(nb_threads=nb_added)
def stop(self):
"""Stop the worker threads and flush the orders queue."""
self._stop = True
with self._condition:
self._condition.notifyAll()
for thread in self._threads:
thread.join()
self._threads = []
self._stop = False
self._orders_queue = []
if self._unique_orders:
self._orders_set.clear()
# vim: expandtab:sw=4:ts=4