summaryrefslogtreecommitdiff
path: root/numpy/lib/tests/test_io.py
diff options
context:
space:
mode:
Diffstat (limited to 'numpy/lib/tests/test_io.py')
-rw-r--r--numpy/lib/tests/test_io.py590
1 files changed, 488 insertions, 102 deletions
diff --git a/numpy/lib/tests/test_io.py b/numpy/lib/tests/test_io.py
index 83fca5b91..f58c9e33d 100644
--- a/numpy/lib/tests/test_io.py
+++ b/numpy/lib/tests/test_io.py
@@ -4,12 +4,16 @@ import sys
import gzip
import os
import threading
-from tempfile import NamedTemporaryFile
import time
import warnings
import gc
-from io import BytesIO
+import io
+import re
+import pytest
+from tempfile import NamedTemporaryFile
+from io import BytesIO, StringIO
from datetime import datetime
+import locale
import numpy as np
import numpy.ma as ma
@@ -17,10 +21,10 @@ from numpy.lib._iotools import ConverterError, ConversionWarning
from numpy.compat import asbytes, bytes, unicode, Path
from numpy.ma.testutils import assert_equal
from numpy.testing import (
- TestCase, run_module_suite, assert_warns, assert_,
- assert_raises_regex, assert_raises, assert_allclose,
- assert_array_equal, temppath, dec, IS_PYPY, suppress_warnings
-)
+ assert_warns, assert_, SkipTest, assert_raises_regex, assert_raises,
+ assert_allclose, assert_array_equal, temppath, tempdir, IS_PYPY,
+ HAS_REFCOUNT, suppress_warnings, assert_no_gc_cycles,
+ )
class TextIO(BytesIO):
@@ -44,6 +48,16 @@ class TextIO(BytesIO):
MAJVER, MINVER = sys.version_info[:2]
IS_64BIT = sys.maxsize > 2**32
+try:
+ import bz2
+ HAS_BZ2 = True
+except ImportError:
+ HAS_BZ2 = False
+try:
+ import lzma
+ HAS_LZMA = True
+except ImportError:
+ HAS_LZMA = False
def strptime(s, fmt=None):
@@ -52,10 +66,9 @@ def strptime(s, fmt=None):
2.5.
"""
- if sys.version_info[0] >= 3:
- return datetime(*time.strptime(s.decode('latin1'), fmt)[:3])
- else:
- return datetime(*time.strptime(s, fmt)[:3])
+ if type(s) == bytes:
+ s = s.decode("latin1")
+ return datetime(*time.strptime(s, fmt)[:3])
class RoundtripTest(object):
@@ -103,8 +116,9 @@ class RoundtripTest(object):
if not isinstance(target_file, BytesIO):
target_file.close()
# holds an open file descriptor so it can't be deleted on win
- if not isinstance(arr_reloaded, np.lib.npyio.NpzFile):
- os.remove(target_file.name)
+ if 'arr_reloaded' in locals():
+ if not isinstance(arr_reloaded, np.lib.npyio.NpzFile):
+ os.remove(target_file.name)
def check_roundtrips(self, a):
self.roundtrip(a)
@@ -143,7 +157,7 @@ class RoundtripTest(object):
a = np.array([1, 2, 3, 4], int)
self.roundtrip(a)
- @np.testing.dec.knownfailureif(sys.platform == 'win32', "Fail on Win32")
+ @pytest.mark.skipif(sys.platform == 'win32', reason="Fails on Win32")
def test_mmap(self):
a = np.array([[1, 2.5], [4, 7.3]])
self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'})
@@ -155,7 +169,7 @@ class RoundtripTest(object):
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')])
self.check_roundtrips(a)
- @dec.slow
+ @pytest.mark.slow
def test_format_2_0(self):
dt = [(("%d" % i) * 100, float) for i in range(500)]
a = np.ones(1000, dtype=dt)
@@ -164,7 +178,7 @@ class RoundtripTest(object):
self.check_roundtrips(a)
-class TestSaveLoad(RoundtripTest, TestCase):
+class TestSaveLoad(RoundtripTest):
def roundtrip(self, *args, **kwargs):
RoundtripTest.roundtrip(self, np.save, *args, **kwargs)
assert_equal(self.arr[0], self.arr_reloaded)
@@ -172,7 +186,7 @@ class TestSaveLoad(RoundtripTest, TestCase):
assert_equal(self.arr[0].flags.fnc, self.arr_reloaded.flags.fnc)
-class TestSavezLoad(RoundtripTest, TestCase):
+class TestSavezLoad(RoundtripTest):
def roundtrip(self, *args, **kwargs):
RoundtripTest.roundtrip(self, np.savez, *args, **kwargs)
try:
@@ -187,8 +201,8 @@ class TestSavezLoad(RoundtripTest, TestCase):
self.arr_reloaded.fid.close()
os.remove(self.arr_reloaded.fid.name)
- @np.testing.dec.skipif(not IS_64BIT, "Works only with 64bit systems")
- @np.testing.dec.slow
+ @pytest.mark.skipif(not IS_64BIT, reason="Needs 64bit platform")
+ @pytest.mark.slow
def test_big_arrays(self):
L = (1 << 31) + 100000
a = np.empty(L, dtype=np.uint8)
@@ -264,7 +278,8 @@ class TestSavezLoad(RoundtripTest, TestCase):
fp.seek(0)
assert_(not fp.closed)
- @np.testing.dec.skipif(IS_PYPY, "context manager required on PyPy")
+ #FIXME: Is this still true?
+ @pytest.mark.skipif(IS_PYPY, reason="Missing context manager on PyPy")
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
@@ -303,7 +318,7 @@ class TestSavezLoad(RoundtripTest, TestCase):
assert_(fp.closed)
-class TestSaveTxt(TestCase):
+class TestSaveTxt(object):
def test_array(self):
a = np.array([[1, 2], [3, 4]], float)
fmt = "%.18e"
@@ -328,6 +343,12 @@ class TestSaveTxt(TestCase):
lines = c.readlines()
assert_equal(lines, [b'1\n', b'2\n', b'3\n', b'4\n'])
+ def test_0D_3D(self):
+ c = BytesIO()
+ assert_raises(ValueError, np.savetxt, c, np.array(1))
+ assert_raises(ValueError, np.savetxt, c, np.array([[[1], [2]]]))
+
+
def test_record(self):
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')])
c = BytesIO()
@@ -357,7 +378,7 @@ class TestSaveTxt(TestCase):
lines = c.readlines()
assert_equal(lines, [b'01 : 2.0\n', b'03 : 4.0\n'])
- # Specify delimiter, should be overiden
+ # Specify delimiter, should be overridden
c = BytesIO()
np.savetxt(c, a, fmt='%02d : %3.1f', delimiter=',')
c.seek(0)
@@ -372,7 +393,7 @@ class TestSaveTxt(TestCase):
# Test the functionality of the header and footer keyword argument.
c = BytesIO()
- a = np.array([(1, 2), (3, 4)], dtype=np.int)
+ a = np.array([(1, 2), (3, 4)], dtype=int)
test_header_footer = 'Test header / footer'
# Test the header keyword argument
np.savetxt(c, a, fmt='%1d', header=test_header_footer)
@@ -447,6 +468,26 @@ class TestSaveTxt(TestCase):
[b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n',
b'(3.142e+00+2.718e+00j) (3.142e+00+2.718e+00j)\n'])
+ def test_complex_negative_exponent(self):
+ # Previous to 1.15, some formats generated x+-yj, gh 7895
+ ncols = 2
+ nrows = 2
+ a = np.zeros((ncols, nrows), dtype=np.complex128)
+ re = np.pi
+ im = np.e
+ a[:] = re - 1.0j * im
+ c = BytesIO()
+ np.savetxt(c, a, fmt='%.3e')
+ c.seek(0)
+ lines = c.readlines()
+ assert_equal(
+ lines,
+ [b' (3.142e+00-2.718e+00j) (3.142e+00-2.718e+00j)\n',
+ b' (3.142e+00-2.718e+00j) (3.142e+00-2.718e+00j)\n'])
+
+
+
+
def test_custom_writer(self):
class CustomWriter(list):
@@ -459,8 +500,136 @@ class TestSaveTxt(TestCase):
b = np.loadtxt(w)
assert_array_equal(a, b)
+ def test_unicode(self):
+ utf8 = b'\xcf\x96'.decode('UTF-8')
+ a = np.array([utf8], dtype=np.unicode)
+ with tempdir() as tmpdir:
+ # set encoding as on windows it may not be unicode even on py3
+ np.savetxt(os.path.join(tmpdir, 'test.csv'), a, fmt=['%s'],
+ encoding='UTF-8')
+
+ def test_unicode_roundtrip(self):
+ utf8 = b'\xcf\x96'.decode('UTF-8')
+ a = np.array([utf8], dtype=np.unicode)
+ # our gz wrapper support encoding
+ suffixes = ['', '.gz']
+ # stdlib 2 versions do not support encoding
+ if MAJVER > 2:
+ if HAS_BZ2:
+ suffixes.append('.bz2')
+ if HAS_LZMA:
+ suffixes.extend(['.xz', '.lzma'])
+ with tempdir() as tmpdir:
+ for suffix in suffixes:
+ np.savetxt(os.path.join(tmpdir, 'test.csv' + suffix), a,
+ fmt=['%s'], encoding='UTF-16-LE')
+ b = np.loadtxt(os.path.join(tmpdir, 'test.csv' + suffix),
+ encoding='UTF-16-LE', dtype=np.unicode)
+ assert_array_equal(a, b)
+
+ def test_unicode_bytestream(self):
+ utf8 = b'\xcf\x96'.decode('UTF-8')
+ a = np.array([utf8], dtype=np.unicode)
+ s = BytesIO()
+ np.savetxt(s, a, fmt=['%s'], encoding='UTF-8')
+ s.seek(0)
+ assert_equal(s.read().decode('UTF-8'), utf8 + '\n')
+
+ def test_unicode_stringstream(self):
+ utf8 = b'\xcf\x96'.decode('UTF-8')
+ a = np.array([utf8], dtype=np.unicode)
+ s = StringIO()
+ np.savetxt(s, a, fmt=['%s'], encoding='UTF-8')
+ s.seek(0)
+ assert_equal(s.read(), utf8 + '\n')
+
+
+class LoadTxtBase(object):
+ def check_compressed(self, fopen, suffixes):
+ # Test that we can load data from a compressed file
+ wanted = np.arange(6).reshape((2, 3))
+ linesep = ('\n', '\r\n', '\r')
+ for sep in linesep:
+ data = '0 1 2' + sep + '3 4 5'
+ for suffix in suffixes:
+ with temppath(suffix=suffix) as name:
+ with fopen(name, mode='wt', encoding='UTF-32-LE') as f:
+ f.write(data)
+ res = self.loadfunc(name, encoding='UTF-32-LE')
+ assert_array_equal(res, wanted)
+ with fopen(name, "rt", encoding='UTF-32-LE') as f:
+ res = self.loadfunc(f)
+ assert_array_equal(res, wanted)
+
+ # Python2 .open does not support encoding
+ @pytest.mark.skipif(MAJVER == 2, reason="Needs Python version >= 3")
+ def test_compressed_gzip(self):
+ self.check_compressed(gzip.open, ('.gz',))
+
+ @pytest.mark.skipif(not HAS_BZ2, reason="Needs bz2")
+ @pytest.mark.skipif(MAJVER == 2, reason="Needs Python version >= 3")
+ def test_compressed_gzip(self):
+ self.check_compressed(bz2.open, ('.bz2',))
+
+ @pytest.mark.skipif(not HAS_LZMA, reason="Needs lzma")
+ @pytest.mark.skipif(MAJVER == 2, reason="Needs Python version >= 3")
+ def test_compressed_gzip(self):
+ self.check_compressed(lzma.open, ('.xz', '.lzma'))
+
+ def test_encoding(self):
+ with temppath() as path:
+ with open(path, "wb") as f:
+ f.write('0.\n1.\n2.'.encode("UTF-16"))
+ x = self.loadfunc(path, encoding="UTF-16")
+ assert_array_equal(x, [0., 1., 2.])
+
+ def test_stringload(self):
+ # umlaute
+ nonascii = b'\xc3\xb6\xc3\xbc\xc3\xb6'.decode("UTF-8")
+ with temppath() as path:
+ with open(path, "wb") as f:
+ f.write(nonascii.encode("UTF-16"))
+ x = self.loadfunc(path, encoding="UTF-16", dtype=np.unicode)
+ assert_array_equal(x, nonascii)
+
+ def test_binary_decode(self):
+ utf16 = b'\xff\xfeh\x04 \x00i\x04 \x00j\x04'
+ v = self.loadfunc(BytesIO(utf16), dtype=np.unicode, encoding='UTF-16')
+ assert_array_equal(v, np.array(utf16.decode('UTF-16').split()))
+
+ def test_converters_decode(self):
+ # test converters that decode strings
+ c = TextIO()
+ c.write(b'\xcf\x96')
+ c.seek(0)
+ x = self.loadfunc(c, dtype=np.unicode,
+ converters={0: lambda x: x.decode('UTF-8')})
+ a = np.array([b'\xcf\x96'.decode('UTF-8')])
+ assert_array_equal(x, a)
+
+ def test_converters_nodecode(self):
+ # test native string converters enabled by setting an encoding
+ utf8 = b'\xcf\x96'.decode('UTF-8')
+ with temppath() as path:
+ with io.open(path, 'wt', encoding='UTF-8') as f:
+ f.write(utf8)
+ x = self.loadfunc(path, dtype=np.unicode,
+ converters={0: lambda x: x + 't'},
+ encoding='UTF-8')
+ a = np.array([utf8 + 't'])
+ assert_array_equal(x, a)
+
+
+class TestLoadTxt(LoadTxtBase):
+ loadfunc = staticmethod(np.loadtxt)
+
+ def setup(self):
+ # lower chunksize for testing
+ self.orig_chunk = np.lib.npyio._loadtxt_chunksize
+ np.lib.npyio._loadtxt_chunksize = 1
+ def teardown(self):
+ np.lib.npyio._loadtxt_chunksize = self.orig_chunk
-class TestLoadTxt(TestCase):
def test_record(self):
c = TextIO()
c.write('1 2\n3 4')
@@ -484,7 +653,7 @@ class TestLoadTxt(TestCase):
c.write('1 2\n3 4')
c.seek(0)
- x = np.loadtxt(c, dtype=np.int)
+ x = np.loadtxt(c, dtype=int)
a = np.array([[1, 2], [3, 4]], int)
assert_array_equal(x, a)
@@ -532,7 +701,7 @@ class TestLoadTxt(TestCase):
c.write('# comment\n1,2,3,5\n')
c.seek(0)
x = np.loadtxt(c, dtype=int, delimiter=',',
- comments=unicode('#'))
+ comments=u'#')
a = np.array([1, 2, 3, 5], int)
assert_array_equal(x, a)
@@ -720,7 +889,7 @@ class TestLoadTxt(TestCase):
# Test using an explicit dtype with an object
data = """ 1; 2001-01-01
2; 2002-01-31 """
- ndtype = [('idx', int), ('code', np.object)]
+ ndtype = [('idx', int), ('code', object)]
func = lambda s: strptime(s.strip(), "%Y-%m-%d")
converters = {1: func}
test = np.loadtxt(TextIO(data), delimiter=";", dtype=ndtype,
@@ -750,11 +919,11 @@ class TestLoadTxt(TestCase):
# IEEE doubles and floats only, otherwise the float32
# conversion may fail.
tgt = np.logspace(-10, 10, 5).astype(np.float32)
- tgt = np.hstack((tgt, -tgt)).astype(np.float)
+ tgt = np.hstack((tgt, -tgt)).astype(float)
inp = '\n'.join(map(float.hex, tgt))
c = TextIO()
c.write(inp)
- for dt in [np.float, np.float32]:
+ for dt in [float, np.float32]:
c.seek(0)
res = np.loadtxt(c, dtype=dt)
assert_equal(res, tgt, err_msg="%s" % dt)
@@ -764,9 +933,29 @@ class TestLoadTxt(TestCase):
c = TextIO()
c.write("%s %s" % tgt)
c.seek(0)
- res = np.loadtxt(c, dtype=np.complex)
+ res = np.loadtxt(c, dtype=complex)
assert_equal(res, tgt)
+ def test_complex_misformatted(self):
+ # test for backward compatibility
+ # some complex formats used to generate x+-yj
+ a = np.zeros((2, 2), dtype=np.complex128)
+ re = np.pi
+ im = np.e
+ a[:] = re - 1.0j * im
+ c = BytesIO()
+ np.savetxt(c, a, fmt='%.16e')
+ c.seek(0)
+ txt = c.read()
+ c.seek(0)
+ # misformat the sign on the imaginary part, gh 7895
+ txt_bad = txt.replace(b'e+00-', b'e00+-')
+ assert_(txt_bad != txt)
+ c.write(txt_bad)
+ c.seek(0)
+ res = np.loadtxt(c, dtype=complex)
+ assert_equal(res, a)
+
def test_universal_newline(self):
with temppath() as name:
with open(name, 'w') as f:
@@ -862,9 +1051,25 @@ class TestLoadTxt(TestCase):
dt = np.dtype([('x', int), ('a', 'S10'), ('y', int)])
np.loadtxt(c, delimiter=',', dtype=dt, comments=None) # Should succeed
-
-class Testfromregex(TestCase):
- # np.fromregex expects files opened in binary mode.
+ @pytest.mark.skipif(locale.getpreferredencoding() == 'ANSI_X3.4-1968',
+ reason="Wrong preferred encoding")
+ def test_binary_load(self):
+ butf8 = b"5,6,7,\xc3\x95scarscar\n\r15,2,3,hello\n\r"\
+ b"20,2,3,\xc3\x95scar\n\r"
+ sutf8 = butf8.decode("UTF-8").replace("\r", "").splitlines()
+ with temppath() as path:
+ with open(path, "wb") as f:
+ f.write(butf8)
+ with open(path, "rb") as f:
+ x = np.loadtxt(f, encoding="UTF-8", dtype=np.unicode)
+ assert_array_equal(x, sutf8)
+ # test broken latin1 conversion people now rely on
+ with open(path, "rb") as f:
+ x = np.loadtxt(f, encoding="UTF-8", dtype="S")
+ x = [b'5,6,7,\xc3\x95scarscar', b'15,2,3,hello', b'20,2,3,\xc3\x95scar']
+ assert_array_equal(x, np.array(x, dtype="S"))
+
+class Testfromregex(object):
def test_record(self):
c = TextIO()
c.write('1.312 foo\n1.534 bar\n4.444 qux')
@@ -897,12 +1102,36 @@ class Testfromregex(TestCase):
a = np.array([(1312,), (1534,), (4444,)], dtype=dt)
assert_array_equal(x, a)
+ def test_record_unicode(self):
+ utf8 = b'\xcf\x96'
+ with temppath() as path:
+ with open(path, 'wb') as f:
+ f.write(b'1.312 foo' + utf8 + b' \n1.534 bar\n4.444 qux')
+
+ dt = [('num', np.float64), ('val', 'U4')]
+ x = np.fromregex(path, r"(?u)([0-9.]+)\s+(\w+)", dt, encoding='UTF-8')
+ a = np.array([(1.312, 'foo' + utf8.decode('UTF-8')), (1.534, 'bar'),
+ (4.444, 'qux')], dtype=dt)
+ assert_array_equal(x, a)
+
+ regexp = re.compile(r"([0-9.]+)\s+(\w+)", re.UNICODE)
+ x = np.fromregex(path, regexp, dt, encoding='UTF-8')
+ assert_array_equal(x, a)
+
+ def test_compiled_bytes(self):
+ regexp = re.compile(b'(\\d)')
+ c = BytesIO(b'123')
+ dt = [('num', np.float64)]
+ a = np.array([1, 2, 3], dtype=dt)
+ x = np.fromregex(c, regexp, dt)
+ assert_array_equal(x, a)
#####--------------------------------------------------------------------------
-class TestFromTxt(TestCase):
- #
+class TestFromTxt(LoadTxtBase):
+ loadfunc = staticmethod(np.genfromtxt)
+
def test_record(self):
# Test w/ explicit dtype
data = TextIO('1 2\n3 4')
@@ -919,7 +1148,7 @@ class TestFromTxt(TestCase):
assert_equal(test, control)
def test_array(self):
- # Test outputing a standard ndarray
+ # Test outputting a standard ndarray
data = TextIO('1 2\n3 4')
control = np.array([[1, 2], [3, 4]], dtype=int)
test = np.ndfromtxt(data, dtype=int)
@@ -1005,7 +1234,10 @@ class TestFromTxt(TestCase):
def test_header(self):
# Test retrieving a header
data = TextIO('gender age weight\nM 64.0 75.0\nF 25.0 60.0')
- test = np.ndfromtxt(data, dtype=None, names=True)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.ndfromtxt(data, dtype=None, names=True)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
control = {'gender': np.array([b'M', b'F']),
'age': np.array([64.0, 25.0]),
'weight': np.array([75.0, 60.0])}
@@ -1016,7 +1248,10 @@ class TestFromTxt(TestCase):
def test_auto_dtype(self):
# Test the automatic definition of the output dtype
data = TextIO('A 64 75.0 3+4j True\nBCD 25 60.0 5+6j False')
- test = np.ndfromtxt(data, dtype=None)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.ndfromtxt(data, dtype=None)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
control = [np.array([b'A', b'BCD']),
np.array([64, 25]),
np.array([75.0, 60.0]),
@@ -1062,7 +1297,10 @@ F 35 58.330000
M 33 21.99
""")
# The # is part of the first name and should be deleted automatically.
- test = np.genfromtxt(data, names=True, dtype=None)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(data, names=True, dtype=None)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
ctrl = np.array([('M', 21, 72.1), ('F', 35, 58.33), ('M', 33, 21.99)],
dtype=[('gender', '|S1'), ('age', int), ('weight', float)])
assert_equal(test, ctrl)
@@ -1073,14 +1311,27 @@ M 21 72.100000
F 35 58.330000
M 33 21.99
""")
- test = np.genfromtxt(data, names=True, dtype=None)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(data, names=True, dtype=None)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
assert_equal(test, ctrl)
+ def test_names_and_comments_none(self):
+ # Tests case when names is true but comments is None (gh-10780)
+ data = TextIO('col1 col2\n 1 2\n 3 4')
+ test = np.genfromtxt(data, dtype=(int, int), comments=None, names=True)
+ control = np.array([(1, 2), (3, 4)], dtype=[('col1', int), ('col2', int)])
+ assert_equal(test, control)
+
def test_autonames_and_usecols(self):
# Tests names and usecols
data = TextIO('A B C D\n aaaa 121 45 9.1')
- test = np.ndfromtxt(data, usecols=('A', 'C', 'D'),
- names=True, dtype=None)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.ndfromtxt(data, usecols=('A', 'C', 'D'),
+ names=True, dtype=None)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
control = np.array(('aaaa', 45, 9.1),
dtype=[('A', '|S4'), ('C', int), ('D', float)])
assert_equal(test, control)
@@ -1097,8 +1348,12 @@ M 33 21.99
def test_converters_with_usecols_and_names(self):
# Tests names and usecols
data = TextIO('A B C D\n aaaa 121 45 9.1')
- test = np.ndfromtxt(data, usecols=('A', 'C', 'D'), names=True,
- dtype=None, converters={'C': lambda s: 2 * int(s)})
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.ndfromtxt(data, usecols=('A', 'C', 'D'), names=True,
+ dtype=None,
+ converters={'C': lambda s: 2 * int(s)})
+ assert_(w[0].category is np.VisibleDeprecationWarning)
control = np.array(('aaaa', 90, 9.1),
dtype=[('A', '|S4'), ('C', int), ('D', float)])
assert_equal(test, control)
@@ -1177,19 +1432,19 @@ M 33 21.99
conv = {0: int, 1: int, 2: int, 3: lambda r: dmap[r.decode()]}
test = np.recfromcsv(TextIO(dstr,), dtype=dtyp, delimiter=',',
names=None, converters=conv)
- control = np.rec.array([[1,5,-1,0], [2,8,-1,1], [3,3,-2,3]], dtype=dtyp)
+ control = np.rec.array([(1,5,-1,0), (2,8,-1,1), (3,3,-2,3)], dtype=dtyp)
assert_equal(test, control)
dtyp = [('e1','i4'),('e2','i4'),('n', 'i1')]
test = np.recfromcsv(TextIO(dstr,), dtype=dtyp, delimiter=',',
usecols=(0,1,3), names=None, converters=conv)
- control = np.rec.array([[1,5,0], [2,8,1], [3,3,3]], dtype=dtyp)
+ control = np.rec.array([(1,5,0), (2,8,1), (3,3,3)], dtype=dtyp)
assert_equal(test, control)
def test_dtype_with_object(self):
# Test using an explicit dtype with an object
data = """ 1; 2001-01-01
2; 2002-01-31 """
- ndtype = [('idx', int), ('code', np.object)]
+ ndtype = [('idx', int), ('code', object)]
func = lambda s: strptime(s.strip(), "%Y-%m-%d")
converters = {1: func}
test = np.genfromtxt(TextIO(data), delimiter=";", dtype=ndtype,
@@ -1199,7 +1454,7 @@ M 33 21.99
dtype=ndtype)
assert_equal(test, control)
- ndtype = [('nest', [('idx', int), ('code', np.object)])]
+ ndtype = [('nest', [('idx', int), ('code', object)])]
try:
test = np.genfromtxt(TextIO(data), delimiter=";",
dtype=ndtype, converters=converters)
@@ -1218,6 +1473,18 @@ M 33 21.99
dtype=[('', '|S10'), ('', float)])
assert_equal(test, control)
+ def test_utf8_userconverters_with_explicit_dtype(self):
+ utf8 = b'\xcf\x96'
+ with temppath() as path:
+ with open(path, 'wb') as f:
+ f.write(b'skip,skip,2001-01-01' + utf8 + b',1.0,skip')
+ test = np.genfromtxt(path, delimiter=",", names=None, dtype=float,
+ usecols=(2, 3), converters={2: np.unicode},
+ encoding='UTF-8')
+ control = np.array([('2001-01-01' + utf8.decode('UTF-8'), 1.)],
+ dtype=[('', '|U11'), ('', float)])
+ assert_equal(test, control)
+
def test_spacedelimiter(self):
# Test space delimiter
data = TextIO("1 2 3 4 5\n6 7 8 9 10")
@@ -1336,7 +1603,7 @@ M 33 21.99
test = np.mafromtxt(data, dtype=None, **kwargs)
control = ma.array([(0, 1), (2, -1)],
mask=[(False, False), (False, True)],
- dtype=[('A', np.int), ('B', np.int)])
+ dtype=[('A', int), ('B', int)])
assert_equal(test, control)
assert_equal(test.mask, control.mask)
#
@@ -1344,7 +1611,7 @@ M 33 21.99
test = np.mafromtxt(data, **kwargs)
control = ma.array([(0, 1), (2, -1)],
mask=[(False, False), (False, True)],
- dtype=[('A', np.float), ('B', np.float)])
+ dtype=[('A', float), ('B', float)])
assert_equal(test, control)
assert_equal(test.mask, control.mask)
@@ -1413,7 +1680,7 @@ M 33 21.99
missing_values='-999.0', names=True,)
control = ma.array([(0, 1.5), (2, -1.)],
mask=[(False, False), (False, True)],
- dtype=[('A', np.int), ('B', np.float)])
+ dtype=[('A', int), ('B', float)])
assert_equal(test, control)
assert_equal(test.mask, control.mask)
@@ -1544,11 +1811,17 @@ M 33 21.99
# Test autostrip
data = "01/01/2003 , 1.3, abcde"
kwargs = dict(delimiter=",", dtype=None)
- mtest = np.ndfromtxt(TextIO(data), **kwargs)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ mtest = np.ndfromtxt(TextIO(data), **kwargs)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
ctrl = np.array([('01/01/2003 ', 1.3, ' abcde')],
dtype=[('f0', '|S12'), ('f1', float), ('f2', '|S8')])
assert_equal(mtest, ctrl)
- mtest = np.ndfromtxt(TextIO(data), autostrip=True, **kwargs)
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ mtest = np.ndfromtxt(TextIO(data), autostrip=True, **kwargs)
+ assert_(w[0].category is np.VisibleDeprecationWarning)
ctrl = np.array([('01/01/2003', 1.3, 'abcde')],
dtype=[('f0', '|S10'), ('f1', float), ('f2', '|S5')])
assert_equal(mtest, ctrl)
@@ -1668,28 +1941,141 @@ M 33 21.99
def test_comments_is_none(self):
# Github issue 329 (None was previously being converted to 'None').
- test = np.genfromtxt(TextIO("test1,testNonetherestofthedata"),
- dtype=None, comments=None, delimiter=',')
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(TextIO("test1,testNonetherestofthedata"),
+ dtype=None, comments=None, delimiter=',')
+ assert_(w[0].category is np.VisibleDeprecationWarning)
assert_equal(test[1], b'testNonetherestofthedata')
- test = np.genfromtxt(TextIO("test1, testNonetherestofthedata"),
- dtype=None, comments=None, delimiter=',')
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(TextIO("test1, testNonetherestofthedata"),
+ dtype=None, comments=None, delimiter=',')
+ assert_(w[0].category is np.VisibleDeprecationWarning)
assert_equal(test[1], b' testNonetherestofthedata')
+ def test_latin1(self):
+ latin1 = b'\xf6\xfc\xf6'
+ norm = b"norm1,norm2,norm3\n"
+ enc = b"test1,testNonethe" + latin1 + b",test3\n"
+ s = norm + enc + norm
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(TextIO(s),
+ dtype=None, comments=None, delimiter=',')
+ assert_(w[0].category is np.VisibleDeprecationWarning)
+ assert_equal(test[1, 0], b"test1")
+ assert_equal(test[1, 1], b"testNonethe" + latin1)
+ assert_equal(test[1, 2], b"test3")
+ test = np.genfromtxt(TextIO(s),
+ dtype=None, comments=None, delimiter=',',
+ encoding='latin1')
+ assert_equal(test[1, 0], u"test1")
+ assert_equal(test[1, 1], u"testNonethe" + latin1.decode('latin1'))
+ assert_equal(test[1, 2], u"test3")
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(TextIO(b"0,testNonethe" + latin1),
+ dtype=None, comments=None, delimiter=',')
+ assert_(w[0].category is np.VisibleDeprecationWarning)
+ assert_equal(test['f0'], 0)
+ assert_equal(test['f1'], b"testNonethe" + latin1)
+
+ def test_binary_decode_autodtype(self):
+ utf16 = b'\xff\xfeh\x04 \x00i\x04 \x00j\x04'
+ v = self.loadfunc(BytesIO(utf16), dtype=None, encoding='UTF-16')
+ assert_array_equal(v, np.array(utf16.decode('UTF-16').split()))
+
+ def test_utf8_byte_encoding(self):
+ utf8 = b"\xcf\x96"
+ norm = b"norm1,norm2,norm3\n"
+ enc = b"test1,testNonethe" + utf8 + b",test3\n"
+ s = norm + enc + norm
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '', np.VisibleDeprecationWarning)
+ test = np.genfromtxt(TextIO(s),
+ dtype=None, comments=None, delimiter=',')
+ assert_(w[0].category is np.VisibleDeprecationWarning)
+ ctl = np.array([
+ [b'norm1', b'norm2', b'norm3'],
+ [b'test1', b'testNonethe' + utf8, b'test3'],
+ [b'norm1', b'norm2', b'norm3']])
+ assert_array_equal(test, ctl)
+
+ def test_utf8_file(self):
+ utf8 = b"\xcf\x96"
+ latin1 = b"\xf6\xfc\xf6"
+ with temppath() as path:
+ with open(path, "wb") as f:
+ f.write((b"test1,testNonethe" + utf8 + b",test3\n") * 2)
+ test = np.genfromtxt(path, dtype=None, comments=None,
+ delimiter=',', encoding="UTF-8")
+ ctl = np.array([
+ ["test1", "testNonethe" + utf8.decode("UTF-8"), "test3"],
+ ["test1", "testNonethe" + utf8.decode("UTF-8"), "test3"]],
+ dtype=np.unicode)
+ assert_array_equal(test, ctl)
+
+ # test a mixed dtype
+ with open(path, "wb") as f:
+ f.write(b"0,testNonethe" + utf8)
+ test = np.genfromtxt(path, dtype=None, comments=None,
+ delimiter=',', encoding="UTF-8")
+ assert_equal(test['f0'], 0)
+ assert_equal(test['f1'], "testNonethe" + utf8.decode("UTF-8"))
+
+
+ def test_utf8_file_nodtype_unicode(self):
+ # bytes encoding with non-latin1 -> unicode upcast
+ utf8 = u'\u03d6'
+ latin1 = u'\xf6\xfc\xf6'
+
+ # skip test if cannot encode utf8 test string with preferred
+ # encoding. The preferred encoding is assumed to be the default
+ # encoding of io.open. Will need to change this for PyTest, maybe
+ # using pytest.mark.xfail(raises=***).
+ try:
+ encoding = locale.getpreferredencoding()
+ utf8.encode(encoding)
+ except (UnicodeError, ImportError):
+ raise SkipTest('Skipping test_utf8_file_nodtype_unicode, '
+ 'unable to encode utf8 in preferred encoding')
+
+ with temppath() as path:
+ with io.open(path, "wt") as f:
+ f.write(u"norm1,norm2,norm3\n")
+ f.write(u"norm1," + latin1 + u",norm3\n")
+ f.write(u"test1,testNonethe" + utf8 + u",test3\n")
+ with warnings.catch_warnings(record=True) as w:
+ warnings.filterwarnings('always', '',
+ np.VisibleDeprecationWarning)
+ test = np.genfromtxt(path, dtype=None, comments=None,
+ delimiter=',')
+ # Check for warning when encoding not specified.
+ assert_(w[0].category is np.VisibleDeprecationWarning)
+ ctl = np.array([
+ ["norm1", "norm2", "norm3"],
+ ["norm1", latin1, "norm3"],
+ ["test1", "testNonethe" + utf8, "test3"]],
+ dtype=np.unicode)
+ assert_array_equal(test, ctl)
+
def test_recfromtxt(self):
#
data = TextIO('A,B\n0,1\n2,3')
kwargs = dict(delimiter=",", missing_values="N/A", names=True)
test = np.recfromtxt(data, **kwargs)
control = np.array([(0, 1), (2, 3)],
- dtype=[('A', np.int), ('B', np.int)])
- self.assertTrue(isinstance(test, np.recarray))
+ dtype=[('A', int), ('B', int)])
+ assert_(isinstance(test, np.recarray))
assert_equal(test, control)
#
data = TextIO('A,B\n0,1\n2,N/A')
test = np.recfromtxt(data, dtype=None, usemask=True, **kwargs)
control = ma.array([(0, 1), (2, -1)],
mask=[(False, False), (False, True)],
- dtype=[('A', np.int), ('B', np.int)])
+ dtype=[('A', int), ('B', int)])
assert_equal(test, control)
assert_equal(test.mask, control.mask)
assert_equal(test.A, [0, 2])
@@ -1700,15 +2086,15 @@ M 33 21.99
kwargs = dict(missing_values="N/A", names=True, case_sensitive=True)
test = np.recfromcsv(data, dtype=None, **kwargs)
control = np.array([(0, 1), (2, 3)],
- dtype=[('A', np.int), ('B', np.int)])
- self.assertTrue(isinstance(test, np.recarray))
+ dtype=[('A', int), ('B', int)])
+ assert_(isinstance(test, np.recarray))
assert_equal(test, control)
#
data = TextIO('A,B\n0,1\n2,N/A')
test = np.recfromcsv(data, dtype=None, usemask=True, **kwargs)
control = ma.array([(0, 1), (2, -1)],
mask=[(False, False), (False, True)],
- dtype=[('A', np.int), ('B', np.int)])
+ dtype=[('A', int), ('B', int)])
assert_equal(test, control)
assert_equal(test.mask, control.mask)
assert_equal(test.A, [0, 2])
@@ -1716,16 +2102,23 @@ M 33 21.99
data = TextIO('A,B\n0,1\n2,3')
test = np.recfromcsv(data, missing_values='N/A',)
control = np.array([(0, 1), (2, 3)],
- dtype=[('a', np.int), ('b', np.int)])
- self.assertTrue(isinstance(test, np.recarray))
+ dtype=[('a', int), ('b', int)])
+ assert_(isinstance(test, np.recarray))
assert_equal(test, control)
#
data = TextIO('A,B\n0,1\n2,3')
- dtype = [('a', np.int), ('b', np.float)]
+ dtype = [('a', int), ('b', float)]
test = np.recfromcsv(data, missing_values='N/A', dtype=dtype)
control = np.array([(0, 1), (2, 3)],
dtype=dtype)
- self.assertTrue(isinstance(test, np.recarray))
+ assert_(isinstance(test, np.recarray))
+ assert_equal(test, control)
+
+ #gh-10394
+ data = TextIO('color\n"red"\n"blue"')
+ test = np.recfromcsv(data, converters={0: lambda x: x.strip(b'\"')})
+ control = np.array([('red',), ('blue',)], dtype=[('color', (bytes, 4))])
+ assert_equal(test.dtype, control.dtype)
assert_equal(test, control)
def test_max_rows(self):
@@ -1786,11 +2179,7 @@ M 33 21.99
# Test that we can load data from a filename as well as a file
# object
tgt = np.arange(6).reshape((2, 3))
- if sys.version_info[0] >= 3:
- # python 3k is known to fail for '\r'
- linesep = ('\n', '\r\n')
- else:
- linesep = ('\n', '\r\n', '\r')
+ linesep = ('\n', '\r\n', '\r')
for sep in linesep:
data = '0 1 2' + sep + '3 4 5'
@@ -1800,6 +2189,22 @@ M 33 21.99
res = np.genfromtxt(name)
assert_array_equal(res, tgt)
+ def test_gft_from_gzip(self):
+ # Test that we can load data from a gzipped file
+ wanted = np.arange(6).reshape((2, 3))
+ linesep = ('\n', '\r\n', '\r')
+
+ for sep in linesep:
+ data = '0 1 2' + sep + '3 4 5'
+ s = BytesIO()
+ with gzip.GzipFile(fileobj=s, mode='w') as g:
+ g.write(asbytes(data))
+
+ with temppath(suffix='.gz2') as name:
+ with open(name, 'w') as f:
+ f.write(data)
+ assert_array_equal(np.genfromtxt(name), wanted)
+
def test_gft_using_generator(self):
# gft doesn't work with unicode.
def count():
@@ -1826,7 +2231,7 @@ M 33 21.99
assert_equal(test.dtype.names, ['f0', 'f1', 'f2'])
- assert_(test.dtype['f0'] == np.float)
+ assert_(test.dtype['f0'] == float)
assert_(test.dtype['f1'] == np.int64)
assert_(test.dtype['f2'] == np.integer)
@@ -1835,9 +2240,9 @@ M 33 21.99
assert_equal(test['f2'], 1024)
-class TestPathUsage(TestCase):
+@pytest.mark.skipif(Path is None, reason="No pathlib.Path")
+class TestPathUsage(object):
# Test that pathlib.Path can be used
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_loadtxt(self):
with temppath(suffix='.txt') as path:
path = Path(path)
@@ -1846,7 +2251,6 @@ class TestPathUsage(TestCase):
x = np.loadtxt(path)
assert_array_equal(x, a)
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_save_load(self):
# Test that pathlib.Path instances can be used with savez.
with temppath(suffix='.npy') as path:
@@ -1856,7 +2260,6 @@ class TestPathUsage(TestCase):
data = np.load(path)
assert_array_equal(data, a)
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_savez_load(self):
# Test that pathlib.Path instances can be used with savez.
with temppath(suffix='.npz') as path:
@@ -1864,8 +2267,7 @@ class TestPathUsage(TestCase):
np.savez(path, lab='place holder')
with np.load(path) as data:
assert_array_equal(data['lab'], 'place holder')
-
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
+
def test_savez_compressed_load(self):
# Test that pathlib.Path instances can be used with savez.
with temppath(suffix='.npz') as path:
@@ -1875,7 +2277,6 @@ class TestPathUsage(TestCase):
assert_array_equal(data['lab'], 'place holder')
data.close()
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_genfromtxt(self):
with temppath(suffix='.txt') as path:
path = Path(path)
@@ -1884,9 +2285,8 @@ class TestPathUsage(TestCase):
data = np.genfromtxt(path)
assert_array_equal(a, data)
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_ndfromtxt(self):
- # Test outputing a standard ndarray
+ # Test outputting a standard ndarray
with temppath(suffix='.txt') as path:
path = Path(path)
with path.open('w') as f:
@@ -1896,7 +2296,6 @@ class TestPathUsage(TestCase):
test = np.ndfromtxt(path, dtype=int)
assert_array_equal(test, control)
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_mafromtxt(self):
# From `test_fancy_dtype_alt` above
with temppath(suffix='.txt') as path:
@@ -1908,7 +2307,6 @@ class TestPathUsage(TestCase):
control = ma.array([(1.0, 2.0, 3.0), (4.0, 5.0, 6.0)])
assert_equal(test, control)
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_recfromtxt(self):
with temppath(suffix='.txt') as path:
path = Path(path)
@@ -1918,11 +2316,10 @@ class TestPathUsage(TestCase):
kwargs = dict(delimiter=",", missing_values="N/A", names=True)
test = np.recfromtxt(path, **kwargs)
control = np.array([(0, 1), (2, 3)],
- dtype=[('A', np.int), ('B', np.int)])
- self.assertTrue(isinstance(test, np.recarray))
+ dtype=[('A', int), ('B', int)])
+ assert_(isinstance(test, np.recarray))
assert_equal(test, control)
- @np.testing.dec.skipif(Path is None, "No pathlib.Path")
def test_recfromcsv(self):
with temppath(suffix='.txt') as path:
path = Path(path)
@@ -1932,8 +2329,8 @@ class TestPathUsage(TestCase):
kwargs = dict(missing_values="N/A", names=True, case_sensitive=True)
test = np.recfromcsv(path, dtype=None, **kwargs)
control = np.array([(0, 1), (2, 3)],
- dtype=[('A', np.int), ('B', np.int)])
- self.assertTrue(isinstance(test, np.recarray))
+ dtype=[('A', int), ('B', int)])
+ assert_(isinstance(test, np.recarray))
assert_equal(test, control)
@@ -1952,7 +2349,7 @@ def test_gzip_load():
def test_gzip_loadtxt():
- # Thanks to another windows brokeness, we can't use
+ # Thanks to another windows brokenness, we can't use
# NamedTemporaryFile: a file created from this function cannot be
# reopened by another open call. So we first put the gzipped string
# of the test reference array, write it to a securely opened file,
@@ -2010,6 +2407,7 @@ def test_npzfile_dict():
assert_('x' in z.keys())
+@pytest.mark.skipif(not HAS_REFCOUNT, reason="Python lacks refcounts")
def test_load_refcount():
# Check that objects returned by np.load are directly freed based on
# their refcount, rather than needing the gc to collect them.
@@ -2018,17 +2416,5 @@ def test_load_refcount():
np.savez(f, [1, 2, 3])
f.seek(0)
- assert_(gc.isenabled())
- gc.disable()
- try:
- gc.collect()
+ with assert_no_gc_cycles():
np.load(f)
- # gc.collect returns the number of unreachable objects in cycles that
- # were found -- we are checking that no cycles were created by np.load
- n_objects_in_cycles = gc.collect()
- finally:
- gc.enable()
- assert_equal(n_objects_in_cycles, 0)
-
-if __name__ == "__main__":
- run_module_suite()