Source code for clickpoints.DataFile

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# DataFile.py

# Copyright (c) 2015-2022, Richard Gerum, Sebastian Richter, Alexander Winterl
#
# This file is part of ClickPoints.
#
# ClickPoints is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ClickPoints is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ClickPoints. If not, see <http://www.gnu.org/licenses/>

import numpy as np
import os
import peewee
import imageio
import sys
import platform
import PIL
import zlib
try:
    from cStringIO import StringIO
except ImportError:
    try:
        from StringIO import StringIO
    except ImportError:
        import io

PY3 = sys.version_info[0] == 3
if PY3:
    basestring = str

from .addons.imageio_plugin.imageio_plugin_BOOL import *

# to get query results as dictionaries
def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
        d[idx] = row[idx]
    return d


class ImageField(peewee.BlobField):
    """ A database field, that """
    def db_value(self, value):
        # if the maximal value is 1, we can save it as a 1bit PNG
        if np.max(value) == 1:
            value = imageio.imwrite(imageio.RETURN_BYTES, value, format=".bool")
        else:
            value = imageio.imwrite(imageio.RETURN_BYTES, value, format=".png")
        if PY3:
            return value
        return peewee.binary_construct(value)

    def python_value(self, value):
        if not PY3:
            stream = StringIO(str(value))
        else:
            stream = io.BytesIO(value)
        if stream.read(4)==b"BOOL":
            stream.seek(0)
            return imageio.imread(stream, format=".bool")
        else:
            stream.seek(0)
            return imageio.imread(stream, format=".png")

def CheckValidColor(color):
    class NoValidColor(Exception):
        pass

    if isinstance(color, basestring):
        if color[0] == "#":
            color = color[1:]
        for c in color:
            if not "0" <= c.upper() <= "F":
                raise NoValidColor(color + " is no valid color")
        if len(color) != 6 and len(color) != 8:
            raise NoValidColor(color + " is no valid color")
        return "#" + color
    color_string = ""
    for value in color:
        if not 0 <= value <= 255:
            raise NoValidColor(str(color) + " is no valid color")
        color_string += "%02x" % value
    if len(color_string) != 6 and len(color_string) != 8:
        raise NoValidColor(str(color) + " is no valid color")
    return "#" + color_string

def NormalizeColor(color):
    # if color is a string
    if isinstance(color, basestring):
        color = str(color).upper()

        CheckValidColor(color)
        return color

    # if color is a list
    color = [CheckValidColor(col.upper()) for col in color]
    return color

def HTMLColorToRGB(colorstring):
    """ convert #RRGGBB to an (R, G, B) tuple """
    colorstring = str(colorstring).strip()
    if colorstring[0] == '#': colorstring = colorstring[1:]
    if len(colorstring) != 6 and len(colorstring) != 8:
        raise (ValueError, "input #%s is not in #RRGGBB format" % colorstring)
    return [int(colorstring[i*2:i*2+2], 16) for i in range(int(len(colorstring)/2))]

def addFilter(query, parameter, field):
    if parameter is None:
        return query
    if isinstance(parameter, (tuple, list)):
        return query.where(field << parameter)
    if isinstance(parameter, slice):
        if parameter.start is None:
            return query.where(field < parameter.stop)
        elif parameter.stop is None:
            return query.where(field >= parameter.start)
        else:
            return query.where( field.between(parameter.start, parameter.stop))
    else:
        return query.where(field == parameter)

def noNoneDict(**kwargs):
    new_dict = {}
    for key in kwargs:
        if kwargs[key] is not None:
            new_dict[key] = kwargs[key]
    return new_dict

def setFields(entry, dict):
    for key in dict:
        if dict[key] is not None:
            setattr(entry, key, dict[key])

def packToDictList(table, **kwargs):
    import itertools
    max_len = 0
    singles = {}
    def WrapSingle(key, i):
        return kwargs[key]
    def WrapMultiple(key, i):
        return kwargs[key][i]
    def WrapNoneID(key, i):
        field = getattr(table, key)
        if field.default is not None:
            result = table.select(peewee.fn.COALESCE(peewee.fn.MAX(field)).where(table.id == singles["id"](i)))
        else:
            result = table.select(field).where(table.id == singles["id"](i))
        return result
    def WrapNoneImageTrack(key, i):
        field = getattr(table, key)
        if field.default is not None:
            # if the field has no default value, the SELECT query would return an empty list if the element does not exist
            # this would throw a not-null constraint exception and it would ignore the default value
            # therefore we have to use the default value, if no entry is found
            # MAX "convertes" the empy query to a NULL and COALESCE converts the NULL to the default value
            result = table.select(peewee.fn.COALESCE(peewee.fn.MAX(field), field.default)).where(table.image == singles["image"](i), table.track == singles["track"](i))
        else:
            result = table.select(field).where(table.image == singles["image"](i), table.track == singles["track"](i))
        return result
    for key in list(kwargs.keys()):
        if kwargs[key] is None:
            if "id" in kwargs and kwargs["id"] is not None:
                singles[key] = lambda i, key=key: WrapNoneID(key, i)
            elif ("image" in kwargs and kwargs["image"] is not None) and ("track" in kwargs and kwargs["track"] is not None):
                singles[key] = lambda i, key=key: WrapNoneImageTrack(key, i)
            else:
                del kwargs[key]
            continue
        if isinstance(kwargs[key], (tuple, list, np.ndarray)):
            if max_len > 1 and max_len != len(kwargs[key]):
                raise IndexError()
            max_len = max(max_len, len(kwargs[key]))
            singles[key] = lambda i, key=key: WrapMultiple(key, i)
        else:
            max_len = max(max_len, 1)
            singles[key] = lambda i, key=key: WrapSingle(key, i)
    dict_list = []
    for i in range(max_len):
        dict_list.append({key: singles[key](i) for key in kwargs})
    return dict_list

class Option:
    key = ""
    display_name = ""
    value = None
    default = ""
    value_type = ""
    value_count = 1
    min_value = None
    max_value = None
    decimals = None
    unit = None
    category = ""
    hidden = False
    tooltip = ""

    def __init__(self, **kwargs):
        for key in kwargs:
            setattr(self, key, kwargs[key])

class OptionAccess(object):
    def __init__(self, data_file):
        self.data_file = data_file

    def __getattr__(self, key):
        if key != "data_file":
            return self.data_file.getOption(key)
        return object.__getattr__(self, key)

    def __setattr__(self, key, value):
        if key != "data_file":
            return self.data_file.setOption(key, value)
        return object.__setattr__(self, key, value)

def VerboseDict(dictionary):
    return " and ".join("%s=%s" % (key, dictionary[key]) for key in dictionary)


class DoesNotExist(peewee.DoesNotExist):
    pass


class ImageDoesNotExist(DoesNotExist):
    pass


class MaskDimensionMismatch(DoesNotExist):
    pass


class MaskDtypeMismatch(DoesNotExist):
    pass


class MaskDimensionUnknown(DoesNotExist):
    pass


class MarkerTypeDoesNotExist(DoesNotExist):
    pass


class TrackDoesNotExist(DoesNotExist):
    pass


def GetCommandLineArgs():
    """
    Parse the command line arguments for the information provided by ClickPoints, if the script is invoked from within
    ClickPoints. The arguments are --start_frame --database and --port.

    Returns
    -------
    start_frame : int
        the frame ClickPoints was in when invoking the script. Probably the evaluation should start here
    database : string
        the filename of the database where the current ClickPoints project is stored. Should be used with
        clickpoints.DataFile
    port : int
        the port of the socket connection to communicate with the ClickPoints instance. Should be used with
        clickpoints.Commands
    """
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--database", dest='database', help='specify which database file to use')
    parser.add_argument("--start_frame", type=int, default=0, dest='start_frame',
                        help='specify at which frame to start')
    parser.add_argument("--port", type=int, dest='port', help='from which port to communicate with ClickPoints')
    args, unknown = parser.parse_known_args()
    return args.start_frame, args.database, args.port


def getLine(image, line, width=None):
    # get the start and end position of the line
    line = np.array(line)
    x1, y1 = line[0]
    x2, y2 = line[1]

    # the width and height of the line
    w = x2 - x1
    h = y2 - y1
    # the length
    length = np.sqrt(w ** 2 + h ** 2)
    # and the normed normal vector
    w2 = h / length
    h2 = -w / length

    # apply an optional offset if the image is a ClickPoints image with an offset
    offset = getattr(image, "offset", None)
    if offset is not None:
        offx, offy = offset.x, offset.y
    else:
        offx, offy = 0, 0
    x1 -= offx
    y1 -= offy

    # get the image data (if the image is a ClickPoints image, if not it is a numpy array)
    data = getattr(image, "data", None)
    image = data if data is not None else image

    # width None is a line with width of 1 and the result is returned as a 1D array
    if width is None:
        width = 1
        return_1d = True
    else:
        return_1d = False

    datas = []
    # iterate over the different image slices to get the width of the cut
    for j in np.arange(0, width) - width / 2. + 0.5:
        data = []
        # iterate over all pixels of the length
        for i in np.linspace(0, 1, np.ceil(length)):
            # get the position along the line
            x = x1 + w * i + w2 * j
            y = y1 + h * i + h2 * j
            # get the rounding percentage
            xp = x - np.floor(x)
            yp = y - np.floor(y)
            x, y = int(x), int(y)
            # and interpolate the 4 surrounding pixels according to the rounding percentage
            v = np.array([[1 - yp, yp]]).T @ np.array([[1 - xp, xp]])
            # for multi channel images (color images)
            if len(image.shape) == 3:
                data.append(np.sum(image[y:y + 2, x:x + 2, :] * v[:, :, None], axis=(0, 1),
                                   dtype=image.dtype))
            # and for single channel images
            else:
                data.append(np.sum(image[y:y + 2, x:x + 2] * v, dtype=image.dtype))
        datas.append(data)

    if return_1d:
        return np.array(datas[0])
    return np.array(datas)[::-1, :]

