Implementation of Job Monitoring
In order to implement real-time "streaming" of Standard Out from jobs running through Pulsar in Galaxy, there were several changes that needed to be made both in Galaxy and Pulsar. The general idea was to send the standard out text file to Galaxy using an API endpoint that populates job files. Then in Galaxy, when requesting job status, the file is read and sent alongside the rest of the job status. In the frontend Galaxy client, changes were made to request this status, standard out included, render it, and automatically scroll as the standard out grows. The following sections will explain a bit more about the actual changes.
Pulsar
The bulk of the changes in Pulsar are in pulsar/managers/stateful.py
. Here, when a job is kicked off, alongside a thread that checks the status every so often, another thread is ran that periodically sends the standard out to Galaxy. The thread is started in get_status()
when the status is changed to "to_running". The stdout_update()
method starts the thread and contains the code that calls the post_remote_output()
, _prepare_file_output()
, and _post_file()
methods, which do the work of actually sending the standard out store in Pulsar's job directory to the Galaxy file endpoint. In order to not resend standard out that has already been sent, there is a map that stores the standard out file position for that job. It is instantiated in the _handling_of_preprocessing_state()
method. When the standard out is read, it moves the file pointer to the end of the current standard out file. When new standard out is added to the file, the pointer will be in position to read that new standard out. When the job is finished in __handle_postprocessing()
we make sure to end any standard out that has not been sent yet. Because Pulsar originally defaults to sending the standard out in a rabbit mq status message when the job has finished, we also removed the standard out from this message which is created in pulsar/manager_endpoint_util.py
. In order to check if standard out has already been sent or not, is_live_stdout_update()
was added to the ManagerProxy class located in pulsar/managers/__init__.py
. This was then overridden in stateful.py
that was mentioned earlier.
All of this functionality is only enabled through a couple of added configuration options. Without them, Pulsar acts exactly like it did before the changes. The config options are send_stdout_update: [boolean]
and stdout_update_interval: [int]
. The first option enables the live standard out reporting with a value of true. The second option determines how long the update intervals are in seconds.
Galaxy
The changes in Galaxy are more spread out, but are broadly grouped into two categories: API changes, and client changes.
The API changes mostly just change the GET /api/jobs/{job_id} endpoint. Two optional query parameters are added, stdout_start_pos
and stdout_count
. The former specifies what character index to start reading the standard out from. The latter, how much standard out to read (characters). These are added in both lib/galaxy/webapps/galaxy/api/jobs.py
and lib/galaxy/webapps/galaxy/services/jobs.py
in the show()
methods for each file. These methods eventually end up calling get_accessible_job()
which is located in lib/galaxy/managers/jobs.py
. The parameters are passed through to this method, and here the standard out file is read from the current job working directory and sent as part of the return value (an instance of model.Job).
The one other backend change here is that in lib/galaxy/jobs/runners/pulsar.py
in the finish_job()
method the standard out file is also read here (from the same location that get_accessible_job() uses) and added to the job here. The reason this is necessary is because we are no longer sending the standard output from Pulsar as a part of the complete status message, there is no longer any standard out being read here by default. This change allows us to populate the standard out at the conclusion of the job (there are still checks in place to preserve previous functionality as well).
The frontend changes at a broad level are basically changing the current code to call the newly modified API endpoint correctly in order to retrieve the standard out, making the standard out window scrollable, and implementing an auto-scroll. The scrolling changes are in client/src/components/JobInformation/CodeRow.vue
and contain some css and a few lines of JavsScript (mostly just setting scrollTop to scrollHeight if the user is scrolled all the way down). In client/src/components/JobInformation/JobInformation.vue
and client/src/components/providers/JobProvider.js
the jobDetails async function is modified to include the new stdout_start_pos and and stdout_count parameters in the call to the endpoint discussed above. Then the standard out text is now stored in memory instead of calling for the entire standard out and rendering it every call.
Conclusion
These are the broad changes made in the development of this live standard out feature. The code itself should hopefully be pretty easy to follow, and no other files or features besides those listed here should be affected by these changes.