"Fossies" - the Fresh Open Source Software Archive 
Member "pyzor-1.0.0/pyzor/engines/gdbm_.py" (10 Dec 2014, 6613 Bytes) of package /linux/privat/pyzor-1.0.0.tar.gz:
As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style:
standard) with prefixed line numbers.
Alternatively you can here
view or
download the uninterpreted source code file.
For more information about "gdbm_.py" see the
Fossies "Dox" file reference documentation and the latest
Fossies "Diffs" side-by-side code changes report:
0.9.0_vs_1.0.0.
1 """Gdbm database engine."""
2
3 try:
4 import gdbm as gdbm
5 _has_gdbm = True
6 except ImportError:
7 try:
8 import dbm.gnu as gdbm
9 _has_gdbm = True
10 except ImportError:
11 _has_gdbm = False
12
13 import time
14 import logging
15 import datetime
16 import threading
17
18 from pyzor.engines.common import Record, DBHandle, BaseEngine
19
20
21 def _dt_decode(datetime_str):
22 """Decode a string into a datetime object."""
23 if datetime_str == 'None':
24 return None
25 try:
26 return datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S.%f")
27 except ValueError:
28 return datetime.datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
29
30
31 class GdbmDBHandle(BaseEngine):
32 absolute_source = True
33 handles_one_step = False
34
35 sync_period = 60
36 reorganize_period = 3600 * 24 # 1 day
37 fields = ('r_count', 'r_entered', 'r_updated',
38 'wl_count', 'wl_entered', 'wl_updated')
39 _fields = [('r_count', int),
40 ('r_entered', _dt_decode),
41 ('r_updated', _dt_decode),
42 ('wl_count', int),
43 ('wl_entered', _dt_decode),
44 ('wl_updated', _dt_decode)]
45 this_version = '1'
46 log = logging.getLogger("pyzord")
47
48 def __init__(self, fn, mode, max_age=None):
49 self.max_age = max_age
50 self.db = gdbm.open(fn, mode)
51 self.reorganize_timer = None
52 self.sync_timer = None
53 self.start_reorganizing()
54 self.start_syncing()
55
56 def __iter__(self):
57 k = self.db.firstkey()
58 while k is not None:
59 yield k
60 k = self.db.nextkey(k)
61
62 def _iteritems(self):
63 for k in self:
64 try:
65 yield k, self._really_getitem(k)
66 except Exception as e:
67 self.log.warning("Invalid record %s: %s", k, e)
68
69 def iteritems(self):
70 return self._iteritems()
71
72 def items(self):
73 return list(self._iteritems())
74
75 def apply_method(self, method, varargs=(), kwargs=None):
76 if kwargs is None:
77 kwargs = {}
78 return apply(method, varargs, kwargs)
79
80 def __getitem__(self, key):
81 return self.apply_method(self._really_getitem, (key,))
82
83 def _really_getitem(self, key):
84 return GdbmDBHandle.decode_record(self.db[key])
85
86 def __setitem__(self, key, value):
87 self.apply_method(self._really_setitem, (key, value))
88
89 def _really_setitem(self, key, value):
90 self.db[key] = GdbmDBHandle.encode_record(value)
91
92 def __delitem__(self, key):
93 self.apply_method(self._really_delitem, (key,))
94
95 def _really_delitem(self, key):
96 del self.db[key]
97
98 def start_syncing(self):
99 if self.db:
100 self.apply_method(self._really_sync)
101 self.sync_timer = threading.Timer(self.sync_period,
102 self.start_syncing)
103 self.sync_timer.setDaemon(True)
104 self.sync_timer.start()
105
106 def _really_sync(self):
107 self.db.sync()
108
109 def start_reorganizing(self):
110 if not self.max_age:
111 return
112 if self.db:
113 self.apply_method(self._really_reorganize)
114 self.reorganize_timer = threading.Timer(self.reorganize_period,
115 self.start_reorganizing)
116 self.reorganize_timer.setDaemon(True)
117 self.reorganize_timer.start()
118
119 def _really_reorganize(self):
120 self.log.debug("reorganizing the database")
121 key = self.db.firstkey()
122 breakpoint = time.time() - self.max_age
123 while key is not None:
124 rec = self._really_getitem(key)
125 delkey = None
126 if int(time.mktime(rec.r_updated.timetuple())) < breakpoint:
127 self.log.debug("deleting key %s", key)
128 delkey = key
129 key = self.db.nextkey(key)
130 if delkey:
131 self._really_delitem(delkey)
132 self.db.reorganize()
133
134 @classmethod
135 def encode_record(cls, value):
136 values = [cls.this_version]
137 values.extend(["%s" % getattr(value, x) for x in cls.fields])
138 return ",".join(values)
139
140 @classmethod
141 def decode_record(cls, s):
142 try:
143 s = s.decode("utf8")
144 except UnicodeError:
145 raise StandardError("don't know how to handle db value %s" %
146 repr(s))
147 parts = s.split(',')
148 version = parts[0]
149 if len(parts) == 3:
150 dispatch = cls.decode_record_0
151 elif version == '1':
152 dispatch = cls.decode_record_1
153 else:
154 raise StandardError("don't know how to handle db value %s" %
155 repr(s))
156 return dispatch(s)
157
158 @staticmethod
159 def decode_record_0(s):
160 r = Record()
161 parts = s.split(',')
162 fields = ('r_count', 'r_entered', 'r_updated')
163 assert len(parts) == len(fields)
164 for i in range(len(parts)):
165 setattr(r, fields[i], int(parts[i]))
166 return r
167
168 @classmethod
169 def decode_record_1(cls, s):
170 r = Record()
171 parts = s.split(',')[1:]
172 assert len(parts) == len(cls.fields)
173 for part, field in zip(parts, cls._fields):
174 f, decode = field
175 setattr(r, f, decode(part))
176 return r
177
178
179 class ThreadedGdbmDBHandle(GdbmDBHandle):
180 """Like GdbmDBHandle, but handles multi-threaded access."""
181
182 def __init__(self, fn, mode, max_age=None, bound=None):
183 self.db_lock = threading.Lock()
184 GdbmDBHandle.__init__(self, fn, mode, max_age=max_age)
185
186 def apply_method(self, method, varargs=(), kwargs=None):
187 if kwargs is None:
188 kwargs = {}
189 with self.db_lock:
190 return GdbmDBHandle.apply_method(self, method, varargs=varargs,
191 kwargs=kwargs)
192
193 # This won't work because the gdbm object needs to be in shared memory of the
194 # spawned processes.
195 # class ProcessGdbmDBHandle(ThreadedGdbmDBHandle):
196 # def __init__(self, fn, mode, max_age=None, bound=None):
197 # ThreadedGdbmDBHandle.__init__(self, fn, mode, max_age=max_age,
198 # bound=bound)
199 # self.db_lock = multiprocessing.Lock()
200
201 if not _has_gdbm:
202 handle = DBHandle(single_threaded=None,
203 multi_threaded=None,
204 multi_processing=None,
205 prefork=None)
206 else:
207 handle = DBHandle(single_threaded=GdbmDBHandle,
208 multi_threaded=ThreadedGdbmDBHandle,
209 multi_processing=None,
210 prefork=None)