Skip to content

Commit

Permalink
[Fixes #12124] Fix broken tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mattiagiupponi committed Apr 30, 2024
1 parent 47c6c13 commit 80311a5
Showing 1 changed file with 30 additions and 41 deletions.
71 changes: 30 additions & 41 deletions geonode/base/api/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
from rest_framework.test import APITestCase
from rest_framework.renderers import JSONRenderer
from rest_framework.parsers import JSONParser

from geonode.resource.manager import resource_manager
from guardian.shortcuts import get_anonymous_user

from geonode.assets.utils import create_asset_and_link
Expand Down Expand Up @@ -2225,7 +2225,7 @@ def test_manager_can_edit_map(self):
)

def test_resource_service_copy(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
files_as_dict, _ = get_files(files)
resource = Dataset.objects.create(
owner=get_user_model().objects.get(username="admin"),
Expand Down Expand Up @@ -2288,7 +2288,7 @@ def test_resource_service_copy(self):
logger.warning(f"Can't delete test resource {resource}", exc_info=e)

def test_resource_service_copy_with_perms_dataset(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
files_as_dict, _ = get_files(files)
resource = Dataset.objects.create(
owner=get_user_model().objects.get(username="admin"),
Expand All @@ -2305,63 +2305,52 @@ def test_resource_service_copy_with_perms_dataset(self):
self._assertCloningWithPerms(resource)

@patch.dict(os.environ, {"ASYNC_SIGNALS": "False"})
@override_settings(ASYNC_SIGNALS=False, FILE_UPLOAD_DIRECTORY_PERMISSIONS=0o777, FILE_UPLOAD_PERMISSIONS=0o7777)
@patch("importer.celery_tasks.connections")
def test_resource_service_copy_with_perms_dataset_set_default_perms(self, conn):
@override_settings(ASYNC_SIGNALS=False)
def test_resource_service_copy_with_perms_dataset_set_default_perms(self):
with self.settings(ASYNC_SIGNALS=False):
_ = MagicMock()
files_as_dict, resource = self._import_dataset()

_, _ = create_asset_and_link(
resource, get_user_model().objects.get(username="admin"), list(files_as_dict.values())
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
files_as_dict, _ = get_files(files)

resource = resource_manager.create(
None,
resource_type=Dataset,
defaults={
"owner": get_user_model().objects.first(),
"title": "test_copy_with_perms",
"name": "test_copy_with_perms",
"is_approved": True,
"store": "geonode_data",
"subtype": "vector",
"resource_type": "dataset",
"files": files_as_dict.values(),
},
)
_perms = {
"users": {"bobby": ["base.add_resourcebase", "base.download_resourcebase"]},
"groups": {"anonymous": ["base.view_resourcebase", "base.download_resourcebae"]},
"groups": {"anonymous": ["base.view_resourcebase", "base.download_resourcebase"]},
}
resource.set_permissions(_perms)
# checking that bobby is in the original dataset perms list
self.assertIn("bobby", [x.username for x in resource.get_all_level_info().get("users", [])])
self.assertTrue("bobby" in "bobby" in [x.username for x in resource.get_all_level_info().get("users", [])])
# copying the resource, should remove the perms for bobby
# only the default perms should be available
copy_url = reverse("importer_resource_copy", kwargs={"pk": resource.pk})

self.assertTrue(self.client.login(username="admin", password="admin"))
payload = QueryDict("", mutable=True)
payload.update({"defaults": '{"title": "' + resource.title + '" }'})
response = self.client.put(copy_url, data=payload)

response = self.client.put(copy_url)
self.assertEqual(response.status_code, 200)

resouce_service_dispatcher.apply((response.json().get("execution_id"),))
# resouce_service_dispatcher.apply((response.json().get("execution_id"),))

self.assertEqual("finished", self.client.get(response.json().get("status_url")).json().get("status"))
_resource = Dataset.objects.filter(title__icontains="test_copy_with_perms").last()
self.assertIsNotNone(_resource)

logger.warning(f"ALL LEVEL INFO {_resource.get_all_level_info()}")
self.assertNotIn("bobby", [x.username for x in _resource.get_all_level_info().get("users", [])])
self.assertIn("admin", [x.username for x in _resource.get_all_level_info().get("users", [])])

def _import_dataset(self):
files_as_dict = {
"base_file": os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_location.shp"),
"dbf_file": os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_location.dbf"),
"prj_file": os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_location.shx"),
"shx_file": os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_location.prj"),
}
payload = {_filename: open(_file, "rb") for _filename, _file in files_as_dict.items()}

_url = reverse("importer_upload")
self.client.force_login(get_user_model().objects.get(username="admin"))

response = self.client.post(_url, data=payload)
self.assertEqual(201, response.status_code)

resource = ResourceHandlerInfo.objects.get(execution_request_id=response.json()["execution_id"])
return files_as_dict, resource.resource
self.assertFalse("bobby" in "bobby" in [x.username for x in _resource.get_all_level_info().get("users", [])])
self.assertTrue("admin" in "admin" in [x.username for x in _resource.get_all_level_info().get("users", [])])

def test_resource_service_copy_with_perms_doc(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
files_as_dict, _ = get_files(files)
resource = Document.objects.create(
owner=get_user_model().objects.get(username="admin"),
Expand All @@ -2377,7 +2366,7 @@ def test_resource_service_copy_with_perms_doc(self):

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_resource_service_copy_with_perms_map(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/san_andres_y_providencia_water.shp")
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
files_as_dict, _ = get_files(files)
resource = Document.objects.create(
owner=get_user_model().objects.get(username="admin"),
Expand Down

0 comments on commit 80311a5

Please sign in to comment.