summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog8
-rw-r--r--adbh.py82
-rw-r--r--clcommands.py20
-rw-r--r--db.py130
-rw-r--r--debian/control2
-rw-r--r--deprecation.py4
-rw-r--r--test/unittest_db.py111
7 files changed, 302 insertions, 55 deletions
diff --git a/ChangeLog b/ChangeLog
index 0397a1e..ad58c71 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,6 +1,14 @@
ChangeLog for logilab.common
============================
+ --
+ * adbh: changed backup / restore api (BREAKS COMPAT):
+ - backup_command is now backup_commands (eg return a list of commands)
+ - each command returned in backup_commands/restore_commands may now
+ be list that may be used as argument to subprocess.call, or a string
+ which will the requires a subshell
+ * deprecation: deprecated now takes an optional 'stacklevel' argument, default to 2
+
2009-12-23 -- 0.46.0
* db / adbh: added SQL Server support using Pyodbc
diff --git a/adbh.py b/adbh.py
index 7f66f7c..f76dada 100644
--- a/adbh.py
+++ b/adbh.py
@@ -167,7 +167,7 @@ class _GenericAdvFuncHelper:
"""return the system database for the given driver"""
raise NotImplementedError('not supported by this DBMS')
- def backup_command(self, dbname, dbhost, dbuser, backupfile,
+ def backup_commands(self, dbname, dbhost, dbuser, backupfile,
keepownership=True):
"""return a list of commands to backup the given database.
@@ -326,7 +326,7 @@ class _PGAdvFuncHelper(_GenericAdvFuncHelper):
"""return the system database for the given driver"""
return 'template1'
- def backup_command(self, dbname, dbhost, dbuser, backupfile,
+ def backup_commands(self, dbname, dbhost, dbuser, backupfile,
keepownership=True):
cmd = ['pg_dump', '-Fc']
if dbhost:
@@ -338,7 +338,11 @@ class _PGAdvFuncHelper(_GenericAdvFuncHelper):
cmd.append('--file')
cmd.append(backupfile)
cmd.append(dbname)
+<<<<<<< /home/syt/src/fcubicweb/logilab/common/adbh.py
+ return [cmd]
+=======
return cmd
+>>>>>>> /tmp/adbh.py~other.u7LzGS
def restore_commands(self, dbname, dbhost, dbuser, backupfile,
encoding='utf-8', keepownership=True, drop=True):
@@ -436,9 +440,13 @@ class _SqliteAdvFuncHelper(_GenericAdvFuncHelper):
intersect_all_support = False
alter_column_support = False
- def backup_command(self, dbname, dbhost, dbuser, backupfile,
+ def backup_commands(self, dbname, dbhost, dbuser, backupfile,
keepownership=True):
+<<<<<<< /home/syt/src/fcubicweb/logilab/common/adbh.py
+ return [['gzip', dbname], ['mv', dbname + '.gz', backupfile]]
+=======
return ['gzip', dbname], ['mv', dbname + '.gz', backupfile]
+>>>>>>> /tmp/adbh.py~other.u7LzGS
def restore_commands(self, dbname, dbhost, dbuser, backupfile,
encoding='utf-8', keepownership=True, drop=True):
@@ -492,14 +500,20 @@ class _MyAdvFuncHelper(_GenericAdvFuncHelper):
"""return the system database for the given driver"""
return ''
- def backup_command(self, dbname, dbhost, dbuser, backupfile,
+ def backup_commands(self, dbname, dbhost, dbuser, backupfile,
keepownership=True):
cmd = ['mysqldump']
# XXX compress
if dbhost is not None:
+<<<<<<< /home/syt/src/fcubicweb/logilab/common/adbh.py
+ cmd += ('-h', dbhost)
+ cmd += ('-u', dbuser, '-p', '-r', backupfile, dbname)
+ return [cmd]
+=======
cmd += ('-h', dbhost)
cmd += ['-u', dbuser, '-p', '-r', backupfile, dbname]
return cmd
+>>>>>>> /tmp/adbh.py~other.u7LzGS
def restore_commands(self, dbname, dbhost, dbuser, backupfile,
encoding='utf-8', keepownership=True, drop=True):
@@ -606,6 +620,62 @@ class _SqlServer2005FuncHelper(_GenericAdvFuncHelper):
def binary_value(self, value):
return StringIO.StringIO(value)
+<<<<<<< /home/syt/src/fcubicweb/logilab/common/adbh.py
+
+ def backup_commands(self, dbname, dbhost, dbuser, backupfile,
+ keepownership=True):
+ return [[sys.executable, os.path.normpath(__file__),
+ "_SqlServer2005FuncHelper._do_backup", dbhost, dbname, backupfile]
+ ]
+
+ def restore_commands(self, dbname, dbhost, dbuser, backupfile,
+ encoding='utf-8', keepownership=True, drop=True):
+ return [[sys.executable, os.path.normpath(__file__),
+ "_SqlServer2005FuncHelper._do_restore", dbhost, dbname, backupfile],
+ ]
+
+ @staticmethod
+ def _do_backup():
+ import time
+ from logilab.common.db import get_connection
+ dbhost = sys.argv[2]
+ dbname = sys.argv[3]
+ filename = sys.argv[4]
+ cnx = get_connection(driver='sqlserver2005', host=dbhost, database=dbname, extra_args='autocommit;trusted_connection')
+ cursor = cnx.cursor()
+ cursor.execute("BACKUP DATABASE ? TO DISK= ? ", (dbname, filename,))
+ prev_size = -1
+ err_count = 0
+ same_size_count = 0
+ while err_count < 10 and same_size_count < 10:
+ time.sleep(1)
+ try:
+ size = os.path.getsize(filename)
+ except OSError, exc:
+ err_count +=1
+ print exc
+ if size > prev_size:
+ same_size_count = 0
+ prev_size = size
+ else:
+ same_size_count += 1
+ cnx.close()
+ sys.exit(0)
+
+ @staticmethod
+ def _do_restore():
+ """return the SQL statement to restore a backup of the given database"""
+ from logilab.common.db import get_connection
+ dbhost = sys.argv[2]
+ dbname = sys.argv[3]
+ filename = sys.argv[4]
+ cnx = get_connection(driver='sqlserver2005', host=dbhost, database='master', extra_args='autocommit;trusted_connection')
+ cursor = cnx.cursor()
+ cursor.execute("RESTORE DATABASE ? FROM DISK= ? WITH REPLACE", (dbname, filename,))
+ sys.exit(0)
+
+=======
+>>>>>>> /tmp/adbh.py~other.u7LzGS
ADV_FUNC_HELPER_DIRECTORY = {'postgres': _PGAdvFuncHelper(),
'sqlite': _SqliteAdvFuncHelper(),
'mysql': _MyAdvFuncHelper(),
@@ -627,3 +697,7 @@ def auto_register_function(funcdef):
"""register the function `funcdef` on supported backends"""
for driver in funcdef.supported_backends:
register_function(driver, funcdef)
+
+if __name__ == "__main__": # used to backup sql server db
+ func_call = sys.argv[1]
+ eval(func_call+'()')
diff --git a/clcommands.py b/clcommands.py
index 5683cf8..8fb0ec7 100644
--- a/clcommands.py
+++ b/clcommands.py
@@ -4,7 +4,7 @@ one command.
e.g called as "tool command [options] args..." where <options> and <args> are
command'specific
-:copyright: 2003-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:copyright: 2003-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: General Public License version 2 - http://www.gnu.org/licenses
"""
@@ -18,7 +18,7 @@ from logilab.common.configuration import Configuration
DEFAULT_COPYRIGHT = '''\
-Copyright (c) 2004-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+Copyright (c) 2004-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
http://www.logilab.fr/ -- mailto:contact@logilab.fr'''
@@ -80,16 +80,15 @@ def register_commands(commands):
_COMMANDS[command_klass.name] = command_klass
-def main_usage(status=0, __doc__=DEFAULT_DOC, copyright=DEFAULT_COPYRIGHT):
+def main_usage(status=0, doc=DEFAULT_DOC, copyright=DEFAULT_COPYRIGHT):
"""display usage for the main program (i.e. when no command supplied)
and exit
"""
commands = _COMMANDS.keys()
commands.sort()
- doc = __doc__
if doc != DEFAULT_DOC:
try:
- doc = __doc__ % ('<command>', '<command arguments>',
+ doc = doc % ('<command>', '<command arguments>',
'''\
Type "%prog <command> --help" for more information about a specific
command. Available commands are :\n''')
@@ -104,7 +103,8 @@ command. Available commands are :\n''')
if not cmd.hidden:
title = cmd.__doc__.split('.')[0]
print ' ', (command+padding)[:max_len], title
- print '\n', copyright
+ if copyright:
+ print '\n', copyright
sys.exit(status)
@@ -124,19 +124,19 @@ def cmd_run(cmdname, *args):
print command.help()
-def main_run(args, doc=DEFAULT_DOC):
+def main_run(args, doc=DEFAULT_DOC, copyright=DEFAULT_COPYRIGHT):
"""command line tool"""
try:
arg = args.pop(0)
except IndexError:
- main_usage(status=1, __doc__=doc)
+ main_usage(status=1, doc=doc, copyright=copyright)
if arg in ('-h', '--help'):
- main_usage(__doc__=doc)
+ main_usage(doc=doc, copyright=copyright)
try:
cmd_run(arg, *args)
except BadCommandUsage, err:
print 'ERROR: ', err
- main_usage(1, doc)
+ main_usage(1, doc=doc, copyright=copyright)
class ListCommandsCommand(Command):
diff --git a/db.py b/db.py
index c67cd21..6607320 100644
--- a/db.py
+++ b/db.py
@@ -16,7 +16,7 @@ Additional helpers are also provided for advanced functionalities such
as listing existing users or databases, creating database... Get the
helper for your database using the `get_adv_func_helper` function.
-:copyright: 2002-2009 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+:copyright: 2002-2010 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
:contact: http://www.logilab.fr/ -- mailto:contact@logilab.fr
:license: General Public License version 2 - http://www.gnu.org/licenses
"""
@@ -25,10 +25,11 @@ __docformat__ = "restructuredtext en"
import sys
import re
from warnings import warn
+import threading
+import datetime
import logilab.common as lgc
from logilab.common.deprecation import obsolete
-import datetime
try:
from mx.DateTime import DateTimeType, DateTimeDeltaType, strptime
@@ -600,10 +601,12 @@ class _MySqlDBAdapter(DBAPIAdapter):
finally:
cursor.execute("DROP TABLE _type_code_test")
-class _PyodbcAdapter(DBAPIAdapter):
+class _BaseSqlServerAdapter(DBAPIAdapter):
driver = 'Override in subclass'
_use_trusted_connection = False
-
+ _use_autocommit = False
+ _fetch_lock = threading.Lock()
+
@classmethod
def use_trusted_connection(klass, use_trusted=False):
"""
@@ -611,11 +614,22 @@ class _PyodbcAdapter(DBAPIAdapter):
Authentication (i.e. passwordless auth)
"""
klass._use_trusted_connection = use_trusted
+
+ @classmethod
+ def use_autocommit(klass, use_autocommit=False):
+ """
+ pass True to this class method to enable autocommit (required
+ for backup and restore)
+ """
+ klass._use_autocommit = use_autocommit
+
@classmethod
def _process_extra_args(klass, arguments):
arguments = arguments.lower().split(';')
if 'trusted_connection' in arguments:
klass.use_trusted_connection(True)
+ if 'autocommit' in arguments:
+ klass.use_autocommit(True)
def connect(self, host='', database='', user='', password='', port=None, extra_args=None):
"""Handles pyodbc connection format
@@ -628,15 +642,14 @@ class _PyodbcAdapter(DBAPIAdapter):
Windows Authentication, and therefore no login/password is
required.
"""
- pyodbc = self._native_module
- if extra_args is not None:
- self._process_extra_args(extra_args)
- class PyodbcCursor(object):
- """cursor adapting usual dict format to pyodbc format
+ lock = self._fetch_lock
+ class SqlServerCursor(object):
+ """cursor adapting usual dict format to pyodbc/adobdapi format
in SQL queries
"""
def __init__(self, cursor):
self._cursor = cursor
+ self._fetch_lock = lock
def _replace_parameters(self, sql, kwargs, _date_class=datetime.date):
if isinstance(kwargs, dict):
new_sql = re.sub(r'%\(([^\)]+)\)s', r'?', sql)
@@ -658,10 +671,7 @@ class _PyodbcAdapter(DBAPIAdapter):
self._cursor.execute(sql)
else:
final_sql, args = self._replace_parameters(sql, kwargs)
- try:
- self._cursor.execute(final_sql , args)
- except:
- raise
+ self._cursor.execute(final_sql , args)
def executemany(self, sql, kwargss):
if not isinstance(kwargss, (list, tuple)):
kwargss = tuple(kwargss)
@@ -676,14 +686,26 @@ class _PyodbcAdapter(DBAPIAdapter):
def fetchone(self):
smalldate_cols = self._get_smalldate_columns()
- row = self._cursor.fetchone()
+ self._fetch_lock.acquire()
+ try:
+ row = self._cursor.fetchone()
+ finally:
+ self._fetch_lock.release()
return self._replace_smalldate(row, smalldate_cols)
def fetchall (self):
smalldate_cols = self._get_smalldate_columns()
rows = []
- for row in self._cursor.fetchall():
- rows.append(self._replace_smalldate(row, smalldate_cols))
+ while True:
+ self._fetch_lock.acquire()
+ try:
+ batch = self._cursor.fetchmany(1024)
+ finally:
+ self._fetch_lock.release()
+ if not batch:
+ break
+ for row in batch:
+ rows.append(self._replace_smalldate(row, smalldate_cols))
return rows
def _replace_smalldate(self, row, smalldate_cols):
@@ -697,25 +719,15 @@ class _PyodbcAdapter(DBAPIAdapter):
def __getattr__(self, attrname):
return getattr(self._cursor, attrname)
- class PyodbcCnxWrapper:
+ class SqlServerCnxWrapper:
def __init__(self, cnx):
self._cnx = cnx
def cursor(self):
- return PyodbcCursor(self._cnx.cursor())
+ return SqlServerCursor(self._cnx.cursor())
def __getattr__(self, attrname):
return getattr(self._cnx, attrname)
-
- cnx_string_bits = ['DRIVER={%(driver)s}']
- variables = {'host' : host,
- 'database' : database,
- 'user' : user, 'password' : password,
- 'driver': self.driver}
- if self._use_trusted_connection:
- variables['Trusted_Connection'] = 'yes'
- del variables['user']
- del variables['password']
- cnx = self._native_module.connect(**variables)
- return self._wrap_if_needed(PyodbcCnxWrapper(cnx), user)
+ cnx = self._connect(host=host, database=database, user=user, password=password, port=port, extra_args=extra_args)
+ return self._wrap_if_needed(SqlServerCnxWrapper(cnx), user)
def process_value(self, value, description, encoding='utf-8', binarywrap=None):
# if the dbapi module isn't supporting type codes, override to return value directly
@@ -737,15 +749,54 @@ class _PyodbcAdapter(DBAPIAdapter):
return value
+class _PyodbcAdapter(_BaseSqlServerAdapter):
+ def _connect(self, host='', database='', user='', password='', port=None, extra_args=None):
+ if extra_args is not None:
+ self._process_extra_args(extra_args)
+ cnx_string_bits = ['DRIVER={%(driver)s}']
+ variables = {'host' : host,
+ 'database' : database,
+ 'user' : user, 'password' : password,
+ 'driver': self.driver}
+ if self._use_trusted_connection:
+ variables['Trusted_Connection'] = 'yes'
+ del variables['user']
+ del variables['password']
+ if self._use_autocommit:
+ variables['autocommit'] = True
+ return self._native_module.connect(**variables)
class _PyodbcSqlServer2000Adapter(_PyodbcAdapter):
driver = "SQL Server"
class _PyodbcSqlServer2005Adapter(_PyodbcAdapter):
- driver = "SQL Native Client"
+ driver = "SQL Server Native Client 10.0"
class _PyodbcSqlServer2008Adapter(_PyodbcAdapter):
- driver = "SQL Native Client 10.0"
+ driver = "SQL Server Native Client 10.0"
+
+class _AdodbapiAdapter(_BaseSqlServerAdapter):
+
+ def _connect(self, host='', database='', user='', password='', port=None, extra_args=None):
+ if extra_args is not None:
+ self._process_extra_args(extra_args)
+ if self._use_trusted_connection:
+ # this will open a MS-SQL table with Windows authentication
+ auth = 'Integrated Security=SSPI'
+ else:
+ # this set opens a MS-SQL table with SQL authentication
+ auth = 'user ID=%s; Password=%s;' % (user, password)
+ constr = r"Initial Catalog=%s; Data Source=%s; Provider=SQLOLEDB.1; %s" %(database, host, auth)
+ return self._native_module.connect(constr)
+
+class _AdodbapiSqlServer2000Adapter(_AdodbapiAdapter):
+ driver = "SQL Server"
+
+class _AdodbapiSqlServer2005Adapter(_AdodbapiAdapter):
+ driver = "SQL Server Native Client 10.0"
+
+class _AdodbapiSqlServer2008Adapter(_AdodbapiAdapter):
+ driver = "SQL Server Native Client 10.0"
## Drivers, Adapters and helpers registries ###################################
@@ -754,9 +805,9 @@ PREFERED_DRIVERS = {
"postgres" : [ 'psycopg2', 'psycopg', 'pgdb', 'pyPgSQL.PgSQL', ],
"mysql" : [ 'MySQLdb', ], # 'pyMySQL.MySQL, ],
"sqlite" : ['pysqlite2.dbapi2', 'sqlite', 'sqlite3',],
- "sqlserver2000" : ['pyodbc'],
- "sqlserver2005" : ['pyodbc'],
- "sqlserver2008" : ['pyodbc'],
+ "sqlserver2000" : ['pyodbc', 'adodbapi', ],
+ "sqlserver2005" : ['pyodbc', 'adodbapi', ],
+ "sqlserver2008" : ['pyodbc', 'adodbapi', ],
}
_ADAPTERS = {
@@ -769,9 +820,12 @@ _ADAPTERS = {
'sqlite' : { 'pysqlite2.dbapi2' : _PySqlite2Adapter,
'sqlite' : _SqliteAdapter,
'sqlite3' : _PySqlite2Adapter, },
- "sqlserver2000" : {'pyodbc': _PyodbcSqlServer2000Adapter},
- "sqlserver2005" : {'pyodbc': _PyodbcSqlServer2005Adapter},
- "sqlserver2008" : {'pyodbc': _PyodbcSqlServer2008Adapter},
+ "sqlserver2000" : {'adodbapi': _AdodbapiSqlServer2000Adapter,
+ 'pyodbc': _PyodbcSqlServer2000Adapter},
+ "sqlserver2005" : {'adodbapi': _AdodbapiSqlServer2005Adapter,
+ 'pyodbc': _PyodbcSqlServer2005Adapter},
+ "sqlserver2008" : {'adodbapi': _AdodbapiSqlServer2008Adapter,
+ 'pyodbc': _PyodbcSqlServer2008Adapter},
}
# _AdapterDirectory could be more generic by adding a 'protocol' parameter
diff --git a/debian/control b/debian/control
index 2472d6c..796a080 100644
--- a/debian/control
+++ b/debian/control
@@ -21,7 +21,7 @@ Architecture: all
Provides: ${python:Provides}
Depends: ${python:Depends}, ${misc:Depends}
Recommends: python-egenix-mxdatetime
-Conflicts: python-constraint ( <= 0.3.0-4), python-logilab-astng ( <= 0.16.0-1), pylint ( << 0.11.0-1), devtools ( <= 0.9.0-1), logilab-doctools ( <= 0.1.6-4), python-logilab-aspects ( <= 0.1.4-2), python2.3-logilab-common, python2.4-logilab-common
+Conflicts: python-constraint ( <= 0.3.0-4), python-logilab-astng ( <= 0.16.0-1), pylint ( << 0.11.0-1), devtools ( <= 0.9.0-1), logilab-doctools ( <= 0.1.6-4), python-logilab-aspects ( <= 0.1.4-2), python2.3-logilab-common, python2.4-logilab-common, cubicweb-server ( << 3.6.0-1)
XB-Python-Version: ${python:Versions}
Description: useful miscellaneous modules used by Logilab projects
logilab-common is a collection of low-level Python packages and modules,
diff --git a/deprecation.py b/deprecation.py
index 154ae7c..d6cd86c 100644
--- a/deprecation.py
+++ b/deprecation.py
@@ -58,7 +58,7 @@ def class_moved(new_class, old_name=None, message=None):
old_name, new_class.__module__, new_class.__name__)
return class_renamed(old_name, new_class, message)
-def deprecated(reason=None):
+def deprecated(reason=None, stacklevel=2):
"""Decorator that raises a DeprecationWarning to print a message
when the decorated function is called.
"""
@@ -67,7 +67,7 @@ def deprecated(reason=None):
if '%s' in message:
message = message % func.func_name
def wrapped(*args, **kwargs):
- warn(message, DeprecationWarning, stacklevel=2)
+ warn(message, DeprecationWarning, stacklevel=stacklevel)
return func(*args, **kwargs)
return wrapped
return deprecated_decorator
diff --git a/test/unittest_db.py b/test/unittest_db.py
index af288ff..c3f67e5 100644
--- a/test/unittest_db.py
+++ b/test/unittest_db.py
@@ -230,6 +230,117 @@ class DBAPIAdaptersTC(TestCase):
self.assertEquals(mshelper.func_sqlname('MYFUNC'), 'MYF')
self.assertEquals(slhelper.func_sqlname('MYFUNC'), 'SQLITE_MYFUNC')
+class BaseSqlServer(TestCase):
+ def tearDown(self):
+ cursor = self.cnx.cursor()
+ cursor.execute('drop table TestBlob')
+ cursor.execute('drop table TestLargeString')
+ self.cnx.commit()
+ cursor.close()
+ self.cnx.close()
+
+ def blob(self):
+ cursor = self.cnx.cursor()
+ data_length = xrange(400*1024-10, 400*1024+10)
+ for length in data_length:
+ data = buffer('\x00'*length)
+ print "inserting string of length", len(data)
+ cursor.execute('insert into TestBlob(id, data) VALUES(%(id)s, %(data)s)',
+ {'id': length, 'data': data})
+ self.cnx.commit()
+ cursor.execute('select count(*) from TestBlob')
+ print '%d rows in table' % (cursor.fetchone()[0])
+ cursor.close()
+
+ def large_string(self):
+ cursor = self.cnx.cursor()
+ data_length = xrange(400*1024-10, 400*1024+10)
+ for length in data_length:
+ data = '1'*length
+ print "inserting string of length", len(data)
+ cursor.execute('insert into TestLargeString(id, data) VALUES(%(id)s, %(data)s)',
+ {'id': length, 'data': data})
+ self.cnx.commit()
+ cursor.execute('select count(*) from TestLargeString')
+ print '%d rows in table' % (cursor.fetchone()[0])
+ cursor.close()
+
+ def varbinary_none(self):
+ cursor = self.cnx.cursor()
+ cursor.execute('insert into TestBlob (id) values (42)')
+ self.cnx.commit()
+ cursor.execute('select * from TestBlob where id=42')
+ print cursor.fetchall()
+ cursor.execute('update TestBlob set id=43, data=NULL where id=42')
+ self.cnx.commit()
+ cursor.execute('select * from TestBlob where id=43')
+ print cursor.fetchall()
+ cursor.execute('update TestBlob set id = %(id)s, data=%(data)s where id=%(old_id)s', {'data': None, 'id': 42, 'old_id': 43})
+ self.cnx.commit()
+ cursor.execute('select * from TestBlob where id=42')
+ print cursor.fetchall()
+ cursor.close()
+
+
+try:
+ import pyodbc
+except ImportError:
+ print "pyodbc tests skipped"
+else:
+ class pyodbcTC(BaseSqlServer):
+ def setUp(self):
+ try:
+ self.cnx = get_connection(driver='sqlserver2005', database='alf',
+ host='localhost', extra_args='Trusted_Connection')
+ except pyodbc.Error, exc:
+ self.skip(str(exc))
+ cursor = self.cnx.cursor()
+ try:
+ cursor.execute('create table TestLargeString (id int, data varchar(max))')
+ cursor.execute('create table TestBlob (id int, data varbinary(max))')
+ except Exception, exc:
+ print exc
+ cursor.close()
+
+ def test_blob(self):
+ self.blob()
+
+ def test_large_string(self):
+ self.large_string()
+
+ def test_varbinary_none(self):
+ self.varbinary_none()
+
+try:
+ import adodbapi as adb
+except ImportError:
+ print "adodbapi tests skipped"
+else:
+ class adodbapiTC(BaseSqlServer):
+ def setUp(self):
+ try:
+ self.cnx = get_connection(driver='sqlserver2005', database='alf',
+ host='localhost', extra_args='Trusted_Connection')
+ except adb.Error, exc:
+ self.skip(str(exc))
+ cursor = self.cnx.cursor()
+ try:
+
+ cursor.execute('create table TestLargeString (id int, data varchar(max))')
+ cursor.execute('create table TestBlob (id int, data varbinary(max))')
+ except Exception, exc:
+ print exc
+ cursor.close()
+
+ def test_blob(self):
+ self.blob()
+
+ def test_large_string(self):
+ self.large_string()
+
+ def test_varbinary_none(self):
+ self.varbinary_none()
+
if __name__ == '__main__':
unittest_main()