1
0
Fork 0

add transaction.py

This commit is contained in:
Harald Hoyer 2023-10-27 13:56:04 +02:00
parent 4ceaef7522
commit dc85629865
2 changed files with 657 additions and 0 deletions

203
static/files/transaction.py Normal file
View file

@ -0,0 +1,203 @@
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
"""
Transactions of attributes by inheriting the Transaction class
Basic Usage:
class Test(Transaction):
pass
a = Test()
a.test = "old state"
a.commit()
a.test = "bad state, roll me back"
a.rollback()
assert(a.test == "old state")
See also: https://harald.hoyer.xyz/2015/10/13/a-python-transaction-class/
Copyright (C) 2008 Harald Hoyer <harald@redhat.com>
Copyright (C) 2008 Red Hat, Inc.
"""
import copy
def _checksetseen(what, seen):
"checks and sets the obj id in seen"
if what in seen:
return True
seen.add(what)
return False
class Transaction(object):
"""
This class allows sub-classes to commit changes to an instance to a
history, and rollback to previous states.
Because the class only stores attributes in self.__dict__ sub-classes
need to use the methods __getstate__ and __setstate__ to provide additional
state information. See the Transactionlist below for an example usage.
"""
def commit(self, **kwargs):
"""
Commit the object state.
If the optional argument "deep" is set to False,
objects of class Transaction stored in this object will
not be committed.
"""
seen = kwargs.get("_commit_seen", set())
if _checksetseen(id(self), seen):
return
deep = kwargs.get("deep", True)
# Do not deepcopy the Transaction objects. We want to keep the
# reference. Instead commit() them.
state = dict()
for key, val in self.__dict__.items():
if isinstance(val, Transaction):
state[key] = val
if deep:
val.commit(_commit_seen=seen)
elif key == "__l":
# do not deepcopy our old state
state[key] = val
else:
state[key] = copy.deepcopy(val)
if hasattr(self, "__getstate__"):
state = (state, getattr(self, "__getstate__")())
self.__dict__["__l"] = state
def rollback(self, **kwargs):
"""
Rollback the last committed object state.
If the optional argument "deep" is set to False,
objects of class Transaction stored in this object will
not be rolled back.
"""
seen = kwargs.get("_rollback_seen", set())
if _checksetseen(id(self), seen):
return
deep = kwargs.get("deep", True)
state = None
extrastate = None
gotstate = False
gotextrastate = False
if "__l" in self.__dict__:
state = self.__dict__["__l"]
gotstate = True
if type(state) is tuple:
gotextrastate = True
(state, extrastate) = state
# rollback our childs, then ourselves
for child in self.__dict__.values():
if isinstance(child, Transaction):
if deep:
child.rollback(_rollback_seen=seen)
if gotstate:
self.__dict__.clear()
self.__dict__.update(state)
if gotextrastate and hasattr(self, "__setstate__"):
getattr(self, "__setstate__")(extrastate)
class Transactionlist(list, Transaction):
"""
An example subclass of list, which inherits transactions.
Due to the special list implementation, we need the
__getstate__ and __setstate__ methods.
See the code for the implementation.
"""
def commit(self, **kwargs):
"""
Commit the object state.
If the optional argument "deep" is set to False,
objects of class Transaction stored in this object will
not be committed.
"""
# make a local copy of the recursive marker
seen = set(kwargs.get("_commit_seen", set()))
super(Transactionlist, self).commit(**kwargs)
if _checksetseen(id(self), seen):
return
deep = kwargs.get("deep", True)
if deep:
for val in self:
if isinstance(val, Transaction):
val.commit()
def rollback(self, **kwargs):
"""
Rollback the last committed object state.
If the optional argument "deep" is set to False,
objects of class Transaction stored in this object will
not be rolled back.
"""
# make a local copy of the recursive marker
seen = set(kwargs.get("_rollback_seen", set()))
super(Transactionlist, self).rollback(**kwargs)
if _checksetseen(id(self), seen):
return
deep = kwargs.get("deep", True)
if deep:
for val in self:
if isinstance(val, Transaction):
val.rollback()
def __getstate__(self):
"""
return a deepcopy of all non Transaction class objects in our list,
and a reference for the committed Transaction objects.
"""
state = []
for val in self:
if isinstance(val, Transaction):
state.append(val)
else:
state.append(copy.deepcopy(val))
return state
def __setstate__(self, state):
"clear the list and restore all objects from the state"
del self[:]
self.extend(state)
__author__ = "Harald Hoyer <harald@redhat.com>"