[docs]class DataFile: """ The DataFile class provides access to the .cdb file format in which ClickPoints stores the data for a project. Parameters ---------- database_filename : string the filename to open mode : string, optional can be 'r' (default) to open an existing database and append data to it or 'w' to create a new database. If the mode is 'w' and the database already exists, it will be deleted and a new database will be created. """ db = None _reader = None _current_version = "22" _database_filename = None _next_sort_index = 0 _SQLITE_MAX_VARIABLE_NUMBER = None _config = None _buffer = None """ Enumerations """ TYPE_Normal = 0 # old synonym for TYPE_Marker TYPE_Marker = 0 TYPE_Rect = 1 TYPE_Line = 2 TYPE_Track = 4 TYPE_Ellipse = 8 TYPE_Polygon = 16
[docs] def max_sql_variables(self): """Get the maximum number of arguments allowed in a query by the current sqlite3 implementation. Returns ------- int inferred SQLITE_MAX_VARIABLE_NUMBER """ import sqlite3 db = sqlite3.connect(':memory:') cur = db.cursor() cur.execute('CREATE TABLE t (test)') low, high = 0, 100000 while (high - 1) > low: guess = (high + low) // 2 query = 'INSERT INTO t VALUES ' + ','.join(['(?)' for _ in range(guess)]) args = [str(i) for i in range(guess)] try: cur.execute(query, args) except sqlite3.OperationalError as e: if "too many SQL variables" in str(e) or "too many terms in compound SELECT" in str(e): high = guess else: raise else: low = guess cur.close() db.close() return low
def saveReplaceMany(self, table, data): if self._SQLITE_MAX_VARIABLE_NUMBER is None: self._SQLITE_MAX_VARIABLE_NUMBER = self.max_sql_variables() chunk_size = ((self._SQLITE_MAX_VARIABLE_NUMBER // len(data[0])) - 1) // 2 with self.db.atomic(): for idx in range(0, len(data), chunk_size): table.replace_many(data[idx:idx + chunk_size]).execute() def saveInsertMany(self, table, data, fields=None): if self._SQLITE_MAX_VARIABLE_NUMBER is None: self._SQLITE_MAX_VARIABLE_NUMBER = self.max_sql_variables() chunk_size = ((self._SQLITE_MAX_VARIABLE_NUMBER // len(data[0])) - 1) // 2 with self.db.atomic(): for idx in range(0, len(data), chunk_size): table.insert_many(data[idx:idx + chunk_size], fields=fields).execute() def __init__(self, database_filename=None, mode='r'): if database_filename is None: raise TypeError("No database filename supplied.") self._database_filename = database_filename version = self._current_version new_database = True # Create a new database if mode == "w": if os.path.exists(self._database_filename): os.remove(self._database_filename) self.db = peewee.SqliteDatabase(database_filename) self.db.connect() else: # or read an existing one if not os.path.exists(self._database_filename) and mode != "r+": raise Exception("DB %s does not exist!" % os.path.abspath(self._database_filename)) exists = os.path.exists(self._database_filename) self.db = peewee.SqliteDatabase(database_filename) self.db.connect() if exists: version = self._CheckVersion() self._next_sort_index = None new_database = False """ Basic Tables """ class BaseModel(peewee.Model): class Meta: database = self.db database_class = self class Meta(BaseModel): key = peewee.CharField(unique=True) value = peewee.CharField() class Option(BaseModel): key = peewee.CharField(unique=True) value = peewee.CharField(null=True) class Path(BaseModel): path = peewee.CharField(unique=True) def __str__(self): return "PathObject id%s: path=%s" % (self.id, self.path) """ Layer Table """ class Layer(BaseModel): name = peewee.CharField(unique=True) base_layer = peewee.ForeignKeyField('self', related_name='dependent_layers') def __str__(self): return "LayerObject id%s:\tname=%s" \ % (self.id, self.name) def print_details(self): print("LayerObject:\n" "id:\t\t{0}\n" "name:\t{1}\n" .format(self.id, self.name)) class Image(BaseModel): filename = peewee.CharField() ext = peewee.CharField(max_length=10) frame = peewee.IntegerField(default=0) external_id = peewee.IntegerField(null=True) timestamp = peewee.DateTimeField(null=True) sort_index = peewee.IntegerField(default=0) width = peewee.IntegerField(null=True) height = peewee.IntegerField(null=True) path = peewee.ForeignKeyField(Path, backref="images", on_delete='CASCADE') layer = peewee.ForeignKeyField(Layer, backref="images", on_delete='CASCADE') class Meta: # image and path in combination have to be unique indexes = ((('filename', 'path', 'frame'), True),) def __array__(self): return self.get_data() def get_data(self): # if we have a buffer if self.database_class._buffer is not None: # we try to get the image from the buffer to speed up the loading image = self.database_class.get_image_data(self.sort_index, self.layer) # if for some reason the buffer returns None, we have to load the image from the file if image is not None: return image # only if we don't have the file already open (which in case for videos is important) if self.database_class._reader is None or self.database_class._reader.filename != self.filename: # compose the path if platform.system() == 'Linux' and self.path.path.startswith("\\\\"): # replace samba path for linux path = os.path.join("/mnt", self.path.path[2:], self.filename).replace("\\", "/") else: path = os.path.join(os.path.dirname(self.database_class._database_filename), self.path.path, self.filename) # get the reader (open the file) self.database_class._reader = imageio.get_reader(path) self.database_class._reader.filename = self.filename # return the image return self.database_class._reader.get_data(self.frame) def get_full_filename(self): filename = os.path.join(self.path.path, self.filename) # replace samba path for linux if platform.system() == 'Linux' and filename.startswith("\\\\"): filename = "/mnt/" + filename[2:].replace("\\", "/") # apply replace pattern if self.database_class.replace is not None: filename = filename.replace(self.database_class.replace[0], self.database_class.replace[1]) return filename @property def mask(self): try: return self.masks[0] except IndexError: return None @property def annotation(self): try: return self.annotations[0] except IndexError: return None @property def offset(self): try: return self.offsets[0] except IndexError: return None @property def data(self): return self.get_data() @property def data8(self): data = self.get_data().copy() if data.dtype == np.uint16: if data.max() < 2 ** 12: data >>= 4 return data.astype(np.uint8) data >>= 8 return data.astype(np.uint8) return data def __array__(self): return self.get_data() def getShape(self): if self.width is not None and self.height is not None: return (self.height, self.width) else: try: return self.data.shape[:2] except: raise IOError("Can't retrieve image dimensions for %s" % self.filename) def __str__(self): return f"ImageObject id{self.id}:\tfilename={self.filename}\text={self.ext}\tframe={self.frame}" \ f"texternal_id={self.external_id}\ttimestamp={self.timestamp}\tsort_index={self.sort_index}," \ f" width={self.width}\theight={self.height}\tpath={self.path}\tlayer={self.layer}" def print_details(self): print("ImageObject:\n" f"id:\t\t{self.id}\n" f"filename:\t{self.filename}\n" f"ext:\t{self.ext}\n" f"frame:\t{self.frame}\n" f"external_id:\t{self.external_id}\n" f"timestamp:\t{self.timestamp}\n" f"sort_index:\t{self.sort_index}\n" f"widht:\t{self.width}\n" f"height:\t{self.height}\n" f"path:\t{self.path}\n" f"layer:\t{self.layer}") self.base_model = BaseModel self.table_meta = Meta self.table_path = Path self.table_layer = Layer self.table_image = Image self.table_option = Option self._tables = [Meta, Path, Layer, Image, Option] """ Offset Table """ class Offset(BaseModel): image = peewee.ForeignKeyField(Image, unique=True, backref="offsets", on_delete='CASCADE') x = peewee.FloatField() y = peewee.FloatField() def __str__(self): return "OffsetObject id%s:\tx=%s\ty=%s\timage=%s" \ % (self.id, self.x, self.y, self.image) def print_details(self): print("OffsetObject:\n" "id:\t\t{0}\n" "x:\t{1}\n" "y:\t{2}\n" "image:\t{3}\n" .format(self.id, self.x, self.y, self.image)) def __add__(self, other): try: return np.array([self.x, self.y]) + other except TypeError: return np.array([self.x, self.y]) + np.array([other.x, other.y]) def __sub__(self, other): try: return np.array([self.x, self.y]) - other except TypeError: return np.array([self.x, self.y]) - np.array([other.x, other.y]) def __array__(self): return np.array([self.x, self.y]) self.table_offset = Offset self._tables.extend([Offset]) """ Marker Tables """ class MarkerType(BaseModel): name = peewee.CharField(unique=True) color = peewee.CharField() mode = peewee.IntegerField(default=0) style = peewee.CharField(null=True) text = peewee.CharField(null=True) hidden = peewee.BooleanField(default=False) def __str__(self): return "MarkerTypeObject id%s:\tname=%s\tcolor=%s\tmode=%s\tstyle=%s\ttext=%s\thidden=%s" \ % (self.id, self.name, self.color, self.mode, self.style, self.text, self.hidden) def print_details(self): print("MarkerTypeObject:\n" "id:\t\t{0}\n" "name:\t{1}\n" "color:\t{2}\n" "mode:\t{3}\n" "style:\t{4}\n" "text:\t{5}\n" "hidden:\t{6}\n" .format(self.id, self.name, self.color, self.mode, self.style, self.text, self.hidden)) def getColorRGB(self): return HTMLColorToRGB(self.color) class Track(BaseModel): style = peewee.CharField(null=True) text = peewee.CharField(null=True) type = peewee.ForeignKeyField(MarkerType, backref="tracks", on_delete='CASCADE') hidden = peewee.BooleanField(default=False) def __getattribute__(self, item): if item == "points": return np.array([[point.x, point.y] for point in self.markers]) if item == "points_corrected": return np.array([point.correctedXY() for point in self.markers]) if item == "markers": return self.track_markers.join(Image).order_by(Image.sort_index) if item == "times": return np.array([point.image.timestamp for point in self.markers]) if item == "frames": return np.array([point.image.sort_index for point in self.markers]) if item == "image_ids": return np.array([point.image.id for point in self.markers]) return BaseModel.__getattribute__(self, item) def __str__(self): return "TrackObject id%s:\ttype=%s\ttext=%s\tstyle=%s\thidden=%s" \ % (self.id, self.type, self.style, self.text, self.hidden) def print_details(self): print("TrackObject:\n" "id:\t\t{0}\n" "type:\t{1}\n" "style:\t{2}\n" "text:\t{3}\n" "hidden:\t{4}\n" .format(self.id, self.type, self.style, self.text, self.hidden)) def split(self, marker): # if we are not given a marker entry.. if not isinstance(marker, Marker): # we try to get it over its id marker = Marker.get(id=marker) # if not complain if marker is None: raise ValueError("No valid marker given.") # get the markers after the given marker markers = self.markers.where(Image.sort_index > marker.image.sort_index) # create a new track as a copy of this track new_track = Track(style=self.style, text=self.text, type=self.type, hidden=self.hidden) new_track.save(force_insert=True) # move the markers after the given marker to the new track Marker.update(track=new_track.id).where(Marker.id << markers).execute() # return the new track return new_track def removeAfter(self, marker): # if we are not given a marker entry.. if not isinstance(marker, Marker): # we try to get it over its id marker = Marker.get(id=marker) # if not complain if marker is None: raise ValueError("No valid marker given.") # get the markers after the given marker markers = self.markers.where(Image.sort_index > marker.image.sort_index) # and delete them return Marker.delete().where(Marker.id << markers).execute() def changeType(self, new_type): # if we are not given a MarkerType entry.. if not isinstance(new_type, MarkerType): # we try to get it by its id if isinstance(new_type, int): new_type = MarkerType.get(id=new_type) # or by its name else: new_type = MarkerType.get(name=new_type) # if we don't find anything, complain if new_type is None: raise ValueError("No valid marker type given.") # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Track: raise ValueError("Given type has not the mode TYPE_Track") # change the type and save self.type = new_type self.save() # and change the type of the markers #TODO: figure out why peewee ignores the where condition on some systems - works with raw query # q = self.database_class.table_marker.update(type=new_type).where(self.database_class.table_marker.track_id == self.id) count = self.database_class.db.execute_sql("UPDATE marker SET type_id = %d WHERE track_id = %d" % (new_type.id, self.id)) return count def merge(self, track, mode=None): # if we are not given a track.. if not isinstance(track, Track): # interpret it as a track id and get the track entry track = Track.get(id=track) # if we don't get it, complain if track is None: raise ValueError("No valid track given.") # find the image ids from this track and the other track my_image_ids = [m.image_id for m in self.markers] other_image_ids = [m.image_id for m in track.markers] # test if they share any image ids if set(my_image_ids) & set(other_image_ids): if mode == "average": # the temporary table generates the averaged track, every point of this temporary table is # upserted in the original track self.database_class.db.execute_sql("WITH bla as (SELECT id, image_id, AVG(x) as x, AVG(y) as y, ? as track_id, type_id, processed, style, text FROM marker WHERE track_id in (?, ?) GROUP BY image_id);" "INSERT OR REPLACE INTO marker(image_id, x, y, track_id, type_id, processed, style, text) SELECT image_id, x, y, track_id, type_id, processed, style, text from bla", [self.id, self.id, track.id]) # than we can delete the second track track.delete_instance() else: # they are not allowed to share any images image_list = set(my_image_ids) & set(other_image_ids) # list first 10 images with a conflict if len(image_list) < 10: image_list = ", ".join("#%d" % i for i in image_list) else: image_list = ", ".join(["#%d" % i for i in image_list][:10]) + ", ..." # raise an exception raise ValueError( "Can't merge track #%d with #%d, because they have markers in the same images.\n(images %s)" % ( self.id, track.id, image_list)) else: # elif set(my_image_ids) & set(other_image_ids) and force: # if len(set(my_image_ids) & set(other_image_ids))>1: # self.database_class.db.execute_sql('update marker set track_id = ? where id in (select min(id) as id from marker where track_id in (?,?) group by image_id)',[self.id, self.id, track.id]) # self.database_class.db.execute_sql('delete from marker where track_id=?',[track.id]) # move the markers from the other track to this track count = Marker.update(track=self.id, type=self.type).where(Marker.id << track.markers).execute() # and delete the other track track.delete_instance() return count class Marker(BaseModel): image = peewee.ForeignKeyField(Image, backref="markers", on_delete='CASCADE') x = peewee.FloatField() y = peewee.FloatField() type = peewee.ForeignKeyField(MarkerType, backref="markers", null=True, on_delete='CASCADE') processed = peewee.IntegerField(default=0) track = peewee.ForeignKeyField(Track, null=True, backref='track_markers', on_delete='CASCADE') style = peewee.CharField(null=True) text = peewee.CharField(null=True) class Meta: indexes = ((('image', 'track'), True),) def __str__(self): return "Marker Object: id=%s\timage=#%s\tx=%s\tx=%s\ttype=%s\tprocessed=%s\ttrack=#%s\tstyle=%s\ttext=%s" \ % (self.id, self.image_id, self.x, self.y, self.type, self.processed, self.track_id, self.style, self.text) def details(self): print("Marker Object:\n" "id:\t\t{0}\n" "image:\t{1}\n" "x:\t{2}\n" "y:\t{3}\n" "type:\t{4}\n" "processed:\t{5}\n" "track:\t{5}\n" "style:\t{5}\n" "text:\t{5}\n" .format(self.id, self.image, self.x, self.y, self.type, self.processed, self.track, self.style, self.text)) def correctedXY(self): return np.array(self.database_class.db.execute_sql( "SELECT m.x - IFNULL(o.x, 0), m.y - IFNULL(o.y, 0) FROM marker m LEFT JOIN image i ON m.image_id == i.id LEFT JOIN offset o ON i.id == o.image_id WHERE m.id == ?", [self.id]).fetchone()) def pos(self): return np.array([self.x, self.y]) def __add__(self, other): try: return np.array([self.x, self.y]) + other except TypeError: return np.array([self.x, self.y]) + np.array([other.x, other.y]) def __sub__(self, other): try: return np.array([self.x, self.y]) - other except TypeError: return np.array([self.x, self.y]) - np.array([other.x, other.y]) def __array__(self): return np.array([self.x, self.y]) def changeType(self, new_type): # if we are not given a MarkerType entry.. if not isinstance(new_type, MarkerType): # we try to get it by its id if isinstance(new_type, int): new_type = MarkerType.get(id=new_type) # or by its name else: new_type = MarkerType.get(name=new_type) # if we don't find anything, complain if new_type is None: raise ValueError("No valid marker type given.") if self.type.mode == self.database_class.TYPE_Normal: # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Normal: raise ValueError("Given type has not the mode TYPE_Normal") elif self.type.mode == self.database_class.TYPE_Track: # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Track: raise ValueError("Given type has not the mode TYPE_Track") # change the type and save self.type = new_type return self.save() def getPixels(self, shape=None, perimeter=False): import skimage.draw if shape is None: shape = self.image.data.shape if 0 <= self.x < shape[1] and 0 <= self.y < shape[0]: return (int(self.y+0.5), int(self.x+0.5)) return (), () class Line(BaseModel): image = peewee.ForeignKeyField(Image, backref="lines", on_delete='CASCADE') x1 = peewee.FloatField() y1 = peewee.FloatField() x2 = peewee.FloatField() y2 = peewee.FloatField() type = peewee.ForeignKeyField(MarkerType, backref="lines", null=True, on_delete='CASCADE') processed = peewee.IntegerField(default=0) style = peewee.CharField(null=True) text = peewee.CharField(null=True) def setPos1(self, x, y): self.x1 = x self.y1 = y def setPos2(self, x, y): self.x2 = x self.y2 = y def getPos(self): return [self.x1, self.y1, self.x2, self.y2] def getPos1(self): return [self.x1, self.y1] def getPos2(self): return [self.x2, self.y2] @property def points(self): return np.array([[self.x1, self.y1], [self.x2, self.y2], ]) @property def center(self): # the center is the mean of all points return np.array([(self.x1 + self.x2) / 2, (self.y1 + self.y2) / 2]) @property def perimeter(self): return np.sqrt((self.x1-self.x2)**2 + (self.y1-self.y2)**2) # def __getattribute__(self, item): # if item == "correctedXY": # return self.correctedXY() # if item == "pos": # return self.pos() # if item == "length": # return self.length() # return BaseModel.__getattribute__(self, item) def correctedXY(self): join_condition = (Marker.image == Offset.image) querry = Marker.select(Marker.x, Marker.y, Offset.x, Offset.y) \ .join(Offset, peewee.JOIN_LEFT_OUTER, on=(join_condition).alias('offset')) \ .where(Marker.id == self.id) for q in querry: if not (q.offset.x is None) or not (q.offset.y is None): pt = [q.x + q.offset.x, q.y + q.offset.y] else: pt = [q.x, q.y] return pt def pos(self): return np.array([self.x, self.y]) def length(self): return np.sqrt((self.x1-self.x2)**2 + (self.y1-self.y2)**2) def angle(self): return np.arctan2(self.y1 - self.y2, self.x1 - self.x2) def cropImage(self, image=None, width=None): # if no image is given take the image of the line if image is None: image = self.image return getLine(image, self, width) def __str__(self): return "LineObject id%s:\timage=%s\tx1=%s\ty1=%s\tx2=%s\ty2=%s\ttype=%s\tprocessed=%s\tstyle=%s\ttext=%s" \ % (self.id, self.image, self.x1, self.y1, self.x2, self.y2, self.type, self.processed, self.style, self.text) def print_details(self): print("LineObject:\n" "id:\t\t{0}\n" "image:\t{1}\n" "x1:\t{2}\n" "y1:\t{3}\n" "x2:\t{4}\n" "y2:\t{5}\n" "type:\t{6}\n" "processed:\t{7}\n" "style:\t{8}\n" "text:\t{9}" .format(self.id, self.image, self.x1, self.y1, self.x2, self.y2, self.type, self.processed, self.style, self.text)) def changeType(self, new_type): # if we are not given a MarkerType entry.. if not isinstance(new_type, MarkerType): # we try to get it by its id if isinstance(new_type, int): new_type = MarkerType.get(id=new_type) # or by its name else: new_type = MarkerType.get(name=new_type) # if we don't find anything, complain if new_type is None: raise ValueError("No valid marker type given.") # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Line: raise ValueError("Given type has not the mode TYPE_Line") # change the type and save self.type = new_type return self.save() def __array__(self): return np.array([[self.x1, self.y1], [self.x2, self.y2]]) def getPixels(self, shape=None, perimeter=False): import skimage.draw if shape is None: shape = self.image.data.shape rr, cc = skimage.draw.line(int(self.y1+0.5), int(self.x1+0.5), int(self.y2+0.5), int(self.x2+0.5)) inside = (rr>0) & (rr<shape[0]) & (cc>0) & (cc<shape[1]) return rr[inside], cc[inside] class Rectangle(BaseModel): image = peewee.ForeignKeyField(Image, backref="rectangles", on_delete='CASCADE') x = peewee.FloatField() y = peewee.FloatField() width = peewee.FloatField() height = peewee.FloatField() type = peewee.ForeignKeyField(MarkerType, backref="rectangles", null=True, on_delete='CASCADE') processed = peewee.IntegerField(default=0) style = peewee.CharField(null=True) text = peewee.CharField(null=True) def setPos1(self, x, y): self.x = x self.y = y #def setPos2(self, x, y): # self.x = x # self.y = y @property def points(self): return np.array([[self.x, self.y], [self.x+self.width, self.y], [self.x+self.width, self.y+self.height], [self.x, self.y+self.height]]) @property def center(self): # the center is the mean of all points return np.array([self.x+self.width/2, self.y+self.height/2]) @property def perimeter(self): return self.width*2 + self.height*2 def getRect(self): return [self.x, self.y, self.width, self.height] def getPos1(self): return [self.x, self.y] def getPos2(self): return [self.x+self.width, self.y] def getPos3(self): return [self.x+self.width, self.y+self.height] def getPos4(self): return [self.x, self.y+self.height] def correctedXY(self): return np.array(self.database_class.db.execute_sql( "SELECT r.x - IFNULL(o.x, 0), r.y - IFNULL(o.y, 0) FROM rectangle r LEFT JOIN image i ON r.image_id == i.id LEFT JOIN offset o ON i.id == o.image_id WHERE r.id == ?", [self.id]).fetchone()) join_condition = (Marker.image == Offset.image) querry = Marker.select(Marker.x, Marker.y, Offset.x, Offset.y) \ .join(Offset, peewee.JOIN_LEFT_OUTER, on=(join_condition).alias('offset')) \ .where(Marker.id == self.id) for q in querry: if not (q.offset.x is None) or not (q.offset.y is None): pt = [q.x + q.offset.x, q.y + q.offset.y] else: pt = [q.x, q.y] return pt def pos(self): return np.array([self.x, self.y]) def slice_x(self, border=0): if self.width < 0: return slice(int(self.x+self.width-border), int(self.x+border)) return slice(int(self.x-border), int(self.x+self.width+border)) def slice_y(self, border=0): if self.height < 0: return slice(int(self.y+self.height-border), int(self.y + border)) return slice(int(self.y-border), int(self.y + self.height + border)) def slice(self, border=0): try: border_y, border_x = border except TypeError: border_y = border border_x = border return (self.slice_y(border_y), self.slice_x(border_x)) def cropImage(self, image=None, with_offset=True, with_subpixel=False, border=0): # if no image is given take the image of the rectangle if image is None: image = self.image # get the date from the image if it is a database entry if isinstance(image, Image): image_data = image.data # if not, assume it is a numpy array else: image_data = image try: border_y, border_x = border except TypeError: border_y = border border_x = border # the start of the rectangle start = np.array([self.x-border_x, self.y-border_y]) # the dimensions of the rectangle (needs to be integers) extent = np.array([self.width+border_x*2, self.height+border_y*2]).astype("int") # check if some dimensions are negative if self.width < 0: extent[0] = -extent[0] start[0] = start[0] - extent[0] if self.height < 0: extent[1] = -extent[1] start[1] = start[1] - extent[1] # subtract the offset from the image if with_offset: # try to get the offset from the image (fails if image is already a numpy array) try: # get the offset from the rectangle's image offset0 = self.image.offset # get the offset from the target image offset = image.offset # try to calculate the difference if offset0 is not None: if offset is not None: offset = offset - offset0 else: offset = -np.array([offset0.x, offset0.y]) # image is a numpy array, it has no offset information except AttributeError: offset = None # apply the offset, if we found one if offset is not None: start -= offset # split offsets in integer and decimal part start_int = start.astype("int") start_float = start - start_int # get the cropped image crop = image_data[start_int[1]:start_int[1]+extent[1], start_int[0]:start_int[0]+extent[0]] # apply the subpixel decimal shift if with_subpixel and (start_float[0] or start_float[1]): from scipy.ndimage import shift if len(crop.shape) == 2: # bw image crop = shift(crop, [start_float[1], start_float[0]]) else: # color image crop = shift(crop, [start_float[1], start_float[0], 0]) # return the cropped image return crop def area(self): return self.width * self.height def __str__(self): return "RectangleObject id%s:\timage=%s\tx=%s\ty=%s\twidth=%s\theight=%s\ttype=%s\tprocessed=%s\tstyle=%s\ttext=%s" \ % ( self.id, self.image, self.x, self.y, self.width, self.height, self.type, self.processed, self.style, self.text) def print_details(self): print("RectangleObject:\n" "id:\t\t{0}\n" "image:\t{1}\n" "x:\t{2}\n" "y:\t{3}\n" "width:\t{4}\n" "height:\t{5}\n" "type:\t{6}\n" "processed:\t{7}\n" "style:\t{8}\n" "text:\t{9}" .format(self.id, self.image, self.x, self.y, self.width, self.height, self.type, self.processed, self.style, self.text)) def changeType(self, new_type): # if we are not given a MarkerType entry.. if not isinstance(new_type, MarkerType): # we try to get it by its id if isinstance(new_type, int): new_type = MarkerType.get(id=new_type) # or by its name else: new_type = MarkerType.get(name=new_type) # if we don't find anything, complain if new_type is None: raise ValueError("No valid marker type given.") # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Rect: raise ValueError("Given type has not the mode TYPE_Rect") # change the type and save self.type = new_type return self.save() def getPixels(self, shape=None, perimeter=False): import skimage.draw if shape is None: shape = self.image.data.shape x1, y1 = self.getPos1() x2, y2 = self.getPos3() w, h = self.width, self.height if perimeter is True: rr, cc = skimage.draw.rectangle_perimeter((int(y1+0.5), int(x1+0.5)), (int(y2+0.5), int(x2+0.5)), shape=shape) else: rr, cc = skimage.draw.rectangle((int(y1+0.5), int(x1+0.5)), (int(y2+0.5), int(x2+0.5)), shape=shape) return rr, cc class Ellipse(BaseModel): image = peewee.ForeignKeyField(Image, backref="ellipses", on_delete='CASCADE') x = peewee.FloatField() y = peewee.FloatField() width = peewee.FloatField() height = peewee.FloatField() angle = peewee.FloatField() type = peewee.ForeignKeyField(MarkerType, backref="ellipses", null=True, on_delete='CASCADE') processed = peewee.IntegerField(default=0) style = peewee.CharField(null=True) text = peewee.CharField(null=True) @property def center(self): return np.array([self.x, self.y]) @property def area(self): return np.pi * self.width / 2 * self.height / 2 def __str__(self): return f"EllipseObject id{self.id}:\timage={self.image}\tx={self.x}\ty={self.y}\twidth={self.width}\theight={self.height}\tangle={self.angle}\ttype={self.type}\tprocessed={self.processed}\tstyle={self.style}\ttext={self.text}" def print_details(self): print("EllipseObject:\n" f"id:\t\t{self.id}\n" f"image:\t{self.image}\n" f"x:\t{self.x}\n" f"y:\t{self.y}\n" f"width:\t{self.width}\n" f"height:\t{self.height}\n" f"angle:\t{self.angle}\n" f"type:\t{self.type}\n" f"processed:\t{self.processed}\n" f"style:\t{self.style}\n" f"text:\t{self.text}") def changeType(self, new_type): # if we are not given a MarkerType entry.. if not isinstance(new_type, MarkerType): # we try to get it by its id if isinstance(new_type, int): new_type = MarkerType.get(id=new_type) # or by its name else: new_type = MarkerType.get(name=new_type) # if we don't find anything, complain if new_type is None: raise ValueError("No valid marker type given.") # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Ellipse: raise ValueError("Given type has not the mode TYPE_Ellipse") # change the type and save self.type = new_type return self.save() def getPixels(self, shape=None, perimeter=False): import skimage.draw if shape is None: shape = self.image.data.shape if perimeter is True: rr, cc = skimage.draw.ellipse_perimeter(int(self.y + 0.5), int(self.x + 0.5), int(self.width / 2), int(self.height / 2), np.pi / 2 + np.deg2rad(self.angle), shape) else: rr, cc = skimage.draw.ellipse(self.y, self.x, self.width / 2, self.height / 2, shape, np.pi / 2 - np.deg2rad(self.angle)) return rr, cc this = self class Polygon(BaseModel): image = peewee.ForeignKeyField(Image, backref="polygons", on_delete='CASCADE') type = peewee.ForeignKeyField(MarkerType, backref="polygons", null=True, on_delete='CASCADE') closed = peewee.BooleanField(default=0) processed = peewee.IntegerField(default=0) style = peewee.CharField(null=True) text = peewee.CharField(null=True) def __array__(self): return self.points @property def points(self): if getattr(self, "cached_points", None) is None: self.cached_points = np.array(self.points_raw.select(this.table_polygon_point.x, this.table_polygon_point.y) .order_by(this.table_polygon_point.index).tuples(), dtype=float).ravel().reshape(-1, 2) return self.cached_points @points.setter def points(self, points): # store the points self.cached_points = np.asarray(points) # remember that this "points" "field" is dirty (e.g. is not synchronous with the database) setattr(self, "points_dirty", True) @property def area(self): x, y = self.points.T # the shoelace formula for the area of a polygon return 0.5 * np.abs(x @ np.roll(y, 1) - y @ np.roll(x, 1)) @property def center(self): # the center is the mean of all points return np.mean(np.asarray(self.points), axis=0) @property def perimeter(self): p = np.asarray(self.points) # if it is closed, include the distance form the last to the first if self.closed: return np.sum(np.linalg.norm(p - np.roll(p, shift=1, axis=0), axis=1)) else: # if not, it is just the sum of the distances between subsequent points return np.sum(np.linalg.norm(p[:-1] - p[1:], axis=1)) def save(self, *args, **kwargs): BaseModel.save(self, *args, **kwargs) if getattr(self, "points_dirty", False) is True: # remove unnecessary points this.db.execute_sql(f"DELETE FROM polygonpoint WHERE polygon_id = {self.id} AND 'index' >= {len(self.cached_points)};") # update the points data = [] for index, point in enumerate(self.cached_points): data.append(dict(polygon=self.id, x=point[0], y=point[1], index=index)) this.saveReplaceMany(this.table_polygon_point, data) def is_dirty(self): return BaseModel.is_dirty(self) or getattr(self, "points_dirty", False) def __str__(self): return f"PolygonObject id{self.id}:\timage={self.image}\ttype={self.type}\tprocessed={self.processed}\tstyle={self.style}\ttext={self.text}" def print_details(self): print("PolygonObject:\n" f"id:\t\t{self.id}\n" f"image:\t{self.image}\n" f"type:\t{self.type}\n" f"processed:\t{self.processed}\n" f"style:\t{self.style}\n" f"text:\t{self.text}") def changeType(self, new_type): # if we are not given a MarkerType entry.. if not isinstance(new_type, MarkerType): # we try to get it by its id if isinstance(new_type, int): new_type = MarkerType.get(id=new_type) # or by its name else: new_type = MarkerType.get(name=new_type) # if we don't find anything, complain if new_type is None: raise ValueError("No valid marker type given.") # ensure that the mode is correct if new_type.mode != self.database_class.TYPE_Polygon: raise ValueError("Given type has not the mode TYPE_Polygon") # change the type and save self.type = new_type return self.save() def getPixels(self, shape=None, perimeter=False): import skimage.draw if shape is None: shape = self.image.data.shape x, y = self.__array__().T if perimeter is True: rr, cc = skimage.draw.polygon_perimeter(y, x, shape) else: rr, cc = skimage.draw.polygon(y, x, shape) return rr, cc class PolygonPoint(BaseModel): polygon = peewee.ForeignKeyField(Polygon, backref="points_raw", on_delete='CASCADE') x = peewee.FloatField() y = peewee.FloatField() index = peewee.IntegerField() class Meta: # image and path in combination have to be unique indexes = ((('polygon', 'index'), True),) def pos(self): return np.array([self.x, self.y]) self.table_marker = Marker self.table_line = Line self.table_rectangle = Rectangle self.table_ellipse = Ellipse self.table_polygon = Polygon self.table_polygon_point = PolygonPoint self.table_track = Track self.table_markertype = MarkerType self._tables.extend([Marker, Line, Rectangle, Ellipse, Polygon, PolygonPoint, Track, MarkerType]) """ Mask Tables """ class Mask(BaseModel): image = peewee.ForeignKeyField(Image, backref="masks", on_delete='CASCADE') data = ImageField() def __array__(self): return self.data def __str__(self): return "MaskObject id%s: image=%s, data=%s" % (self.id, self.image, self.data) class MaskType(BaseModel): name = peewee.CharField(unique=True) color = peewee.CharField() index = peewee.IntegerField(unique=True) def __str__(self): return "MasktypeObject id%s: name=%s, color=%s, index=%s" % (self.id, self.name, self.color, self.index) def getColorRGB(self): return HTMLColorToRGB(self.color) self.table_mask = Mask self.table_masktype = MaskType self._tables.extend([Mask, MaskType]) """ Annotation Tables """ class Annotation(BaseModel): image = peewee.ForeignKeyField(Image, unique=True, backref="annotations", on_delete='CASCADE') timestamp = peewee.DateTimeField(null=True) comment = peewee.TextField(default="") rating = peewee.IntegerField(default=0) def __getattribute__(self, item): if item == "tags": return [tagassociations.tag for tagassociations in self.tagassociations] return BaseModel.__getattribute__(self, item) def __str__(self): return "AnnotationObject id%s:\timage=%s\ttimestamp=%s\tcomment=%s\trating=%s" \ % (self.id, self.image, self.timestamp, self.comment, self.rating) def print_details(self): print("AnnotationObject:\n" "id:\t\t{0}\n" "image:\t{1}\n" "timestamp:\t{2}\n" "comment:\t{3}\n" "rating:\t{4}\n" .format(self.id, self.image, self.timestamp, self.comment, self.rating)) class Tag(BaseModel): name = peewee.CharField() def __getattribute__(self, item): if item == "annotations": return [tagassociations.annotation for tagassociations in self.tagassociations] return BaseModel.__getattribute__(self, item) def __str__(self): return "TagObject id%s:\timage=%s" \ % (self.id, self.name) def print_details(self): print("TagObject:\n" "id:\t\t{0}\n" "name:\t{1}\n" .format(self.id, self.name)) class TagAssociation(BaseModel): annotation = peewee.ForeignKeyField(Annotation, backref="tagassociations", on_delete='CASCADE') tag = peewee.ForeignKeyField(Tag, backref="tagassociations", on_delete='CASCADE') def __str__(self): return "TagAssociationObject id%s:\tannotation=%s\ttag=%s" \ % (self.id, self.annotation, self.tag) def print_details(self): print("TagAssociationObject:\n" "id:\t\t{0}\n" "annotation:\t{1}\n" "tag:\t{2}\n" .format(self.id, self.annotation, self.tag)) self.table_annotation = Annotation self.table_tag = Tag self.table_tagassociation = TagAssociation self._tables.extend([Annotation, Tag, TagAssociation]) """ Connect """ try: self.db.connect() except peewee.OperationalError: pass self._CreateTables() self.db.execute_sql("PRAGMA foreign_keys = ON") self.db.execute_sql("PRAGMA journal_mode = WAL") if new_database: self.db.execute_sql("CREATE TRIGGER no_empty_tracks\ AFTER DELETE ON marker\ BEGIN\ DELETE FROM track WHERE id = OLD.track_id AND (SELECT COUNT(marker.id) FROM marker WHERE marker.track_id = track.id) = 0;\ END;") if new_database: self.table_meta(key="version", value=self._current_version).save() # second migration part which needs the peewee model if version is not None and int(version) < int(self._current_version): self._migrateDBFrom2(version) self._InitOptions() def __del__(self): if self.db: self.db.close() def _InitOptions(self): self._options = {} self._options_by_key = {} self._last_category = "General" self._AddOption(key="jumps", display_name="Frame Jumps", default=[-1, +1, -10, +10, -100, +100, -1000, +1000], value_type="int", value_count=8, tooltip="How many frames to jump\n" "for the keys on the numpad:\n" "2, 3, 5, 6, 8, 9, /, *") self._AddOption(key="auto_contrast", display_name="Auto Contrast", default=False, value_type="bool") self._AddOption(key="rotation", default=0, value_type="int", hidden=True) self._AddOption(key="rotation_steps", default=90, value_type="int", hidden=True) self._AddOption(key="hide_interfaces", default=True, value_type="bool", hidden=True) self._AddOption(key="timestamp_formats", default="['%Y%m%d-%H%M%S-%f', '%Y%m%d-%H%M%S']", value_type="string", hidden=True) self._AddOption(key="timestamp_formats2", default="['%Y%m%d-%H%M%S_%Y%m%d-%H%M%S']", value_type="string", hidden=True) self._AddOption(key="max_image_size", default=2**14, value_type="int", hidden=True) self._AddOption(key="threaded_image_load", display_name="Thread image load", default=True, value_type="bool", tooltip="Whether to do image loading\n" "in a separate thread.\n" "Should only be altered if threading\n" "causes issues.") self._AddOption(key="threaded_image_display", display_name="Thread image display", default=True, value_type="bool", tooltip="Whether to do image display\n" "preparation in a separate thread.") self._AddOption(key="buffer_mode", display_name="Buffer Mode", default=2, value_type="choice", values=["No Buffer", "Limit Buffer by Frames", "Limit Buffer by Memory"]) self._AddOption(key="buffer_size", display_name="Buffer Frame Count", default=300, value_type="int", min_value=1, tooltip="How many frames to keep in buffer.\n" "The buffer should be only as big as the\n" "RAM has space to prevent swapping.") self._AddOption(key="buffer_memory", display_name="Buffer Memory Amount", default=500, value_type="int", min_value=1, unit="MB", tooltip="How big the buffer is allowed to grow in MB.\n" "This is no hard limit, the buffer can grow\n" "one image bigger than the allowed memory size.\n" "The buffer should be only as big as the\n" "RAM has space to prevent swapping.") self._last_category = "Script Launcher" self._AddOption(key="scripts", hidden=True, default=[], value_type="array") self._last_category = "Contrast Adjust" self._AddOption(key="contrast_interface_hidden", default=True, value_type="bool", hidden=True) self._AddOption(key="contrast", default=None, value_type="dict", hidden=True) self._last_category = "Marker" self._AddOption(key="types", default={0: ["marker", [255, 0, 0], self.TYPE_Normal]}, value_type="dict", hidden=True) self._AddOption(key="selected_marker_type", default=-1, value_type="int", hidden=True) self._AddOption(key="marker_interface_hidden", default=True, value_type="bool", hidden=True) self._AddOption(key="tracking_connect_nearest", display_name="Track Auto-Connect", default=False, value_type="bool", tooltip="When Auto-Connect is turned on,\n" "clicking in the image will always\n" "move the current point of the nearest track\n" "instead of starting a new track.\n" "To start a new track while Auto-Connect\n" "is turned on, hold down the 'alt' key") self._AddOption(key="tracking_show_trailing", display_name="Track show trailing", default=20, value_type="int", min_value=-1, tooltip="Nr of track markers displayed\n" "before the current frame (past).\n" "-1 for all.") self._AddOption(key="tracking_show_leading", display_name="Track show leading", default=0, value_type="int", min_value=-1, tooltip="Nr of track markers displayed\n" "after the current frame (future).\n" "-1 for all.") self._AddOption(key="tracking_hide_trailing", display_name="Track hide trailing", default=2, value_type="int", min_value=0, tooltip="Nr of frames before the first track marker\n" "until which the track is hidden.") self._AddOption(key="tracking_hide_leading", display_name="Track hide leading", default=2, value_type="int", min_value=0, tooltip="Nr of frames after the last track marker\n" "until the the track is hidden") self._last_category = "Mask" self._AddOption(key="draw_types", default=[[1, [124, 124, 255], "mask"]], value_type="list", hidden=True) self._AddOption(key="selected_draw_type", default=-1, value_type="int", hidden=True) self._AddOption(key="mask_opacity", default=0.5, value_type="float", hidden=True) self._AddOption(key="mask_brush_size", default=10, value_type="int", hidden=True) self._AddOption(key="mask_interface_hidden", default=True, value_type="bool", hidden=True) self._AddOption(key="auto_mask_update", display_name="Auto Mask Update", default=True, value_type="bool", tooltip="When turned on, mask changes\n" "are directly displayed as the mask\n" "if not, it is first displayed\n" "separately to increase speed.") self._last_category = "Info Hud" self._AddOption(key="info_hud_string", display_name="Info Text", default="", value_type="string", tooltip="Can display extra information of the image.\n" "Supports the following types:\n" "exif[] exit information from jpeg files.\n" "regex[] information from the filename.\n" "meta[] meta information from tiff images.") self._AddOption(key="filename_data_regex", display_name="Filename Regex", default="", value_type="string", tooltip="Can display extra information of the image.\n" "Supports the following types:\n" "exif[] exit information from jpeg files.\n" "regex[] information from the filename.\n" "meta[] meta information from tiff images.") self._AddOption(key="infohud_interface_hidden", default=True, value_type="bool", hidden=True) self._last_category = "Timeline" self._AddOption(key="fps", default=0, value_type="float", hidden=True) self._AddOption(key="skip", default=1, value_type="int", hidden=True) self._AddOption(key="play_start", default=0.0, value_type="float", hidden=True) self._AddOption(key="play_end", default=1.0, value_type="float", hidden=True) self._AddOption(key="playing", default=False, value_type="bool", hidden=True) self._AddOption(key="timeline_hide", default=False, value_type="bool", hidden=True) self._AddOption(key="datetimeline_show", display_name="Show Datetimeline", default=True, value_type="bool", tooltip="Whether to display the slider with dates.\n" "Changes are only displayed after restart.") self._AddOption(key="display_timeformat", display_name="Timeformat for Display", default=r'%Y-%m-%d %H:%M:%S.%2f', value_type="string", tooltip="How the time of the current frame\n" "should be displayed.\n" "Use %Y for year\n" "%m for month\n" "%d for day\n" "%H for hour\n" "%M for minute\n" "%S for second\n" "%2f for milliseconds\n" "%6f for nanoseconds") self._last_category = "Video Exporter" self._AddOption(key="export_video_filename", default="export/export.mp4", value_type="string", hidden=True) self._AddOption(key="export_image_filename", default="export/images%d.jpg", value_type="string", hidden=True) self._AddOption(key="export_single_image_filename", default="export/images%d.jpg", value_type="string", hidden=True) self._AddOption(key="export_gif_filename", default="export/export.gif", value_type="string", hidden=True) self._AddOption(key="export_type", default=0, value_type="int", hidden=True) self._AddOption(key="video_codec", default="libx264", value_type="string", hidden=True) self._AddOption(key="video_quality", default=5, value_type="int", hidden=True) self._AddOption(key="export_display_time", default=True, value_type="bool", hidden=True) self._AddOption(key="export_time_from_zero", default=True, value_type="bool", hidden=True) self._AddOption(key="export_time_font_size", default=50, value_type="int", hidden=True) self._AddOption(key="export_time_font_color", default="#FFFFFF", value_type="string", hidden=True) self._AddOption(key="export_time_format", default="%Y-%m-%d %H:%M:%S", value_type="string", hidden=True) self._AddOption(key="export_timedelta_format", default="%H:%M:%S", value_type="string", hidden=True) self._AddOption(key="export_custom_time", default=False, value_type="bool", hidden=True) self._AddOption(key="export_custom_time_delta", default=1.0, value_type="float", hidden=True) self._AddOption(key="export_image_scale", default=1.0, value_type="float", hidden=True) self._AddOption(key="export_marker_scale", default=1.0, value_type="float", hidden=True) self._last_category = "Annotations" self._AddOption(key="server_annotations", default=False, value_type="bool", hidden=True) self._AddOption(key="sql_dbname", default='', value_type="string", hidden=True) self._AddOption(key="sql_host", default='', value_type="string", hidden=True) self._AddOption(key="sql_port", default=3306, value_type="int", hidden=True) self._AddOption(key="sql_user", default='', value_type="string", hidden=True) self._AddOption(key="sql_pwd", default='', value_type="string", hidden=True) def _AddOption(self, **kwargs): category = kwargs["category"] if "category" in kwargs else self._last_category if "display_name" not in kwargs: kwargs["display_name"] = kwargs["key"] option = Option(**kwargs) if category not in self._options: self._options[category] = [] try: entry = self.table_option.get(key=option.key) entry_found = True if option.value_type == "int": if option.value_count > 1: option.value = self._stringToList(entry.value) else: option.value = int(entry.value) if option.value_type == "dict" or option.value_type == "list": import ast option.value = ast.literal_eval(entry.value) if option.value_type == "choice": option.value = int(entry.value) if option.value_type == "choice_string": option.value = str(entry.value) if option.value_type == "float": option.value = float(entry.value) if option.value_type == "bool": option.value = (entry.value == "True") or (entry.value == True) if option.value_type == "string": option.value = str(entry.value) if option.value_type == "color": option.value = str(entry.value) if option.value_type == "array": option.value = [value.strip()[1:-1] if value.strip()[0] != "u" else value.strip()[2:-1] for value in entry.value[1:-1].split(",")] except peewee.DoesNotExist: entry_found = False self._options[category].append(option) self._options_by_key[option.key] = option if self._config is not None and option.key in self._config and not entry_found: self.setOption(option.key, self._config[option.key]) #print("Config", option.key, self._config[option.key]) #print("Config", option.key, self._config[option.key]) def _stringToList(self, value): value = value.strip() if (value.startswith("(") and value.endswith(")")) or (value.startswith("[") and value.endswith("]")): value = value[1:-1].strip() if value.endswith(","): value[:-1].strip() try: value = [int(v) for v in value.split(",")] except ValueError: raise ValueError() return value def setOption(self, key, value): option = self._options_by_key[key] option.value = value value = str(value) if str(option.default) == value: try: self.table_option.get(key=option.key).delete_instance() except peewee.DoesNotExist: return # write protected except peewee.OperationalError: pass else: try: entry = self.table_option.get(key=option.key) entry.value = value entry.save() except peewee.DoesNotExist: try: self.table_option(key=option.key, value=value).save(force_insert=True) # write protected except peewee.OperationalError: pass # write protected except peewee.OperationalError: pass def getOption(self, key): option = self._options_by_key[key] if option.value is None: if isinstance(option.default, (dict, list)): return option.default.copy() return option.default return option.value def getOptionAccess(self): return OptionAccess(self) def _CheckVersion(self): try: version = self.db.execute_sql('SELECT value FROM meta WHERE key = "version"').fetchone()[0] except (KeyError, peewee.DoesNotExist): version = "0" if int(version) < int(self._current_version): print("Open database with version", version) self._migrateDBFrom(version) elif int(version) > int(self._current_version): print("Warning Database version %d is newer than ClickPoints version %d " "- please get an updated Version!" % (int(version), int(self._current_version))) print("Proceeding on own risk!") return version def _migrateDBFrom(self, version): # migrate database from an older version print("Migrating DB from version %s" % version) nr_version = int(version) self.db.connection().row_factory = dict_factory if nr_version < 3: print("\tto 3") with self.db.transaction(): # Add text fields for Marker self.db.execute_sql("ALTER TABLE marker ADD COLUMN text varchar(255)") self._SetVersion(3) if nr_version < 4: print("\tto 4") with self.db.transaction(): # Add text fields for Tracks self.db.execute_sql("ALTER TABLE tracks ADD COLUMN text varchar(255)") # Add text fields for Types self.db.execute_sql("ALTER TABLE types ADD COLUMN text varchar(255)") self._SetVersion(4) if nr_version < 5: print("\tto 5") with self.db.transaction(): # Add text fields for Tracks self.db.execute_sql("ALTER TABLE images ADD COLUMN frame int DEFAULT 0") self.db.execute_sql("ALTER TABLE images ADD COLUMN sort_index int") with self.db.transaction(): self.db.execute_sql( "CREATE TEMPORARY TABLE NewIDs (sort_index INTEGER PRIMARY KEY AUTOINCREMENT, id INT UNSIGNED)") self.db.execute_sql("INSERT INTO NewIDs (id) SELECT id FROM images ORDER BY filename ASC") self.db.execute_sql( "UPDATE images SET sort_index = (SELECT sort_index FROM NewIDs WHERE images.id = NewIDs.id)-1") self.db.execute_sql("DROP TABLE NewIDs") self._SetVersion(5) if nr_version < 6: print("\tto 6") with self.db.transaction(): # Add text fields for Tracks self.db.execute_sql("ALTER TABLE images ADD COLUMN path_id int") self.db.execute_sql("ALTER TABLE images ADD COLUMN width int NULL") self.db.execute_sql("ALTER TABLE images ADD COLUMN height int NULL") self._SetVersion(6) if nr_version < 7: print("\tto 7") # fix migration for old branched databases if nr_version < 4: # version before start of migration try: # Add text fields for Tracks self.db.execute_sql("ALTER TABLE tracks ADD COLUMN text varchar(255)") except peewee.OperationalError: pass try: # Add text fields for Types self.db.execute_sql("ALTER TABLE types ADD COLUMN text varchar(255)") except peewee.OperationalError: pass self._SetVersion(7) if nr_version < 8: print("\tto 8") with self.db.transaction(): # fix for DB migration with missing paths table self.db.execute_sql('CREATE TABLE IF NOT EXISTS "paths" ("id" INTEGER NOT NULL PRIMARY KEY, "path" VARCHAR (255) NOT NULL);') self.db.execute_sql("ALTER TABLE paths RENAME TO path") self.db.execute_sql("ALTER TABLE images RENAME TO image") self.db.execute_sql('INSERT INTO path (id, path) VALUES(1, "")') self.db.execute_sql("UPDATE image SET path_id = 1") # fix for DB migration with missing paths table self.db.execute_sql('CREATE TABLE IF NOT EXISTS "offsets" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL,"x" REAL NOT NULL,"y" REAL NOT NULL, FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE);') self.db.execute_sql("ALTER TABLE offsets RENAME TO offset") self.db.execute_sql("ALTER TABLE tracks RENAME TO track") self.db.execute_sql("ALTER TABLE types RENAME TO markertype") self.db.execute_sql("ALTER TABLE masktypes RENAME TO masktype") self.db.execute_sql("ALTER TABLE tags RENAME TO tag") self._SetVersion(8) if nr_version < 9: print("\tto 9") with self.db.transaction(): # Add type fields for Track self.db.execute_sql("ALTER TABLE track ADD COLUMN type_id int") self.db.execute_sql("UPDATE track SET type_id = (SELECT type_id FROM marker WHERE track_id = track.id LIMIT 1)") self.db.execute_sql("DELETE FROM track WHERE type_id IS NULL") self._SetVersion(9) if nr_version < 10: print("\tto 10") with self.db.transaction(): # store mask_path and all masks self.db.execute_sql("PRAGMA foreign_keys = OFF") try: mask_path = self.db.execute_sql("SELECT * FROM meta WHERE key = 'mask_path'").fetchone()[2] except TypeError: mask_path = "" masks = self.db.execute_sql("SELECT id, image_id, filename FROM mask").fetchall() self.migrate_to_10_mask_path = mask_path self.migrate_to_10_masks = masks self.db.execute_sql("CREATE TABLE `mask_tmp` (`id` INTEGER NOT NULL, `image_id` INTEGER NOT NULL, `data` BLOB NOT NULL, PRIMARY KEY(id), FOREIGN KEY(`image_id`) REFERENCES 'image' ( 'id' ) ON DELETE CASCADE)") for mask in masks: tmp_maskpath = os.path.join(self.migrate_to_10_mask_path, mask[2]) if os.path.exists(tmp_maskpath): from PIL import Image as PILImage im = np.asarray(PILImage.open(tmp_maskpath)) value = imageio.imwrite(imageio.RETURN_BYTES, im, format=".png") value = peewee.binary_construct(value) self.db.execute_sql("INSERT INTO mask_tmp VALUES (?, ?, ?)", [mask[0], mask[1], value]) self.db.execute_sql("DROP TABLE mask") self.db.execute_sql("ALTER TABLE mask_tmp RENAME TO mask") self.db.execute_sql("PRAGMA foreign_keys = ON") self._SetVersion(10) if nr_version < 11: print("\tto 11") with self.db.transaction(): self.db.execute_sql("PRAGMA foreign_keys = OFF") self.db.execute_sql('CREATE TABLE "annotation_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL, "timestamp" DATETIME, "comment" TEXT NOT NULL, "rating" INTEGER NOT NULL, FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE)') self.db.execute_sql('INSERT INTO annotation_tmp SELECT id, image_id, timestamp, comment, rating FROM annotation') self.db.execute_sql("DROP TABLE annotation") self.db.execute_sql("ALTER TABLE annotation_tmp RENAME TO annotation") self.db.execute_sql('CREATE TABLE "tagassociation_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "annotation_id" INTEGER NOT NULL, "tag_id" INTEGER NOT NULL, FOREIGN KEY ("annotation_id") REFERENCES "annotation" ("id") ON DELETE CASCADE, FOREIGN KEY ("tag_id") REFERENCES "tag" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO tagassociation_tmp SELECT id, annotation_id, tag_id FROM tagassociation') self.db.execute_sql("DROP TABLE tagassociation") self.db.execute_sql("ALTER TABLE tagassociation_tmp RENAME TO tagassociation") self.db.execute_sql("DROP TABLE IF EXISTS basemodel") self.db.execute_sql('CREATE TABLE "image_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "filename" VARCHAR(255) NOT NULL, "ext" VARCHAR(10) NOT NULL, "frame" INTEGER, "external_id" INTEGER, "timestamp" DATETIME, "sort_index" INTEGER, "width" INTEGER, "height" INTEGER, "path_id" INTEGER, FOREIGN KEY ("path_id") REFERENCES "path" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO image_tmp SELECT id, filename, ext, frame, external_id, timestamp, sort_index, width, height, path_id FROM image') self.db.execute_sql("DROP TABLE image") self.db.execute_sql("ALTER TABLE image_tmp RENAME TO image") self.db.execute_sql('CREATE TABLE "marker_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL, "x" REAL NOT NULL, "y" REAL NOT NULL, "type_id" INTEGER, "processed" INTEGER NOT NULL, "partner_id" INTEGER, "track_id" INTEGER, "style" VARCHAR(255), "text" VARCHAR(255), FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE, FOREIGN KEY ("partner_id") REFERENCES "marker" ("id") ON DELETE SET NULL, FOREIGN KEY ("track_id") REFERENCES "track" ("id"));') self.db.execute_sql('INSERT INTO marker_tmp SELECT id, image_id, x, y, type_id, processed, partner_id, track_id, style, text FROM marker') self.db.execute_sql("DROP TABLE marker") self.db.execute_sql("ALTER TABLE marker_tmp RENAME TO marker") self.db.execute_sql('CREATE TABLE "offset_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL, "x" REAL NOT NULL, "y" REAL NOT NULL, FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO offset_tmp SELECT id, image_id, x, y FROM offset') self.db.execute_sql("DROP TABLE offset") self.db.execute_sql("ALTER TABLE offset_tmp RENAME TO offset") self.db.execute_sql('CREATE TABLE "track_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "uid" VARCHAR(255) NOT NULL, "style" VARCHAR(255), "text" VARCHAR(255), "type_id" INTEGER NOT NULL, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO track_tmp SELECT id, uid, style, text, type_id FROM track') self.db.execute_sql("DROP TABLE track") self.db.execute_sql("ALTER TABLE track_tmp RENAME TO track") self._SetVersion(11) if nr_version < 12: print("\tto 12") with self.db.transaction(): self.db.execute_sql("DELETE FROM meta WHERE key = 'version'") indexes = ['CREATE UNIQUE INDEX IF NOT EXISTS "image_filename_path_id_frame" ON "image" ("filename", "path_id", "frame");', 'CREATE UNIQUE INDEX IF NOT EXISTS "marker_image_id_track_id" ON "marker" ("image_id", "track_id");', 'CREATE INDEX IF NOT EXISTS "track_type_id" ON "track" ("type_id");', 'CREATE INDEX IF NOT EXISTS "marker_track_id" ON "marker" ("track_id");', 'CREATE INDEX IF NOT EXISTS "marker_type_id" ON "marker" ("type_id");', 'CREATE UNIQUE INDEX IF NOT EXISTS "markertype_name" ON "markertype" ("name");', 'CREATE UNIQUE INDEX IF NOT EXISTS "path_path" ON "path" ("path");', 'CREATE INDEX IF NOT EXISTS "image_path_id" ON "image" ("path_id");', 'CREATE INDEX IF NOT EXISTS "marker_image_id" ON "marker" ("image_id");', 'CREATE INDEX IF NOT EXISTS "tagassociation_tag_id" ON "tagassociation" ("tag_id");', 'CREATE UNIQUE INDEX IF NOT EXISTS "meta_key" ON "meta" ("key");', 'CREATE INDEX IF NOT EXISTS "marker_partner_id" ON "marker" ("partner_id");', 'CREATE INDEX IF NOT EXISTS "mask_image_id" ON "mask" ("image_id");', 'CREATE INDEX IF NOT EXISTS "tagassociation_annotation_id" ON "tagassociation" ("annotation_id");', 'CREATE UNIQUE INDEX IF NOT EXISTS "masktype_index" ON "masktype" ("index");', 'CREATE UNIQUE INDEX IF NOT EXISTS "offset_image_id" ON "offset" ("image_id");', 'CREATE INDEX IF NOT EXISTS "annotation_image_id" ON "annotation" ("image_id");'] for index in indexes: self.db.execute_sql(index) self._SetVersion(12) if nr_version < 13: print("\tto 13") with self.db.transaction(): self.db.execute_sql('CREATE TABLE "marker_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL, "x" REAL NOT NULL, "y" REAL NOT NULL, "type_id" INTEGER, "processed" INTEGER NOT NULL, "partner_id" INTEGER, "track_id" INTEGER, "style" VARCHAR(255), "text" VARCHAR(255), FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE, FOREIGN KEY ("partner_id") REFERENCES "marker" ("id") ON DELETE SET NULL, FOREIGN KEY ("track_id") REFERENCES "track" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO marker_tmp SELECT id, image_id, x, y, type_id, processed, partner_id, track_id, style, text FROM marker') self.db.execute_sql("DROP TABLE marker") self.db.execute_sql("ALTER TABLE marker_tmp RENAME TO marker") self.db.execute_sql('CREATE INDEX "marker_image_id" ON "marker" ("image_id")') self.db.execute_sql('CREATE UNIQUE INDEX "marker_image_id_track_id" ON "marker" ("image_id", "track_id")') self.db.execute_sql('CREATE INDEX "marker_partner_id" ON "marker" ("partner_id")') self.db.execute_sql('CREATE INDEX "marker_track_id" ON "marker" ("track_id")') self.db.execute_sql('CREATE INDEX "marker_type_id" ON "marker" ("type_id")') self.db.execute_sql('CREATE TRIGGER no_empty_tracks\ AFTER DELETE ON marker\ BEGIN\ DELETE FROM track WHERE id = OLD.track_id AND (SELECT COUNT(marker.id) FROM marker WHERE marker.track_id = track.id) = 0;\ END;') self._SetVersion(13) if nr_version < 14: print("\tto 14") with self.db.transaction(): # create new table line self.db.execute_sql('CREATE TABLE "line" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL, "x1" REAL NOT NULL, "y1" REAL NOT NULL, "x2" REAL NOT NULL, "y2" REAL NOT NULL, "type_id" INTEGER, "processed" INTEGER NOT NULL, "style" VARCHAR(255), "text" VARCHAR(255), FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE);') self.db.execute_sql('CREATE INDEX "line_image_id" ON "line" ("image_id");') self.db.execute_sql('CREATE INDEX "line_type_id" ON "line" ("type_id");') # migrate line marker to line self.db.execute_sql('INSERT INTO line SELECT m1.id, m1.image_id, m1.x AS x1, m1.y AS y1, m2.x AS x2, m2.y AS y2, m1.type_id, m1.processed, m1.style, m1.text FROM marker AS m1 JOIN markertype ON m1.type_id = markertype.id JOIN marker AS m2 ON m1.partner_id = m2.id WHERE m1.partner_id > m1.id AND mode == 2') self.db.execute_sql('DELETE FROM marker WHERE marker.id IN (SELECT marker.id FROM marker JOIN markertype ON marker.type_id = markertype.id WHERE mode == 2)') # create table rectangle self.db.execute_sql('CREATE TABLE "rectangle" ("id" INTEGER NOT NULL PRIMARY KEY, "image_id" INTEGER NOT NULL, "x" REAL NOT NULL, "y" REAL NOT NULL, "width" REAL NOT NULL, "height" REAL NOT NULL, "type_id" INTEGER, "processed" INTEGER NOT NULL, "style" VARCHAR(255), "text" VARCHAR(255), FOREIGN KEY ("image_id") REFERENCES "image" ("id") ON DELETE CASCADE, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE);') self.db.execute_sql('CREATE INDEX "rectangle_image_id" ON "rectangle" ("image_id");') self.db.execute_sql('CREATE INDEX "rectangle_type_id" ON "rectangle" ("type_id");') # migrate rectangle marker to rectangle self.db.execute_sql('INSERT INTO rectangle SELECT m1.id, m1.image_id, m1.x, m1.y, (m2.x-m1.x) AS width, (m2.y-m1.y) AS height, m1.type_id, m1.processed, m1.style, m1.text FROM marker AS m1 JOIN markertype ON m1.type_id = markertype.id JOIN marker AS m2 ON m1.partner_id = m2.id WHERE m1.partner_id > m1.id AND mode == 1') self.db.execute_sql('DELETE FROM marker WHERE marker.id IN (SELECT marker.id FROM marker JOIN markertype ON marker.type_id = markertype.id WHERE mode == 1)') self._SetVersion(14) if nr_version < 15: print("\tto 15") with self.db.transaction(): # remove uid from tracks self.db.execute_sql('CREATE TABLE "track_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "style" VARCHAR(255), "text" VARCHAR(255), "type_id" INTEGER NOT NULL, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO track_tmp SELECT id, style, text, type_id FROM track') self.db.execute_sql("DROP TABLE track") self.db.execute_sql("ALTER TABLE track_tmp RENAME TO track") self.db.execute_sql('CREATE INDEX "track_type_id" ON "track" ("type_id")') # make masktype unique self.db.execute_sql('CREATE TABLE "masktype_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL, "color" VARCHAR(255) NOT NULL, "index" INTEGER NOT NULL)') self.db.execute_sql('CREATE UNIQUE INDEX "masktype_tmp_index" ON "masktype_tmp" ("index");') self.db.execute_sql('CREATE UNIQUE INDEX "masktype_tmp_name" ON "masktype_tmp" ("name");') mask_types = self.db.execute_sql('SELECT * from masktype').fetchall() for mask_type in mask_types: i = 1 name = mask_type["name"] while True: try: self.db.execute_sql('INSERT INTO masktype_tmp ("id", "name", "color", "index") VALUES(?, ?, ?, ?)', [mask_type["id"], name, mask_type["color"], mask_type["index"]]) break except peewee.IntegrityError: i += 1 name = "%s%d" % (mask_type["name"], i) self.db.execute_sql("DROP TABLE masktype") self.db.execute_sql("ALTER TABLE masktype_tmp RENAME TO masktype") self.db.execute_sql('DROP INDEX "masktype_tmp_index"') self.db.execute_sql('DROP INDEX "masktype_tmp_name"') self.db.execute_sql('CREATE UNIQUE INDEX "masktype_index" ON "masktype" ("index");') self.db.execute_sql('CREATE UNIQUE INDEX "masktype_name" ON "masktype" ("name");') # make annotations unique self.db.execute_sql('DROP INDEX IF EXISTS "annotation_image_id"') self.db.execute_sql('CREATE UNIQUE INDEX "annotation_image_id" ON "annotation" ("image_id");') self._SetVersion(15) if nr_version < 16: print("\tto 16") with self.db.transaction(): try: self.db.execute_sql( 'CREATE TABLE "option" ("id" INTEGER NOT NULL PRIMARY KEY, "key" VARCHAR(255) NOT NULL, "value" VARCHAR(255))') self.db.execute_sql('CREATE UNIQUE INDEX "option_key" ON "option" ("key")') except peewee.OperationalError: pass self._SetVersion(16) if nr_version < 17: print("\tto 17") with self.db.transaction(): try: self.db.execute_sql( 'CREATE TABLE "markertype_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL, "color" VARCHAR(255) NOT NULL, "mode" INTEGER NOT NULL, "style" VARCHAR(255), "text" VARCHAR(255), "hidden" INTEGER NOT NULL);') self.db.execute_sql( 'INSERT INTO markertype_tmp SELECT id, name, color, mode, style, text, "0" FROM markertype') self.db.execute_sql("DROP TABLE markertype") self.db.execute_sql("ALTER TABLE markertype_tmp RENAME TO markertype") self.db.execute_sql('CREATE UNIQUE INDEX "markertype_name" ON "markertype" ("name");') self.db.execute_sql( 'CREATE TABLE "track_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "style" VARCHAR(255), "text" VARCHAR(255), "type_id" INTEGER NOT NULL, "hidden" INTEGER NOT NULL, FOREIGN KEY ("type_id") REFERENCES "markertype" ("id") ON DELETE CASCADE);') self.db.execute_sql( 'INSERT INTO track_tmp SELECT id, style, text, type_id, "0" FROM track') self.db.execute_sql("DROP TABLE track") self.db.execute_sql("ALTER TABLE track_tmp RENAME TO track") self.db.execute_sql('CREATE INDEX "track_type_id" ON "track" ("type_id");') except peewee.OperationalError: raise pass self._SetVersion(17) if nr_version < 18: print("\tto 18") with self.db.transaction(): try: self.db.execute_sql( 'CREATE TABLE "image_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "filename" VARCHAR(255) NOT NULL, "ext" VARCHAR(10) NOT NULL, "frame" INTEGER DEFAULT 0, "external_id" INTEGER, "timestamp" DATETIME, "sort_index" INTEGER NOT NULL, "width" INTEGER, "height" INTEGER, "path_id" INTEGER NOT NULL, "layer" INTEGER NOT NULL, FOREIGN KEY ("path_id") REFERENCES "path" ("id") ON DELETE CASCADE);') self.db.execute_sql( 'INSERT INTO image_tmp SELECT id, filename, ext, frame, external_id, timestamp, sort_index, width, height, path_id, "0" FROM image') self.db.execute_sql('DROP TABLE image') self.db.execute_sql('ALTER TABLE image_tmp RENAME TO image') self.db.execute_sql('CREATE INDEX "image_path_id" ON "image" ("path_id");') except peewee.OperationalError: raise pass self._SetVersion(18) if nr_version < 19: print("\tto 19") with self.db.transaction(): # crete a new table for the layers self.db.execute_sql( 'CREATE TABLE "layer" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL);') self.db.execute_sql('CREATE UNIQUE INDEX "layer_name" ON "layer" ("name")') # increase the layer index of every image (should now start with layer 1 instead of layer 0) self.db.execute_sql('UPDATE image SET layer = layer+1') # add layer entries for each layer referenced in the image table self.db.execute_sql('INSERT INTO layer SELECT DISTINCT layer, "layer " || layer FROM image') # convert the layer field to a foreign key field self.db.execute_sql('CREATE TABLE "image_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "filename" VARCHAR(255) NOT NULL, "ext" VARCHAR(10) NOT NULL, "frame" INTEGER NOT NULL, "external_id" INTEGER, "timestamp" DATETIME, "sort_index" INTEGER NOT NULL, "width" INTEGER, "height" INTEGER, "path_id" INTEGER NOT NULL, "layer_id" INTEGER, FOREIGN KEY ("path_id") REFERENCES "path" ("id") ON DELETE CASCADE, FOREIGN KEY ("layer_id") REFERENCES "layer" ("id") ON DELETE CASCADE);') self.db.execute_sql('INSERT INTO image_tmp SELECT * FROM image') self.db.execute_sql("DROP TABLE image") self.db.execute_sql("ALTER TABLE image_tmp RENAME TO image") # rename the first layer to "default" self.db.execute_sql('UPDATE layer SET name="default" WHERE id = 1') self._SetVersion(19) if nr_version < 20: print("\tto 20") with self.db.transaction(): # create a new tmp table with base_layer_id added as a foreign key field self.db.execute_sql('CREATE TABLE "layer_tmp" ("id" INTEGER NOT NULL PRIMARY KEY, "name" VARCHAR(255) NOT NULL, "base_layer_id" INTEGER NOT NULL, FOREIGN KEY ("base_layer_id") REFERENCES "layer" ("id"));') self.db.execute_sql('INSERT INTO layer_tmp SELECT id, name, (SELECT id FROM layer ORDER BY id LIMIT 1) FROM layer') self.db.execute_sql("DROP TABLE layer") self.db.execute_sql("ALTER TABLE layer_tmp RENAME TO layer") self.db.execute_sql('CREATE UNIQUE INDEX "layer_name" ON "layer" ("name");') self._SetVersion(20) if nr_version < 21: print("\tto 21") with self.db.transaction(): # create a new table for the ellipses self.db.execute_sql("""CREATE TABLE ellipse ( id INTEGER NOT NULL PRIMARY KEY, image_id INTEGER NOT NULL, x REAL NOT NULL, y REAL NOT NULL, width REAL NOT NULL, height REAL NOT NULL, angle REAL NOT NULL, type_id INTEGER, processed INTEGER NOT NULL, style VARCHAR (255), text VARCHAR (255), FOREIGN KEY ( image_id ) REFERENCES image (id) ON DELETE CASCADE, FOREIGN KEY ( type_id ) REFERENCES markertype (id) ON DELETE CASCADE ); """) self._SetVersion(21) if nr_version < 22: print("\tto 22") with self.db.transaction(): # create a new table for the polygons self.db.execute_sql("""CREATE TABLE polygon ( id INTEGER NOT NULL PRIMARY KEY, image_id INTEGER NOT NULL, type_id INTEGER, closed INTEGER NOT NULL, processed INTEGER NOT NULL, style VARCHAR (255), text VARCHAR (255), FOREIGN KEY ( image_id ) REFERENCES image (id) ON DELETE CASCADE, FOREIGN KEY ( type_id ) REFERENCES markertype (id) ON DELETE CASCADE ); """) self.db.execute_sql("""CREATE TABLE polygonpoint ( id INTEGER NOT NULL PRIMARY KEY, polygon_id INTEGER NOT NULL, x REAL NOT NULL, y REAL NOT NULL, [index] INTEGER NOT NULL, FOREIGN KEY ( polygon_id ) REFERENCES polygon (id) ON DELETE CASCADE ); """) self.db.execute_sql("""CREATE UNIQUE INDEX polygonpoint_polygon_id_index ON polygonpoint ( polygon_id, "index" ); """) self._SetVersion(22) self.db.connection().row_factory = None def _SetVersion(self, nr_new_version): self.db.execute_sql("INSERT OR REPLACE INTO meta (id,key,value) VALUES ( \ (SELECT id FROM meta WHERE key='version'),'version',%s)" % str( nr_new_version)) def _migrateDBFrom2(self, nr_version): nr_version = int(nr_version) if nr_version < 5: print("second migration step to 5") images = self.table_image.select().order_by(self.table_image.filename) for index, image in enumerate(images): image.sort_index = index image.frame = 0 image.save() if nr_version < 6: print("second migration step to 6") images = self.table_image.select().order_by(self.table_image.filename) for image in images: path = None if os.path.exists(image.filename): path = "" else: for root, dirs, files in os.walk("."): if image.filename in files: path = root break if path is None: print("ERROR: image not found", image.filename) continue try: path_entry = self.table_path.get(path=path) except peewee.DoesNotExist: path_entry = self.table_path(path=path) path_entry.save() image.path = path_entry image.save() def _CreateTables(self): for table in self._tables: table.create_table(fail_silently=True) def _checkTrackField(self, tracks): if not isinstance(tracks, (tuple, list)): tracks = [tracks] tracks = list(set([t for t in tracks if t is not None])) if len(tracks) == 0: return if self._SQLITE_MAX_VARIABLE_NUMBER is None: self._SQLITE_MAX_VARIABLE_NUMBER = self.max_sql_variables() chunk_size = (self._SQLITE_MAX_VARIABLE_NUMBER - 1) // 2 c=0 with self.db.atomic(): for idx in range(0, len(tracks), chunk_size): c += self.table_track.select().where(self.table_track.id << tracks[idx:idx + chunk_size]).count() if c != len(set(tracks)): raise TrackDoesNotExist("One or more tracks from the list {0} does not exist.".format(tracks)) def _processesTypeNameField(self, types, modes): mode_list = [] for mode in modes: mode_list.append(getattr(self, mode)) def CheckType(type): if isinstance(type, basestring): type_name = type type = self.getMarkerType(type) if type is None: raise MarkerTypeDoesNotExist("No marker type with the name \"%s\" exists." % type_name) if type is not None and type.mode not in mode_list: raise ValueError("Marker type \"%s\" is not a marker type with mode (allowed modes here: %s)" % (type.name, ", ".join(modes))) return type if isinstance(types, (tuple, list)): types = [CheckType(type) for type in types] else: types = CheckType(types) return types def _processLayerNameField(self, layers): def CheckLayer(layer): if isinstance(layer, basestring): layer_entry = self.getLayer(layer) if layer_entry is None: raise peewee.DoesNotExist("Layer with name \"%s\" does not exist" % layer) return layer_entry return layer if isinstance(layers, (tuple, list)): layers = [CheckLayer(layer) for layer in layers] else: layers = CheckLayer(layers) return layers def _processPathNameField(self, paths): def CheckPath(path): if isinstance(path, basestring): return self.getPath(path) return path if isinstance(paths, (tuple, list)): paths = [CheckPath(path) for path in paths] else: paths = CheckPath(paths) return paths def _processImagesField(self, images, frames, filenames, layer): if images is not None: if not isinstance(frames, (tuple, list)): # if a number is provided, than it is the id of an image in the database if isinstance(images, int): return self.getImage(id=images) # if not, it should be an image entry object return self.getImage(frame=images.sort_index, layer=images.layer.base_layer) new_images = [] for image in images: new_images.append(self.getImage(frame=image.sort_index, layer=image.layer.base_layer)) return new_images def CheckImageFrame(frame, layer): image = self.getImage(frame=frame, layer=layer) if image is None: raise ImageDoesNotExist("No image with the frame number %s exists." % frame) return image def CheckImageFilename(filename): image = self.getImage(filename=filename) if image is None: raise ImageDoesNotExist("No image with the filename \"%s\" exists." % filename) return image if frames is not None: if isinstance(frames, (tuple, list)): images = [CheckImageFrame(frame, layer) for frame in frames] else: images = CheckImageFrame(frames, layer) elif filenames is not None: if isinstance(filenames, (tuple, list)): images = [CheckImageFilename(filename) for filename in filenames] else: images = CheckImageFilename(filenames) return images
[docs] def getDbVersion(self): """ Returns the version of the currently opened database file. Returns ------- version : string the version of the database """ return self._current_version
[docs] def getPath(self, path_string=None, id=None, create=False, absolute=False): """ Get a :py:class:`Path` entry from the database. See also: :py:meth:`~.DataFile.getPaths`, :py:meth:`~.DataFile.setPath`, :py:meth:`~.DataFile.deletePaths` Parameters ---------- path_string: string, optional the string specifying the path. id: int, optional the id of the path. create: bool, optional whether the path should be created if it does not exist. (default: False) absolute: bool, optional whether the created path should be absolute or relative to the database file. (default: False) Returns ------- path : :py:class:`Path` the created/requested :py:class:`Path` entry. """ # check input assert any(e is not None for e in [id, path_string]), "Path and ID may not be both None" # collect arguments kwargs = {} # normalize the path, making it relative to the database file if path_string is not None: if not absolute: if self._database_filename: try: path_string = os.path.relpath(path_string, os.path.dirname(self._database_filename)) except ValueError: path_string = os.path.abspath(path_string) path_string = os.path.normpath(path_string) kwargs["path"] = path_string # add the id if id: kwargs["id"] = id # try to get the path try: path = self.table_path.get(**kwargs) # if not create it except peewee.DoesNotExist as err: if create: path = self.table_path(**kwargs) path.save() else: return None # return the path return path
[docs] def getPaths(self, path_string=None, base_path=None, id=None): """ Get all :py:class:`Path` entries from the database, which match the given criteria. If no critera a given, return all paths. See also: :py:meth:`~.DataFile.getPath`, :py:meth:`~.DataFile.setPath`, :py:meth:`~.DataFile.deletePaths` Parameters ---------- path_string : string, path_string, optional the string/s specifying the path/s. base_path : string, optional return only paths starting with the base_path string. id: int, array_like, optional the id/s of the path/s. Returns ------- entries : array_like a query object containing all the matching :py:class:`Path` entries in the database file. """ query = self.table_path.select() query = addFilter(query, id, self.table_path.id) query = addFilter(query, path_string, self.table_path.path) if base_path is not None: query = query.where(self.table_path.path.startswith(base_path)) return query
[docs] def setPath(self, path_string=None, id=None): """ Update or create a new :py:class:`Path` entry with the given parameters. See also: :py:meth:`~.DataFile.getPath`, :py:meth:`~.DataFile.getPaths`, :py:meth:`~.DataFile.deletePaths` Parameters ---------- path_string: string, optional the string specifying the path. id: int, optional the id of the paths. Returns ------- entries : :py:class:`Path` the changed or created :py:class:`Path` entry. """ try: path = self.table_path.get(**noNoneDict(id=id, path=path_string)) except peewee.DoesNotExist: path = self.table_path() setFields(path, dict(path=path_string)) path.save() return path
[docs] def deletePaths(self, path_string=None, base_path=None, id=None): """ Delete all :py:class:`Path` entries with the given criteria. See also: :py:meth:`~.DataFile.getPath`, :py:meth:`~.DataFile.getPaths`, :py:meth:`~.DataFile.setPath` Parameters ---------- path_string: string, optional the string/s specifying the paths. base_path: string, optional return only paths starting with the base_path string. id: int, optional the id/s of the paths. Returns ------- rows : int the number of affected rows. """ query = self.table_path.delete() query = addFilter(query, id, self.table_path.id) query = addFilter(query, path_string, self.table_path.path) if base_path is not None: query = query.where(self.table_path.path.startswith(base_path)) return query.execute()
[docs] def getLayer(self, layer_name=None, base_layer=None, id=None, create=False): """ Get a :py:class:`Layer` entry from the database. See also: :py:meth:`~.DataFile.getLayers`, :py:meth:`~.DataFile.setLayer`, :py:meth:`~.DataFile.deleteLayers` Parameters ---------- layer_name: string, optional the string specifying the layers name. base_layer : int, :py:class:`Layer`, optional the base layer to which this layer should reference. id: int, optional the id of the layer. create: bool, optional whether the layer should be created if it does not exist. (default: False) Returns ------- path : :py:class:`Layer` the created/requested :py:class:`Layer` entry. """ # check input assert any(e is not None for e in [id, layer_name]), "Name and ID may not be both None" # collect arguments kwargs = {} # normalize the path, making it relative to the database file if layer_name is not None: kwargs["name"] = layer_name # add the id if id: kwargs["id"] = id if base_layer is not None: kwargs["base_layer"] = base_layer # try to get the path try: layer = self.table_layer.get(**kwargs) # if not create it except peewee.DoesNotExist as err: if create: layer = self.table_layer(**kwargs) # if the base_layer is None, we want to create a self-referential entry, but as we don't know the id # of the new entry, we have to assign it later. So for now we just guess a valid id for a base_layer # reference if base_layer is None: # it is not possible to try_base_layer_id = 1 while True: try: layer.base_layer = try_base_layer_id except peewee.IntegrityError: pass else: break try_base_layer_id += 1 layer.save() # now the layer has been created and we can assign the self reference if base_layer is None: layer.base_layer = layer layer.save() else: return None # return the path return layer
[docs] def getLayers(self, layer_name=None, base_layer=None, id=None): """ Get all :py:class:`Layer` entries from the database, which match the given criteria. If no critera a given, return all layers. See also: :py:meth:`~.DataFile.getLayer`, :py:meth:`~.DataFile.setLayer`, :py:meth:`~.DataFile.deleteLayers` Parameters ---------- layer_name : string, optional the string/s specifying the layer name/s. base_layer : int, :py:class:`Layer`, optional the base layer to which this layer should reference. id: int, array_like, optional the id/s of the layer/s. Returns ------- entries : array_like a query object containing all the matching :py:class:`Layer` entries in the database file. """ query = self.table_layer.select() query = addFilter(query, id, self.table_layer.id) query = addFilter(query, layer_name, self.table_layer.name) query = addFilter(query, base_layer, self.table_layer.base_layer) return query
[docs] def setLayer(self, layer_name=None, base_layer=None, id=None): """ Update or create a new :py:class:`Layer` entry with the given parameters. See also: :py:meth:`~.DataFile.getLayer`, :py:meth:`~.DataFile.getLayers`, :py:meth:`~.DataFile.deleteLayers` Parameters ---------- layer_name: string, optional the string specifying the name of the layer. base_layer: int, :py:class:`Layer`, optional the base layer to which this layer should reference. id: int, optional the id of the layers. Returns ------- entries : :py:class:`Layer` the changed or created :py:class:`Layer` entry. """ base_layer = self._processLayerNameField(base_layer) try: layer = self.table_layer.get(**noNoneDict(id=id, name=layer_name, base_layer=base_layer)) except peewee.DoesNotExist: return self.getLayer(layer_name=layer_name, base_layer=base_layer, id=id, create=True) setFields(layer, dict(name=layer_name, base_layer=base_layer)) layer.save() return layer
[docs] def deleteLayers(self, layer_name=None, base_layer=None, id=None): """ Delete all :py:class:`Layer` entries with the given criteria. See also: :py:meth:`~.DataFile.getLayer`, :py:meth:`~.DataFile.getLayers`, :py:meth:`~.DataFile.setLayer` Parameters ---------- layer_name: string, optional the string/s specifying the name/s of the layer/s. base_layer: int, :py:class:`Layer`, optional the base layer to which this layer should reference. id: int, optional the id/s of the layers. Returns ------- rows : int the number of affected rows. """ query = self.table_layer.delete() query = addFilter(query, id, self.table_layer.id) query = addFilter(query, layer_name, self.table_layer.name) query = addFilter(query, base_layer, self.table_layer.base_layer) return query.execute()
[docs] def getImageCount(self): """ Returns the number of images in the database. Returns ------- count : int the number of images. """ return self.db.execute_sql("SELECT MAX(sort_index) FROM image LIMIT 1;").fetchone()[0] + 1
[docs] def getImage(self, frame=None, filename=None, id=None, layer=None): """ Returns the :py:class:`Image` entry with the given frame number and layer. See also: :py:meth:`~.DataFile.getImages`, :py:meth:`~.DataFile.getImageIterator`, :py:meth:`~.DataFile.setImage`, :py:meth:`~.DataFile.deleteImages`. Parameters ---------- frame : int, optional the frame number of the desired image, as displayed in ClickPoints. filename : string, optional the filename of the desired image. id : int, optional the id of the image. layer : int, string, optional the layer_id or name of the layer of the image. Returns ------- image : :py:class:`Image` the image entry. """ layer = self._processLayerNameField(layer) kwargs = noNoneDict(sort_index=frame, filename=filename, id=id, layer=layer) try: return self.table_image.get(**kwargs) except peewee.DoesNotExist: KeyError("No image with %s found." % VerboseDict(kwargs))
[docs] def getImages(self, frame=None, filename=None, ext=None, external_id=None, timestamp=None, width=None, height=None, path=None, layer=None, order_by="sort_index"): """ Get all :py:class:`Image` entries sorted by sort index. For large databases :py:meth:`~.DataFile.getImageIterator`, should be used as it doesn't load all frames at once. See also: :py:meth:`~.DataFile.getImage`, :py:meth:`~.DataFile.getImageIterator`, :py:meth:`~.DataFile.setImage`, :py:meth:`~.DataFile.deleteImages`. Parameters ---------- frame : int, array_like, optional the frame number/s of the image/s as displayed in ClickPoints (sort_index in the database). filename : string, array_like, optional the filename/s of the image/s. ext : string, array_like, optional the extension/s of the image/s. external_id : int, array_like, optional the external id/s of the image/s. timestamp : datetime, array_like, optional the timestamp/s of the image/s. width : int, array_like, optional the width/s of the image/s. height : int, array_like, optional the height/s of the image/s path : int, :py:class:`Path`, array_like, optional the path/s (or path id/s) of the image/s layer : int, string, array_like, optional the layer/s of the image/s order_by : string, optional sort by either 'sort_index' (default) or 'timestamp'. Returns ------- entries : array_like a query object containing all the :py:class:`Image` entries in the database file. """ layer = self._processLayerNameField(layer) query = self.table_image.select() query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, ext, self.table_image.ext) query = addFilter(query, external_id, self.table_image.external_id) query = addFilter(query, timestamp, self.table_image.timestamp) query = addFilter(query, width, self.table_image.width) query = addFilter(query, height, self.table_image.height) query = addFilter(query, path, self.table_image.path) query = addFilter(query, layer, self.table_image.layer) if order_by == "sort_index": query = query.order_by(self.table_image.sort_index) elif order_by == "timestamp": query = query.order_by(self.table_image.timestamp) else: raise Exception("Unknown order_by parameter - use sort_index or timestamp") return query
[docs] def getImageIterator(self, start_frame=0, end_frame=None, skip=1, layer=1): """ Get an iterator to iterate over all :py:class:`Image` entries starting from start_frame. See also: :py:meth:`~.DataFile.getImage`, :py:meth:`~.DataFile.getImages`, :py:meth:`~.DataFile.setImage`, :py:meth:`~.DataFile.deleteImages`. Parameters ---------- start_frame : int, optional start at the image with the number start_frame. Default is 0 end_frame : int, optional the last frame of the iteration (excluded). Default is None, the iteration stops when no more images are present. skip : int, optional how many frames to jump. Default is 1 layer : int, string, optional layer of frames, over which should be iterated. Returns ------- image_iterator : iterator an iterator object to iterate over :py:class:`Image` entries. Examples -------- .. code-block:: python :linenos: import clickpoints # open the database "data.cdb" db = clickpoints.DataFile("data.cdb") # iterate over all images and print the filename for image in db.GetImageIterator(): print(image.filename) """ layer = self._processLayerNameField(layer) frame = start_frame while True: if frame == end_frame: break try: image = self.table_image.get(self.table_image.sort_index == frame, self.table_image.layer == layer) yield image except peewee.DoesNotExist: break frame += skip
[docs] def setImage(self, filename=None, path=None, frame=None, external_id=None, timestamp=None, width=None, height=None, id=None, layer="default", sort_index=None): """ Update or create new :py:class:`Image` entry with the given parameters. See also: :py:meth:`~.DataFile.getImage`, :py:meth:`~.DataFile.getImages`, :py:meth:`~.DataFile.getImageIterator`, :py:meth:`~.DataFile.deleteImages`. Parameters ---------- filename : string, optional the filename of the image (including the extension) path : string, int, :py:class:`Path`, optional the path string, id or entry of the image to insert frame : int, optional the frame number if the image is part of a video external_id : int, optional an external id for the image. Only necessary if the annotation server is used timestamp : datetime object, optional the timestamp of the image width : int, optional the width of the image height : int, optional the height of the image id : int, optional the id of the image layer : int, string, optional the layer_id of the image, always use with sort_index sort_index: int, only use with layer the sort index (position in the time line) if not in layer 0 Returns ------- image : :py:class:`Image` the changed or created :py:class:`Image` entry """ try: item = self.table_image.get(id=id, filename=filename) new_image = False except peewee.DoesNotExist: item = self.table_image() new_image = True if isinstance(layer, basestring): layer = self.getLayer(layer, create=True) if filename is not None: item.filename = os.path.split(filename)[1] item.ext = os.path.splitext(filename)[1] if path is None: item.path = self.getPath(path_string=os.path.split(filename)[0], create=True) if isinstance(path, basestring): path = self.getPath(path) setFields(item, noNoneDict(frame=frame, path=path, external_id=external_id, timestamp=timestamp, width=width, height=height, layer=layer, sort_index=sort_index)) if new_image: if sort_index is None: if self._next_sort_index is None: try: self._next_sort_index = self.db.execute_sql("SELECT MAX(sort_index) FROM image LIMIT 1;").fetchone()[0] + 1 except IndexError: self._next_sort_index = 0 item.sort_index = self._next_sort_index self._next_sort_index += 1 item.save() return item
[docs] def deleteImages(self, filename=None, path=None, frame=None, external_id=None, timestamp=None, width=None, height=None, id=None, layer=None): """ Delete all :py:class:`Image` entries with the given criteria. See also: :py:meth:`~.DataFile.getImage`, :py:meth:`~.DataFile.getImages`, :py:meth:`~.DataFile.getImageIterator`, :py:meth:`~.DataFile.setImage`. Parameters ---------- filename : string, array_like, optional the filename/filenames of the image (including the extension) path : string, int, :py:class:`Path`, array_like optional the path string, id or entry of the image to insert frame : int, array_like, optional the number/numbers of frames the images have external_id : int, array_like, optional an external id/ids for the images. Only necessary if the annotation server is used timestamp : datetime object, array_like, optional the timestamp/timestamps of the images width : int, array_like, optional the width/widths of the images height : int, optional the height/heights of the images id : int, array_like, optional the id/ids of the images layer: int, array_like, optional the layer/layers of the images Returns ------- rows : int the number of affected rows. """ query = self.table_image.delete() layer = self._processLayerNameField(layer) path = self._processPathNameField(path) query = addFilter(query, id, self.table_image.id) query = addFilter(query, path, self.table_image.path) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, frame, self.table_image.frame) query = addFilter(query, external_id, self.table_image.external_id) query = addFilter(query, timestamp, self.table_image.timestamp) query = addFilter(query, width, self.table_image.width) query = addFilter(query, height, self.table_image.height) query = addFilter(query, layer, self.table_image.layer) return query.execute()
[docs] def getTracks(self, type=None, text=None, hidden=None, id=None): """ Get all :py:class:`Track` entries, optional filter by type See also: :py:meth:`~.DataFile.getTrack`, :py:meth:`~.DataFile.setTrack`, :py:meth:`~.DataFile.deleteTracks`, :py:meth:`~.DataFile.getTracksNanPadded`. Parameters ---------- type: :py:class:`MarkerType`, str, array_like, optional the marker type/types or name of the marker type for the track. text : str, array_like, optional the :py:class:`Track` specific text entry hidden : bool, array_like, optional whether the tracks should be displayed in ClickPoints id : int, array_like, optional the :py:class:`Track` ID Returns ------- entries : array_like a query object which contains the requested :py:class:`Track`. """ type = self._processesTypeNameField(type, ["TYPE_Track"]) query = self.table_track.select() query = addFilter(query, type, self.table_track.type) query = addFilter(query, text, self.table_track.text) query = addFilter(query, hidden, self.table_track.hidden) query = addFilter(query, id, self.table_track.id) return query
[docs] def getTrack(self, id): """ Get a specific :py:class:`Track` entry by its database ID. See also: :py:meth:`~.DataFile.getTracks`, :py:meth:`~.DataFile.deleteTracks`, :py:meth:`~.DataFile.getTracksNanPadded`. Parameters ---------- id: int id of the track Returns ------- entries : :py:class:`Track` requested object of class :py:class:`Track` or None """ try: return self.table_track.get(id=id) except peewee.DoesNotExist: return None
[docs] def setTrack(self, type, style=None, text=None, hidden=None, id=None, uid=None): """ Insert or update a :py:class:`Track` object. See also: :py:meth:`~.DataFile.getTrack`, :py:meth:`~.DataFile.getTracks`, :py:meth:`~.DataFile.deleteTracks`, :py:meth:`~.DataFile.getTracksNanPadded`. Parameters ---------- type: :py:class:`MarkerType`, str the marker type or name of the marker type for the track. style: the :py:class:`Track` specific style entry text : the :py:class:`Track` specific text entry hidden : wether the track should be displayed in ClickPoints id : int, array_like the :py:class:`Track` ID Returns ------- track : track object a new :py:class:`Track` object """ type = self._processesTypeNameField(type, ["TYPE_Track"]) # gather all the parameters that are not none parameters = locals() parameters = {key: parameters[key] for key in ["id", "type", "style", "text", "hidden"] if parameters[key] is not None} # insert and item with the given parameters item = self.table_track.replace(**parameters).execute() item = self.table_track.get(id=item) return item
[docs] def deleteTracks(self, type=None, text=None, hidden=None, id=None): """ Delete a single :py:class:`Track` object specified by id or all :py:class:`Track` object of an type See also: :py:meth:`~.DataFile.getTrack`, :py:meth:`~.DataFile.getTracks`, :py:meth:`~.DataFile.setTrack`, :py:meth:`~.DataFile.getTracksNanPadded`. Parameters ---------- type: :py:class:`MarkerType`, str, array_like, optional the marker type or name of the marker type text : str, array_like, optional the :py:class:`Track` specific text entry hidden : bool, array_like, optional whether the tracks should be displayed in ClickPoints id : int, array_like, array_like, optional the :py:class:`Track` ID Returns ------- rows : int the number of affected rows. """ type = self._processesTypeNameField(type, ["TYPE_Track"]) query = self.table_track.delete() query = addFilter(query, id, self.table_track.id) query = addFilter(query, text, self.table_track.text) query = addFilter(query, hidden, self.table_track.hidden) query = addFilter(query, type, self.table_track.type) return query.execute()
[docs] def getMarkerTypes(self, name=None, color=None, mode=None, text=None, hidden=None, id=None): """ Retreive all :py:class:`MarkerType` objects in the database. See also: :py:meth:`~.DataFile.getMarkerType`, :py:meth:`~.DataFile.setMarkerType`, :py:meth:`~.DataFile.deleteMarkerTypes`. Parameters ---------- name: str, array_like, optional the name of the type color: str, array_like, optional hex code string for rgb color of style "#00ff3f" mode: int, array_like, optional mode of the marker type (marker 0, rect 1, line 2, track 4) text: str, array_like, optional display text hidden: bool, array_like, optional whether the types should be displayed in ClickPoints id: int, array_like, optional id of the :py:class:`MarkerType` object Returns ------- entries : array_like a query object which contains all :py:class:`MarkerType` entries. """ query = self.table_markertype.select() query = addFilter(query, name, self.table_markertype.name) query = addFilter(query, color, self.table_markertype.color) query = addFilter(query, mode, self.table_markertype.mode) query = addFilter(query, text, self.table_markertype.text) query = addFilter(query, hidden, self.table_markertype.hidden) query = addFilter(query, id, self.table_markertype.id) return query
[docs] def getMarkerType(self, name=None, id=None): """ Retrieve an :py:class:`MarkerType` object from the database. See also: :py:meth:`~.DataFile.getMarkerTypes`, :py:meth:`~.DataFile.setMarkerType`, :py:meth:`~.DataFile.deleteMarkerTypes`. Parameters ---------- name: str, optional the name of the desired type id: int, optional id of the :py:class:`MarkerType` object Returns ------- entries : array_like the :py:class:`MarkerType` with the desired name or None. """ try: return self.table_markertype.get(**noNoneDict(name=name, id=id)) except peewee.DoesNotExist: return None
[docs] def setMarkerType(self, name=None, color=None, mode=None, style=None, text=None, hidden=None, id=None): """ Insert or update an :py:class:`MarkerType` object in the database. See also: :py:meth:`~.DataFile.getMarkerType`, :py:meth:`~.DataFile.getMarkerTypes`, :py:meth:`~.DataFile.deleteMarkerTypes`. Parameters ---------- name: str, optional the name of the type color: str, optional hex code string for rgb color of style "#00ff3f" mode: int, optional mode of the marker type (marker 0, rect 1, line 2, track 4) style: str, optional style string text: str, optional display text hidden: bool, optional whether the type should be displayed in ClickPoints id: int, optional id of the :py:class:`MarkerType` object Returns ------- entries : object the created :py:class:`MarkerType` with the desired name or None. """ try: item = self.table_markertype.get(**noNoneDict(id=id, name=name)) except peewee.DoesNotExist: item = self.table_markertype() if color is not None: color = CheckValidColor(color) setFields(item, dict(name=name, color=color, mode=mode, style=style, text=text, hidden=hidden)) item.save() return item
[docs] def deleteMarkerTypes(self, name=None, color=None, mode=None, text=None, hidden=None, id=None): """ Delete all :py:class:`MarkerType` entries from the database, which match the given criteria. See also: :py:meth:`~.DataFile.getMarkerType`, :py:meth:`~.DataFile.getMarkerTypes`, :py:meth:`~.DataFile.setMarkerType`. Parameters ---------- name: str, array_like, optional the name of the type color: str, array_like, optional hex code string for rgb color of style "#00ff3f" mode: int, array_like, optional mode of the marker type (marker 0, rect 1, line 2, track 4) text: str, array_like, optional display text hidden: bool, array_like, optional whether the types should be displayed in ClickPoints id: int, array_like, optional id of the :py:class:`MarkerType` object Returns ------- entries : int nr of deleted entries """ query = self.table_markertype.delete() query = addFilter(query, name, self.table_markertype.name) query = addFilter(query, color, self.table_markertype.color) query = addFilter(query, mode, self.table_markertype.mode) query = addFilter(query, text, self.table_markertype.text) query = addFilter(query, hidden, self.table_markertype.hidden) query = addFilter(query, id, self.table_markertype.id) return query.execute()
[docs] def getMaskType(self, name=None, color=None, index=None, id=None): """ Get a :py:class:`MaskType` from the database. See also: :py:meth:`~.DataFile.getMaskTypes`, :py:meth:`~.DataFile.setMaskType`, :py:meth:`~.DataFile.deleteMaskTypes`. Parameters ---------- name : string, optional the name of the mask type. color : string, optional the color of the mask type. index : int, optional the index of the mask type, which is used for painting this mask type. id : int, optional the id of the mask type. Returns ------- entries : :py:class:`MaskType` the created/requested :py:class:`MaskType` entry. """ # check input assert any(e is not None for e in [id, name, color, index]), "Path, ID, color and index may not be all None" if color: color = NormalizeColor(color) # try to get the path try: return self.table_masktype.get(**noNoneDict(id=id, name=name, color=color, index=index)) # if not create it except peewee.DoesNotExist: return None
[docs] def getMaskTypes(self, name=None, color=None, index=None, id=None): """ Get all :py:class:`MaskType` entries from the database, which match the given criteria. If no criteria a given, return all mask types. See also: :py:meth:`~.DataFile.getMaskType`, :py:meth:`~.DataFile.setMaskType`, :py:meth:`~.DataFile.deleteMaskTypes`. Parameters ---------- name : string, array_like, optional the name/names of the mask types. color : string, array_like, optional the color/colors of the mask types. index : int, array_like, optional the index/indices of the mask types, which is used for painting this mask types. id : int, array_like, optional the id/ids of the mask types. Returns ------- entries : array_like a query object containing all the matching :py:class:`MaskType` entries in the database file. """ query = self.table_masktype.select() if color: color = NormalizeColor(color) query = addFilter(query, id, self.table_masktype.id) query = addFilter(query, name, self.table_masktype.name) query = addFilter(query, color, self.table_masktype.color) query = addFilter(query, index, self.table_masktype.index) return query
[docs] def setMaskType(self, name=None, color=None, index=None, id=None): """ Update or create a new a :py:class:`MaskType` entry with the given parameters. See also: :py:meth:`~.DataFile.getMaskType`, :py:meth:`~.DataFile.getMaskTypes`, :py:meth:`~.DataFile.setMaskType`, :py:meth:`~.DataFile.deleteMaskTypes`. Parameters ---------- name : string, optional the name of the mask type. color : string, optional the color of the mask type. index : int, optional the index of the mask type, which is used for painting this mask type. id : int, optional the id of the mask type. Returns ------- entries : :py:class:`MaskType` the changed or created :py:class:`MaskType` entry. """ # normalizer and check color values if color: color = NormalizeColor(color) # get lowest free index is not specified by user if not index: index_list = [l.index for l in self.table_masktype.select().order_by(self.table_masktype.index)] free_idxs = list(set(range(1, 254)) - set(index_list)) new_index = free_idxs[0] else: new_index = index try: # only use id if multiple unique fields are specified if id: mask_type = self.table_masktype.get(id=id) elif name: mask_type = self.table_masktype.get(name=name) else: raise peewee.DoesNotExist # if no desired index is provided keep the existing index if index is None or mask_type.index is not None: new_index = mask_type.index except peewee.DoesNotExist: mask_type = self.table_masktype() setFields(mask_type, dict(name=name, color=color, index=new_index)) mask_type.save() return mask_type
[docs] def deleteMaskTypes(self, name=None, color=None, index=None, id=None): """ Delete all :py:class:`MaskType` entries from the database, which match the given criteria. See also: :py:meth:`~.DataFile.getMaskType`, :py:meth:`~.DataFile.getMaskTypes`, :py:meth:`~.DataFile.setMaskType`. Parameters ---------- name : string, array_like, optional the name/names of the mask types. color : string, array_like, optional the color/colors of the mask types. index : int, array_like, optional the index/indices of the mask types, which is used for painting this mask types. id : int, array_like, optional the id/ids of the mask types. """ query = self.table_masktype.delete() # normalize and check color values if color: color = NormalizeColor(color) query = addFilter(query, id, self.table_masktype.id) query = addFilter(query, name, self.table_masktype.name) query = addFilter(query, color, self.table_masktype.color) query = addFilter(query, index, self.table_masktype.index) query.execute()
""" Masks """
[docs] def getMask(self, image=None, frame=None, filename=None, id=None, layer=None, create=False): """ Get the :py:class:`Mask` entry for the given image frame number or filename. See also: :py:meth:`~.DataFile.getMasks`, :py:meth:`~.DataFile.setMask`, :py:meth:`~.DataFile.deleteMasks`. Parameters ---------- image : int, :py:class:`Image`, optional the image for which the mask should be retrieved. If omitted, frame number or filename should be specified instead. frame : int, optional frame number of the image, which mask should be returned. If omitted, image or filename should be specified instead. filename : string, optional filename of the image, which mask should be returned. If omitted, image or frame number should be specified instead. id : int, optional id of the mask entry. create : bool, optional whether the mask should be created if it does not exist. (default: False) layer: int, optional layer of the image, which mask should be returned. Always use with frame. Returns ------- mask : :py:class:`Mask` the desired :py:class:`Mask` entry. """ # check input # assert sum(e is not None for e in [id, image, frame, filename]) == 1, \ # "Exactly one of image, frame or filename should be specified or should be referenced by it's id." assert sum(e is not None for e in [id, image, frame, filename]) == 1, \ "Image, frame (with layer) or filename should be specified or should be referenced by it's id." image = self._processImagesField(image, frame, filename, layer) query = self.table_mask.select(self.table_mask, self.table_image).join(self.table_image) query = addFilter(query, id, self.table_mask.id) query = addFilter(query, image, self.table_mask.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, layer, self.table_image.layer) query.limit(1) try: return query[0] except IndexError: if create is True: if not image: image = self.getImage(frame=frame, filename=filename, layer=layer) if not image: raise ImageDoesNotExist("No parent image found ") try: data = np.zeros(image.getShape()) except IOError: raise MaskDimensionUnknown("Can't retrieve dimensions for mask from image %s " % image.filename) mask = self.table_mask(image=image, data=data) mask.save() return mask return None
[docs] def getMasks(self, image=None, frame=None, filename=None, id=None, layer=None, order_by="sort_index"): """ Get all :py:class:`Mask` entries from the database, which match the given criteria. If no criteria a given, return all masks. See also: :py:meth:`~.DataFile.getMask`, :py:meth:`~.DataFile.setMask`, :py:meth:`~.DataFile.deleteMasks`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/images for which the mask should be retrieved. If omitted, frame numbers or filenames should be specified instead. frame: int, array_like, optional frame number/numbers of the images, which masks should be returned. If omitted, images or filenames should be specified instead. filename: string, array_like, optional filename of the image/images, which masks should be returned. If omitted, images or frame numbers should be specified instead. id : int, array_like, optional id/ids of the masks. layer : int, optional layer of the images, which masks should be returned. Always use with frame. order_by: string, optional sorts the result according to sort paramter ('sort_index' or 'timestamp') Returns ------- entries : :py:class:`Mask` a query object containing all the matching :py:class:`Mask` entries in the database file. """ # check input assert sum(e is not None for e in [image, frame, filename]) <= 1, \ "Exactly one of images, frames or filenames should be specified" if layer is not None: assert frame is not None, \ "Frame should be specified, if layer is given." image = self._processImagesField(image, frame, filename, layer) query = self.table_mask.select(self.table_mask, self.table_image).join(self.table_image) query = addFilter(query, id, self.table_mask.id) query = addFilter(query, image, self.table_mask.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, layer, self.table_image.layer) if order_by == "sort_index": query = query.order_by(self.table_image.sort_index) elif order_by == "timestamp": query = query.order_by(self.table_image.timestamp) else: raise Exception("Unknown order_by parameter - use sort_index or timestamp") class QuerySelector(type(query)): def __iter__(self): return self.iterator() query.__class__ = QuerySelector return query
[docs] def setMask(self, image=None, frame=None, filename=None, data=None, id=None, layer=None, checkShape=False): """ Update or create new :py:class:`Mask` entry with the given parameters. See also: :py:meth:`~.DataFile.getMask`, :py:meth:`~.DataFile.getMasks`, :py:meth:`~.DataFile.deleteMasks`. Parameters ---------- image : int, :py:class:`Image`, optional the image for which the mask should be set. If omitted, frame number or filename should be specified instead. frame: int, optional frame number of the images, which masks should be set. If omitted, image or filename should be specified instead. filename: string, optional filename of the image, which masks should be set. If omitted, image or frame number should be specified instead. data: ndarray, optional the mask data of the mask to set. Must have the same dimensions as the corresponding image, but only one channel, and it should be using the data type uint8. id : int, optional id of the mask entry. layer: int, optional the layer of the image, which masks should be set. always use with frame. checkShape: bool, optional check if the maks and image have the same shape Returns ------- mask : :py:class:`Mask` the changed or created :py:class:`Mask` entry. """ # check input assert sum(e is not None for e in [id, image, frame, filename]) == 1, \ "Exactly one of image, frame or filename should be specified or an id" if layer is not None: assert frame is not None, \ "Frame should be specified, if layer is given." image = self._processImagesField(image, frame, filename, layer) mask = self.getMask(image=image, filename=filename, id=id) # get image object if not image: image = self.getImage(frame=frame, filename=filename, layer=layer) if not image: raise ImageDoesNotExist("No matching parent image found (%s)" % filename) # verify data if data is not None: if not data.dtype == np.uint8: raise MaskDtypeMismatch("mask.data dtype is not of type uint8") if checkShape: try: if not tuple(data.shape) == image.getShape(): raise MaskDimensionMismatch("mask.data shape doesn't match image dimensions!") except IOError: UserWarning("Couldn't retrieve image dimension - shape verification not possible ") # create mask element if not mask: # create and verify data if data is None: try: data = np.zeros(image.getShape()) except IOError: raise MaskDimensionUnknown("Can't retreive dimensions for mask from image %s " % image.filename) else: if not data.dtype == np.uint8: raise MaskDtypeMismatch("mask.data dtype is not of type uint8") if checkShape: try: if not tuple(data.shape) == image.getShape(): raise MaskDimensionMismatch("mask.data shape doesn't match image dimensions!") except IOError: UserWarning("Couldn't retrieve image dimension - shape verification not possible ") mask = self.table_mask(image=image, data=data) setFields(mask, dict(data=data, image=image)) if frame is not None or filename is not None: mask.image = self.getImage(frame=frame, filename=filename, layer=layer) mask.save() return mask
[docs] def deleteMasks(self, image=None, frame=None, filename=None, id=None, layer=None): """ Delete all :py:class:`Mask` entries with the given criteria. See also: :py:meth:`~.DataFile.getMask`, :py:meth:`~.DataFile.getMasks`, :py:meth:`~.DataFile.setMask`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/images for which the mask should be deleted. If omitted, frame numbers or filenames should be specified instead. frame: int, array_like, optional frame number/numbers of the images, which masks should be deleted. If omitted, images or filenames should be specified instead. filename: string, array_like, optional filename of the image/images, which masks should be deleted. If omitted, images or frame numbers should be specified instead. id : int, array_like, optional id/ids of the masks. layer: int, array_like, optional layer/layers of the images, which masks should be deleted. Always use with frame! """ # check input assert sum(e is not None for e in [image, frame, filename]) <= 1, \ "Exactly one of images, frames or filenames should be specified" if layer is not None: assert frame is not None, \ "Frame should be specified, if layer is given." query = self.table_mask.delete() if image is None: images = self.table_image.select() images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) images = addFilter(images, layer, self.table_image.layer) query = query.where(self.table_mask.image.in_(images)) else: query = addFilter(query, image, self.table_mask.image) query = addFilter(query, id, self.table_mask.id) query.execute()
""" Markers """
[docs] def getMarker(self, id): """ Retrieve an :py:class:`Marker` object from the database. See also: :py:meth:`~.DataFile.getMarkers`, :py:meth:`~.DataFile.setMarker`, :py:meth:`~.DataFile.setMarkers`, :py:meth:`~.DataFile.deleteMarkers`. Parameters ---------- id: int the id of the marker Returns ------- marker : :py:class:`Marker` the :py:class:`Marker` with the desired id or None. """ try: return self.table_marker.get(id=id) except peewee.DoesNotExist: return None
[docs] def getMarkers(self, image=None, frame=None, filename=None, x=None, y=None, type=None, processed=None, track=None, text=None, id=None, layer=None): """ Get all :py:class:`Marker` entries with the given criteria. See also: :py:meth:`~.DataFile.getMarker`, :py:meth:`~.DataFile.getMarkers`, :py:meth:`~.DataFile.setMarker`, :py:meth:`~.DataFile.setMarkers`, :py:meth:`~.DataFile.deleteMarkers`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the markers. frame : int, array_like, optional the frame/s of the images of the markers. filename : string, array_like, optional the filename/s of the images of the markers. x : int, array_like, optional the x coordinate/s of the markers. y : int, array_like, optional the y coordinate/s of the markers. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the markers. processed : int, array_like, optional the processed flag/s of the markers. track : int, :py:class:`Track`, array_like, optional the track id/s or instance/s of the markers. text : string, array_like, optional the text/s of the markers. id : int, array_like, optional the id/s of the markers. layer : int, optional the layer of the markers Returns ------- entries : array_like a query object which contains all :py:class:`Marker` entries. """ type = self._processesTypeNameField(type, ["TYPE_Normal", "TYPE_Track"]) query = self.table_marker.select(self.table_marker, self.table_image).join(self.table_image) image = self._processImagesField(image, frame, filename, layer) query = addFilter(query, id, self.table_marker.id) query = addFilter(query, image, self.table_marker.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, layer, self.table_image.layer) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, x, self.table_marker.x) query = addFilter(query, y, self.table_marker.y) query = addFilter(query, type, self.table_marker.type) query = addFilter(query, processed, self.table_marker.processed) query = addFilter(query, track, self.table_marker.track) query = addFilter(query, text, self.table_marker.text) # define the __array__ method of the query to make np.array(db.getMarkers()) possible query.__array__ = lambda: np.array([p.pos() for p in query]) return query
[docs] def setMarker(self, image=None, frame=None, filename=None, x=None, y=None, type=None, processed=None, track=None, style=None, text=None, id=None, layer=None): """ Insert or update an :py:class:`Marker` object in the database. See also: :py:meth:`~.DataFile.getMarker`, :py:meth:`~.DataFile.getMarkers`, :py:meth:`~.DataFile.setMarkers`, :py:meth:`~.DataFile.deleteMarkers`. Parameters ---------- image : int, :py:class:`Image`, optional the image of the marker. frame : int, optional the frame of the images of the marker. filename : string, optional the filename of the image of the marker. x : int, optional the x coordinate of the marker. y : int, optional the y coordinate of the marker. type : string, :py:class:`MarkerType`, optional the marker type (or name) of the marker. processed : int, optional the processed flag of the marker. track : int, :py:class:`Track`, optional the track id or instance of the marker. text : string, optional the text of the marker. id : int, optional the id of the marker. layer: int, optional the layer of the image of the marker Returns ------- marker : :py:class:`Marker` the created or changed :py:class:`Marker` item. """ assert not (id is None and type is None and track is None), "Marker must either have a type or a track or be referenced by it's id." assert not (id is None and image is None and frame is None and filename is None), "Marker must have an image, frame or filename given or be referenced by it's id." try: item = self.table_marker.get(id=id) except peewee.DoesNotExist: item = self.table_marker() type = self._processesTypeNameField(type, ["TYPE_Normal", "TYPE_Track"]) if track is not None: self._checkTrackField(track) image = self._processImagesField(image, frame, filename, layer) setFields(item, dict(image=image, x=x, y=y, type=type, processed=processed, track=track, style=style, text=text)) item.save() return item
[docs] def setMarkers(self, image=None, frame=None, filename=None, x=None, y=None, type=None, processed=None, track=None, style=None, text=None, id=None, layer=None): """ Insert or update multiple :py:class:`Marker` objects in the database. See also: :py:meth:`~.DataFile.getMarker`, :py:meth:`~.DataFile.getMarkers`, :py:meth:`~.DataFile.setMarker`, :py:meth:`~.DataFile.deleteMarkers`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the markers. frame : int, array_like, optional the frame/s of the images of the markers. filename : string, array_like, optional the filename/s of the images of the markers. x : int, array_like, optional the x coordinate/s of the markers. y : int, array_like, optional the y coordinate/s of the markers. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the markers. processed : int, array_like, optional the processed flag/s of the markers. track : int, :py:class:`Track`, array_like, optional the track id/s or instance/s of the markers. text : string, array_like, optional the text/s of the markers. id : int, array_like, optional the id/s of the markers. layer: int, optional the layer of the images Returns ------- success : bool it the inserting was successful. """ type = self._processesTypeNameField(type, ["TYPE_Normal", "TYPE_Track"]) if track is not None: self._checkTrackField(track) image = self._processImagesField(image, frame, filename, layer) data = packToDictList(self.table_marker, id=id, image=image, x=x, y=y, processed=processed, type=type, track=track, style=style, text=text) return self.saveReplaceMany(self.table_marker, data)
[docs] def deleteMarkers(self, image=None, frame=None, filename=None, x=None, y=None, type=None, processed=None, track=None, text=None, id=None, layer=None): """ Delete all :py:class:`Marker` entries with the given criteria. See also: :py:meth:`~.DataFile.getMarker`, :py:meth:`~.DataFile.getMarkers`, :py:meth:`~.DataFile.setMarker`, :py:meth:`~.DataFile.setMarkers`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the markers. frame : int, array_like, optional the frame/s of the images of the markers. filename : string, array_like, optional the filename/s of the images of the markers. x : int, array_like, optional the x coordinate/s of the markers. y : int, array_like, optional the y coordinate/s of the markers. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the markers. processed : int, array_like, optional the processed flag/s of the markers. track : int, :py:class:`Track`, array_like, optional the track id/s or instance/s of the markers. text : string, array_like, optional the text/s of the markers. id : int, array_like, optional the id/s of the markers. Returns ------- rows : int the number of affected rows. """ type = self._processesTypeNameField(type, ["TYPE_Normal", "TYPE_Track"]) query = self.table_marker.delete() image = self._processImagesField(image, frame, filename, layer) if image is None and (frame is not None or filename is not None): images = self.table_image.select(self.table_image.id) images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) query = query.where(self.table_marker.image.in_(images)) else: query = addFilter(query, image, self.table_marker.image) query = addFilter(query, id, self.table_marker.id) query = addFilter(query, x, self.table_marker.x) query = addFilter(query, y, self.table_marker.y) query = addFilter(query, type, self.table_marker.type) query = addFilter(query, processed, self.table_marker.processed) query = addFilter(query, track, self.table_marker.track) query = addFilter(query, text, self.table_marker.text) return query.execute()
""" Lines """
[docs] def getLine(self, id): """ Retrieve an :py:class:`Line` object from the database. See also: :py:meth:`~.DataFile.getLines`, :py:meth:`~.DataFile.setLine`, :py:meth:`~.DataFile.setLines`, :py:meth:`~.DataFile.deleteLines`. Parameters ---------- id: int the id of the line Returns ------- line : :py:class:`Line` the :py:class:`Line` with the desired id or None. """ try: return self.table_line.get(id=id) except peewee.DoesNotExist: return None
[docs] def getLines(self, image=None, frame=None, filename=None, x1=None, y1=None, x2=None, y2=None, type=None, processed=None, text=None, id=None): """ Get all :py:class:`Line` entries with the given criteria. See also: :py:meth:`~.DataFile.getLine`, :py:meth:`~.DataFile.setLine`, :py:meth:`~.DataFile.setLines`, :py:meth:`~.DataFile.deleteLines`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the lines. frame : int, array_like, optional the frame/s of the images of the lines. filename : string, array_like, optional the filename/s of the images of the lines. x1 : int, array_like, optional the x coordinate/s of the lines start. y1 : int, array_like, optional the y coordinate/s of the lines start. x2 : int, array_like, optional the x coordinate/s of the lines end. y2 : int, array_like, optional the y coordinate/s of the lines end. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the lines. processed : int, array_like, optional the processed flag/s of the lines. text : string, array_like, optional the text/s of the lines. id : int, array_like, optional the id/s of the lines. Returns ------- entries : array_like a query object which contains all :py:class:`Line` entries. """ type = self._processesTypeNameField(type, ["TYPE_Line"]) query = self.table_line.select(self.table_line, self.table_image).join(self.table_image) query = addFilter(query, id, self.table_line.id) query = addFilter(query, image, self.table_line.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, x1, self.table_line.x1) query = addFilter(query, y1, self.table_line.y1) query = addFilter(query, x2, self.table_line.x2) query = addFilter(query, y2, self.table_line.y2) query = addFilter(query, type, self.table_line.type) query = addFilter(query, processed, self.table_line.processed) query = addFilter(query, text, self.table_line.text) # define the __array__ method of the query to make np.array(db.getLines()) possible query.__array__ = lambda: np.array([[[l.x1, l.y1], [l.x2, l.y2]] for l in query]) return query
[docs] def setLine(self, image=None, frame=None, filename=None, x1=None, y1=None, x2=None, y2=None, type=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update an :py:class:`Line` object in the database. See also: :py:meth:`~.DataFile.getLine`, :py:meth:`~.DataFile.getLines`, :py:meth:`~.DataFile.setLines`, :py:meth:`~.DataFile.deleteLines`. Parameters ---------- image : int, :py:class:`Image`, optional the image of the line. frame : int, optional the frame of the images of the line. filename : string, optional the filename of the image of the line. x1 : int, optional the x coordinate of the start of the line. y1 : int, optional the y coordinate of the start of the line. x2 : int, optional the x coordinate of the end of the line. y2 : int, optional the y coordinate of the end of the line. type : string, :py:class:`MarkerType`, optional the marker type (or name) of the line. processed : int, optional the processed flag of the line. text : string, optional the text of the line. id : int, optional the id of the line. layer : int, optional the layer of the image of the line Returns ------- line : :py:class:`Line` the created or changed :py:class:`Line` item. """ assert not (id is None and type is None), "Line must either have a type or be referenced by it's id." assert not (id is None and image is None and frame is None and filename is None), "Line must have an image, frame or filename given or be referenced by it's id." try: item = self.table_line.get(id=id) except peewee.DoesNotExist: item = self.table_line() type = self._processesTypeNameField(type, ["TYPE_Line"]) image = self._processImagesField(image, frame, filename, layer) setFields(item, dict(image=image, x1=x1, y1=y1, x2=x2, y2=y2, type=type, processed=processed, style=style, text=text)) item.save() return item
[docs] def setLines(self, image=None, frame=None, filename=None, x1=None, y1=None, x2=None, y2=None, type=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update multiple :py:class:`Line` objects in the database. See also: :py:meth:`~.DataFile.getLine`, :py:meth:`~.DataFile.getLines`, :py:meth:`~.DataFile.setLine`, :py:meth:`~.DataFile.deleteLines`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the lines. frame : int, array_like, optional the frame/s of the images of the lines. filename : string, array_like, optional the filename/s of the images of the lines. x1 : int, array_like, optional the x coordinate/s of the start of the lines. y1 : int, array_like, optional the y coordinate/s of the start of the lines. x2 : int, array_like, optional the x coordinate/s of the end of the lines. y2 : int, array_like, optional the y coordinate/s of the end of the lines. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the lines. processed : int, array_like, optional the processed flag/s of the lines. track : int, :py:class:`Track`, array_like, optional the track id/s or instance/s of the lines. text : string, array_like, optional the text/s of the lines. id : int, array_like, optional the id/s of the lines. layer: int, optional the layer of the images of the lines Returns ------- success : bool it the inserting was successful. """ type = self._processesTypeNameField(type, ["TYPE_Line"]) image = self._processImagesField(image, frame, filename, layer) data = packToDictList(self.table_line, id=id, image=image, x1=x1, y1=y1, x2=x2, y2=y2, processed=processed, type=type, style=style, text=text) return self.saveReplaceMany(self.table_line, data)
[docs] def deleteLines(self, image=None, frame=None, filename=None, x1=None, y1=None, x2=None, y2=None, type=None, processed=None, text=None, id=None): """ Delete all :py:class:`Line` entries with the given criteria. See also: :py:meth:`~.DataFile.getLine`, :py:meth:`~.DataFile.getLines`, :py:meth:`~.DataFile.setLine`, :py:meth:`~.DataFile.setLines`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the lines. frame : int, array_like, optional the frame/s of the images of the lines. filename : string, array_like, optional the filename/s of the images of the lines. x1 : int, array_like, optional the x coordinate/s of the start of the lines. y1 : int, array_like, optional the y coordinate/s of the start of the lines. x2 : int, array_like, optional the x coordinate/s of the end of the lines. y2 : int, array_like, optional the y coordinate/s of the end of the lines. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the lines. processed : int, array_like, optional the processed flag/s of the lines. text : string, array_like, optional the text/s of the lines. id : int, array_like, optional the id/s of the lines. Returns ------- rows : int the number of affected rows. """ type = self._processesTypeNameField(type, ["TYPE_Line"]) query = self.table_line.delete() if image is None and (frame is not None or filename is not None): images = self.table_image.select(self.table_image.id) images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) query = query.where(self.table_line.image.in_(images)) else: query = addFilter(query, image, self.table_line.image) query = addFilter(query, id, self.table_line.id) query = addFilter(query, x1, self.table_line.x1) query = addFilter(query, y1, self.table_line.y1) query = addFilter(query, x2, self.table_line.x2) query = addFilter(query, y2, self.table_line.y2) query = addFilter(query, type, self.table_line.type) query = addFilter(query, processed, self.table_line.processed) query = addFilter(query, text, self.table_line.text) return query.execute()
""" Rectangles """
[docs] def getRectangle(self, id): """ Retrieve an :py:class:`Rectangle` object from the database. See also: :py:meth:`~.DataFile.getRectangles`, :py:meth:`~.DataFile.setRectangle`, :py:meth:`~.DataFile.setRectangles`, :py:meth:`~.DataFile.deleteRectangles`. Parameters ---------- id: int the id of the rectangle. Returns ------- rectangle : :py:class:`Rectangle` the :py:class:`Rectangle` with the desired id or None. """ try: return self.table_rectangle.get(id=id) except peewee.DoesNotExist: return None
[docs] def getRectangles(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, type=None, processed=None, text=None, id=None, layer=None): """ Get all :py:class:`Rectangle` entries with the given criteria. See also: :py:meth:`~.DataFile.getRectangle`, :py:meth:`~.DataFile.setRectangle`, :py:meth:`~.DataFile.setRectangles`, :py:meth:`~.DataFile.deleteRectangles`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the rectangles. frame : int, array_like, optional the frame/s of the images of the rectangles. filename : string, array_like, optional the filename/s of the images of the rectangles. x : int, array_like, optional the x coordinate/s of the upper left corner/s of the rectangles. y : int, array_like, optional the y coordinate/s of the upper left corner/s of the rectangles. width : int, array_like, optional the width/s of the rectangles. height : int, array_like, optional the height/s of the rectangles. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the rectangles. processed : int, array_like, optional the processed flag/s of the rectangles. text : string, array_like, optional the text/s of the rectangles. id : int, array_like, optional the id/s of the rectangles. layer : int, optional the id of the image of the rectangle Returns ------- entries : array_like a query object which contains all :py:class:`Rectangle` entries. """ type = self._processesTypeNameField(type, ["TYPE_Rect"]) query = self.table_rectangle.select(self.table_rectangle, self.table_image).join(self.table_image) image = self._processImagesField(image, frame, filename, layer) query = addFilter(query, id, self.table_rectangle.id) query = addFilter(query, image, self.table_rectangle.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, x, self.table_rectangle.x) query = addFilter(query, y, self.table_rectangle.y) query = addFilter(query, height, self.table_rectangle.height) query = addFilter(query, width, self.table_rectangle.width) query = addFilter(query, type, self.table_rectangle.type) query = addFilter(query, processed, self.table_rectangle.processed) query = addFilter(query, text, self.table_rectangle.text) return query
[docs] def setRectangle(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, type=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update an :py:class:`Rectangle` object in the database. See also: :py:meth:`~.DataFile.getRectangle`, :py:meth:`~.DataFile.getRectangles`, :py:meth:`~.DataFile.setRectangles`, :py:meth:`~.DataFile.deleteRectangles`. Parameters ---------- image : int, :py:class:`Image`, optional the image of the rectangle. frame : int, optional the frame of the images of the rectangle. filename : string, optional the filename of the image of the rectangle. x : int, optional the x coordinate of the upper left corner of the rectangle. y : int, optional the y coordinate of the upper left corner of the rectangle. width : int, optional the width of the rectangle. height : int, optional the height of the rectangle. type : string, :py:class:`MarkerType`, optional the marker type (or name) of the rectangle. processed : int, optional the processed flag of the rectangle. text : string, optional the text of the rectangle. id : int, optional the id of the rectangle. layer : int, optional the id of the image of the rectangle Returns ------- rectangle : :py:class:`Rectangle` the created or changed :py:class:`Rectangle` item. """ assert not (id is None and type is None), "Rectangle must either have a type or be referenced by it's id." assert not (id is None and image is None and frame is None and filename is None), "Rectangle must have an image, frame or filename given or be referenced by it's id." try: item = self.table_rectangle.get(id=id) except peewee.DoesNotExist: item = self.table_rectangle() type = self._processesTypeNameField(type, ["TYPE_Rect"]) image = self._processImagesField(image, frame, filename, layer) setFields(item, dict(image=image, x=x, y=y, width=width, height=height, type=type, processed=processed, style=style, text=text)) item.save() return item
[docs] def setRectangles(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, type=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update multiple :py:class:`Rectangle` objects in the database. See also: :py:meth:`~.DataFile.getRectangle`, :py:meth:`~.DataFile.getRectangles`, :py:meth:`~.DataFile.setRectangle`, :py:meth:`~.DataFile.deleteRectangles`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the rectangles. frame : int, array_like, optional the frame/s of the images of the rectangles. filename : string, array_like, optional the filename/s of the images of the rectangles. x : int, array_like, optional the x coordinate/s of the upper left corner/s of the rectangles. y : int, array_like, optional the y coordinate/s of the upper left corner/s of the rectangles. width : int, array_like, optional the width/s of the rectangles. height : int, array_like, optional the height/s of the rectangles. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the rectangles. processed : int, array_like, optional the processed flag/s of the rectangles. text : string, array_like, optional the text/s of the rectangles. id : int, array_like, optional the id/s of the rectangles. layer : int, optional the layer of the images of the rectangles Returns ------- success : bool it the inserting was successful. """ type = self._processesTypeNameField(type, ["TYPE_Rect"]) image = self._processImagesField(image, frame, filename, layer) data = packToDictList(self.table_rectangle, id=id, image=image, x=x, y=y, width=width, height=height, processed=processed, type=type, style=style, text=text) return self.saveReplaceMany(self.table_rectangle, data)
[docs] def deleteRectangles(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, type=None, processed=None, text=None, id=None, layer=None): """ Delete all :py:class:`Rectangle` entries with the given criteria. See also: :py:meth:`~.DataFile.getRectangle`, :py:meth:`~.DataFile.getRectangles`, :py:meth:`~.DataFile.setRectangle`, :py:meth:`~.DataFile.setRectangles`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the rectangles. frame : int, array_like, optional the frame/s of the images of the rectangles. filename : string, array_like, optional the filename/s of the images of the rectangles. x : int, array_like, optional the x coordinate/s of the upper left corner/s of the rectangles. y : int, array_like, optional the y coordinate/s of the upper left corner/s of the rectangles. width : int, array_like, optional the width/s of the rectangles. height : int, array_like, optional the height/s of the rectangles. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the rectangles. processed : int, array_like, optional the processed flag/s of the rectangles. text : string, array_like, optional the text/s of the rectangles. id : int, array_like, optional the id/s of the rectangles. Returns ------- rows : int the number of affected rows. """ type = self._processesTypeNameField(type, ["TYPE_Rect"]) image = self._processImagesField(image, frame, filename, layer) query = self.table_rectangle.delete() if image is None and (frame is not None or filename is not None): images = self.table_image.select(self.table_image.id) images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) query = query.where(self.table_rectangle.image.in_(images)) else: query = addFilter(query, image, self.table_rectangle.image) query = addFilter(query, id, self.table_rectangle.id) query = addFilter(query, x, self.table_rectangle.x) query = addFilter(query, y, self.table_rectangle.y) query = addFilter(query, width, self.table_rectangle.width) query = addFilter(query, height, self.table_rectangle.height) query = addFilter(query, type, self.table_rectangle.type) query = addFilter(query, processed, self.table_rectangle.processed) query = addFilter(query, text, self.table_rectangle.text) return query.execute()
""" Ellipses """
[docs] def getEllipse(self, id): """ Retrieve an :py:class:`Ellipse` object from the database. See also: :py:meth:`~.DataFile.getEllipses`, :py:meth:`~.DataFile.setEllipse`, :py:meth:`~.DataFile.setEllipses`, :py:meth:`~.DataFile.deleteEllipses`. Parameters ---------- id: int the id of the ellipse. Returns ------- ellipse : :py:class:`Ellipse` the :py:class:`Ellipse` with the desired id or None. """ try: return self.table_ellipse.get(id=id) except peewee.DoesNotExist: return None
[docs] def getEllipses(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, angle=None, type=None, processed=None, text=None, id=None, layer=None): """ Get all :py:class:`Ellipse` entries with the given criteria. See also: :py:meth:`~.DataFile.getEllipse`, :py:meth:`~.DataFile.setEllipse`, :py:meth:`~.DataFile.setEllipses`, :py:meth:`~.DataFile.deleteEllipses`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the ellipses. frame : int, array_like, optional the frame/s of the images of the ellipses. filename : string, array_like, optional the filename/s of the images of the ellipses. x : int, array_like, optional the x coordinate/s of the center/s of the ellipses. y : int, array_like, optional the y coordinate/s of the center/s of the ellipses. width : int, array_like, optional the width/s of the ellipses. height : int, array_like, optional the height/s of the ellipses. angle : float, array_like, optional the angle/s of the rectangles in ellipses. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the ellipses. processed : int, array_like, optional the processed flag/s of the ellipses. text : string, array_like, optional the text/s of the ellipses. id : int, array_like, optional the id/s of the ellipses. layer : int, optional the id of the image of the ellipses. Returns ------- entries : array_like a query object which contains all :py:class:`Ellipse` entries. """ type = self._processesTypeNameField(type, ["TYPE_Ellipse"]) query = self.table_ellipse.select(self.table_ellipse, self.table_image).join(self.table_image) image = self._processImagesField(image, frame, filename, layer) query = addFilter(query, id, self.table_ellipse.id) query = addFilter(query, image, self.table_ellipse.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, x, self.table_ellipse.x) query = addFilter(query, y, self.table_ellipse.y) query = addFilter(query, height, self.table_ellipse.height) query = addFilter(query, width, self.table_ellipse.width) query = addFilter(query, angle, self.table_ellipse.angle) query = addFilter(query, type, self.table_ellipse.type) query = addFilter(query, processed, self.table_ellipse.processed) query = addFilter(query, text, self.table_ellipse.text) return query
[docs] def setEllipse(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, angle=None, type=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update an :py:class:`Ellipse` object in the database. See also: :py:meth:`~.DataFile.getEllipse`, :py:meth:`~.DataFile.getEllipses`, :py:meth:`~.DataFile.setEllipses`, :py:meth:`~.DataFile.deleteEllipses`. Parameters ---------- image : int, :py:class:`Image`, optional the image of the ellipse. frame : int, optional the frame of the images of the ellipse. filename : string, optional the filename of the image of the ellipse. x : int, optional the x coordinate of the center of the ellipse. y : int, optional the y coordinate of the center of the ellipse. width : int, optional the width of the ellipse. height : int, optional the height of the ellipse. angle : float, optional the angle of the ellipse in degrees. type : string, :py:class:`MarkerType`, optional the marker type (or name) of the ellipse. processed : int, optional the processed flag of the ellipse. text : string, optional the text of the ellipse. id : int, optional the id of the ellipse. layer : int, optional the id of the image of the ellipse. Returns ------- ellipse : :py:class:`Ellipse` the created or changed :py:class:`Ellipse` item. """ assert not (id is None and type is None), "Ellipse must either have a type or be referenced by it's id." assert not (id is None and image is None and frame is None and filename is None), "Ellipse must have an image, frame or filename given or be referenced by it's id." try: item = self.table_ellipse.get(id=id) except peewee.DoesNotExist: item = self.table_ellipse() type = self._processesTypeNameField(type, ["TYPE_Ellipse"]) image = self._processImagesField(image, frame, filename, layer) setFields(item, dict(image=image, x=x, y=y, width=width, height=height, angle=angle, type=type, processed=processed, style=style, text=text)) item.save() return item
[docs] def setEllipses(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, angle=None, type=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update multiple :py:class:`Ellipse` objects in the database. See also: :py:meth:`~.DataFile.getEllipse`, :py:meth:`~.DataFile.getEllipses`, :py:meth:`~.DataFile.setEllipse`, :py:meth:`~.DataFile.deleteEllipses`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the ellipses. frame : int, array_like, optional the frame/s of the images of the ellipses. filename : string, array_like, optional the filename/s of the images of the ellipses. x : int, array_like, optional the x coordinate/s of the center/s of the ellipses. y : int, array_like, optional the y coordinate/s of the center/s of the ellipses. width : int, array_like, optional the width/s of the ellipses. height : int, array_like, optional the height/s of the ellipses. angle : int, array_like, optional the angle/s of the ellipses. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the ellipses. processed : int, array_like, optional the processed flag/s of the ellipses. text : string, array_like, optional the text/s of the ellipses. id : int, array_like, optional the id/s of the ellipses. layer : int, optional the layer of the images of the ellipses. Returns ------- success : bool it the inserting was successful. """ type = self._processesTypeNameField(type, ["TYPE_Ellipse"]) image = self._processImagesField(image, frame, filename, layer) data = packToDictList(self.table_ellipse, id=id, image=image, x=x, y=y, width=width, height=height, angle=angle, processed=processed, type=type, style=style, text=text) return self.saveReplaceMany(self.table_ellipse, data)
[docs] def deleteEllipses(self, image=None, frame=None, filename=None, x=None, y=None, width=None, height=None, angle=None, type=None, processed=None, text=None, id=None, layer=None): """ Delete all :py:class:`Ellipse` entries with the given criteria. See also: :py:meth:`~.DataFile.getEllipse`, :py:meth:`~.DataFile.getEllipses`, :py:meth:`~.DataFile.setEllipse`, :py:meth:`~.DataFile.setEllipses`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the ellipses. frame : int, array_like, optional the frame/s of the images of the ellipses. filename : string, array_like, optional the filename/s of the images of the ellipses. x : int, array_like, optional the x coordinate/s of the center/s of the ellipses. y : int, array_like, optional the y coordinate/s of the center/s of the ellipses. width : int, array_like, optional the width/s of the ellipses. height : int, array_like, optional the height/s of the ellipses. angle : int, array_like, optional the angle/s of the ellipses in degrees. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the ellipses. processed : int, array_like, optional the processed flag/s of the ellipses. text : string, array_like, optional the text/s of the ellipses. id : int, array_like, optional the id/s of the ellipses. Returns ------- rows : int the number of affected rows. """ type = self._processesTypeNameField(type, ["TYPE_Ellipse"]) image = self._processImagesField(image, frame, filename, layer) query = self.table_ellipse.delete() if image is None and (frame is not None or filename is not None): images = self.table_image.select(self.table_image.id) images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) query = query.where(self.table_ellipse.image.in_(images)) else: query = addFilter(query, image, self.table_ellipse.image) query = addFilter(query, id, self.table_ellipse.id) query = addFilter(query, x, self.table_ellipse.x) query = addFilter(query, y, self.table_ellipse.y) query = addFilter(query, width, self.table_ellipse.width) query = addFilter(query, height, self.table_ellipse.height) query = addFilter(query, angle, self.table_ellipse.angle) query = addFilter(query, type, self.table_ellipse.type) query = addFilter(query, processed, self.table_ellipse.processed) query = addFilter(query, text, self.table_ellipse.text) return query.execute()
""" Polygons """
[docs] def getPolygon(self, id): """ Retrieve an :py:class:`Polygon` object from the database. See also: :py:meth:`~.DataFile.getPolygons`, :py:meth:`~.DataFile.setPolygon`, :py:meth:`~.DataFile.setPolygons`, :py:meth:`~.DataFile.deletePolygons`. Parameters ---------- id: int the id of the polygon. Returns ------- polygon : :py:class:`Polygon` the :py:class:`Polygon` with the desired id or None. """ try: return self.table_polygon.get(id=id) except peewee.DoesNotExist: return None
[docs] def getPolygons(self, image=None, frame=None, filename=None, type=None, processed=None, text=None, id=None, layer=None): """ Get all :py:class:`Polygon` entries with the given criteria. See also: :py:meth:`~.DataFile.getPolygon`, :py:meth:`~.DataFile.setPolygon`, :py:meth:`~.DataFile.setPolygons`, :py:meth:`~.DataFile.deletePolygons`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the polygons. frame : int, array_like, optional the frame/s of the images of the polygons. filename : string, array_like, optional the filename/s of the images of the polygons. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the polygons. processed : int, array_like, optional the processed flag/s of the polygons. text : string, array_like, optional the text/s of the polygons. id : int, array_like, optional the id/s of the polygons. layer : int, optional the id of the image of the polygons. Returns ------- entries : array_like a query object which contains all :py:class:`Polygon` entries. """ type = self._processesTypeNameField(type, ["TYPE_Polygon"]) query = self.table_polygon.select(self.table_polygon, self.table_image).join(self.table_image) image = self._processImagesField(image, frame, filename, layer) query = addFilter(query, id, self.table_polygon.id) query = addFilter(query, image, self.table_polygon.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, type, self.table_polygon.type) query = addFilter(query, processed, self.table_polygon.processed) query = addFilter(query, text, self.table_polygon.text) return query
[docs] def setPolygon(self, image=None, frame=None, filename=None, points=None, type=None, closed=None, processed=None, style=None, text=None, id=None, layer=None): """ Insert or update an :py:class:`Polygon` object in the database. See also: :py:meth:`~.DataFile.getPolygon`, :py:meth:`~.DataFile.getPolygons`, :py:meth:`~.DataFile.setPolygons`, :py:meth:`~.DataFile.deletePolygons`. Parameters ---------- image : int, :py:class:`Image`, optional the image of the polygon. frame : int, optional the frame of the images of the polygon. filename : string, optional the filename of the image of the polygon. points : array, optional the points of the vertices of the polygon. type : string, :py:class:`MarkerType`, optional the marker type (or name) of the polygon. closed : int, optional makes the polygon a closed shape. processed : int, optional the processed flag of the polygon. text : string, optional the text of the polygon. id : int, optional the id of the polygon. layer : int, optional the id of the image of the polygon. Returns ------- polygon : :py:class:`Polygon` the created or changed :py:class:`Polygon` item. """ assert not (id is None and type is None), "Polygon must either have a type or be referenced by it's id." assert not (id is None and image is None and frame is None and filename is None), "Polygon must have an image, frame or filename given or be referenced by it's id." try: item = self.table_polygon.get(id=id) except peewee.DoesNotExist: item = self.table_polygon() type = self._processesTypeNameField(type, ["TYPE_Polygon"]) image = self._processImagesField(image, frame, filename, layer) setFields(item, dict(image=image, points=points, type=type, closed=closed, processed=processed, style=style, text=text)) item.save() return item
def setPolygons(self, image=None, frame=None, filename=None, points=None, type=None, processed=None, style=None, text=None, id=None, layer=None): raise NotImplemented("Use multiple calls to setPolygon() instead.")
[docs] def deletePolygons(self, image=None, frame=None, filename=None, type=None, processed=None, text=None, id=None, layer=None): """ Delete all :py:class:`Polygon` entries with the given criteria. See also: :py:meth:`~.DataFile.getPolygon`, :py:meth:`~.DataFile.getPolygons`, :py:meth:`~.DataFile.setPolygon`, :py:meth:`~.DataFile.setPolygons`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s of the polygons. frame : int, array_like, optional the frame/s of the images of the polygons. filename : string, array_like, optional the filename/s of the images of the polygons. type : string, :py:class:`MarkerType`, array_like, optional the marker type/s (or name/s) of the polygons. processed : int, array_like, optional the processed flag/s of the polygons. text : string, array_like, optional the text/s of the polygons. id : int, array_like, optional the id/s of the polygons. Returns ------- rows : int the number of affected rows. """ type = self._processesTypeNameField(type, ["TYPE_Polygon"]) image = self._processImagesField(image, frame, filename, layer) query = self.table_polygon.delete() if image is None and (frame is not None or filename is not None): images = self.table_image.select(self.table_image.id) images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) query = query.where(self.table_polygon.image.in_(images)) else: query = addFilter(query, image, self.table_polygon.image) query = addFilter(query, id, self.table_polygon.id) query = addFilter(query, type, self.table_polygon.type) query = addFilter(query, processed, self.table_polygon.processed) query = addFilter(query, text, self.table_polygon.text) return query.execute()
""" Offset """
[docs] def setOffset(self, image, x, y): """ Set an :py:class:`Offset` entry for a given image. See also: :py:meth:`~.DataFile.deleteOffsets`. Parameters ---------- image : int, :py:class:`Image` the image for which the offset should be given. x : int the x coordinate of the offset. y : int the y coordinate of the offset. Returns ------- entries : :py:class:`Offset` object of class :py:class:`Offset` """ try: offset = self.table_offset.get(image=image) except peewee.DoesNotExist: offset = self.table_offset() setFields(offset, dict(image=image, x=x, y=y)) offset.save() return offset
[docs] def deleteOffsets(self, image=None): """ Delete all :py:class:`Offset` entries from the database, which match the given criteria. If no criteria a given, delete all. See also: :py:meth:`~.DataFile.setOffset`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/s for which the offset should be deleted. Returns ------- rows : int number of rows deleted """ query = self.table_offset.delete() query = addFilter(query, image, self.table_offset.image) return query.execute()
[docs] def setTag(self, name=None, id=None): """ Set a specific :py:class:`Tag` entry by its name or database ID See also: :py:meth:`~.DataFile.getTag`, :py:meth:`~.DataFile.getTags`, :py:meth:`~.DataFile.deleteTags`. Parameters ---------- name: str name of the tag id: int id of :py:class:`Tag` entry Returns ------- entries : :py:class:`Tag` object of class :py:class:`Tag` """ # check input assert any(e is not None for e in [id, name]), "Name and ID may not be all None" try: if id: tag = self.table_tag.get(id=id) else: tag = self.table_tag.get(name=name) except peewee.DoesNotExist: tag = self.table_tag() setFields(tag, dict(name=name)) tag.save() return tag
[docs] def getTag(self, name=None, id=None): """ Get a specific :py:class:`Tag` entry by its name or database ID See also: :py:meth:`~.DataFile.getTags`, :py:meth:`~.DataFile.setTag`, :py:meth:`~.DataFile.deleteTags`. Parameters ---------- name: str name of the tag id: int id of :py:class:`Tag` entry Returns ------- entries : :py:class:`Tag` requested object of class :py:class:`Tag` or None """ # check input assert any(e is not None for e in [id, name]), "Name and ID may not be all None" try: return self.table_tag.get(**noNoneDict(name=name, id=id)) except: return None
[docs] def getTags(self,name=None,id=None): """ Get all :py:class:`Tag` entries from the database, which match the given criteria. If no criteria a given, return all. See also: :py:meth:`~.DataFile.getTag`, :py:meth:`~.DataFile.setTag`, :py:meth:`~.DataFile.deleteTags`. Parameters ---------- name : string, array_like, optional the name/names of the :py:class:`Tag`. id : int, array_like, optional the id/ids of the :py:class:`Tag`. Returns ------- entries : array_like a query object containing all the matching :py:class:`Tag` entries in the database file. """ query = self.table_tag.select() query = addFilter(query, name, self.table_tag.name) query = addFilter(query, id, self.table_tag.id) return query
[docs] def deleteTags(self, name=None, id=None): """ Delete all :py:class:`Tag` entries from the database, which match the given criteria. If no criteria a given, delete all. See also: :py:meth:`~.DataFile.getTag`, :py:meth:`~.DataFile.getTags`, :py:meth:`~.DataFile.setTag`. Parameters ---------- name : string, array_like, optional the name/names of the :py:class:`Tag`. id : int, array_like, optional the id/ids of the :py:class:`Tag`. Returns ------- rows : int number of rows deleted """ query = self.table_tag.delete() query = addFilter(query, name, self.table_tag.name) query = addFilter(query, id, self.table_tag.id) return query.execute()
[docs] def getAnnotation(self, image=None, frame=None, filename=None, id=None, create=False): """ Get the :py:class:`Annotation` entry for the given image frame number or filename. See also: :py:meth:`~.DataFile.getAnnotations`, :py:meth:`~.DataFile.setAnnotation`, :py:meth:`~.DataFile.deleteAnnotations`. Parameters ---------- image : int, :py:class:`Image`, optional the image for which the annotation should be retrieved. If omitted, frame number or filename should be specified instead. frame : int, optional frame number of the image, which annotation should be returned. If omitted, image or filename should be specified instead. filename : string, optional filename of the image, which annotation should be returned. If omitted, image or frame number should be specified instead. id : int, optional id of the annotation entry. create : bool, optional whether the annotation should be created if it does not exist. (default: False) Returns ------- annotation : :py:class:`Annotation` the desired :py:class:`Annotation` entry. """ # check input assert sum(e is not None for e in [id, image, frame, filename]) == 1, \ "Exactly one of image, frame or filename should be specified or should be referenced by it's id." query = self.table_annotation.select(self.table_annotation, self.table_image).join(self.table_image) query = addFilter(query, id, self.table_annotation.id) query = addFilter(query, image, self.table_annotation.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query.limit(1) try: return query[0] except IndexError: if create is True: if not image: image = self.getImage(frame, filename) if not image: raise ImageDoesNotExist("No parent image found ") annotation = self.table_annotation(image=image, timestamp=image.timestamp, comment="", rating=None) annotation.save() return annotation return None
[docs] def getAnnotations(self, image=None, frame=None, filename=None, timestamp=None, tag=None, comment=None, rating=None, id=None): """ Get all :py:class:`Annotation` entries from the database, which match the given criteria. If no criteria a given, return all masks. See also: :py:meth:`~.DataFile.getAnnotation`, :py:meth:`~.DataFile.setAnnotation`, :py:meth:`~.DataFile.deleteAnnotations`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/images for which the annotations should be retrieved. If omitted, frame numbers or filenames should be specified instead. frame: int, array_like, optional frame number/numbers of the images, which annotations should be returned. If omitted, images or filenames should be specified instead. filename: string, array_like, optional filename of the image/images, which annotations should be returned. If omitted, images or frame numbers should be specified instead. timestamp : datetime, array_like, optional timestamp/s of the annotations. tag : string, array_like, optional the tag/s of the annotations to load. comment : string, array_like, optional the comment/s of the annotations. rating : int, array_like, optional the rating/s of the annotations. id : int, array_like, optional id/ids of the annotations. Returns ------- entries : :py:class:`Annotation` a query object containing all the matching :py:class:`Annotation` entries in the database file. """ # check input assert sum(e is not None for e in [image, frame, filename]) <= 1, \ "Exactly one of images, frames or filenames should be specified" query = self.table_annotation.select(self.table_annotation, self.table_image).join(self.table_image) if tag is not None: query = query.switch(self.table_annotation).join(self.table_tagassociation).join(self.table_tag) query = addFilter(query, id, self.table_annotation.id) query = addFilter(query, image, self.table_annotation.image) query = addFilter(query, frame, self.table_image.sort_index) query = addFilter(query, filename, self.table_image.filename) query = addFilter(query, timestamp, self.table_annotation.timestamp) query = addFilter(query, comment, self.table_annotation.comment) query = addFilter(query, rating, self.table_annotation.rating) if tag: query = addFilter(query, tag, self.table_tag.name) query = query.group_by(self.table_annotation.id) return query
[docs] def setAnnotation(self, image=None, frame=None, filename=None, timestamp=None, comment=None, rating=None, id=None, layer=None): """ Insert or update an :py:class:`Annotation` object in the database. See also: :py:meth:`~.DataFile.getAnnotation`, :py:meth:`~.DataFile.getAnnotations`, :py:meth:`~.DataFile.deleteAnnotations`. Parameters ---------- image : int, :py:class:`Image`, optional the image of the annotation. frame : int, optional the frame of the images of the annotation. filename : string, optional the filename of the image of the annotation. timestamp : datetime, optional the timestamp of the annotation. comment : string, optional the text of the annotation. rating : int, optional the rating of the annotation. id : int, optional the id of the annotation. Returns ------- annotation : :py:class:`Annotation` the created or changed :py:class:`Annotation` item. """ assert not (id is None and image is None and frame is None and filename is None), "Annotations must have an image, frame or filename given or be referenced by it's id." image = self._processImagesField(image, frame, filename, layer) try: item = self.table_annotation.get(**noNoneDict(id=id, image=image)) except peewee.DoesNotExist: item = self.table_annotation() setFields(item, dict(image=image, timestamp=timestamp, comment=comment, rating=rating)) item.save() return item
[docs] def deleteAnnotations(self, image=None, frame=None, filename=None, timestamp=None, comment=None, rating=None, id=None): """ Delete all :py:class:`Annotation` entries with the given criteria. See also: :py:meth:`~.DataFile.getAnnotation`, :py:meth:`~.DataFile.getAnnotations`, :py:meth:`~.DataFile.setAnnotation`. Parameters ---------- image : int, :py:class:`Image`, array_like, optional the image/images for which the annotations should be retrieved. If omitted, frame numbers or filenames should be specified instead. frame: int, array_like, optional frame number/numbers of the images, which annotations should be returned. If omitted, images or filenames should be specified instead. filename: string, array_like, optional filename of the image/images, which annotations should be returned. If omitted, images or frame numbers should be specified instead. timestamp : datetime, array_like, optional timestamp/s of the annotations. comment : string, array_like, optional the comment/s of the annotations. rating : int, array_like, optional the rating/s of the annotations. id : int, array_like, optional id/ids of the annotations. Returns ------- rows : int the number of affected rows. """ query = self.table_annotation.delete() if image is None and (frame is not None or filename is not None): images = self.table_image.select(self.table_image.id) images = addFilter(images, frame, self.table_image.sort_index) images = addFilter(images, filename, self.table_image.filename) query = query.where(self.table_annotation.image.in_(images)) else: query = addFilter(query, image, self.table_annotation.image) query = addFilter(query, id, self.table_annotation.id) query = addFilter(query, image, self.table_annotation.image) query = addFilter(query, timestamp, self.table_annotation.timestamp) query = addFilter(query, comment, self.table_annotation.comment) query = addFilter(query, rating, self.table_annotation.rating) return query.execute()
def mergeWith(self, other_db): my_marker_types = {t.name: t.id for t in self.getMarkerTypes()} other_marker_types = {t.name: t.id for t in other_db.getMarkerTypes()} marker_translation = {} marker_type_attributes = ["name", "color", "mode", "style", "text", "hidden"] for other_marker_type in other_db.getMarkerTypes(): if other_marker_type.name in my_marker_types: marker_translation[other_marker_type.id] = my_marker_types[other_marker_type.name] else: kwargs = {name: getattr(other_marker_type, name) for name in marker_type_attributes} marker_type = self.setMarkerType(**kwargs) marker_translation[other_marker_type.id] = marker_type.id image_attributes = ["filename", "frame", "external_id", "timestamp", "width", "height", "layer"] marker_attributes = ["image", "x", "y", "type", "processed", "track", "style", "text"] line_attributes = ["image", "x1", "x2", "y1", "y2", "type", "processed", "style", "text"] annotation_attributes = ["image", "timestamp", "comment", "rating"] for image_other in other_db.getImages(): kwargs = {name: getattr(image_other, name) for name in image_attributes} image = self.setImage(**kwargs) for marker in image_other.markers: kwargs = {name: getattr(marker, name) for name in marker_attributes} kwargs["type"] = kwargs["type"].name kwargs["image"] = image self.setMarker(**kwargs) for line in image_other.lines: kwargs = {name: getattr(line, name) for name in line_attributes} kwargs["type"] = kwargs["type"].name kwargs["image"] = image self.setLine(**kwargs) for annotation in image_other.annotations: kwargs = {name: getattr(annotation, name) for name in annotation_attributes} kwargs["image"] = image self.setAnnotation(**kwargs)
[docs] def getTracksNanPadded(self, type=None, id=None, start_frame=None, end_frame=None, skip=None, layer=None, apply_offset=False): """ Return an array of all track points with the given filters. The array has the shape of [n_tracks, n_images, pos], where pos is the 2D position of the markers. See also: :py:meth:`~.DataFile.getTrack`, :py:meth:`~.DataFile.setTrack`, :py:meth:`~.DataFile.deleteTracks`, :py:meth:`~.DataFile.getTracks`. Parameters ---------- type: :py:class:`MarkerType`, str, array_like, optional the marker type/types or name of the marker type for the track. id : int, array_like, optional the :py:class:`Track` ID start_frame : int, optional the frame where to begin the array. Default: first frame. end_frame : int, optional the frame where to end the array. Default: last frame. skip : int, optional skip every nth frame. Default: don't skip frames. layer : int, optional which layer to use for the images. apply_offset : bool, optional whether to apply the image offsets to the marker positions. Default: False. Returns ------- nan_padded_array : ndarray the array which contains all the track marker positions. """ """ image conditions """ where_condition_image = [] # get the filter condition (only filter if it is necessary, e.g. if we have more than one layer) if layer is not None: where_condition_image.append("i.layer_id = %d" % layer) # if a start frame is given, only export marker from images >= the given frame if start_frame is not None: where_condition_image.append("i.sort_index >= %d" % start_frame) # if a end frame is given, only export marker from images < the given frame if end_frame is not None: where_condition_image.append("i.sort_index < %d" % end_frame) # skip every nth frame if skip is not None: where_condition_image.append("i.sort_index %% %d = 0" % skip) # append sorting by sort index if len(where_condition_image): where_condition_image = " AND ".join(where_condition_image) else: where_condition_image = "" """ track conditions """ where_condition_tracks = [] if type is not None: type = self._processesTypeNameField(type, ["TYPE_Track"]) if not isinstance(type, list): where_condition_tracks.append("t.type_id = %d" % type.id) else: where_condition_tracks.append("t.type_id in " % str([t.id for t in type])) if id is not None: where_condition_tracks.append("t.id = %d" % id) # append sorting by sort index if len(where_condition_tracks): where_condition_tracks = " AND ".join(where_condition_tracks) else: where_condition_tracks = "" where_condition = " AND ".join([cond for cond in [where_condition_image, where_condition_tracks] if cond != ""]) if where_condition != "": where_condition = "WHERE " + where_condition if where_condition_tracks != "": where_condition_tracks = "WHERE " + where_condition_tracks if where_condition_image != "": where_condition_image = "WHERE " + where_condition_image # get the image ids according to the conditions image_ids = self.db.execute_sql( "SELECT id FROM image i " + where_condition_image + " ORDER BY sort_index;").fetchall() image_count = len(image_ids) track_ids = self.db.execute_sql("SELECT id FROM track t " + where_condition_tracks + ";").fetchall() track_count = len(track_ids) # ---- fast (but a little komplex) query algorithm ---- # this will be faster than iterating as long as track_ids and image ids are not extreeeeemly sparse # (e.g 1, 1002, 678194083, ...), because that will create mindnumbingly big LUTs # start with an array of the correct size for all the data, but flattened shape pos = np.zeros((track_count * image_count, 2), "float") # make the array NaN pos[:] = np.nan # now query all positions at once, but include track id and image id -> shape (N, 4) nonNanPos = np.asarray(self.db.execute_sql( "SELECT t.id, image_id, x, y FROM track t LEFT JOIN marker m ON m.track_id = t.id LEFT JOIN image i ON i.id = m.image_id "+\ where_condition ).fetchall()) # now we built a little LUT/encoding scheme, that tells us which positions in the "pos" array encodes which image/track # start with what track ids could possibly be in nonNanPos allowedTracksIds = np.array(track_ids, dtype=int) # now built a look up table, that maps these track ids to a an id in [0, track_count] -> column id of that track allowedTracksLUT = np.cumsum(np.isin(np.arange(max(allowedTracksIds)+1), allowedTracksIds)) # now we renumber the tracks from our query to fit into [0, track_count] encTracksId = allowedTracksLUT[nonNanPos[:,0].astype(int)] - allowedTracksIds.min() # repeat the same with the image ids # start with what image ids could possibly be in nonNanPos allowedImageIds = np.array(image_ids, dtype=int) # now built a look up table, that maps these image ids to a an id in [0, image_count] -> column id of that image allowedImageLUT = np.cumsum(np.isin(np.arange(max(allowedImageIds)+1), allowedImageIds)) # now we renumber the iamges from our query to fit into [0, image_count] encImageId = allowedImageLUT[nonNanPos[:,1].astype(int)] - allowedImageIds.min() # now we stack our encoding scheme together -> this now identifies all entries in pos encId = encImageId + image_count*encTracksId # write the entries from our nonNanPos to the correct entries # note that we do implicit ordering of track/image id here, because of how the LUT above are constructed pos[encId] = nonNanPos[:,2:] # a simple reshape brings us to the final nanPaddedTrackArray form pos = pos.reshape((track_count, image_count, 2)) # if the offset is required, get the offsets for all images and add them to the marker positions if apply_offset: query_offset = "SELECT IFNULL(o.x, 0) AS x, IFNULL(o.y, 0) AS y FROM image AS i LEFT JOIN offset o ON i.id = o.image_id" offsets = np.array(self.db.execute_sql(query_offset + where_condition_image + " ORDER BY sort_index;").fetchall()).astype(float) pos += offsets return pos
def setTracksNanPadded(self, nan_padded, track_type, layer=None, start_frame=0, clear_tracks_before=True): # get the type type = self._processesTypeNameField(track_type, ["TYPE_Normal", "TYPE_Track"]) # get the layer if layer is not None: layer_id = self.db.execute_sql(f"SELECT id FROM layer WHERE layer.name = {layer} LIMIT 1").fetchall()[0][0] else: try: layer_id = self.db.execute_sql(f"SELECT id FROM layer WHERE layer.base_layer_id is layer.id LIMIT 1").fetchall()[0][0] except IndexError: layer_id = None if clear_tracks_before is True: # remove previous tracks (deletes also their markers) self.deleteTracks(track_type) # create the new tracks data = [[type] for i in range(nan_padded.shape[0])] self.saveInsertMany(self.table_track, data, ["type"]) # get the ids of the new tracks, as we deleted all tracks before, we can just get all ids track_ids = np.array(self.db.execute_sql(f"SELECT id FROM track WHERE track.type_id = {type.id}").fetchall()).astype(np.int).ravel() else: # if we do not delete the tracks before, we have to insert them one after the other to obtain their ids with self.db.atomic(): track_ids = [] for i in range(nan_padded.shape[0]): track_ids.append(self.table_track.insert(dict(type=track_type)).execute()) # get the image ids if layer_id is None: image_ids = np.array([self.db.execute_sql(f"SELECT id FROM image WHERE image.sort_index = {i+start_frame} LIMIT 1").fetchall()[0][0] for i in range(nan_padded.shape[1])]).astype(np.int)#.ravel() else: image_ids = np.array([self.db.execute_sql(f"SELECT id FROM image WHERE image.sort_index = {i+start_frame} AND image.layer_id = {layer_id} LIMIT 1").fetchall()[0][0] for i in range(nan_padded.shape[1])]).astype(np.int) # .ravel() # prepare the data data = [] isnan = np.isnan(nan_padded[:, :, 0]) for i in range(nan_padded.shape[1]): for t in range(nan_padded.shape[0]): if not isnan[t, i]: data.append([track_ids[t], type, nan_padded[t, i, 0], nan_padded[t, i, 1], image_ids[i]]) # add the data to the database return self.saveInsertMany(self.table_marker, data, ["track_id", "type", "x", "y", "image_id"]) def __enter__(self): self.db.connect(reuse_if_open=True) return self def __exit__(self, exc_type, exc_val, exc_tb): self.db.close() def __call__(self, *args, **kwargs): return self