Skip to Content

Contributors

queue_job commit

Hello,

I would like to commit fix for queue_job but am not able to do that with git. Is it enough if I send it to you in email so I don't break something in the repo itself? The description and code itself is below.
Thanks!

queue_job: split autovacuum execution

Sometimes autovacuum method reach thread max execution time and fail (for us it is 120 seconds). Therefore stop its execution if takes long time and run it again as a new job.

@@ -282,13 +282,19 @@ class QueueJob(models.Model):
         """Delete all jobs done based on the removal interval defined on the
            channel
 
         Called from a cron.
         """
+        start = datetime.now()
+        time_exceeded = False
         for channel in self.env["queue.job.channel"].search([]):
             deadline = datetime.now() - timedelta(days=int(channel.removal_interval))
             while True:
+                if (datetime.now() - start).total_seconds() > 60:
+                    # we don't want to have thread max execution time exceeded exception
+                    time_exceeded = True
+                    break
                 jobs = self.search(
                     [
                         ("date_done", "<=", deadline),
                         ("channel", "=", channel.complete_name),
                     ],
@@ -296,10 +302,15 @@ class QueueJob(models.Model):
                 )
                 if jobs:
                     jobs.unlink()
                 else:
                     break
+            if time_exceeded:
+                break
+        if time_exceeded:
+            # there is probably still something to cleanup so run it again
+            self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs").with_delay().method_direct_trigger()
         return True
 
     def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0):
         """Fix jobs that are in a bad states
 

Code of the autovacuum method:

    def autovacuum(self):
        """Delete all jobs done based on the removal interval defined on the
           channel

        Called from a cron.
        """
        start = datetime.now()
        time_exceeded = False
        for channel in self.env["queue.job.channel"].search([]):
            deadline = datetime.now() - timedelta(days=int(channel.removal_interval))
            while True:
                if (datetime.now() - start).total_seconds() > 60:
                    # we don't want to have thread max execution time exceeded exception
                    time_exceeded = True
                    break
                jobs = self.search(
                    [
                        ("date_done", "<=", deadline),
                        ("channel", "=", channel.complete_name),
                    ],
                    limit=1000,
                )
                if jobs:
                    jobs.unlink()
                else:
                    break
            if time_exceeded:
                break
        if time_exceeded:
            # there is probably still something to cleanup so run it again
            self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs").with_delay().method_direct_trigger()
        return True

Kind regards,
Martin

image.png

by "Martin Fraňo" <waky007@gmail.com> - 10:06 - 9 Jun 2022

Follow-Ups

  • Re: queue_job commit
    Hi,

    There is already a pending PR there : https://github.com/OCA/queue/pull/417

    You can maybe review it ?

    Thanks

    On Thu, Jun 9, 2022 at 10:07 AM Martin Fraňo <waky007@gmail.com> wrote:
    Hello,

    I would like to commit fix for queue_job but am not able to do that with git. Is it enough if I send it to you in email so I don't break something in the repo itself? The description and code itself is below.
    Thanks!

    queue_job: split autovacuum execution

    Sometimes autovacuum method reach thread max execution time and fail (for us it is 120 seconds). Therefore stop its execution if takes long time and run it again as a new job.

    @@ -282,13 +282,19 @@ class QueueJob(models.Model):
             """Delete all jobs done based on the removal interval defined on the
                channel
     
             Called from a cron.
             """
    +        start = datetime.now()
    +        time_exceeded = False
             for channel in self.env["queue.job.channel"].search([]):
                 deadline = datetime.now() - timedelta(days=int(channel.removal_interval))
                 while True:
    +                if (datetime.now() - start).total_seconds() > 60:
    +                    # we don't want to have thread max execution time exceeded exception
    +                    time_exceeded = True
    +                    break
                     jobs = self.search(
                         [
                             ("date_done", "<=", deadline),
                             ("channel", "=", channel.complete_name),
                         ],
    @@ -296,10 +302,15 @@ class QueueJob(models.Model):
                     )
                     if jobs:
                         jobs.unlink()
                     else:
                         break
    +            if time_exceeded:
    +                break
    +        if time_exceeded:
    +            # there is probably still something to cleanup so run it again
    +            self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs").with_delay().method_direct_trigger()
             return True
     
         def requeue_stuck_jobs(self, enqueued_delta=5, started_delta=0):
             """Fix jobs that are in a bad states
     

    Code of the autovacuum method:

        def autovacuum(self):
            """Delete all jobs done based on the removal interval defined on the
               channel

            Called from a cron.
            """
            start = datetime.now()
            time_exceeded = False
            for channel in self.env["queue.job.channel"].search([]):
                deadline = datetime.now() - timedelta(days=int(channel.removal_interval))
                while True:
                    if (datetime.now() - start).total_seconds() > 60:
                        # we don't want to have thread max execution time exceeded exception
                        time_exceeded = True
                        break
                    jobs = self.search(
                        [
                            ("date_done", "<=", deadline),
                            ("channel", "=", channel.complete_name),
                        ],
                        limit=1000,
                    )
                    if jobs:
                        jobs.unlink()
                    else:
                        break
                if time_exceeded:
                    break
            if time_exceeded:
                # there is probably still something to cleanup so run it again
                self.env.ref("queue_job.ir_cron_autovacuum_queue_jobs").with_delay().method_direct_trigger()
            return True

    Kind regards,
    Martin

    image.png

    _______________________________________________
    Mailing-List: https://odoo-community.org/groups/contributors-15
    Post to: mailto:contributors@odoo-community.org
    Unsubscribe: https://odoo-community.org/groups?unsubscribe



    --

    Denis Roussel
    Software Engineer
    T    : +32 2 888 31 49
    M : +32 472 22 00 57


    Val Benoit, Quai Banning 6 | B-4000 Liège | Belgium
    Atrium Building, Drève Richelle 167 | B-1410 Waterloo | Belgium
    Zone industrielle 22 | L-8287 Kehlen | Luxembourg

    by Denis Roussel - 10:31 - 9 Jun 2022