View file

@ -0,0 +1,454 @@
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, write to the
# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
"""\
UnitTests for the Transaction class
See also: https://harald.hoyer.xyz/2015/10/13/a-python-transaction-class/
Copyright (C) 2008 Harald Hoyer <harald@redhat.com>
Copyright (C) 2008 Red Hat, Inc.
"""
import unittest
import sys
import copy
from transaction import Transaction
class TransactionOld1(object):
"""\
Old Transaction class from
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/284677
"""
def __init__(self):
self.log = []
def commit(self, **kwargs):
self.log.append(self.__dict__.copy())
def rollback(self, **kwargs):
try:
self.__dict__.update(self.log.pop(-1))
except IndexError:
pass
def __repr__(self):
return "'self.__dict__ = %s'" % self.__dict__
class TransactionOld2(TransactionOld1):
def commit(self, **kwargs):
self.log.append(copy.deepcopy(self.__dict__))
class TransactionOld3(TransactionOld2):
def rollback(self, **kwargs):
try:
state = self.log.pop(-1)
self.__dict__.clear()
self.__dict__.update(state)
except IndexError:
pass
class TransactionNew1(object):
def _docommit(self):
if "log" not in self.__dict__:
self.__dict__["log"] = list()
self.__dict__["log"].append(copy.deepcopy(self.__dict__))
def _dorollback(self):
if "log" not in self.__dict__:
return
try:
state = self.__dict__["log"].pop(-1)
self.__dict__.clear()
self.__dict__.update(state)
except IndexError:
pass
def commit(self, **kwargs):
# commit ourselves, then our childs
self._docommit()
if kwargs.get("deep", True):
for child in self.__dict__.values():
if isinstance(child, self.__class__):
child.commit()
def rollback(self, **kwargs):
# rollback our childs, then ourselves
if kwargs.get("deep", True):
for child in self.__dict__.values():
if isinstance(child, self.__class__):
child.rollback()
self._dorollback()
def __repr__(self):
return "'self.__dict__ = %s'" % self.__dict__
class TransactionNew2(TransactionNew1):
def _docommit(self):
if "log" in self.__dict__:
oldstate = self.__dict__.pop("log")
else:
oldstate = None
state = copy.deepcopy(self.__dict__)
if oldstate:
state["log"] = oldstate
self.__dict__["log"] = state
def _dorollback(self):
if "log" not in self.__dict__:
return
try:
state = self.__dict__["log"]
self.__dict__.clear()
self.__dict__.update(state)
except IndexError:
pass
def __repr__(self):
return "'self.__dict__ = %s'" % self.__dict__
class TransactionNew3(TransactionNew2):
def _checksetseen(self, seen):
if id(self) in seen:
sys.stderr.write("Recursion detected... ")
return True
seen.add(id(self))
return False
def commit(self, **kwargs): # pylint: disable-msg=W0613
seen = kwargs.get("_commit_seen", set())
if self._checksetseen(seen):
return
# commit ourselves, then our childs
self._docommit()
if kwargs.get("deep", True):
for child in self.__dict__.values():
if isinstance(child, self.__class__):
child.commit(_commit_seen=seen)
def rollback(self, **kwargs):
seen = kwargs.get("_rollback_seen", set())
if self._checksetseen(seen):
return
# rollback our childs, then ourselves
if kwargs.get("deep", True):
for child in self.__dict__.values():
if isinstance(child, self.__class__):
child.rollback(_rollback_seen=seen)
self._dorollback()
class TransactionImproved(Transaction):
def __repr__(self):
return "'self.__dict__ = %s'" % self.__dict__
class TestTransaction(unittest.TestCase):
doRecursion = False
def test01(self):
"simple rollback"
a = TestClass()
a.test = "correct"
a.commit()
a.test = "roll me back"
a.rollback()
self.assertEqual(a.test, "correct")
def test02(self):
"""double rollback
demonstrates how you can commit / rollback several times
"""
a = TestClass()
a.test = "correct"
a.commit()
a.test = "roll me back second"
a.commit()
a.test = "roll me back first"
a.rollback()
a.rollback()
self.assertEqual(a.test, "correct")
def test03(self):
"""\
test list rollback
showing the side effects of a non deep rollback
"""
a = TestClass()
a.ls = [0, 1, 2]
a.commit(deep=False)
a.ls.append(3)
a.rollback(deep=False)
self.assertEqual(a.ls, [0, 1, 2])
def test031(self):
"""\
non deep rollback
showing the side effects of a non deep rollback
"""
a = TestClass()
a.ckls = TestClass()
b = a.ckls
a.ckls.newvar = "correct"
a.commit(deep=False)
a.ckls.newvar = "roll me back"
a.rollback(deep=False)
self.assertEqual(b.newvar, "roll me back")
self.assertEqual(a.ckls.newvar, "roll me back")
def test04(self):
"""commit not deep, rollback deep
showing the side effects of a non deep commit
"""
a = TestClass()
a.ckls = TestClass()
a.ckls.newvar = TestClass()
a.ckls.newvar.text = "correct"
b = a.ckls
a.commit(deep=False)
a.ckls.newvar.text = "roll me back"
a.rollback(deep=True)
self.assertEqual(b.newvar.text, "roll me back")
self.assertEqual(a.ckls.newvar.text, "roll me back")
def test041(self):
"""check for leftover attributes"""
a = TestClass()
a.newvar = "correct"
a.commit()
a.shouldnotbethere = True
a.rollback()
self.failIf(hasattr(a, "shouldnotbethere"), a)
def test05(self):
"""commit and rollback deep
no more side effects
"""
a = TestClass()
a.ckls = TestClass()
a.ckls.newvar = "correct"
b = a.ckls
a.commit()
a.ckls.newvar = "roll me back"
a.rollback(deep=True)
self.assertEqual(a.ckls.newvar, "correct")
self.assertEqual(b.newvar, "correct")
self.assertEqual(id(a.ckls), id(b))
def test06(self):
"""commit only a sub object
though we committed only an attribute, the deep rollback
will roll it back.
"""
a = TestClass()
a.ckls = TestClass()
a.newvar = "correct"
a.ckls.newvar = "correct"
a.ckls.commit()
a.newvar = "will not be rolled back"
a.ckls.newvar = "roll me back"
a.rollback()
self.assertEqual(a.newvar, "will not be rolled back")
self.assertEqual(a.ckls.newvar, "correct")
def test07(self):
"""commit only a sub object, rollback with deep=false
we committed only an attribute and the non deep rollback
will not roll it back.
"""
a = TestClass()
a.ckls = TestClass()
a.newvar = "correct"
a.ckls.newvar = "correct"
a.ckls.commit()
a.newvar = "will not be rolled back"
a.ckls.newvar = "will not be rolled back"
a.rollback(deep=False)
self.assertEqual(a.newvar, "will not be rolled back")
self.assertEqual(a.ckls.newvar, "will not be rolled back")
def test10(self):
"""check for the commit/rollback recursion"""
if not TestTransaction.doRecursion:
sys.stderr.write("skipped .. ")
return
a = TestClass()
b = a
for i in xrange(10):
b.newvar = TestClass()
b.test = "test" + str(i)
b = b.newvar
b.newvar = a
# would raise a recursion maximum exception
a.commit(deep=True)
def test11(self):
"""check for swapping Transaction objects"""
a = TestClass()
a.t1 = TestClass()
a.t2 = TestClass()
a.t1.text = "test1"
a.t2.text = "test2"
a.commit()
b = a.t1
a.t1 = a.t2
a.t2 = b
a.rollback()
self.assertEqual(a.t1.text, "test1")
self.assertEqual(a.t2.text, "test2")
def count_dict(self, seen, what):
if id(what) in seen:
return 1
seen.add(id(what))
if not hasattr(what, "__dict__"):
return 1
i = 1
for val in what.__dict__.values():
# print "Counting ", id(val), val
i = i + self.count_dict(seen, val)
return i
def test91(self):
"""check for the stack length (deep=False)"""
a = TestClass()
b = a
for i in xrange(10):
b.newvar = TestClass()
b.test = "test" + str(i)
b = b.newvar
a.commit(deep=False)
a.commit(deep=False)
a.commit(deep=False)
seen = set()
count = self.count_dict(seen, a)
sys.stderr.write("%d objects in %d places .. " % (len(seen), count))
def test92(self):
"""check for the stack length (deep=True)"""
a = TestClass()
b = a
for i in xrange(10):
b.newvar = TestClass()
b.test = "test" + str(i)
b.test2 = "test2" + str(i)
b = b.newvar
a.commit(deep=True)
a.commit()
a.commit()
a.commit()
seen = set()
count = self.count_dict(seen, a)
sys.stderr.write("%d objects in %d places .. " % (len(seen), count))
def suite():
_suite = unittest.TestSuite()
_suite = unittest.makeSuite(TestTransaction, "test")
return _suite
if __name__ == "__main__":
global TestClass
# print >> sys.stderr, """\
# **********************************************************************
# Old Transaction Class (original)
# **********************************************************************
# """
# TestClass = TransactionOld1
# testrunner = unittest.TextTestRunner(verbosity=2)
# result = testrunner.run(suite())
# print >> sys.stderr, """\
# **********************************************************************
# Old Transaction Class (copy.deepcopy)
# **********************************************************************
# """
# TestClass = TransactionOld2
# testrunner = unittest.TextTestRunner(verbosity=2)
# result = testrunner.run(suite())
#
# print >> sys.stderr, """\
# **********************************************************************
# Old Transaction Class (copy.deepcopy + __dict__.clear)
# **********************************************************************
# """
# TestClass = TransactionOld3
# testrunner = unittest.TextTestRunner(verbosity=2)
# result = testrunner.run(suite())
#
# print >> sys.stderr, """\
# **********************************************************************
# New Transaction Class (deep commit)
# **********************************************************************
# """
# TestClass = TransactionNew1
# testrunner = unittest.TextTestRunner(verbosity=2)
# result = testrunner.run(suite())
#
# print >> sys.stderr, """\
# **********************************************************************
# New Transaction Class (deep commit + stack improvement)
# **********************************************************************
# """
# TestClass = TransactionNew2
# testrunner = unittest.TextTestRunner(verbosity=2)
# result = testrunner.run(suite())
#
# print >> sys.stderr, """\
# **********************************************************************
# New Transaction Class (deep commit + stack improvement + recursion check)
# **********************************************************************
# """
# TestClass = TransactionNew3
# TestTransaction.doRecursion = True
# testrunner = unittest.TextTestRunner(verbosity=2)
# result = testrunner.run(suite())
print >> sys.stderr, """\
**********************************************************************
New Transaction Class (final)
**********************************************************************
"""
TestClass = TransactionImproved
TestTransaction.doRecursion = True
testrunner = unittest.TextTestRunner(verbosity=2)
result = testrunner.run(suite())
sys.exit(not result.wasSuccessful())