Skip to content

jobs.py

JobTable

Bases: Table

A base relation with no definition. Allows reserving jobs

Source code in datajoint/jobs.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class JobTable(Table):
    """
    A base relation with no definition. Allows reserving jobs
    """

    def __init__(self, conn, database):
        self.database = database
        self._connection = conn
        self._heading = Heading(
            table_info=dict(
                conn=conn, database=database, table_name=self.table_name, context=None
            )
        )
        self._support = [self.full_table_name]

        self._definition = """    # job reservation table for `{database}`
        table_name  :varchar(255)  # className of the table
        key_hash  :char(32)  # key hash
        ---
        status  :enum('reserved','error','ignore')  # if tuple is missing, the job is available
        key=null  :blob  # structure containing the key
        error_message=""  :varchar({error_message_length})  # error message returned if failed
        error_stack=null  :mediumblob  # error stack if failed
        user="" :varchar(255) # database user
        host=""  :varchar(255)  # system hostname
        pid=0  :int unsigned  # system process id
        connection_id = 0  : bigint unsigned          # connection_id()
        timestamp=CURRENT_TIMESTAMP  :timestamp   # automatic timestamp
        """.format(
            database=database, error_message_length=ERROR_MESSAGE_LENGTH
        )
        if not self.is_declared:
            self.declare()
        self._user = self.connection.get_user()

    @property
    def definition(self):
        return self._definition

    @property
    def table_name(self):
        return "~jobs"

    def delete(self):
        """bypass interactive prompts and dependencies"""
        self.delete_quick()

    def drop(self):
        """bypass interactive prompts and dependencies"""
        self.drop_quick()

    def reserve(self, table_name, key):
        """
        Reserve a job for computation.  When a job is reserved, the job table contains an entry for the
        job key, identified by its hash. When jobs are completed, the entry is removed.

        :param table_name: `database`.`table_name`
        :param key: the dict of the job's primary key
        :return: True if reserved job successfully. False = the jobs is already taken
        """
        job = dict(
            table_name=table_name,
            key_hash=key_hash(key),
            status="reserved",
            host=platform.node(),
            pid=os.getpid(),
            connection_id=self.connection.connection_id,
            key=key,
            user=self._user,
        )
        try:
            with config(enable_python_native_blobs=True):
                self.insert1(job, ignore_extra_fields=True)
        except DuplicateError:
            return False
        return True

    def complete(self, table_name, key):
        """
        Log a completed job.  When a job is completed, its reservation entry is deleted.

        :param table_name: `database`.`table_name`
        :param key: the dict of the job's primary key
        """
        job_key = dict(table_name=table_name, key_hash=key_hash(key))
        (self & job_key).delete_quick()

    def error(self, table_name, key, error_message, error_stack=None):
        """
        Log an error message.  The job reservation is replaced with an error entry.
        if an error occurs, leave an entry describing the problem

        :param table_name: `database`.`table_name`
        :param key: the dict of the job's primary key
        :param error_message: string error message
        :param error_stack: stack trace
        """
        if len(error_message) > ERROR_MESSAGE_LENGTH:
            error_message = (
                error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)]
                + TRUNCATION_APPENDIX
            )
        with config(enable_python_native_blobs=True):
            self.insert1(
                dict(
                    table_name=table_name,
                    key_hash=key_hash(key),
                    status="error",
                    host=platform.node(),
                    pid=os.getpid(),
                    connection_id=self.connection.connection_id,
                    user=self._user,
                    key=key,
                    error_message=error_message,
                    error_stack=error_stack,
                ),
                replace=True,
                ignore_extra_fields=True,
            )

complete(table_name, key)

Log a completed job. When a job is completed, its reservation entry is deleted.

:param table_name: database.table_name :param key: the dict of the job's primary key

Source code in datajoint/jobs.py
90
91
92
93
94
95
96
97
98
def complete(self, table_name, key):
    """
    Log a completed job.  When a job is completed, its reservation entry is deleted.

    :param table_name: `database`.`table_name`
    :param key: the dict of the job's primary key
    """
    job_key = dict(table_name=table_name, key_hash=key_hash(key))
    (self & job_key).delete_quick()

delete()

bypass interactive prompts and dependencies

Source code in datajoint/jobs.py
56
57
58
def delete(self):
    """bypass interactive prompts and dependencies"""
    self.delete_quick()

drop()

bypass interactive prompts and dependencies

Source code in datajoint/jobs.py
60
61
62
def drop(self):
    """bypass interactive prompts and dependencies"""
    self.drop_quick()

error(table_name, key, error_message, error_stack=None)

Log an error message. The job reservation is replaced with an error entry. if an error occurs, leave an entry describing the problem

:param table_name: database.table_name :param key: the dict of the job's primary key :param error_message: string error message :param error_stack: stack trace

Source code in datajoint/jobs.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def error(self, table_name, key, error_message, error_stack=None):
    """
    Log an error message.  The job reservation is replaced with an error entry.
    if an error occurs, leave an entry describing the problem

    :param table_name: `database`.`table_name`
    :param key: the dict of the job's primary key
    :param error_message: string error message
    :param error_stack: stack trace
    """
    if len(error_message) > ERROR_MESSAGE_LENGTH:
        error_message = (
            error_message[: ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)]
            + TRUNCATION_APPENDIX
        )
    with config(enable_python_native_blobs=True):
        self.insert1(
            dict(
                table_name=table_name,
                key_hash=key_hash(key),
                status="error",
                host=platform.node(),
                pid=os.getpid(),
                connection_id=self.connection.connection_id,
                user=self._user,
                key=key,
                error_message=error_message,
                error_stack=error_stack,
            ),
            replace=True,
            ignore_extra_fields=True,
        )

reserve(table_name, key)

Reserve a job for computation. When a job is reserved, the job table contains an entry for the job key, identified by its hash. When jobs are completed, the entry is removed.

:param table_name: database.table_name :param key: the dict of the job's primary key :return: True if reserved job successfully. False = the jobs is already taken

Source code in datajoint/jobs.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def reserve(self, table_name, key):
    """
    Reserve a job for computation.  When a job is reserved, the job table contains an entry for the
    job key, identified by its hash. When jobs are completed, the entry is removed.

    :param table_name: `database`.`table_name`
    :param key: the dict of the job's primary key
    :return: True if reserved job successfully. False = the jobs is already taken
    """
    job = dict(
        table_name=table_name,
        key_hash=key_hash(key),
        status="reserved",
        host=platform.node(),
        pid=os.getpid(),
        connection_id=self.connection.connection_id,
        key=key,
        user=self._user,
    )
    try:
        with config(enable_python_native_blobs=True):
            self.insert1(job, ignore_extra_fields=True)
    except DuplicateError:
        return False
    return True