diff --git a/README.mkd b/README.mkd index e95d229..f533b20 100644 --- a/README.mkd +++ b/README.mkd @@ -69,9 +69,9 @@ To test the module, just perform the following command: ```python >>> import pyplink >>> pyplink.test() -....................... +............................................ ---------------------------------------------------------------------- -Ran 23 tests in 0.149s +Ran 44 tests in 0.468s OK ``` diff --git a/demo/PyPlink Demo.ipynb b/demo/PyPlink Demo.ipynb index e1b9694..8e2f0bb 100644 --- a/demo/PyPlink Demo.ipynb +++ b/demo/PyPlink Demo.ipynb @@ -55,7 +55,9 @@ " * [*Counting the allele frequency of markers*](#Counting-the-allele-frequency-of-markers)\n", "\n", "\n", - "* [**Writing binary pedfile**](#Writing-binary-pedfile)" + "* [**Writing binary pedfile**](#Writing-binary-pedfile)\n", + " * [SNP-major format](#SNP-major-format)\n", + " * [INDIVIDUAL-major-format](#INDIVIDUAL-major-format)" ] }, { @@ -823,7 +825,9 @@ "source": [ "## Writing binary pedfile\n", "\n", - "The following examples shows how to write a binary file using the `PyPlink` module.\n", + "### *SNP-major* format\n", + "\n", + "The following examples shows how to write a binary file using the `PyPlink` module. The *SNP-major* format is the default. It means that the binary file is written one marker at a time.\n", "\n", "> Note that `PyPlink` only writes the `BED` file. The user is required to create the `FAM` and `BIM` files." ] @@ -832,7 +836,7 @@ "cell_type": "code", "execution_count": 17, "metadata": { - "collapsed": true + "collapsed": false }, "outputs": [], "source": [ @@ -846,7 +850,7 @@ "# Writing the BED file using PyPlink\n", "with PyPlink(\"test_output\", \"w\") as pedfile:\n", " for genotypes in all_genotypes:\n", - " pedfile.write_marker(genotypes)\n", + " pedfile.write_genotypes(genotypes)\n", "\n", "# Writing a dummy FAM file\n", "with open(\"test_output.fam\", \"w\") as fam_file:\n", @@ -1142,7 +1146,7 @@ "\n", "Skipping web check... [ --noweb ] \n", "Writing this text to log file [ plink.log ]\n", - "Analysis started: Tue Nov 3 11:12:07 2015\n", + "Analysis started: Wed Nov 4 14:50:58 2015\n", "\n", "Options in effect:\n", "\t--noweb\n", @@ -1167,7 +1171,7 @@ "10 founders and 0 non-founders found\n", "Writing allele frequencies (founders-only) to [ plink.frq ] \n", "\n", - "Analysis finished: Tue Nov 3 11:12:10 2015\n", + "Analysis finished: Wed Nov 4 14:50:58 2015\n", "\n" ] } @@ -1205,6 +1209,149 @@ "with open(\"plink.frq\", \"r\") as i_file:\n", " print(i_file.read(), end=\"\")" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### *INDIVIDUAL-major* format\n", + "\n", + "The following examples shows how to write a binary file using the `PyPlink` module. The *INDIVIDUAL-major* format means that the binary file is written one sample at a time.\n", + "\n", + "**Files in *INDIVIDUAL-major* format is not readable by `PyPlink`.** You need to convert it using *Plink*.\n", + "\n", + "> Note that `PyPlink` only writes the `BED` file. The user is required to create the `FAM` and `BIM` files." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "# The genotypes for 3 markers and 10 samples (INDIVIDUAL-major)\n", + "all_genotypes = [\n", + " [ 0, 0, 0],\n", + " [ 0, 0, 0],\n", + " [ 0, 1, 0],\n", + " [ 1, 1, 0],\n", + " [ 0, 0, 1],\n", + " [ 0, 0, 1],\n", + " [-1, 0, 0],\n", + " [ 2, 1, 0],\n", + " [ 1, 2, 0],\n", + " [ 0, 0, 1],\n", + "]\n", + "\n", + "# Writing the BED file using PyPlink\n", + "with PyPlink(\"test_output_2\", \"w\", bed_format=\"INDIVIDUAL-major\") as pedfile:\n", + " for genotypes in all_genotypes:\n", + " pedfile.write_genotypes(genotypes)\n", + "\n", + "# Writing a dummy FAM file\n", + "with open(\"test_output_2.fam\", \"w\") as fam_file:\n", + " for i in range(10):\n", + " print(\"family_{}\".format(i+1), \"sample_{}\".format(i+1), \"0\", \"0\", \"0\", \"-9\",\n", + " sep=\" \", file=fam_file)\n", + "\n", + "# Writing a dummy BIM file\n", + "with open(\"test_output_2.bim\", \"w\") as bim_file:\n", + " for i in range(3):\n", + " print(\"1\", \"marker_{}\".format(i+1), \"0\", i+1, \"A\", \"T\",\n", + " sep=\"\\t\", file=bim_file)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": { + "collapsed": false, + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "@----------------------------------------------------------@\n", + "| PLINK! | v1.07 | 10/Aug/2009 |\n", + "|----------------------------------------------------------|\n", + "| (C) 2009 Shaun Purcell, GNU General Public License, v2 |\n", + "|----------------------------------------------------------|\n", + "| For documentation, citation & bug-report instructions: |\n", + "| http://pngu.mgh.harvard.edu/purcell/plink/ |\n", + "@----------------------------------------------------------@\n", + "\n", + "Skipping web check... [ --noweb ] \n", + "Writing this text to log file [ plink_2.log ]\n", + "Analysis started: Wed Nov 4 14:50:58 2015\n", + "\n", + "Options in effect:\n", + "\t--noweb\n", + "\t--bfile test_output_2\n", + "\t--freq\n", + "\t--out plink_2\n", + "\n", + "Reading map (extended format) from [ test_output_2.bim ] \n", + "3 markers to be included from [ test_output_2.bim ]\n", + "Reading pedigree information from [ test_output_2.fam ] \n", + "10 individuals read from [ test_output_2.fam ] \n", + "0 individuals with nonmissing phenotypes\n", + "Assuming a disease phenotype (1=unaff, 2=aff, 0=miss)\n", + "Missing phenotype value is also -9\n", + "0 cases, 0 controls and 10 missing\n", + "0 males, 0 females, and 10 of unspecified sex\n", + "Warning, found 10 individuals with ambiguous sex codes\n", + "These individuals will be set to missing ( or use --allow-no-sex )\n", + "Writing list of these individuals to [ plink_2.nosex ]\n", + "Reading genotype bitfile from [ test_output_2.bed ] \n", + "Detected that binary PED file is v1.00 individual-major mode\n", + "Before frequency and genotyping pruning, there are 3 SNPs\n", + "Converting data to SNP-major format\n", + "10 founders and 0 non-founders found\n", + "Writing allele frequencies (founders-only) to [ plink_2.frq ] \n", + "\n", + "Analysis finished: Wed Nov 4 14:50:58 2015\n", + "\n" + ] + } + ], + "source": [ + "from subprocess import Popen, PIPE\n", + "\n", + "# Computing frequencies\n", + "proc = Popen([\"plink\", \"--noweb\", \"--bfile\", \"test_output_2\", \"--freq\", \"--out\", \"plink_2\"],\n", + " stdout=PIPE, stderr=PIPE)\n", + "outs, errs = proc.communicate()\n", + "print(outs.decode(), end=\"\")" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": { + "collapsed": false, + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " CHR SNP A1 A2 MAF NCHROBS\n", + " 1 marker_1 A T 0.2222 18\n", + " 1 marker_2 A T 0.25 20\n", + " 1 marker_3 A T 0.15 20\n" + ] + } + ], + "source": [ + "with open(\"plink_2.frq\", \"r\") as i_file:\n", + " print(i_file.read(), end=\"\")" + ] } ], "metadata": { diff --git a/pyplink/pyplink.py b/pyplink/pyplink.py index be6db36..ba52dc9 100644 --- a/pyplink/pyplink.py +++ b/pyplink/pyplink.py @@ -26,6 +26,8 @@ import os +import warnings +from io import UnsupportedOperation try: from itertools import zip_longest as zip_longest @@ -46,6 +48,10 @@ __all__ = ["PyPlink"] +# Allowing for warnings +warnings.simplefilter("once", DeprecationWarning) + + # The recoding values _geno_recode = {1: -1, # Unknown genotype 2: 1, # Heterozygous genotype @@ -66,13 +72,14 @@ class PyPlink(object): dtype=np.int8, ) - def __init__(self, i_prefix, mode="r"): + def __init__(self, i_prefix, mode="r", bed_format="SNP-major"): """Initializes a new PyPlink object. Args: i_prefix (str): the prefix of the binary Plink files mode (str): the open mode for the binary Plink file nb_samples (int): the number of samples + bed_format (str): the type of bed (SNP-major or INDIVIDUAL-major) Reads or write binary Plink files (BED, BIM and FAM). @@ -80,14 +87,22 @@ def __init__(self, i_prefix, mode="r"): # The mode self._mode = mode + # The bed format + if bed_format not in {"SNP-major", "INDIVIDUAL-major"}: + raise ValueError("invalid bed format: {}".format(bed_format)) + self._bed_format = bed_format + # These are the name of the files self.bed_filename = "{}.bed".format(i_prefix) self.bim_filename = "{}.bim".format(i_prefix) self.fam_filename = "{}.fam".format(i_prefix) if self._mode == "r": - # Checking that all the files exists (otherwise, there's nothing to - # do) + if self._bed_format != "SNP-major": + raise ValueError("only SNP-major format is supported " + "with mode 'r'") + + # Checking that all the files exists (otherwise, error...) for filename in (self.bed_filename, self.bim_filename, self.fam_filename): if not os.path.isfile(filename): @@ -107,7 +122,7 @@ def __init__(self, i_prefix, mode="r"): elif self._mode == "w": # The dummy number of samples and bytes - self._nb_samples = None + self._nb_values = None # Opening the output BED file self._bed_file = open(self.bed_filename, "wb") @@ -118,13 +133,19 @@ def __init__(self, i_prefix, mode="r"): def __repr__(self): """The representation of the PyPlink object.""" - return "PyPlink({:,d} samples; {:,d} markers)".format( - self.get_nb_samples(), - self.get_nb_markers(), - ) + if self._mode == "r": + return "PyPlink({:,d} samples; {:,d} markers)".format( + self.get_nb_samples(), + self.get_nb_markers(), + ) + + return 'PyPlink(mode="w")' def __iter__(self): """The __iter__ function.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + return self def __next__(self): @@ -137,12 +158,19 @@ def __enter__(self): def __exit__(self, *args): """Exiting the context manager.""" + self.close() + + def close(self): + """Closes the BED file if required.""" if self._mode == "w": # Closing the BED file self._bed_file.close() def next(self): """The next function.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + if self._n < self._nb_markers: self._n += 1 @@ -154,6 +182,9 @@ def next(self): def seek(self, n): """Gets to a certain position in the BED file when iterating.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + if 0 <= n < len(self._bed): self._n = n @@ -193,10 +224,16 @@ def _read_bim(self): def get_bim(self): """Returns the BIM file.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + return self._bim.drop("i", axis=1) def get_nb_markers(self): """Returns the number of markers.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + return self._nb_markers def _read_fam(self): @@ -223,10 +260,16 @@ def _read_fam(self): def get_fam(self): """Returns the FAM file.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + return self._fam.drop(["byte", "bit"], axis=1) def get_nb_samples(self): """Returns the number of samples.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + return self._nb_samples def _read_bed(self): @@ -244,7 +287,7 @@ def _read_bed(self): # Checking that the format is SNP-major if ord(bed_file.read(1)) != 1: - raise ValueError("not in SNP-major format: " + raise ValueError("not in SNP-major format (please recode): " "{}".format(self.bed_filename)) # The number of bytes per marker @@ -265,16 +308,23 @@ def _read_bed(self): def _write_bed_header(self): """Writes the BED first 3 bytes.""" # Writing the first three bytes - self._bed_file.write(bytearray((108, 27, 1))) + final_byte = 1 if self._bed_format == "SNP-major" else 0 + self._bed_file.write(bytearray((108, 27, final_byte))) def iter_geno(self): """Iterates over genotypes.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + for i in range(len(self._bed)): geno = self._geno_values[self._bed[i]].flatten(order="C") yield self._markers[i], geno[:self._nb_samples] def iter_acgt_geno(self): """Iterates over genotypes (ACGT format).""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + for i in range(len(self._bed)): geno = self._geno_values[self._bed[i]].flatten(order="C") yield (self._markers[i], @@ -282,6 +332,9 @@ def iter_acgt_geno(self): def iter_geno_marker(self, markers): """Iterates over genotypes for given markers.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + # If string, we change to list if isinstance(markers, str): markers = [markers] @@ -303,6 +356,9 @@ def iter_geno_marker(self, markers): def get_geno_marker(self, marker): """Gets the genotypes for a given marker.""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + # Check if the marker exists if marker not in set(self._bim.index): raise ValueError("{}: marker not in BIM".format(marker)) @@ -315,6 +371,9 @@ def get_geno_marker(self, marker): def iter_acgt_geno_marker(self, markers): """Iterates over genotypes for given markers (ACGT format).""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + # If string, we change to list if isinstance(markers, str): markers = [markers] @@ -336,6 +395,9 @@ def iter_acgt_geno_marker(self, markers): def get_acgt_geno_marker(self, marker): """Gets the genotypes for a given marker (ACGT format).""" + if self._mode != "r": + raise UnsupportedOperation("not available in 'w' mode") + # Check if the marker exists if marker not in set(self._bim.index): raise ValueError("{}: marker not in BIM".format(marker)) @@ -347,15 +409,23 @@ def get_acgt_geno_marker(self, marker): return self._allele_encoding[i][geno[:self._nb_samples]] def write_marker(self, genotypes): + """Deprecated function.""" + warnings.warn("deprecated: use 'write_genotypes'", DeprecationWarning) + self.write_genotypes(genotypes) + + def write_genotypes(self, genotypes): """Write genotypes to binary file.""" + if self._mode != "w": + raise UnsupportedOperation("not available in 'r' mode") + # Initializing the number of samples if required - if self._nb_samples is None: - self._nb_samples = len(genotypes) + if self._nb_values is None: + self._nb_values = len(genotypes) # Checking the expected number of samples - if self._nb_samples != len(genotypes): + if self._nb_values != len(genotypes): raise ValueError("{:,d} samples expected, got {:,d}".format( - self._nb_samples, + self._nb_values, len(genotypes), )) diff --git a/pyplink/tests/test_pyplink.py b/pyplink/tests/test_pyplink.py index 23a798a..48c6514 100644 --- a/pyplink/tests/test_pyplink.py +++ b/pyplink/tests/test_pyplink.py @@ -32,7 +32,9 @@ import zipfile import platform import unittest +import warnings from tempfile import mkdtemp +from io import UnsupportedOperation from distutils.spawn import find_executable from subprocess import check_call, PIPE, CalledProcessError @@ -56,6 +58,74 @@ from ..pyplink import PyPlink +def get_plink(tmp_dir): + """Gets the Plink binary, if required.""" + # Checking if Plink is in the path + plink_path = "plink" + if platform.system() == "Windows": + plink_path += ".exe" + + if find_executable(plink_path) is None: + print("Downloading Plink") + + # The url for each platform + url = "http://pngu.mgh.harvard.edu/~purcell/plink/dist/{filename}" + + # Getting the name of the file + filename = "" + if platform.system() == "Windows": + filename = "plink-1.07-dos.zip" + elif platform.system() == "Darwin": + filename = "plink-1.07-mac-intel.zip" + elif platform.system() == "Linux": + if platform.architecture()[0].startswith("32"): + filename = "plink-1.07-i686.zip" + elif platform.architecture()[0].startswith("64"): + filename = "plink-1.07-x86_64.zip" + else: + return None, "System not compatible for Plink" + else: + return None, "System not compatible for Plink" + + # Downloading Plink + zip_path = os.path.join(tmp_dir, filename) + try: + urlretrieve( + url.format(filename=filename), + zip_path, + ) + except: + return None, "Plink's URL is not available" + + # Unzipping Plink + plink_dir = os.path.join(tmp_dir, ) + with zipfile.ZipFile(zip_path, "r") as z: + z.extractall(tmp_dir) + plink_path = os.path.join(tmp_dir, os.path.splitext(filename)[0], + plink_path) + if not os.path.isfile(plink_path): + return None, "Cannot use Plink" + + # Making the script executable + if platform.system() in {"Darwin", "Linux"}: + os.chmod(plink_path, stat.S_IRWXU) + + # Testing Plink works + try: + check_call([ + plink_path, + "--noweb", + "--help", + "--out", os.path.join(tmp_dir, "execution_test") + ], stdout=PIPE, stderr=PIPE) + except CalledProcessError: + return None, "Plink cannot be properly used" + except IOError: + return None, "Plink was not properly installed" + + return plink_path, "OK" + + class TestPyPlink(unittest.TestCase): @classmethod @@ -95,8 +165,12 @@ def setUpClass(cls): ["AA", "GA", "GG"], ["TT", "TT", "TT"], ["GC", "CC", "CC"], ["GG", "AG", "GG"]] - # Reading the files - cls.pedfile = PyPlink(cls.prefix) + # Getting Plink + cls.plink_path, cls.plink_message = get_plink(cls.tmp_dir) + + def setUp(self): + # Reading the plink binary file + self.pedfile = PyPlink(self.prefix) @classmethod def tearDownClass(cls): @@ -206,8 +280,10 @@ def test_pyplink_bad_bed(self): # This should raise an exception with self.assertRaises(ValueError) as cm: PyPlink(new_prefix) - self.assertEqual("not in SNP-major format: {}".format(new_bed), - str(cm.exception)) + self.assertEqual( + "not in SNP-major format (please recode): {}".format(new_bed), + str(cm.exception), + ) def test_missing_files(self): """Checks that an exception is raised when an input file is missing.""" @@ -231,10 +307,26 @@ def test_get_nb_markers(self): """Tests that the correct number of markers is returned.""" self.assertEqual(self.pedfile.get_nb_markers(), 10) + def test_get_nb_markers_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + p.get_nb_markers() + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_get_nb_samples(self): """Tests that the correct number of samples is returned.""" self.assertEqual(self.pedfile.get_nb_samples(), 3) + def test_get_nb_samples_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + p.get_nb_samples() + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_get_bim(self): """Tests the 'get_bim' function.""" # The original BIM file (with the 'i' column) @@ -281,6 +373,14 @@ def test_get_bim(self): self.assertFalse(comparison.all().cm) self.assertTrue(comparison.all()[["pos", "a1", "a2"]].all()) + def test_get_bim_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + p.get_bim() + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_get_fam(self): """Tests the 'get_fam' function.""" # The original FAM file (with the 'byte' and 'bit' columns) @@ -330,6 +430,14 @@ def test_get_fam(self): comparison.all()[["fid", "iid", "mother", "gender"]].all() ) + def test_get_fam_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + p.get_fam() + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_generator(self): """Testing the class as a generator.""" # Zipping and checking @@ -345,8 +453,30 @@ def test_generator(self): remaining = [(marker, geno) for marker, geno in self.pedfile] self.assertEqual(0, len(remaining)) - # Just to be sure, we seek at the beginning of the file - self.pedfile.seek(0) + def test_generator_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + for marker, genotypes in p: + pass + self.assertEqual("not available in 'w' mode", str(cm.exception)) + + def test_next(self): + """Tests that an exception is raised when calling next in w mode.""" + marker, genotypes = self.pedfile.next() + + # Comparing + self.assertEqual(self.markers[0], marker) + self.assertTrue((self.genotypes[0] == genotypes).all()) + + def test_next_w_mode(self): + """Tests that an exception is raised when calling next in w mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + p.next() + self.assertEqual("not available in 'w' mode", str(cm.exception)) def test_seek(self): """Testing the seeking (for the generator).""" @@ -387,8 +517,13 @@ def test_seek(self): self.pedfile.seek(100) self.assertEqual("invalid position in BED: 100", str(cm.exception)) - # Just to be sure, we seek at the beginning of the file - self.pedfile.seek(0) + def test_seek_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + p.seek(100) + self.assertEqual("not available in 'w' mode", str(cm.exception)) def test_iter_geno(self): """Tests the 'iter_geno' function.""" @@ -400,6 +535,15 @@ def test_iter_geno(self): self.assertEqual(e_marker, marker) self.assertTrue((e_geno == geno).all()) + def test_iter_geno_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + for marker, genotypes in p.iter_geno(): + pass + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_iter_acgt_geno(self): """Tests the 'iter_acgt_geno" function.""" zipped = zip( @@ -410,6 +554,15 @@ def test_iter_acgt_geno(self): self.assertEqual(e_marker, marker) self.assertTrue((e_geno == geno).all()) + def test_iter_acgt_geno_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + for marker, genotypes in p.iter_acgt_geno(): + pass + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_iter_geno_marker(self): """Tests the 'iter_geno_marker' function.""" # Getting a subset of indexes @@ -443,6 +596,15 @@ def test_iter_geno_marker(self): self.assertEqual("['unknown_1', 'unknown_2']: marker not in BIM", str(cm.exception)) + def test_iter_geno_marker_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + for marker, genotypes in p.iter_geno_marker(["M1", "M2"]): + pass + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_iter_acgt_geno_marker(self): """Tests the 'iter_acgt_geno_marker' function.""" # Getting a subset of indexes @@ -476,8 +638,17 @@ def test_iter_acgt_geno_marker(self): self.assertEqual("['unknown_3', 'unknown_4']: marker not in BIM", str(cm.exception)) - def test_repr(self): - """Tests the object representation of the string.""" + def test_iter_acgt_geno_marker_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + for marker, genotypes in p.iter_acgt_geno_marker(["M1", "M2"]): + pass + self.assertEqual("not available in 'w' mode", str(cm.exception)) + + def test_repr_r_mode(self): + """Tests the object representation of the string (r mode).""" # Counting the number of samples nb_samples = None with open(self.fam, "r") as i_file: @@ -498,6 +669,21 @@ def test_repr(self): # Comparing self.assertEqual(e_repr, o_repr) + def test_repr_w_mode(self): + """Tests the object representation of the string (w mode).""" + # The expected representation + e_repr = 'PyPlink(mode="w")' + + # Creating the dummy PyPlink object + pedfile = PyPlink(os.path.join(self.tmp_dir, "test_repr"), "w") + + # Comparing the expected with the observed representation + o_repr = str(pedfile) + self.assertEqual(e_repr, o_repr) + + # Closing the file + pedfile.close() + def test_get_geno_marker(self): """Tests the 'get_geno_marker' function.""" # Getting a random marker to test @@ -517,6 +703,14 @@ def test_get_geno_marker(self): str(cm.exception), ) + def test_get_geno_marker_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + genotypes = p.get_geno_marker("M1") + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_get_acgt_geno_marker(self): """Tests the 'get_acgt_geno_marker' function.""" # Getting a random marker to test @@ -533,6 +727,14 @@ def test_get_acgt_geno_marker(self): self.pedfile.get_acgt_geno_marker("dummy_marker") self.assertEqual("dummy_marker: marker not in BIM", str(cm.exception)) + def test_get_acgt_geno_marker_w_mode(self): + """Tests that an exception is raised if in write mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + # Creating the dummy PyPlink object + with PyPlink(os.path.join(self.tmp_dir, "test_error"), "w") as p: + genotypes = p.get_acgt_geno_marker("M1") + self.assertEqual("not available in 'w' mode", str(cm.exception)) + def test_get_context_read_mode(self): """Tests the PyPlink object as context manager.""" with PyPlink(self.prefix) as genotypes: @@ -549,9 +751,9 @@ def test_write_binary(self): """Tests writing a Plink binary file.""" # The expected genotypes expected_genotypes = [ - np.array([0, 0, 0, 1, 0, 1, 2], dtype=int), - np.array([0, 0, 0, 0, -1, 0, 1], dtype=int), - np.array([0, -1, -1, 2, 0, 0, 0], dtype=int), + np.array([0, 0, 0, 1, 0, 1, 2], dtype=int), + np.array([0, 0, 0, 0, -1, 0, 1], dtype=int), + np.array([0, -1, -1, 2, 0, 0, 0], dtype=int), ] # The prefix @@ -560,7 +762,7 @@ def test_write_binary(self): # Writing the binary file with PyPlink(test_prefix, "w") as pedfile: for genotypes in expected_genotypes: - pedfile.write_marker(genotypes) + pedfile.write_genotypes(genotypes) # Checking the file exists self.assertTrue(os.path.isfile(test_prefix + ".bed")) @@ -599,7 +801,7 @@ def test_write_binary_error(self): with self.assertRaises(ValueError) as cm: with PyPlink(test_prefix, "w") as pedfile: for genotypes in expected_genotypes: - pedfile.write_marker(genotypes) + pedfile.write_genotypes(genotypes) self.assertEqual("7 samples expected, got 6", str(cm.exception)) def test_grouper_padding(self): @@ -628,79 +830,120 @@ def test_grouper_no_padding(self): "Plink not available for {}".format(platform.system())) def test_with_plink(self): """Tests to read a binary file using Plink.""" - # Checking if Plink is in the path - plink_path = "plink" - if platform.system() == "Windows": - plink_path += ".exe" - - if find_executable(plink_path) is None: - # The url for each platform - url = "http://pngu.mgh.harvard.edu/~purcell/plink/dist/{filename}" - - # Getting the name of the file - filename = "" - if platform.system() == "Windows": - filename = "plink-1.07-dos.zip" - elif platform.system() == "Darwin": - filename = "plink-1.07-mac-intel.zip" - elif platform.system() == "Linux": - if platform.architecture()[0].startswith("32"): - filename = "plink-1.07-i686.zip" - elif platform.architecture()[0].startswith("64"): - filename = "plink-1.07-x86_64.zip" - else: - self.skipTest("System not compatible for Plink") - else: - self.skipTest("System not compatible for Plink") - - # Downloading Plink - zip_path = os.path.join(self.tmp_dir, filename) - try: - urlretrieve( - url.format(filename=filename), - zip_path, - ) - except: - self.skipTest("Plink's URL is not available") - - # Unzipping Plink - plink_dir = os.path.join(self.tmp_dir, ) - with zipfile.ZipFile(zip_path, "r") as z: - z.extractall(self.tmp_dir) - plink_path = os.path.join( - self.tmp_dir, os.path.splitext(filename)[0], - plink_path, - ) - if not os.path.isfile(plink_path): - self.skipTest("Cannot use Plink") + # Checking if we need to skip + if self.plink_path is None: + self.skipTest(self.plink_message) + + # Creating the BED file + all_genotypes = [ + [0, 1, 0, 0, -1, 0, 1, 0, 0, 2], + [2, 1, 2, 2, 2, 2, 2, 1, 0, 1], + [0, 0, 0, 0, 0, 1, 0, 0, 0, 0], + ] + prefix = os.path.join(self.tmp_dir, "test_output") + with PyPlink(prefix, "w") as pedfile: + for genotypes in all_genotypes: + pedfile.write_genotypes(genotypes) + + # Creating the FAM file + fam_content = [ + ["F0", "S0", "0", "0", "1", "-9"], + ["F1", "S1", "0", "0", "1", "-9"], + ["F2", "S2", "0", "0", "2", "-9"], + ["F3", "S3", "0", "0", "1", "-9"], + ["F4", "S4", "0", "0", "1", "-9"], + ["F5", "S5", "0", "0", "2", "-9"], + ["F6", "S6", "0", "0", "1", "-9"], + ["F7", "S7", "0", "0", "0", "-9"], + ["F8", "S8", "0", "0", "1", "-9"], + ["F9", "S9", "0", "0", "2", "-9"], + ] + with open(prefix + ".fam", "w") as o_file: + for sample in fam_content: + print(*sample, sep=" ", file=o_file) - # Making the script executable - if platform.system() in {"Darwin", "Linux"}: - os.chmod(plink_path, stat.S_IRWXU) + # Creating the BIM file + bim_content = [ + ["1", "M0", "0", "123", "A", "G"], + ["1", "M1", "0", "124", "C", "T"], + ["2", "M2", "0", "117", "G", "C"], + ] + with open(prefix + ".bim", "w") as o_file: + for marker in bim_content: + print(*marker, sep="\t", file=o_file) - # Testing Plink works + # Creating a transposed pedfile using Plink + out_prefix = prefix + "_transposed" try: check_call([ - plink_path, + self.plink_path, "--noweb", - "--help", - "--out", os.path.join(self.tmp_dir, "execution_test") + "--bfile", prefix, + "--recode", "--transpose", "--tab", + "--out", out_prefix, ], stdout=PIPE, stderr=PIPE) except CalledProcessError: - self.skipTest("Plink cannot be properly used") - except IOError: - self.skipTest("Plink was not properly installed") + self.fail("Plink could not recode file") - # Creating the BED file + # Checking the two files exists + self.assertTrue(os.path.isfile(out_prefix + ".tped")) + self.assertTrue(os.path.isfile(out_prefix + ".tfam")) + + # Checking the content of the TFAM file + expected = "\n".join("\t".join(sample) for sample in fam_content) + with open(out_prefix + ".tfam", "r") as i_file: + self.assertEqual(expected + "\n", i_file.read()) + + # Checking the content of the TPED file + with open(out_prefix + ".tped", "r") as i_file: + # Checking the first marker + marker_1 = i_file.readline().rstrip("\r\n").split("\t") + self.assertEqual(["1", "M0", "0", "123"], marker_1[:4]) + self.assertEqual(["G G", "A G", "G G", "G G", "0 0", "G G", "A G", + "G G", "G G", "A A"], + marker_1[4:]) + + # Checking the second marker + marker_2 = i_file.readline().rstrip("\r\n").split("\t") + self.assertEqual(["1", "M1", "0", "124"], marker_2[:4]) + self.assertEqual(["C C", "T C", "C C", "C C", "C C", "C C", "C C", + "T C", "T T", "T C"], + marker_2[4:]) + + # Checking the third marker + marker_3 = i_file.readline().rstrip("\r\n").split("\t") + self.assertEqual(["2", "M2", "0", "117"], marker_3[:4]) + self.assertEqual(["C C", "C C", "C C", "C C", "C C", "G C", "C C", + "C C", "C C", "C C"], + marker_3[4:]) + + # Checking this is the end of the file + self.assertEqual("", i_file.readline()) + + @unittest.skipIf(platform.system() not in {"Darwin", "Linux", "Windows"}, + "Plink not available for {}".format(platform.system())) + def test_with_plink_individual_major(self): + """Tests to read a binary file (INDIVIDUAL-major) using Plink.""" + # Checking if we need to skip + if self.plink_path is None: + self.skipTest(self.plink_message) + + # The genotypes all_genotypes = [ [0, 1, 0, 0, -1, 0, 1, 0, 0, 2], [2, 1, 2, 2, 2, 2, 2, 1, 0, 1], [0, 0, 0, 0, 0, 1, 0, 0, 0, 0], ] + transposed_genotypes = [ + [row[i] for row in all_genotypes] + for i in range(len(all_genotypes[0])) + ] + + # Creating the BED file (INDIVIDUAL-major) prefix = os.path.join(self.tmp_dir, "test_output") - with PyPlink(prefix, "w") as pedfile: - for genotypes in all_genotypes: - pedfile.write_marker(genotypes) + with PyPlink(prefix, "w", "INDIVIDUAL-major") as pedfile: + for genotypes in transposed_genotypes: + pedfile.write_genotypes(genotypes) # Creating the FAM file fam_content = [ @@ -733,7 +976,7 @@ def test_with_plink(self): out_prefix = prefix + "_transposed" try: check_call([ - plink_path, + self.plink_path, "--noweb", "--bfile", prefix, "--recode", "--transpose", "--tab", @@ -776,3 +1019,39 @@ def test_with_plink(self): # Checking this is the end of the file self.assertEqual("", i_file.readline()) + + def test_wrong_bed_format(self): + """Tests opening a BED file with unknown format.""" + with self.assertRaises(ValueError) as cm: + PyPlink(self.prefix, bed_format="UNKNOWN-major") + self.assertEqual( + "invalid bed format: UNKNOWN-major", + str(cm.exception), + ) + + def test_invalid_bed_format_with_r_mode(self): + """Tests an invalid BED format with r mode.""" + with self.assertRaises(ValueError) as cm: + PyPlink(self.prefix, bed_format="INDIVIDUAL-major") + self.assertEqual( + "only SNP-major format is supported with mode 'r'", + str(cm.exception), + ) + + def test_write_genotypes_in_r_mode(self): + """Tests to use the 'write_genotypes' function in read mode.""" + with self.assertRaises(UnsupportedOperation) as cm: + self.pedfile.write_genotypes([0, 0, 0]) + self.assertEqual("not available in 'r' mode", str(cm.exception)) + + def test_write_marker_deprecation_warning(self): + """Tests that a deprecation warning is triggered.""" + with warnings.catch_warnings(record=True) as w: + with PyPlink(os.path.join(self.tmp_dir, "test_warns"), "w") as p: + p.write_marker([0, 0, 0]) + self.assertEqual(1, len(w)) + self.assertTrue(issubclass(w[0].category, DeprecationWarning)) + self.assertEqual( + "deprecated: use 'write_genotypes'", + str(w[0].message), + ) diff --git a/setup.py b/setup.py index 990aef8..eed0f10 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ MAJOR = 1 -MINOR = 1 +MINOR = 2 MICRO = 0 VERSION = "{}.{}.{}".format(MAJOR, MINOR, MICRO)