from __future__ import absolute_import
import glob
import signal
import numpy
import codecs
import platform
import subprocess
import tempfile
import uuid
import os
import json
import shutil
import psutil as psutil
import requests
import time
import hashlib
from dplus.FileReaders import _handle_infinity_for_json, NumpyHandlingEncoder
from .CalculationResult import GenerateResult, FitResult
# from .Fit import Fitter
import math
[docs]class JobRunningException(Exception):
def __init__(self):
self.value = "there is a job already running in this session"
def __str__(self):
return repr(self.value)
_is_windows = platform.system() == 'Windows'
[docs]class Runner(object):
"""abstract class that presents the interface for dplus calls."""
[docs] def generate(self, calc_data):
'''
run sync dplus generate.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a GenerateResult class
'''
raise NotImplemented
[docs] def generate_async(self, calc_data):
'''
run async dplus generate.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a RunningJob class
'''
raise NotImplemented
[docs] def fit(self, calc_data):
'''
run sync dplus fit.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a FitResult class
'''
raise NotImplemented
[docs] def fit_async(self, calc_data):
'''
run async dplus fit.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a RunningJob class
'''
raise NotImplemented
[docs]class RunningJob(object):
"""abstract class for working with dplus jobs."""
def __init__(self, calculation_type):
self._calculation_type = calculation_type
def _start(self, calc_data):
'''
:param calc_data:
:return:
'''
raise NotImplemented
[docs] def get_status(self):
'''
The function get_status returns a json dictionary reporting the job's current status.
:rtype: a json dictionary.
'''
raise NotImplemented
def _get_result(self):
'''
:return:
'''
raise NotImplemented
[docs] def abort(self):
'''
ends the current job.
'''
raise NotImplemented
def _get_pdb(self, model_ptr, destination_folder=None):
'''
:param model_ptr:
:return:
'''
raise NotImplemented
def _get_amp(self, model_ptr, destination_folder=None):
'''
:param model_ptr:
:return:
'''
raise NotImplemented
[docs] def get_result(self, calc_data):
"""
The method get_results requires a copy of the CalculationInput used to create the job. /
Should only be called when job is completed. It is the user's responsibility to verify job completion with get_status before calling.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a GenerateResult or FitResult class"""
if self._calculation_type == "generate":
return GenerateResult(calc_data, self._get_result(), self)
else:
return FitResult(calc_data, self._get_result(), self)
[docs]def as_job(job_dict):
'''
this is the function used as an object hook for json deserialization of a dictionary to a job object.
all changes made to __init__ of RunningJob need to be reflected here.
'''
job = LocalRunner.RunningJob(job_dict["session_directory"], job_dict["pid"])
return job
[docs]class LocalRunner(Runner):
"""The LocalRunner is intended for users who have the D+ executable files installed on their system. It takes two optional initialization arguments:
* `exe_directory` is the folder location of the D+ executables. By default, its value is None- on Windows, \
this will lead to the python interface searching the registry for an installed D+ on its own, \
but on linux the executable directory *must* be specified.
* `session_directory` is the folder where the arguments for the calculation are stored, as well as the output results,\
amplitude files, and pdb files, from the c++ executable. By default, its value is None, and an automatically generated temporary folder will be used."""
[docs] class RunningJob(RunningJob):
"""implementation of RunningJob for localRunner class"""
def __init__(self, session_directory, pid=-1, calculation_type="generate"):
super().__init__(calculation_type)
self.session_directory = os.path.abspath(session_directory)
self.pid = pid
def _check_running(self):
try:
if self.pid != -1:
process = psutil.Process(self.pid)
if process.status() == psutil.STATUS_ZOMBIE:
process.wait()
else:
raise JobRunningException
except psutil.NoSuchProcess:
pass
def _clean_folder(self):
filename = os.path.join(self.session_directory, "data.json")
if os.path.exists(filename):
os.remove(filename)
def _start(self, calc_data):
# 1: check that no job is already running in this session folder
self._check_running()
# clean out the session folder
self._clean_folder()
# save the calc_data as args
filename = os.path.join(self.session_directory, "args.json")
with open(filename, 'w') as outfile:
json.dump(_handle_infinity_for_json(calc_data.args), outfile, cls=NumpyHandlingEncoder)
# save an initial job status file:
filename = os.path.join(self.session_directory, "job.json")
jobstat = {"isRunning": True, "progress": 0.0, "code": 0}
with open(filename, 'w') as outfile:
json.dump(jobstat, outfile, cls=NumpyHandlingEncoder)
[docs] def get_status(self):
'''
The function get_status returns a json dictionary reporting the job's current status.
:rtype: a json dictionary.
'''
filename = os.path.join(self.session_directory, "job.json")
for x in range(4): # try 3 times
try:
with codecs.open(filename, 'r', encoding='utf8') as f:
result = f.read()
assert result
break
except (AssertionError, FileNotFoundError, BlockingIOError) as e:
if x == 4:
return {"error": {"code": "22", "message": "failed to read job status"}}
time.sleep(0.1)
try:
result = json.loads(result)
return result
except json.JSONDecodeError:
return {"error": {"code": "22", "message": "json error in job status"}}
def _get_file_status(self):
filename = os.path.join(self.session_directory, "notrunning")
with open(filename, 'r') as f:
status = f.readline()
return status == "True"
def _get_result(self):
filename = os.path.join(self.session_directory, "data.json")
with codecs.open(filename, 'r', encoding='utf8') as f:
result = json.load(f)
if type(result) is dict:
if 'error' in result.keys():
error_message_dict = result['error']
raise Exception(error_message_dict['message'])
return result
[docs] def abort(self):
'''
The function abort ends the current localRunner job.
'''
try:
os.kill(self.pid, signal.SIGTERM)
finally:
filename = os.path.join(self.session_directory, "isrunning")
os.remove(filename)
def _get_pdb(self, model_ptr, destination_folder=None):
'''
:param model_ptr:
:return:
'''
ptr_string = '%08d.pdb' % (int(model_ptr))
file_location = os.path.join(self.session_directory, 'pdb', ptr_string)
if os.path.isfile(file_location):
if destination_folder:
shutil.copy2(file_location, destination_folder)
return os.path.join(destination_folder, ptr_string)
return file_location
raise FileNotFoundError
def _get_amp(self, model_ptr, destination_folder=None):
'''
:param model_ptr:
:return:
'''
ptr_string = '%08d.amp' % (int(model_ptr))
file_location = os.path.join(self.session_directory, 'cache', ptr_string)
if os.path.isfile(file_location):
if destination_folder:
shutil.copy2(file_location, destination_folder)
return os.path.join(destination_folder, ptr_string)
return file_location
raise FileNotFoundError
def __init__(self, exe_directory=None, session_directory=None):
"""initialization function, """
if not exe_directory:
try:
exe_directory = self._find_dplus()
except Exception as e:
raise ValueError("Can't locate D+, please specify the D+ backend directory manually", e)
if not session_directory:
session_directory = tempfile.mkdtemp()
self._exe_directory = os.path.abspath(exe_directory)
self._check_exe_directory()
self._session_directory = os.path.abspath(session_directory)
self._make_file_directories(session_directory)
@property
def session_directory(self):
return self._session_directory
@session_directory.setter
def session_directory(self, session_dir):
self._session_directory = os.path.abspath(session_dir)
self._make_file_directories(session_dir)
def _find_dplus(self):
if not _is_windows:
raise ValueError("Can't find D+ on non Windows machine")
import winreg
RawKey = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\HUJI")
d_plus = ""
try:
i = 0
while 1: # will fail when there are no more keys in "SOFTWARE\HUJI"
tmp_name = winreg.EnumKey(RawKey, i)
if "D+" in tmp_name:
d_plus = tmp_name
i += 1
except WindowsError:
pass
if d_plus == "":
raise Exception("D+ does not exists")
dplus_key = os.path.join("SOFTWARE\HUJI", d_plus)
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, dplus_key) as key:
exe_dir = winreg.QueryValueEx(key, "binDir")[0]
return exe_dir
def _get_program_path(self, calculation_type):
program_path = os.path.join(self._exe_directory, calculation_type)
if _is_windows:
program_path += ".exe"
return program_path
def _check_exe_directory(self):
if not os.path.isdir(self._exe_directory):
raise NotADirectoryError("%s is not a directory" % self._exe_directory)
programs = ['generate', 'fit', 'getallmetadata']
paths = [self._get_program_path(program) for program in programs]
valid = [os.path.isfile(path) for path in paths]
if not all(valid):
raise ValueError(
"The directory %s does not seem to contain the D+ backend executables" % self._exe_directory)
def _make_file_directories(self, directory):
os.makedirs(os.path.join(directory, 'pdb'), exist_ok=True)
os.makedirs(os.path.join(directory, 'amp'), exist_ok=True)
os.makedirs(os.path.join(directory, 'cache'), exist_ok=True)
[docs] def generate(self, calc_data):
"""
The method generate waits until dplus has returned a result.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a CalculationResult class"""
job = self._run(calc_data)
calc_result = GenerateResult(calc_data, job._get_result(), job)
return calc_result
[docs] def generate_async(self, calc_data):
"""
The method generate_async allows dplus calculations to be run in the background.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a RunningJob class"""
job = self._run(calc_data, async=True)
return job
[docs] def fit(self, calc_data):
"""
The method fit waits until dplus has returned a result.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a CalculationResult class"""
job = self._run(calc_data, calculation_type="fit")
calc_result = FitResult(calc_data, job._get_result(), job)
return calc_result
[docs] def fit_async(self, calc_data):
"""
The method fit_async allows dplus calculations to be run in the background.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a RunningJob class"""
job = self._run(calc_data, calculation_type="fit", async=True)
return job
def _run(self, calc_data, calculation_type="generate", async=False):
# 1 start the job
job = LocalRunner.RunningJob(self._session_directory, calculation_type=calculation_type)
job._start(calc_data)
# 2 call the executable process
process = self._call_exe(calculation_type)
job.pid = process.pid
# 3 return, based on async
if async:
return job
process.communicate()
return job
def _call_exe(self, calculation_type):
err_file = open(os.path.join(self._session_directory, "error.txt"), 'w')
out_file = open(os.path.join(self._session_directory, "output.txt"), 'w')
program_path = self._get_program_path(calculation_type)
if not os.path.isfile(program_path):
raise FileNotFoundError
process = subprocess.Popen([program_path, self._session_directory], cwd=self._exe_directory, stdout=out_file,
stderr=err_file)
return process
[docs] @staticmethod
def get_running_job(session):
return LocalRunner.RunningJob(session)
[docs]class WebRunner(Runner):
"""The WebRunner is intended for users accessing the D+ server. It takes two required initialization arguments, with no default values:
* `url` is the address of the server.
* `token` is the authentication token granting access to the server.
"""
[docs] class RunningJob(RunningJob):
"""implementation of RunningJob for WebRunner class"""
def __init__(self, base_url, token, session, calculation_type="generate"):
super().__init__(calculation_type)
self._url = base_url
self._header = {'Authorization': "Token " + str(token)}
self._session_string = "?ID=" + session
def _start(self, calc_data):
# check if files
self._check_files(calc_data)
[docs] def get_status(self):
'''
The function get_status returns a json dictionary reporting the job's current status.
:rtype: a json dictionary.
'''
jobstatus = requests.get(url=self._url + 'job' + self._session_string, headers=self._header)
if jobstatus.status_code == 200:
result = json.loads(jobstatus.text)
return result
raise Exception("Error getting job status:" + jobstatus.reason)
def _get_result(self, calculation_type="generate"):
response = requests.get(url=self._url + calculation_type + self._session_string, headers=self._header)
result = json.loads(response.text)
if type(result) is dict:
# web always returns an error field, so need to check if that field is OK or an error
if result["error"]["code"] != 0:
error_message_dict = result['error']
raise Exception(error_message_dict['message'])
return result
def _check_files(self, calc_data):
def calc_file_hash(filepath): # taken from stackexchange 1869885/calculating-sha1-of-a-file
sha = hashlib.sha1()
with open(filepath, 'rb') as f:
while True:
block = f.read(2 ** 10) # Magic number: one-megabyte blocks.
if not block: break
sha.update(block)
return sha.hexdigest().upper()
filedict = {'files': []}
for filename in calc_data._filenames:
dict = {}
dict['filename'] = filename
dict['size'] = os.stat(filename).st_size
dict['hash'] = calc_file_hash(filename)
filedict['files'].append(dict)
data = json.dumps(filedict)
response = requests.post(url=self._url + 'files', data=data, headers=self._header)
if response.status_code != 200:
raise Exception("Error checking file status" + response.reason)
statuses = json.loads(response.text)
files_missing = []
for filename in calc_data._filenames:
if statuses[filename]['status'] == 'MISSING':
files_missing.append((filename, statuses[filename]['id']))
self._upload_files(files_missing)
def _upload_files(self, filenames):
for filename, id in filenames:
url = self._url + 'files/' + str(id)
files = {'file': open(filename, 'rb')}
response = requests.post(url=url, headers=self._header, files=files)
if response.status_code != 200:
raise Exception("Error uploading files" + response.reason)
def _get_pdb(self, model_ptr, destination_folder=None):
'''
:param model_ptr:
:return: file address
'''
if not destination_folder:
destination_folder = tempfile.mkdtemp()
ptr_string = '%08d.pdb' % (int(model_ptr))
destination_file = os.path.join(destination_folder, ptr_string)
# code used from: https://stackoverflow.com/a/16696317/5961793
response = requests.get(url=self._url + "pdb/" + str(model_ptr) + self._session_string,
headers=self._header, stream=True)
if response.status_code != 200:
raise Exception("Error downloading pdb" + response.reason)
with open(destination_file, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
return destination_file
def _get_amp(self, model_ptr, destination_folder=None):
'''
:param model_ptr:
:return: file address
'''
if not destination_folder:
destination_folder = tempfile.mkdtemp()
ptr_string = '%08d.amp' % (int(model_ptr))
destination_file = os.path.join(destination_folder, ptr_string)
# code used from: https://stackoverflow.com/questions/13137817/how-to-download-image-using-requests
response = requests.get(url=self._url + "amplitude/" + str(model_ptr) + self._session_string,
headers=self._header, stream=True)
if response.status_code != 200:
raise Exception("Error downloading amp" + response.reason)
with open(destination_file, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
return destination_file
def __init__(self, base_url, token):
self._url = base_url + r'api/v1/'
self._header = {'Authorization': "Token " + str(token)}
self._session = str(uuid.uuid4())
self._session_string = "?ID=" + self._session
self._token = token
[docs] def generate(self, calc_data):
"""
The method generate waits until dplus has returned a result.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a CalculationResult class"""
job = self._run(calc_data)
result = job._get_result(calculation_type="generate")
calc_result = GenerateResult(calc_data, result['result'], job)
return calc_result
[docs] def generate_async(self, calc_data):
"""
The method generate_async allows dplus calculations to be run in the background.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a RunningJob class"""
return self._run(calc_data, async=True)
[docs] def fit(self, calc_data):
"""
The method fit waits until dplus has returned a result.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a CalculationResult class"""
job = self._run(calc_data, calculation_type="fit")
result = job._get_result(calculation_type="fit")
calc_result = FitResult(calc_data, result['result'], job)
return calc_result
[docs] def fit_async(self, calc_data):
"""
The method fit_async allows dplus calculations to be run in the background.
:param calc_data: an instance of a CalculationInput class
:rtype: an instance of a RunningJob class"""
return self._run(calc_data, calculation_type="fit", async=True)
def _run(self, calc_data, calculation_type="generate", async=False):
# 1: start the job (checks files)
job = WebRunner.RunningJob(self._url, self._token, self._session, calculation_type=calculation_type)
job._start(calc_data)
# 2: make the necessary call
data = json.dumps(calc_data.args['args'])
test = requests.put(url=self._url + calculation_type + self._session_string, data=data, headers=self._header)
# 3 return depending on sync or async
if async:
return job
while True:
res = job.get_status()
if res['result']['isRunning'] == False:
break
time.sleep(1)
return job
[docs] @staticmethod
def get_running_job(url, token, session):
return WebRunner.RunningJob(url, token, session)