Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
lemieuxl committed Nov 4, 2015
2 parents 1246cbd + 48b4bb3 commit fce5cd3
Showing 1 changed file with 170 additions and 7 deletions.
177 changes: 170 additions & 7 deletions pyplink/tests/test_pyplink.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,26 @@
from __future__ import print_function

import os
import stat
import random
import shutil
import zipfile
import platform
import unittest
from tempfile import mkdtemp
from distutils.spawn import find_executable
from subprocess import check_call, PIPE, CalledProcessError

try:
from itertools import zip_longest as zip
except ImportError:
from itertools import izip_longest as zip

try:
from urllib.request import urlretrieve
except ImportError:
from urllib import urlretrieve

from pkg_resources import resource_filename

import numpy as np
Expand All @@ -51,7 +61,7 @@ class TestPyPlink(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Creating a temporary directory
cls.tmp_dir = mkdtemp(prefix="beelinetools_test_")
cls.tmp_dir = mkdtemp(prefix="pyplink_test_")

# Getting the BED/BIM/FAM files
cls.bed = resource_filename(
Expand Down Expand Up @@ -548,9 +558,9 @@ def test_write_binary(self):
test_prefix = os.path.join(self.tmp_dir, "test_write")

# Writing the binary file
with PyPlink(test_prefix, "w") as plink:
with PyPlink(test_prefix, "w") as pedfile:
for genotypes in expected_genotypes:
plink.write_marker(genotypes)
pedfile.write_marker(genotypes)

# Checking the file exists
self.assertTrue(os.path.isfile(test_prefix + ".bed"))
Expand All @@ -568,8 +578,8 @@ def test_write_binary(self):
file=o_file)

# Reading the written binary file
plink = PyPlink(test_prefix)
for i, (marker, genotypes) in enumerate(plink):
pedfile = PyPlink(test_prefix)
for i, (marker, genotypes) in enumerate(pedfile):
self.assertEqual("m{}".format(i+1), marker)
self.assertTrue((expected_genotypes[i] == genotypes).all())

Expand All @@ -587,9 +597,9 @@ def test_write_binary_error(self):

# Writing the binary file
with self.assertRaises(ValueError) as cm:
with PyPlink(test_prefix, "w") as plink:
with PyPlink(test_prefix, "w") as pedfile:
for genotypes in expected_genotypes:
plink.write_marker(genotypes)
pedfile.write_marker(genotypes)
self.assertEqual("7 samples expected, got 6", str(cm.exception))

def test_grouper_padding(self):
Expand All @@ -613,3 +623,156 @@ def test_grouper_no_padding(self):
observed_chunks = PyPlink._grouper(range(10), 5)
for expected, observed in zip(expected_chunks, observed_chunks):
self.assertEqual(expected, observed)

@unittest.skipIf(platform.system() not in {"Darwin", "Linux", "Windows"},
"Plink not available for {}".format(platform.system()))
def test_with_plink(self):
"""Tests to read a binary file using Plink."""
# Checking if Plink is in the path
plink_path = "plink"
if platform.system() == "Windows":
plink_path += ".exe"

if find_executable(plink_path) is None:
# The url for each platform
url = "http://pngu.mgh.harvard.edu/~purcell/plink/dist/{filename}"

# Getting the name of the file
filename = ""
if platform.system() == "Windows":
filename = "plink-1.07-dos.zip"
elif platform.system() == "Darwin":
filename = "plink-1.07-mac-intel.zip"
elif platform.system() == "Linux":
if platform.architecture()[0].startswith("32"):
filename = "plink-1.07-i686.zip"
elif platform.architecture()[0].startswith("64"):
filename = "plink-1.07-x86_64.zip"
else:
self.skipTest("System not compatible for Plink")
else:
self.skipTest("System not compatible for Plink")

# Downloading Plink
zip_path = os.path.join(self.tmp_dir, filename)
try:
urlretrieve(
url.format(filename=filename),
zip_path,
)
except:
self.skipTest("Plink's URL is not available")

# Unzipping Plink
plink_dir = os.path.join(self.tmp_dir, )
with zipfile.ZipFile(zip_path, "r") as z:
z.extractall(self.tmp_dir)
plink_path = os.path.join(
self.tmp_dir, os.path.splitext(filename)[0],
plink_path,
)
if not os.path.isfile(plink_path):
self.skipTest("Cannot use Plink")

# Making the script executable
if platform.system() in {"Darwin", "Linux"}:
os.chmod(plink_path, stat.S_IRWXU)

# Testing Plink works
try:
check_call([
plink_path,
"--noweb",
"--help",
"--out", os.path.join(self.tmp_dir, "execution_test")
], stdout=PIPE, stderr=PIPE)
except CalledProcessError:
self.skipTest("Plink cannot be properly used")
except IOError:
self.skipTest("Plink was not properly installed")

# Creating the BED file
all_genotypes = [
[0, 1, 0, 0, -1, 0, 1, 0, 0, 2],
[2, 1, 2, 2, 2, 2, 2, 1, 0, 1],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0],
]
prefix = os.path.join(self.tmp_dir, "test_output")
with PyPlink(prefix, "w") as pedfile:
for genotypes in all_genotypes:
pedfile.write_marker(genotypes)

