1  ''' 
  2  Globus Online based File Transfer 
  3  ''' 
  4  import urlparse 
  5  import pdb 
  6  import glob 
  7  import errno 
  8  import sys 
  9  import os 
 10  import stat 
 11  import logging 
 12  import traceback 
 13  import time 
 14   
 15  sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../..")) 
 16   
 17  from pilot.api import State 
 18  from bigjob import logger 
 19   
 20  from globusonline.transfer import api_client 
 21   
 22   
 23  from globusonline.transfer.api_client.goauth import get_access_token 
 24   
 26      """ BigData Coordination File Management for Pilot Data """ 
 27       
 28      URL_PREFIX="go://" 
 29       
 30       
 32          self.service_url = service_url 
 33          result = urlparse.urlparse(service_url) 
 34          self.host = result.netloc 
 35          self.query = result.path       
 36          self.ep = self.__get_ep(self.query) 
 37          self.path = self.__get_path(self.query) 
 38          self.user = result.username   
 39          self.password = result.password 
 40           
 41           
 42          result = get_access_token(ca_certs=None, username=self.user, password=self.password) 
 43           
 44           
 45          saml_cookie = result.token 
 46           
 47          self.api = api_client.TransferAPIClient(username=self.user, 
 48                                                  goauth=saml_cookie 
 49                                                  ) 
 50          status_code, status_message, data = self.api.task_list() 
 51           
 52           
 53          self.__state=State.New 
  54   
 55   
 57          """ Returns security context that needs to be available on the distributed 
 58              node in order to access this Pilot Data """ 
 59          return None 
  60       
 61           
 63           
 64          try: 
 65              self.api.endpoint_mkdir(self.ep, self.path)             
 66          except: 
 67              pass 
 68          self.__state=State.Running 
  69           
 70           
 74       
 75       
 77          self.api.endpoint_delete(self.ep, self.path) 
 78          self.__state=State.Done 
  79       
 80           
 82          if self.__client.get_transport().is_active()==True: 
 83              return self.__state 
 84          else: 
 85              self.__state=State.Failed 
 86              return self.__state             
  87               
 89          du_dir = os.path.join(self.path, str(du_id)) 
 90          logger.debug("mkdir: " + du_dir) 
 91          self.api.endpoint_mkdir(self.ep, du_dir) 
  92           
 93           
 95          logging.debug("Copy DU using Globus Online") 
 96          du_items = du.list() 
 97          for i in du_items.keys():   
 98              local_filename=du_items[i]["local"] 
 99              remote_path = os.path.join(self.path, str(du.id), os.path.basename(local_filename)) 
100              logging.debug("Put file: %s to %s"%(local_filename, remote_path))                         
101              if local_filename.startswith("ssh://"): 
102                   
103                  if self.__is_remote_directory(local_filename): 
104                      logging.warning("Path %s is a directory. Ignored."%local_filename)                 
105                      continue 
106                  result = urlparse.urlparse(local_filename) 
107                  source_host = result.netloc 
108                  source_path = result.path 
109                  logger.debug(str((source_host, source_path, self.host, remote_path))) 
110                  if source_host == "" or source_host==None: 
111                      cmd = "scp "+ source_path + " " + self.host + ":" + remote_path 
112                  else: 
113                      cmd = "scp "+ source_host+":"+source_path + " " + self.host + ":" + remote_path 
114                  logger.debug("Command: %s"%cmd) 
115                  os.system(cmd)                    
116              elif(local_filename.startswith("go://")): 
117                  self.__third_party_transfer_host(local_filename, self.service_url + "/" + str(du.id)) 
 118   
119                   
120       
122          base_dir = self.__get_path_for_du(du) 
123          logger.debug("copy_du_to_url, source: %s remote: %s"%(base_dir, remote_url)) 
124          if remote_url.startswith("/") and os.path.exists(base_dir): 
125              target_path = remote_url 
126              source_path = base_dir 
127              logger.debug("Target and source host are localhost. Processing: %s" %(source_path)) 
128              expanded_path = glob.glob(source_path + "/*") 
129              logger.debug("Expanded path: " + str(expanded_path)) 
130              for path in expanded_path: 
131                  if os.path.isdir(path): 
132                      logger.debug("Source path %s is directory"%path) 
133                      files = os.listdir(path) 
134                      for i in files: 
135                          try: 
136                              os.symlink(os.path.join(files, i), target_path) 
137                          except: 
138                              self.__print_traceback() 
139                  else: 
140                      try: 
141                          os.symlink(path, os.path.join(target_path, os.path.basename(path))) 
142                      except: 
143                          self.__print_traceback() 
144          else: 
145              self.create_remote_directory(remote_url)   
146              for filename in self.__sftp.listdir(base_dir): 
147                  file_url = local_url + "/" + filename 
148                  file_remote_url = remote_url + "/" + filename 
149                  logger.debug("Copy " + file_url + " to " + file_remote_url) 
150                  self.__third_party_transfer_host(file_url, file_remote_url) 
 151   
152           
153   
158           
159       
160 -    def get_du(self, du, target_url): 
 164           
165           
167          self.__remove_directory(os.path.join(self.path, du.id)) 
 168       
169       
170       
171       
172           
173 -    def transfer(self, source_url, target_url): 
 174          self.__third_party_transfer_host(source_url, target_url)     
 175       
176       
178          if not self.__is_remote_directory(target_url): 
179              result = urlparse.urlparse(target_url) 
180              target_query = result.path 
181              target_ep = self.__get_ep(target_query) 
182              target_path = self.__get_path(target_query) 
183              result = self.api.endpoint_mkdir(target_ep, target_path)       
184              logger.debug("GO EP: %s Directory: %s Creation Result: %s"%(target_ep, target_path, str(result)))       
185               
186               
187               
188              return True 
189          return True 
 190       
191       
193          result = urlparse.urlparse(target_url) 
194          target_query = result.path 
195          target_path = self.__get_path(target_query) 
196          return target_path 
 197           
198       
199       
201          return os.path.join(self.path, str(du.id)) 
 202       
203   
205          """Remove remote directory that may contain files.         
206          """ 
207          if self.__exists(path): 
208              for filename in self.__sftp.listdir(path): 
209                  filepath = os.path.join(path, filename) 
210                  logging.debug("Delete %s"%filepath) 
211                  if stat.S_ISDIR(self.__sftp.stat(filepath).st_mode): 
212                      [self.__remove_directory(filepath)] 
213                  else: 
214                      self.__sftp.remove(filepath) 
215              self.__sftp.rmdir(path) 
 216               
217           
219          try: 
220              result = urlparse.urlparse(url) 
221              target_query = result.path 
222              target_ep = self.__get_ep(target_query) 
223              target_path = self.__get_path(target_query) 
224              result = self.api.endpoint_ls(target_ep, target_path)       
225              logger.debug("GO EP: %s Directory: %s Creation Result: %s"%(target_ep, target_path, str(result))) 
226              return True   
227          except: 
228              pass        
229          return False 
 230           
232          ssh_client = paramiko.SSHClient() 
233          ssh_client.load_system_host_keys() 
234          ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 
235          ssh_client.connect(self.host) 
236          sftp_client = ssh_client.open_sftp() 
237          sftp_client.chdir(self.path) 
238          return ssh_client, sftp_client 
 239           
240           
242          """ 
243              Transfers from source URL to machine to target_url 
244          """ 
245          transfer_start = time.time() 
246          result = urlparse.urlparse(source_url) 
247          source_query = result.path 
248          source_ep = self.__get_ep(source_query) 
249          source_path = self.__get_path(source_query) 
250           
251          result = urlparse.urlparse(target_url) 
252          target_query = result.path 
253          target_ep = self.__get_ep(target_query) 
254          target_path = self.__get_path(target_query) 
255   
256   
257          target_path = os.path.join(target_path, os.path.basename(source_path)) 
258          logger.debug("transfer from %s:%s to %s:%s"%(source_ep, source_path, target_ep, target_path)) 
259   
260          if os.path.exists(os.path.dirname(source_path)) and os.path.exists(target_path): 
261              logger.debug("Target and source host are localhost. Processing: %s" %(source_path)) 
262              expanded_path = glob.glob(source_path) 
263              logger.debug("Expanded path: " + str(expanded_path)) 
264              for path in expanded_path: 
265                  if os.path.isdir(path): 
266                      logger.debug("Source path %s is directory"%path) 
267                      files = os.listdir(path) 
268                      for i in files: 
269                          try: 
270                              os.symlink(os.path.join(files, i), target_path) 
271                          except: 
272                              self.__print_traceback() 
273                  else: 
274                      try: 
275                          os.symlink(path, os.path.join(target_path, os.path.basename(path))) 
276                      except: 
277                          self.__print_traceback() 
278           
279          transfer_id = self.api.submission_id()[2]["value"]     
280          logger.debug("Transfer ID: %s"%transfer_id)     
281          transfer = api_client.Transfer(transfer_id, source_ep, target_ep, 
282                                         deadline=None, sync_level=None, label=None) 
283          transfer.add_item(source_path=source_path, destination_path=target_path, recursive=False )         
284          result = self.api.transfer(transfer) 
285          task_id = result[2]["task_id"] 
286          logger.debug("Transfer Request Result: %s Task ID: %s"%(str(result), task_id)) 
287          self.__wait_for_task(task_id) 
288          logger.debug("Task ID: %s Time: %d sec"%(transfer_id, (time.time()-transfer_start))) 
 289       
290       
292          if query_string.startswith("?"): 
293              query_string = query_string[1:]       
294          comp = query_string.split("&") 
295          for i in comp: 
296              part = i.split("=") 
297              if part[0]=="ep": 
298                  return part[1]                 
299       
301          if query_string.startswith("?"): 
302              query_string = query_string[1:]       
303          comp = query_string.split("&") 
304          for i in comp: 
305              part = i.split("=") 
306              if part[0]=="path": 
307                  return part[1]       
 308       
310          status = "ACTIVE" 
311          while (timeout==None or timeout > 0) and status == "ACTIVE": 
312              code, reason, data = self.api.task(task_id, fields="status") 
313              status = data["status"] 
314              time.sleep(1) 
315              if timeout!=None: 
316                  timeout -= 1 
317   
318          if status != "ACTIVE": 
319              print "Task %s complete!" % task_id 
320              return True 
321          else: 
322              print "Task still not complete after %d seconds" % timeout 
323              return False 
 324       
325       
327          """Return True if the remote path exists 
328          """ 
329          try: 
330              self.__sftp.stat(path) 
331          except IOError, e: 
332              if e.errno == errno.ENOENT: 
333                  return False 
334              raise 
335          else: 
336              return True 
 337      
338      
340          exc_type, exc_value, exc_traceback = sys.exc_info() 
341          print "*** print_tb:" 
342          traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) 
343          print "*** print_exception:" 
344          traceback.print_exception(exc_type, exc_value, exc_traceback, 
345                                limit=2, file=sys.stdout) 
 346       
347       
348  if __name__ == "__main__": 
349      go = GlobusOnlineFileAdaptor("http://drelu:bigjob@cli.globusonline.org?ep=drelu#egi&path=/ho") 
350      go.transfer("go://cli.globusonline.org?ep=drelu#MacBook&path=/~/cert.tar.gz", "go://cli.globusonline.org?ep=xsede#kraken&path=/~/") 
351