Skip to content
Snippets Groups Projects
Commit c5d8bf9e authored by dadjavon's avatar dadjavon
Browse files

Begin testing suite

parent 6b57f23b
No related branches found
No related tags found
No related merge requests found
...@@ -2,7 +2,26 @@ ...@@ -2,7 +2,26 @@
The core classes and methods for the python microfluidics, microscopy, and analysis pypline pipeline. The core classes and methods for the python microfluidics, microscopy, and analysis pypline pipeline.
References: ## References
* OMERO python bindings : https://docs.openmicroscopy.org/omero/5.4.0/developers/Python.html * [OMERO python bindings](https://docs.openmicroscopy.org/omero/5.4.0/developers/Python.html)
* Ice3.5 : https://zeroc.com/downloads/ice/3.5#macos * [Zeroc-ice python](https://pypi.org/project/zeroc-ice/3.6.5/)
## Installation
How to set up access to OMERO from python:
* Install openssl headers version 1.0.2: `sudo apt-get install libssl1.0-dev`
* Install the corresponding openssl (as default is 1.1.1): `conda install openssl==1.0.2k`
* Install bzip headers : `sudo apt-get install libbz2-dev`
* Install zeroc-ice from PyPI, which includes Ice: `pip install zeroc-ice==3.6.0`
* Run `connect_to_omero.py` as a test (TODO TESTS)
## Disclaimers
TLDR: Most of this stuff is depcrecated.
Using OMERO 5.2.5 means that we need to use depreacted python 2.7 (EOL 2020.01.01),
and we need to use `zeroc-ice` version 3.6 which is also dropped in the newer version of OMERO.
The local version of `openssl` (`conda`) needs to fit the headers of `libssl-dev` (`apt-get`).
By default conda will install OpenSSL version 1.1.1 as all the others are no longer maintained.
However, using the headers of verions 1.0.2 means that we have to downgrade OpenSSL to version 1.0.2 also.
It is highly recommended that we upgrade OMERO to 5.6 in order to use Python 3, in which case it will even
be possible to get OMERO.py directly from PyPI with easy installation, [omero-py](https://pypi.org/project/omero-py/)
{"host": "sce-bio-c04287.bio.ed.ac.uk", "password": "***REMOVED***", "port": 4064, "user": "upload"}
import omero_py
...@@ -5,68 +5,56 @@ ...@@ -5,68 +5,56 @@
# Copyright (C) 2014 University of Dundee & Open Microscopy Environment. # Copyright (C) 2014 University of Dundee & Open Microscopy Environment.
# All Rights Reserved. # All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt # Use is subject to license terms supplied in LICENSE.txt
import itertools
USERNAME = 'upload'
PASSWORD = '***REMOVED***'
HOST = 'sce-bio-c04287.bio.ed.ac.uk'
PORT = 4064
from omero.gateway import BlitzGateway from omero.gateway import BlitzGateway
from utils import repr_obj from utils import repr_obj
class Database: class Database:
def __init__(self): def __init__(self, username, password, host, port):
self.conn = BlitzGateway(USERNAME, PASSWORD, host=HOST, port=PORT) self.conn = BlitzGateway(username, password, host=host, port=port)
def __repr__(self): def __repr__(self):
return repr_obj(self.conn) return repr_obj(self.conn)
_ _
def connect(self): def connect(self, secure=False):
connected = self.conn.connect() connected = self.conn.connect()
self.conn.setSecure(secure)
# Check if you are connected return connected
# ==========================
if not connected:
import sys
sys.stderr.write(
"Error: Connection not available, please check your user name and"
" password.\n")
sys.exit(1)
# Using secure connection
# =======================
# By default, once we have logged in, data transfer is not encrypted
# (faster)
# To use a secure connection, call setSecure(True):
#self.conn.setSecure(True) # <--------- Uncomment this
def disconnect(self): def disconnect(self):
self.conn.seppuku() self.conn.seppuku()
@property @property
def user(self): def user(self):
# TODO cache
user = self.conn.getUser() user = self.conn.getUser()
return dict(ID=user.getId(), Username=user.getName()) return dict(ID=user.getId(), Username=user.getName())
@property @property
def groups(self): def groups(self):
# TODO cache
return [dict(ID=g.getId(), Name=g.getName()) for g in return [dict(ID=g.getId(), Name=g.getName()) for g in
self.conn.getGroupsMemberOf()] self.conn.getGroupsMemberOf()]
@property @property
def current_group(self): def current_group(self):
# TODO cache
g = self.conn.getGroupFromContext() g = self.conn.getGroupFromContext()
return dict(ID=g.getId(), Name=g.getName()) return dict(ID=g.getId(), Name=g.getName())
def isAdmin(self): def isAdmin(self):
# TODO cache
return self.conn.isAdmin() return self.conn.isAdmin()
def getDataset(self, dataset_id): def getDataset(self, dataset_id):
ds = self.conn.getObject("Dataset", dataset_id) ds = self.conn.getObject("Dataset", dataset_id)
return Dataset(ds) return Dataset(ds)
def getDatasets(self, n):
top_n = itertools.islice(self.conn.getObjects("Dataset"), n)
return [Dataset(ds) for ds in top_n]
class Dataset: class Dataset:
def __init__(self, dataset_wrapper): def __init__(self, dataset_wrapper):
...@@ -75,6 +63,10 @@ class Dataset: ...@@ -75,6 +63,10 @@ class Dataset:
def __repr__(self): def __repr__(self):
return repr_obj(self.dataset) return repr_obj(self.dataset)
def getImages(self, n):
top_n = itertools.islice(self.dataset.listChildren(), n)
return [Image(im) for im in top_n]
class Image: class Image:
def __init__(self, image_wrapper): def __init__(self, image_wrapper):
self.image = image_wrapper self.image = image_wrapper
......
sudo apt-get install libbz2-dev # Install bzip headers
sudo apt-get install libssl1.0-dev # Install openssl headers
conda install openssl==1.0.2n # Install local openssl to match
pip install --no-cache-dir zeroc-ice==3.6.5
python connect_to_omero.py
import os
THISPATH = os.path.dirname(os.path.abspath(__file__))
import sys
sys.path.insert(0, THISPATH)
...@@ -7,13 +7,13 @@ ...@@ -7,13 +7,13 @@
# All Rights Reserved. # All Rights Reserved.
# Use is subject to license terms supplied in LICENSE.txt # Use is subject to license terms supplied in LICENSE.txt
# #
from __future__ import print_function
""" # TODO remove and use unittest to run tests
FOR TRAINING PURPOSES ONLY! import os
""" import sys
import sys sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
sys.path.insert(0, './omero_py') import omero_py
USERNAME = 'upload' USERNAME = 'upload'
PASSWORD = '***REMOVED***' PASSWORD = '***REMOVED***'
...@@ -26,12 +26,12 @@ def print_obj(obj, indent=0): ...@@ -26,12 +26,12 @@ def print_obj(obj, indent=0):
Helper method to display info about OMERO objects. Helper method to display info about OMERO objects.
Not all objects will have a "name" or owner field. Not all objects will have a "name" or owner field.
""" """
print """%s%s:%s Name:"%s" (owner=%s)""" % ( print("""%s%s:%s Name:"%s" (owner=%s)""" % (
" " * indent, " " * indent,
obj.OMERO_CLASS, obj.OMERO_CLASS,
obj.getId(), obj.getId(),
obj.getName(), obj.getName(),
obj.getAnnotation()) obj.getAnnotation()))
if __name__ == '__main__': if __name__ == '__main__':
...@@ -74,40 +74,40 @@ if __name__ == '__main__': ...@@ -74,40 +74,40 @@ if __name__ == '__main__':
# clients. # clients.
user = conn.getUser() user = conn.getUser()
print "Current user:" print( "Current user:")
print " ID:", user.getId() print( " ID:", user.getId())
print " Username:", user.getName() print( " Username:", user.getName())
print " Full Name:", user.getFullName() print( " Full Name:", user.getFullName())
# Check if you are an Administrator # Check if you are an Administrator
print " Is Admin:", conn.isAdmin() print( " Is Admin:", conn.isAdmin())
print "Member of:" print( "Member of:")
for g in conn.getGroupsMemberOf(): for g in conn.getGroupsMemberOf():
print " ID:", g.getId(), " Name:", g.getName() print( " ID:", g.getId(), " Name:", g.getName())
group = conn.getGroupFromContext() group = conn.getGroupFromContext()
print "Current group: ", group.getName() print( "Current group: ", group.getName())
# List the group owners and other members # List the group owners and other members
owners, members = group.groupSummary() owners, members = group.groupSummary()
print " Group owners:" print( " Group owners:")
for o in owners: for o in owners:
print " ID: %s UserName: %s Name: %s" % ( print( " ID: %s UserName: %s Name: %s" % (
o.getId(), o.getOmeName(), o.getFullName()) o.getId(), o.getOmeName(), o.getFullName()))
print " Group members:" print( " Group members:")
for m in members: for m in members:
print " ID: %s UserName: %s Name: %s" % ( print( " ID: %s UserName: %s Name: %s" % (
m.getId(), m.getOmeName(), m.getFullName()) m.getId(), m.getOmeName(), m.getFullName()))
print "Owner of:" print( "Owner of:")
for g in conn.listOwnedGroups(): for g in conn.listOwnedGroups():
print " ID: ", g.getId(), " Name:", g.getName() print( " ID: ", g.getId(), " Name:", g.getName())
# New in OMERO 5 # New in OMERO 5
print "Admins:" print( "Admins:")
for exp in conn.getAdministrators(): for exp in conn.getAdministrators():
print " ID: %s UserName: %s Name: %s" % ( print( " ID: %s UserName: %s Name: %s" % (
exp.getId(), exp.getOmeName(), exp.getFullName()) exp.getId(), exp.getOmeName(), exp.getFullName()))
# The 'context' of our current session # The 'context' of our current session
ctx = conn.getEventContext() ctx = conn.getEventContext()
......
"""
Testing suite for connection to OMERO
"""
# TODO use omero.gateway.scripts.testdb_create module
from __future__ import print_function
import sys
print(sys.path)
import unittest
import json
import omero_py
from core.connect import Database, Dataset
class TestConnections(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open('config.json', 'r') as fd:
config = json.load(fd)
cls._config = config
cls._db = Database(cls._config['user'],
cls._config['password'],
cls._config['host'],
cls._config['port'])
@classmethod
def tearDownClass(cls):
cls._db.disconnect()
def testConnection(self):
self.assertTrue(self._db.connect())
def testUser(self):
self.assertEquals(self._db.user['Username'], self._config['user'])
def testDataset(self):
dataset = self._db.getDatasets(1)
self.assertTrue(dataset is not None)
# FIXME cannot do this as it is instanciated anew.
# self._ds = dataset
def testImage(self):
dataset = self._db.getDatasets(1)
self.assertTrue(dataset is not None)
#FIXME return single dataset
image = dataset[0].getImages(1)
self.assertTrue(image is not None)
# FIXME cannot do this as it is instanciated anew.
self._im = image
def testRenderImage(self):
pass
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment