Package pilot :: Package filemanagement :: Module globusonline_adaptor

Source Code for Module pilot.filemanagement.globusonline_adaptor

  1  ''' 
  2  Globus Online based File Transfer 
  3  ''' 
  4  import urlparse 
  5  import pdb 
  6  import glob 
  7  import errno 
  8  import sys 
  9  import os 
 10  import stat 
 11  import logging 
 12  import traceback 
 13  import time 
 14   
 15  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) 
 16  #sys.path.append(os.path.join(os.path.dirname(__file__), "..")) 
 17  from pilot.api import State 
 18  from bigjob import logger 
 19   
 20  from globusonline.transfer import api_client 
 21   
 22  #from globusonline.transfer.api_client.get_go_cookie import get_go_auth 
 23  from globusonline.transfer.api_client.goauth import get_access_token 
 24   
25 -class GlobusOnlineFileAdaptor(object):
26 """ BigData Coordination File Management for Pilot Data """ 27 28 URL_PREFIX="go://" 29 30
31 - def __init__(self, service_url):
32 self.service_url = service_url 33 result = urlparse.urlparse(service_url) 34 self.host = result.netloc 35 self.query = result.path 36 self.ep = self.__get_ep(self.query) 37 self.path = self.__get_path(self.query) 38 self.user = result.username 39 self.password = result.password 40 41 #result = get_go_auth(ca_certs=None, username=self.user, password=self.password) 42 result = get_access_token(ca_certs=None, username=self.user, password=self.password) 43 44 #saml_cookie = result.cookie 45 saml_cookie = result.token 46 47 self.api = api_client.TransferAPIClient(username=self.user, 48 goauth=saml_cookie 49 ) 50 status_code, status_message, data = self.api.task_list() 51 52 # initialize ssh client 53 self.__state=State.New
54 55
56 - def get_security_context(self):
57 """ Returns security context that needs to be available on the distributed 58 node in order to access this Pilot Data """ 59 return None
60 61
62 - def initialize_pilotdata(self):
63 # check whether directory exists 64 try: 65 self.api.endpoint_mkdir(self.ep, self.path) 66 except: 67 pass 68 self.__state=State.Running
69 70
71 - def get_pilotdata_size(self):
72 # check size 73 return None
74 75
76 - def delete_pilotdata(self):
77 self.api.endpoint_delete(self.ep, self.path) 78 self.__state=State.Done
79 80
81 - def get_state(self):
82 if self.__client.get_transport().is_active()==True: 83 return self.__state 84 else: 85 self.__state=State.Failed 86 return self.__state
87
88 - def create_du(self, du_id):
89 du_dir = os.path.join(self.path, str(du_id)) 90 logger.debug("mkdir: " + du_dir) 91 self.api.endpoint_mkdir(self.ep, du_dir)
92 93
94 - def put_du(self, du):
95 logging.debug("Copy DU using Globus Online") 96 du_items = du.list() 97 for i in du_items.keys(): 98 local_filename=du_items[i]["local"] 99 remote_path = os.path.join(self.path, str(du.id), os.path.basename(local_filename)) 100 logging.debug("Put file: %s to %s"%(local_filename, remote_path)) 101 if local_filename.startswith("ssh://"): 102 # check if remote path is directory 103 if self.__is_remote_directory(local_filename): 104 logging.warning("Path %s is a directory. Ignored."%local_filename) 105 continue 106 result = urlparse.urlparse(local_filename) 107 source_host = result.netloc 108 source_path = result.path 109 logger.debug(str((source_host, source_path, self.host, remote_path))) 110 if source_host == "" or source_host==None: 111 cmd = "scp "+ source_path + " " + self.host + ":" + remote_path 112 else: 113 cmd = "scp "+ source_host+":"+source_path + " " + self.host + ":" + remote_path 114 logger.debug("Command: %s"%cmd) 115 os.system(cmd) 116 elif(local_filename.startswith("go://")): 117 self.__third_party_transfer_host(local_filename, self.service_url + "/" + str(du.id))
118 119 120
121 - def copy_du_to_url(self, du, local_url, remote_url):
122 base_dir = self.__get_path_for_du(du) 123 logger.debug("copy_du_to_url, source: %s remote: %s"%(base_dir, remote_url)) 124 if remote_url.startswith("/") and os.path.exists(base_dir): 125 target_path = remote_url 126 source_path = base_dir 127 logger.debug("Target and source host are localhost. Processing: %s" %(source_path)) 128 expanded_path = glob.glob(source_path + "/*") 129 logger.debug("Expanded path: " + str(expanded_path)) 130 for path in expanded_path: 131 if os.path.isdir(path): 132 logger.debug("Source path %s is directory"%path) 133 files = os.listdir(path) 134 for i in files: 135 try: 136 os.symlink(os.path.join(files, i), target_path) 137 except: 138 self.__print_traceback() 139 else: 140 try: 141 os.symlink(path, os.path.join(target_path, os.path.basename(path))) 142 except: 143 self.__print_traceback() 144 else: 145 self.create_remote_directory(remote_url) 146 for filename in self.__sftp.listdir(base_dir): 147 file_url = local_url + "/" + filename 148 file_remote_url = remote_url + "/" + filename 149 logger.debug("Copy " + file_url + " to " + file_remote_url) 150 self.__third_party_transfer_host(file_url, file_remote_url)
151 152 153
154 - def copy_du(self, du, pd_new):
155 remote_url = pd_new.service_url + "/" + str(du.id) 156 local_url = self.service_url + "/" + str(du.id) 157 self.copy_du_to_url(du, local_url, remote_url)
158 159
160 - def get_du(self, du, target_url):
161 remote_url = target_url 162 local_url = self.service_url + "/" + str(du.id) 163 self.copy_du_to_url(du, local_url, remote_url)
164 165
166 - def remove_du(self, du):
167 self.__remove_directory(os.path.join(self.path, du.id))
168 169 170 ########################################################################### 171 # Pure File Management APIs 172
173 - def transfer(self, source_url, target_url):
174 self.__third_party_transfer_host(source_url, target_url)
175 176
177 - def create_remote_directory(self, target_url):
178 if not self.__is_remote_directory(target_url): 179 result = urlparse.urlparse(target_url) 180 target_query = result.path 181 target_ep = self.__get_ep(target_query) 182 target_path = self.__get_path(target_query) 183 result = self.api.endpoint_mkdir(target_ep, target_path) 184 logger.debug("GO EP: %s Directory: %s Creation Result: %s"%(target_ep, target_path, str(result))) 185 #task_id = result[2]["task_id"] 186 #logger.debug("Transfer Request Result: %s Task ID: %s"%(str(result), task_id)) 187 #self.__wait_for_task(task_id) 188 return True 189 return True
190 191
192 - def get_path(self, target_url):
193 result = urlparse.urlparse(target_url) 194 target_query = result.path 195 target_path = self.__get_path(target_query) 196 return target_path
197 198 ########################################################################### 199 # Private support methods
200 - def __get_path_for_du(self, du):
201 return os.path.join(self.path, str(du.id))
202 203
204 - def __remove_directory(self, path):
205 """Remove remote directory that may contain files. 206 """ 207 if self.__exists(path): 208 for filename in self.__sftp.listdir(path): 209 filepath = os.path.join(path, filename) 210 logging.debug("Delete %s"%filepath) 211 if stat.S_ISDIR(self.__sftp.stat(filepath).st_mode): 212 [self.__remove_directory(filepath)] 213 else: 214 self.__sftp.remove(filepath) 215 self.__sftp.rmdir(path)
216 217
218 - def __is_remote_directory(self, url):
219 try: 220 result = urlparse.urlparse(url) 221 target_query = result.path 222 target_ep = self.__get_ep(target_query) 223 target_path = self.__get_path(target_query) 224 result = self.api.endpoint_ls(target_ep, target_path) 225 logger.debug("GO EP: %s Directory: %s Creation Result: %s"%(target_ep, target_path, str(result))) 226 return True 227 except: 228 pass 229 return False
230
231 - def __create_sftp_client(self):
232 ssh_client = paramiko.SSHClient() 233 ssh_client.load_system_host_keys() 234 ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 235 ssh_client.connect(self.host) 236 sftp_client = ssh_client.open_sftp() 237 sftp_client.chdir(self.path) 238 return ssh_client, sftp_client
239 240
241 - def __third_party_transfer_host(self, source_url, target_url):
242 """ 243 Transfers from source URL to machine to target_url 244 """ 245 transfer_start = time.time() 246 result = urlparse.urlparse(source_url) 247 source_query = result.path 248 source_ep = self.__get_ep(source_query) 249 source_path = self.__get_path(source_query) 250 251 result = urlparse.urlparse(target_url) 252 target_query = result.path 253 target_ep = self.__get_ep(target_query) 254 target_path = self.__get_path(target_query) 255 256 257 target_path = os.path.join(target_path, os.path.basename(source_path)) 258 logger.debug("transfer from %s:%s to %s:%s"%(source_ep, source_path, target_ep, target_path)) 259 260 if os.path.exists(os.path.dirname(source_path)) and os.path.exists(target_path): 261 logger.debug("Target and source host are localhost. Processing: %s" %(source_path)) 262 expanded_path = glob.glob(source_path) 263 logger.debug("Expanded path: " + str(expanded_path)) 264 for path in expanded_path: 265 if os.path.isdir(path): 266 logger.debug("Source path %s is directory"%path) 267 files = os.listdir(path) 268 for i in files: 269 try: 270 os.symlink(os.path.join(files, i), target_path) 271 except: 272 self.__print_traceback() 273 else: 274 try: 275 os.symlink(path, os.path.join(target_path, os.path.basename(path))) 276 except: 277 self.__print_traceback() 278 279 transfer_id = self.api.submission_id()[2]["value"] 280 logger.debug("Transfer ID: %s"%transfer_id) 281 transfer = api_client.Transfer(transfer_id, source_ep, target_ep, 282 deadline=None, sync_level=None, label=None) 283 transfer.add_item(source_path=source_path, destination_path=target_path, recursive=False ) 284 result = self.api.transfer(transfer) 285 task_id = result[2]["task_id"] 286 logger.debug("Transfer Request Result: %s Task ID: %s"%(str(result), task_id)) 287 self.__wait_for_task(task_id) 288 logger.debug("Task ID: %s Time: %d sec"%(transfer_id, (time.time()-transfer_start)))
289 290
291 - def __get_ep(self, query_string):
292 if query_string.startswith("?"): 293 query_string = query_string[1:] 294 comp = query_string.split("&") 295 for i in comp: 296 part = i.split("=") 297 if part[0]=="ep": 298 return part[1] 299
300 - def __get_path(self, query_string):
301 if query_string.startswith("?"): 302 query_string = query_string[1:] 303 comp = query_string.split("&") 304 for i in comp: 305 part = i.split("=") 306 if part[0]=="path": 307 return part[1]
308
309 - def __wait_for_task(self, task_id, timeout=None):
310 status = "ACTIVE" 311 while (timeout==None or timeout > 0) and status == "ACTIVE": 312 code, reason, data = self.api.task(task_id, fields="status") 313 status = data["status"] 314 time.sleep(1) 315 if timeout!=None: 316 timeout -= 1 317 318 if status != "ACTIVE": 319 print "Task %s complete!" % task_id 320 return True 321 else: 322 print "Task still not complete after %d seconds" % timeout 323 return False
324 325
326 - def __exists(self, path):
327 """Return True if the remote path exists 328 """ 329 try: 330 self.__sftp.stat(path) 331 except IOError, e: 332 if e.errno == errno.ENOENT: 333 return False 334 raise 335 else: 336 return True
337 338
339 - def __print_traceback(self):
340 exc_type, exc_value, exc_traceback = sys.exc_info() 341 print "*** print_tb:" 342 traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) 343 print "*** print_exception:" 344 traceback.print_exception(exc_type, exc_value, exc_traceback, 345 limit=2, file=sys.stdout)
346 347 348 if __name__ == "__main__": 349 go = GlobusOnlineFileAdaptor("http://drelu:bigjob@cli.globusonline.org?ep=drelu#egi&path=/ho") 350 go.transfer("go://cli.globusonline.org?ep=drelu#MacBook&path=/~/cert.tar.gz", "go://cli.globusonline.org?ep=xsede#kraken&path=/~/") 351