# Creating the FAM file
fam_content = [
["F0", "S0", "0", "0", "1", "-9"],
["F1", "S1", "0", "0", "1", "-9"],
["F2", "S2", "0", "0", "2", "-9"],
["F3", "S3", "0", "0", "1", "-9"],
["F4", "S4", "0", "0", "1", "-9"],
["F5", "S5", "0", "0", "2", "-9"],
["F6", "S6", "0", "0", "1", "-9"],
["F7", "S7", "0", "0", "0", "-9"],
["F8", "S8", "0", "0", "1", "-9"],
["F9", "S9", "0", "0", "2", "-9"],
]
with open(prefix + ".fam", "w") as o_file:
for sample in fam_content:
print(*sample, sep=" ", file=o_file)

# Creating the BIM file
bim_content = [
["1", "M0", "0", "123", "A", "G"],
["1", "M1", "0", "124", "C", "T"],
["2", "M2", "0", "117", "G", "C"],
]
with open(prefix + ".bim", "w") as o_file:
for marker in bim_content:
print(*marker, sep="\t", file=o_file)

# Creating a transposed pedfile using Plink
out_prefix = prefix + "_transposed"
try:
check_call([
plink_path,
"--noweb",
"--bfile", prefix,
"--recode", "--transpose", "--tab",
"--out", out_prefix,
], stdout=PIPE, stderr=PIPE)
except CalledProcessError:
self.fail("Plink could not recode file")

# Checking the two files exists
self.assertTrue(os.path.isfile(out_prefix + ".tped"))
self.assertTrue(os.path.isfile(out_prefix + ".tfam"))

# Checking the content of the TFAM file
expected = "\n".join("\t".join(sample) for sample in fam_content)
with open(out_prefix + ".tfam", "r") as i_file:
self.assertEqual(expected + "\n", i_file.read())

# Checking the content of the TPED file
with open(out_prefix + ".tped", "r") as i_file:
# Checking the first marker
marker_1 = i_file.readline().rstrip("\r\n").split("\t")
self.assertEqual(["1", "M0", "0", "123"], marker_1[:4])
self.assertEqual(["G G", "A G", "G G", "G G", "0 0", "G G", "A G",
"G G", "G G", "A A"],
marker_1[4:])

# Checking the second marker
marker_2 = i_file.readline().rstrip("\r\n").split("\t")
self.assertEqual(["1", "M1", "0", "124"], marker_2[:4])
self.assertEqual(["C C", "T C", "C C", "C C", "C C", "C C", "C C",
"T C", "T T", "T C"],
marker_2[4:])

# Checking the third marker
marker_3 = i_file.readline().rstrip("\r\n").split("\t")
self.assertEqual(["2", "M2", "0", "117"], marker_3[:4])
self.assertEqual(["C C", "C C", "C C", "C C", "C C", "G C", "C C",
"C C", "C C", "C C"],
marker_3[4:])

# Checking this is the end of the file
self.assertEqual("", i_file.readline())

0 comments on commit fce5cd3

Please sign in to comment.