"""
Copy data from the source to the destination, performing inserts and updates
as necessary to make the destination match the source. Note that deletes are
not performed, however.
"""
import cx_Logging
import cx_LoggingOptions
import cx_OptionParser
import cx_OracleUtils
import os
import Exceptions
import Options
# parse command line
parser = cx_OptionParser.OptionParser()
parser.AddOption(cx_OracleUtils.SchemaOption("source-schema"))
parser.AddOption(cx_OracleUtils.SchemaOption("dest-schema"))
parser.AddOption("--key-columns", metavar = "COLS",
help = "comma separated list of columns to use for checking if the "
"row exists")
parser.AddOption("--no-check-exists", default = 1, action = "store_false",
dest = "checkExists",
help = "do not check to see if the row exists in the target")
parser.AddOption("--no-check-modified", default = 1, action = "store_false",
dest = "checkModified",
help = "do not check to see if the row is identical to the row in the "
"destination")
parser.AddOption("--skip", metavar = "N", type = "int",
help = "number of rows to skip before starting the copy")
parser.AddOption("--row-limit", metavar = "N", type = "int",
help = "number of rows at which the copy will stop")
parser.AddOption(Options.COMMIT_POINT)
parser.AddOption(Options.REPORT_POINT)
parser.AddOption(Options.ARRAY_SIZE)
parser.AddOption(Options.MAX_LONG_SIZE)
parser.AddOption("--source-role", metavar = "ROLE",
help = "enable this <role> [identified by <password>] in the source "
"database immediately after connecting by calling "
"dbms_session.set_role")
parser.AddOption("--dest-role", metavar = "ROLE",
help = "enable this <role> [identified by <password>] in the target "
"database immediately after connecting by calling "
"dbms_session.set_role")
cx_LoggingOptions.AddOptions(parser)
parser.AddArgument("source", required = True,
help = "a select statement or the name of the table to query")
parser.AddArgument("destination",
help = "the name of a table or view to perform the insert and update "
"statements against")
options = parser.Parse()
cx_LoggingOptions.ProcessOptions(options)
# set up the source connection
sourceConnection = cx_OracleUtils.Connect(options.sourceSchema,
options.sourceRole)
sourceCursor = sourceConnection.cursor()
if options.arraySize:
sourceCursor.arraysize = options.arraySize
if options.maxLongSize:
sourceCursor.setoutputsize(options.maxLongSize)
# set up the destination connection
destConnection = cx_OracleUtils.Connect(options.destSchema, options.destRole)
cursor = destConnection.cursor()
# determine query to execute
sourceSQL = options.source.strip()
destinationTable = options.destination
if not sourceSQL.lower().startswith("select ") and os.path.isfile(sourceSQL):
sourceSQL = file(sourceSQL).read().strip()
elif " " not in sourceSQL:
if destinationTable is None:
destinationTable = sourceSQL
sourceInfo = cx_OracleUtils.GetObjectInfo(sourceConnection, sourceSQL)
if sourceInfo is None:
raise Exceptions.SourceTableNotFound(tableName = sourceSQL)
sourceTableOwner, sourceTableName, sourceTableType = sourceInfo
sourceSQL = "select * from %s.%s" % \
(cx_OracleUtils.IdentifierRepr(sourceTableOwner),
cx_OracleUtils.IdentifierRepr(sourceTableName))
if not destinationTable:
raise Exceptions.DestinationTableNotSpecified()
# verify the destination table exists
destInfo = cx_OracleUtils.GetObjectInfo(destConnection, destinationTable)
if destInfo is None:
raise Exceptions.TargetTableNotFound(tableName = destinationTable)
destTableOwner, destTableName, destTableType = destInfo
# determine columns in source query
colPos = 0
sourceColumns = {}
sourceCursor.execute(sourceSQL)
sourceVars = sourceCursor.fetchvars
for colName, colType, colDisplaySize, colInternalSize, colPrecision, \
colScale, colNullOk in sourceCursor.description:
isLob = colType in (sourceConnection.CLOB, sourceConnection.BLOB)
sourceColumns[colName] = (colPos, colType, isLob)
colPos += 1
# lookup columns on destination table
cursor.execute("""
select
column_name,
nullable
from all_tab_columns
where owner = :owner
and table_name = :name""",
owner = destTableOwner,
name = destTableName)
destColumns = {}
for name, nullable in cursor:
destColumns[name] = (nullable == "Y")
# determine the list of key columns to use, if necessary
keyColumns = []
if options.checkExists:
if options.keyColumns:
keyColumns = options.keyColumns.upper().split(",")
else:
cursor.execute("""
select constraint_name
from all_constraints
where owner = :owner
and table_name = :name
and constraint_type in ('P', 'U')
order by constraint_type""",
owner = destTableOwner,
name = destTableName)
row = cursor.fetchone()
if not row:
raise Exceptions.NoPrimaryOrUniqueConstraintOnTable()
constraintName, = row
cursor.execute("""
select column_name
from all_cons_columns
where owner = :owner
and constraint_name = :name""",
owner = destTableOwner,
name = constraintName)
keyColumns = [n for n, in cursor]
for name in keyColumns:
if name not in sourceColumns:
raise Exceptions.KeyColumnNotInSourceQuery(name = name)
# match the columns; all of the source or all of the destination columns must
# match for a valid copy
matchingColumns = [n for n in sourceColumns if n in destColumns]
if len(matchingColumns) not in (len(sourceColumns), len(destColumns)):
raise Exceptions.NotAllColumnsMatchByName()
# set up insert cursor
insertNames = [cx_OracleUtils.IdentifierRepr(n) for n in matchingColumns]
insertValues = [":%s" % (i + 1) for i, n in enumerate(matchingColumns)]
statement = "insert into %s.%s (%s) values (%s)" % \
(cx_OracleUtils.IdentifierRepr(destTableOwner),
cx_OracleUtils.IdentifierRepr(destTableName),
",".join(insertNames), ",".join(insertValues))
insertCursor = cursor
insertCursor.bindarraysize = sourceCursor.arraysize
insertCursor.prepare(statement)
vars = []
insertVars = []
for name in matchingColumns:
colPos, colType, isLob = sourceColumns[name]
sourceVar = sourceVars[colPos]
if options.checkExists or isLob:
targetVar = insertCursor.var(colType, sourceVar.maxlength)
insertVars.append((sourceVar, targetVar, isLob))
else:
targetVar = sourceVar
vars.append(targetVar)
insertCursor.setinputsizes(*vars)
# set up exists cursor
if options.checkExists:
method = cx_OracleUtils.WhereClause
whereClauses = [method(n, ":%s" % (i + 1), destColumns[n], True) \
for i, n in enumerate(keyColumns)]
statement = "select count(*) from %s.%s where %s" % \
(cx_OracleUtils.IdentifierRepr(destTableOwner),
cx_OracleUtils.IdentifierRepr(destTableName),
" and ".join(whereClauses))
existsCursor = destConnection.cursor()
existsCursor.prepare(statement)
vars = []
existsVars = []
for name in keyColumns:
colPos, colType, isLob = sourceColumns[name]
sourceVar = sourceVars[colPos]
targetVar = existsCursor.var(colType, sourceVar.maxlength)
vars.append(targetVar)
existsVars.append((sourceVar, targetVar, isLob))
existsCursor.setinputsizes(*vars)
# set up update cursor
updateCursor = None
if options.checkExists and len(keyColumns) != len(matchingColumns):
updateColumns = [n for n in matchingColumns if n not in keyColumns] + \
keyColumns
setClauses = ["%s = :%s" % (cx_OracleUtils.IdentifierRepr(n), i + 1) \
for i, n in enumerate(updateColumns) if n not in keyColumns]
whereClauses = [method(n, ":%s" % (i + 1), destColumns[n], True) \
for i, n in enumerate(updateColumns) if n in keyColumns]
statement = "update %s.%s set %s where %s" % \
(cx_OracleUtils.IdentifierRepr(destTableOwner),
cx_OracleUtils.IdentifierRepr(destTableName),
",".join(setClauses), " and ".join(whereClauses))
if options.checkModified:
additionalWhereClauses = \
[method(n, ":%s" % (i + 1), destColumns[n], False) \
for i, n in enumerate(updateColumns) if n not in keyColumns]
statement += " and (%s)" % " or ".join(additionalWhereClauses)
updateCursor = destConnection.cursor()
updateCursor.bindarraysize = sourceCursor.arraysize
updateCursor.prepare(statement)
vars = []
updateVars = []
for name in updateColumns:
colPos, colType, isLob = sourceColumns[name]
sourceVar = sourceVars[colPos]
targetVar = updateCursor.var(colType, sourceVar.maxlength)
updateVars.append((sourceVar, targetVar, isLob))
vars.append(targetVar)
updateCursor.setinputsizes(*vars)
# tell user what is happening
cx_Logging.Trace("Copying data...")
cx_Logging.Trace(" Source: %s", sourceSQL)
cx_Logging.Trace(" Destination: %s", destinationTable)
# skip rows that are not of interest
while options.skip:
cx_Logging.Trace(" Rows left to skip: %s", options.skip)
rowsToFetch = min(sourceCursor.arraysize, options.skip)
options.skip -= sourceCursor.fetchraw(rowsToFetch)
# initialize counters used in performing the copy
insertedRows = 0
modifiedRows = 0
unmodifiedRows = 0
insertPos = 0
updatePos = 0
lastCommitted = 0
lastReported = 0
totalRowsFetched = 0
iter = range(sourceCursor.arraysize)
reportPoint = options.reportPoint
commitPoint = options.commitPoint
rowLimit = options.rowLimit
if reportPoint is None and commitPoint is not None:
reportPoint = commitPoint
# perform the copy
while True:
rowsFetched = sourceCursor.fetchraw()
if rowLimit is not None and totalRowsFetched + rowsFetched > rowLimit:
rowsFetched = rowLimit - totalRowsFetched
if not rowsFetched:
break
totalRowsFetched += rowsFetched
if not insertVars:
insertPos = rowsFetched
else:
if rowsFetched != sourceCursor.arraysize:
iter = range(rowsFetched)
for pos in iter:
exists = 0
if options.checkExists:
for sourceVar, targetVar, isLob in existsVars:
targetVar.copy(sourceVar, pos, 0)
existsCursor.execute(None, [])
exists, = existsCursor.fetchone()
if not exists:
targetPos = insertPos
targetVars = insertVars
insertPos += 1
elif updateCursor:
targetPos = updatePos
targetVars = updateVars
updatePos += 1
else:
unmodifiedRows += 1
targetVars = []
for sourceVar, targetVar, isLob in targetVars:
if isLob:
targetVar.setvalue(targetPos,
sourceVar.getvalue(pos).read())
else:
targetVar.copy(sourceVar, pos, targetPos)
if insertPos:
insertCursor.executemanyprepared(insertPos)
insertedRows += insertPos
insertPos = 0
if updatePos:
updateCursor.executemanyprepared(updatePos)
modifiedRows += updateCursor.rowcount
unmodifiedRows += (updatePos - updateCursor.rowcount)
updatePos = 0
if reportPoint and totalRowsFetched - lastReported >= reportPoint:
lastReported = totalRowsFetched
cx_Logging.Trace(" %s rows processed", totalRowsFetched)
if commitPoint and totalRowsFetched - lastCommitted >= commitPoint:
lastCommitted = totalRowsFetched
destConnection.commit()
destConnection.commit()
# print out final statistics
cx_Logging.Trace("%s rows retrieved from source.", totalRowsFetched)
cx_Logging.Trace("%s rows created in destination.", insertedRows)
cx_Logging.Trace("%s rows modified in destination.", modifiedRows)
cx_Logging.Trace("%s rows unmodified in destination.", unmodifiedRows)