############################################################################
#
# Copyright (c) 2004 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
############################################################################
import logging
import sys
import traceback
import warnings
import weakref
from zope.interface import implementer
from transaction import interfaces
from transaction._compat import StringIO
from transaction._compat import get_thread_ident
from transaction._compat import reraise
from transaction._compat import text_type
from transaction.interfaces import TransactionFailedError
from transaction.weakset import WeakSet
_marker = object()
_TB_BUFFER = None # unittests may hook
def _makeTracebackBuffer(): # pragma NO COVER
if _TB_BUFFER is not None:
return _TB_BUFFER
return StringIO()
_LOGGER = None # unittests may hook
def _makeLogger(): # pragma NO COVER
if _LOGGER is not None:
return _LOGGER
return logging.getLogger("txn.%d" % get_thread_ident())
class Status(object):
# ACTIVE is the initial state.
ACTIVE = "Active"
COMMITTING = "Committing"
COMMITTED = "Committed"
DOOMED = "Doomed"
# commit() or commit(True) raised an exception. All further attempts
# to commit or join this transaction will raise TransactionFailedError.
COMMITFAILED = "Commit failed"
class _NoSynchronizers(object):
@staticmethod
def map(_f):
"""Do nothing."""
[docs]@implementer(interfaces.ITransaction)
class Transaction(object):
"""Default implementation of `~transaction.interfaces.ITransaction`."""
# Assign an index to each savepoint so we can invalidate later savepoints
# on rollback. The first index assigned is 1, and it goes up by 1 each
# time.
_savepoint_index = 0
# If savepoints are used, keep a weak key dict of them. This maps a
# savepoint to its index (see above).
_savepoint2index = None
# Meta data. extended_info is also metadata, but is initialized to an
# empty dict in __init__.
_user = u""
_description = u""
def __init__(self, synchronizers=None, manager=None):
self.status = Status.ACTIVE
# List of resource managers, e.g. MultiObjectResourceAdapters.
self._resources = []
# Weak set of synchronizer objects to call.
if synchronizers is None:
synchronizers = WeakSet()
self._synchronizers = synchronizers
self._manager = manager
# _adapters: Connection/_p_jar -> MultiObjectResourceAdapter[Sub]
self._adapters = {}
self._voted = {} # id(Connection) -> boolean, True if voted
# _voted and other dictionaries use the id() of the resource
# manager as a key, because we can't guess whether the actual
# resource managers will be safe to use as dict keys.
# The user, description, and extension attributes are accessed
# directly by storages, leading underscore notwithstanding.
self.extension = {}
self.log = _makeLogger()
self.log.debug("new transaction")
# If a commit fails, the traceback is saved in _failure_traceback.
# If another attempt is made to commit, TransactionFailedError is
# raised, incorporating this traceback.
self._failure_traceback = None
# List of (hook, args, kws) tuples added by addBeforeCommitHook().
self._before_commit = []
# List of (hook, args, kws) tuples added by addAfterCommitHook().
self._after_commit = []
# List of (hook, args, kws) tuples added by addBeforeAbortHook().
self._before_abort = []
# List of (hook, args, kws) tuples added by addAfterAbortHook().
self._after_abort = []
@property
def _extension(self):
# for backward compatibility, since most clients used this
# absent any formal API.
return self.extension
@_extension.setter
def _extension(self, v):
self.extension = v
@property
def user(self):
return self._user
@user.setter
def user(self, v):
if v is None:
raise ValueError("user must not be None")
self._user = text_or_warn(v)
@property
def description(self):
return self._description
@description.setter
def description(self, v):
if v is not None:
self._description = text_or_warn(v)
[docs] def isDoomed(self):
"""See `~transaction.interfaces.ITransaction`."""
return self.status is Status.DOOMED
[docs] def doom(self):
"""See `~transaction.interfaces.ITransaction`."""
if self.status is not Status.DOOMED:
if self.status is not Status.ACTIVE:
# should not doom transactions in the middle,
# or after, a commit
raise ValueError('non-doomable')
self.status = Status.DOOMED
# Raise TransactionFailedError, due to commit()/join()/register()
# getting called when the current transaction has already suffered
# a commit/savepoint failure.
def _prior_operation_failed(self):
assert self._failure_traceback is not None
raise TransactionFailedError(
"An operation previously failed, with traceback:\n\n%s" %
self._failure_traceback.getvalue())
[docs] def join(self, resource):
"""See `~transaction.interfaces.ITransaction`."""
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return
if (self.status is not Status.ACTIVE and
self.status is not Status.DOOMED):
# TODO: Should it be possible to join a committing transaction?
# I think some users want it.
raise ValueError("expected txn status %r or %r, but it's %r" % (
Status.ACTIVE, Status.DOOMED, self.status))
self._resources.append(resource)
if self._savepoint2index:
# A data manager has joined a transaction *after* a savepoint
# was created. A couple of things are different in this case:
#
# 1. We need to add its savepoint to all previous savepoints.
# so that if they are rolled back, we roll this one back too.
#
# 2. We don't actually need to ask the data manager for a
# savepoint: because it's just joining, we can just abort it to
# roll back to the current state, so we simply use an
# AbortSavepoint.
datamanager_savepoint = AbortSavepoint(resource, self)
for transaction_savepoint in self._savepoint2index.keys():
transaction_savepoint._savepoints.append(
datamanager_savepoint)
def _unjoin(self, resource):
# Leave a transaction because a savepoint was rolled back on a resource
# that joined later.
# Don't use remove. We don't want to assume anything about __eq__.
self._resources = [r for r in self._resources if r is not resource]
[docs] def savepoint(self, optimistic=False):
"""See `~transaction.interfaces.ITransaction`."""
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return, it raises
try:
savepoint = Savepoint(self, optimistic, *self._resources)
except: # noqa: E722 do not use bare 'except'
self._cleanup(self._resources)
self._saveAndRaiseCommitishError() # reraises!
if self._savepoint2index is None:
self._savepoint2index = weakref.WeakKeyDictionary()
self._savepoint_index += 1
self._savepoint2index[savepoint] = self._savepoint_index
return savepoint
# Remove and invalidate all savepoints we know about with an index
# larger than `savepoint`'s. This is what's needed when a rollback
# _to_ `savepoint` is done.
def _remove_and_invalidate_after(self, savepoint):
savepoint2index = self._savepoint2index
index = savepoint2index[savepoint]
# use list(items()) to make copy to avoid mutating while iterating
for savepoint, i in list(savepoint2index.items()):
if i > index:
savepoint.transaction = None # invalidate
del savepoint2index[savepoint]
# Invalidate and forget about all savepoints.
def _invalidate_all_savepoints(self):
for savepoint in self._savepoint2index.keys():
savepoint.transaction = None # invalidate
self._savepoint2index.clear()
[docs] def commit(self):
"""See `~transaction.interfaces.ITransaction`."""
if self.status is Status.DOOMED:
raise interfaces.DoomedTransaction(
'transaction doomed, cannot commit')
if self._savepoint2index:
self._invalidate_all_savepoints()
if self.status is Status.COMMITFAILED:
self._prior_operation_failed() # doesn't return
self._callBeforeCommitHooks()
self._synchronizers.map(lambda s: s.beforeCompletion(self))
self.status = Status.COMMITTING
try:
self._commitResources()
self.status = Status.COMMITTED
except: # noqa: E722 do not use bare 'except'
t = None
v = None
tb = None
try:
t, v, tb = self._saveAndGetCommitishError()
self._callAfterCommitHooks(status=False)
reraise(t, v, tb)
finally:
del t, v, tb
else:
self._synchronizers.map(lambda s: s.afterCompletion(self))
self._callAfterCommitHooks(status=True)
self._free()
self.log.debug("commit")
def _saveAndGetCommitishError(self):
self.status = Status.COMMITFAILED
# Save the traceback for TransactionFailedError.
ft = self._failure_traceback = _makeTracebackBuffer()
t = None
v = None
tb = None
try:
t, v, tb = sys.exc_info()
# Record how we got into commit().
traceback.print_stack(sys._getframe(1), None, ft)
# Append the stack entries from here down to the exception.
traceback.print_tb(tb, None, ft)
# Append the exception type and value.
ft.writelines(traceback.format_exception_only(t, v))
return t, v, tb
finally:
del t, v, tb
def _saveAndRaiseCommitishError(self):
t = None
v = None
tb = None
try:
t, v, tb = self._saveAndGetCommitishError()
reraise(t, v, tb)
finally:
del t, v, tb
[docs] def getBeforeCommitHooks(self):
"""See `~transaction.interfaces.ITransaction`."""
return iter(self._before_commit)
[docs] def addBeforeCommitHook(self, hook, args=(), kws=None):
"""See `~transaction.interfaces.ITransaction`."""
if kws is None:
kws = {}
self._before_commit.append((hook, tuple(args), kws))
def _callBeforeCommitHooks(self):
# Call all hooks registered, allowing further registrations
# during processing.
self._call_hooks(self._before_commit)
[docs] def getAfterCommitHooks(self):
"""See `~transaction.interfaces.ITransaction`."""
return iter(self._after_commit)
[docs] def addAfterCommitHook(self, hook, args=(), kws=None):
"""See `~transaction.interfaces.ITransaction`."""
if kws is None:
kws = {}
self._after_commit.append((hook, tuple(args), kws))
def _callAfterCommitHooks(self, status=True):
self._call_hooks(self._after_commit,
exc=False, clean=True, prefix_args=(status,))
def _call_hooks(self, hooks, exc=True, clean=False, prefix_args=()):
"""Call *hooks*.
If *exc* is true, fail on the first exception; otherwise
log the exception and continue.
If *clean* is true, abort all resources. This is to ensure
a clean state should a (after) hook has affected one
of the resources.
*prefix_args* defines additional arguments prefixed
to the arguments provided by the hook definition.
``_call_hooks`` supports that a hook adds new hooks.
"""
# Avoid to abort anything at the end if no hooks are registered.
if not hooks:
return
try:
# Call all hooks registered, allowing further registrations
# during processing
for hook, args, kws in hooks:
try:
hook(*(prefix_args + args), **kws)
except: # noqa: E722 do not use bare 'except'
if exc:
raise
# We should not fail
self.log.error("Error in hook exec in %s ",
hook, exc_info=sys.exc_info())
finally:
del hooks[:] # clear hooks
if clean:
# The primary operation has already been performed.
# But the hooks execution might have left the resources
# in an unclean state. Clean up
for rm in self._resources:
try:
rm.abort(self)
except: # noqa: E722 do not use bare 'except'
# XXX should we take further actions here ?
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
[docs] def getBeforeAbortHooks(self):
"""See `~transaction.interfaces.ITransaction`."""
return iter(self._before_abort)
[docs] def addBeforeAbortHook(self, hook, args=(), kws=None):
"""See `~transaction.interfaces.ITransaction`."""
if kws is None:
kws = {}
self._before_abort.append((hook, tuple(args), kws))
def _callBeforeAbortHooks(self):
# Call all hooks registered, allowing further registrations
# during processing.
self._call_hooks(self._before_abort, exc=False)
[docs] def getAfterAbortHooks(self):
"""See `~transaction.interfaces.ITransaction`."""
return iter(self._after_abort)
[docs] def addAfterAbortHook(self, hook, args=(), kws=None):
"""See `~transaction.interfaces.ITransaction`."""
if kws is None:
kws = {}
self._after_abort.append((hook, tuple(args), kws))
def _callAfterAbortHooks(self):
self._call_hooks(self._after_abort, clean=True)
def _commitResources(self):
# Execute the two-phase commit protocol.
L = list(self._resources)
L.sort(key=rm_key)
try:
for rm in L:
rm.tpc_begin(self)
for rm in L:
rm.commit(self)
self.log.debug("commit %r", rm)
for rm in L:
rm.tpc_vote(self)
self._voted[id(rm)] = True
try:
for rm in L:
rm.tpc_finish(self)
except: # noqa: E722 do not use bare 'except'
# TODO: do we need to make this warning stronger?
# TODO: It would be nice if the system could be configured
# to stop committing transactions at this point.
self.log.critical("A storage error occurred during the second "
"phase of the two-phase commit. Resources "
"may be in an inconsistent state.")
raise
except: # noqa: E722 do not use bare 'except'
# If an error occurs committing a transaction, we try
# to revert the changes in each of the resource managers.
t, v, tb = sys.exc_info()
try:
try:
self._cleanup(L)
finally:
self._synchronizers.map(lambda s: s.afterCompletion(self))
reraise(t, v, tb)
finally:
del t, v, tb
def _cleanup(self, L):
# Called when an exception occurs during tpc_vote or tpc_finish.
for rm in L:
if id(rm) not in self._voted:
try:
rm.abort(self)
except Exception:
self.log.error("Error in abort() on manager %s",
rm, exc_info=sys.exc_info())
for rm in L:
try:
rm.tpc_abort(self)
except Exception:
self.log.error("Error in tpc_abort() on manager %s",
rm, exc_info=sys.exc_info())
def _free_manager(self):
try:
if self._manager:
self._manager.free(self)
finally:
# If we try to abort a transaction and fail, the manager
# may have begun a new transaction, and will raise a
# ValueError from free(); we don't want that to happen
# again in _free(), which abort() always calls, so be sure
# to clear out the manager.
self._manager = None
def _free(self):
# Called when the transaction has been committed or aborted
# to break references---this transaction object will not be returned
# as the current transaction from its manager after this, and all
# IDatamanager objects joined to it will forgotten
# All hooks and data are forgotten.
self._free_manager()
if hasattr(self, '_data'):
delattr(self, '_data')
del self._resources[:]
del self._before_commit[:]
del self._after_commit[:]
del self._before_abort[:]
del self._after_abort[:]
# self._synchronizers might be shared, we can't mutate it
self._synchronizers = _NoSynchronizers
self._adapters = None
self._voted = None
self.extension = None
def data(self, ob):
try:
data = self._data
except AttributeError:
raise KeyError(ob)
try:
return data[id(ob)]
except KeyError:
raise KeyError(ob)
def set_data(self, ob, ob_data):
try:
data = self._data
except AttributeError:
data = self._data = {}
data[id(ob)] = ob_data
[docs] def abort(self):
"""See `~transaction.interfaces.ITransaction`."""
try:
t = None
v = None
tb = None
self._callBeforeAbortHooks()
if self._savepoint2index:
self._invalidate_all_savepoints()
try:
self._synchronizers.map(lambda s: s.beforeCompletion(self))
except: # noqa: E722 do not use bare 'except'
t, v, tb = sys.exc_info()
self.log.error(
"Failed to call synchronizers", exc_info=sys.exc_info())
for rm in self._resources:
try:
rm.abort(self)
except: # noqa: E722 do not use bare 'except'
if tb is None:
t, v, tb = sys.exc_info()
self.log.error("Failed to abort resource manager: %s",
rm, exc_info=sys.exc_info())
self._callAfterAbortHooks()
# Unlike in commit(), we are no longer the current transaction
# when we call afterCompletion(). But we can't be completely
# _free(): the synchronizer might want to access some data it set
# before.
self._free_manager()
self._synchronizers.map(lambda s: s.afterCompletion(self))
self.log.debug("abort")
if tb is not None:
reraise(t, v, tb)
finally:
self._free()
del t, v, tb
[docs] def note(self, text):
"""See `~transaction.interfaces.ITransaction`."""
if text is not None:
text = text_or_warn(text).strip()
if self.description:
self.description += u"\n" + text
else:
self.description = text
[docs] def setUser(self, user_name, path=u"/"):
"""See `~transaction.interfaces.ITransaction`."""
self.user = u"%s %s" % (text_or_warn(path), text_or_warn(user_name))
[docs] def setExtendedInfo(self, name, value):
"""See `~transaction.interfaces.ITransaction`."""
self.extension[name] = value
def isRetryableError(self, error):
return self._manager._retryable(type(error), error)
# TODO: We need a better name for the adapters.
def rm_key(rm):
func = getattr(rm, 'sortKey', None)
if func is not None:
return func()
[docs]@implementer(interfaces.ISavepoint)
class Savepoint(object):
"""Implementation of `~transaction.interfaces.ISavepoint`, a transaction
savepoint.
Transaction savepoints coordinate savepoints for data managers
participating in a transaction.
"""
def __init__(self, transaction, optimistic, *resources):
self.transaction = transaction
self._savepoints = savepoints = []
for datamanager in resources:
try:
savepoint = datamanager.savepoint
except AttributeError:
if not optimistic:
raise TypeError("Savepoints unsupported", datamanager)
savepoint = NoRollbackSavepoint(datamanager)
else:
savepoint = savepoint()
savepoints.append(savepoint)
@property
def valid(self):
return self.transaction is not None
[docs] def rollback(self):
"""See `~transaction.interfaces.ISavepoint`."""
transaction = self.transaction
if transaction is None:
raise interfaces.InvalidSavepointRollbackError(
'invalidated by a later savepoint')
transaction._remove_and_invalidate_after(self)
try:
for savepoint in self._savepoints:
savepoint.rollback()
except: # noqa: E722 do not use bare 'except'
# Mark the transaction as failed.
transaction._saveAndRaiseCommitishError() # reraises!
class AbortSavepoint(object):
def __init__(self, datamanager, transaction):
self.datamanager = datamanager
self.transaction = transaction
def rollback(self):
self.datamanager.abort(self.transaction)
self.transaction._unjoin(self.datamanager)
class NoRollbackSavepoint(object):
def __init__(self, datamanager):
self.datamanager = datamanager
def rollback(self):
raise TypeError("Savepoints unsupported", self.datamanager)
def text_or_warn(s):
if isinstance(s, text_type):
return s
warnings.warn("Expected text", DeprecationWarning, stacklevel=3)
if isinstance(s, bytes):
return s.decode('utf-8', 'replace')
else:
return text_type(s)