1  import sys, os 
  2  import stat 
  3  import urlparse 
  4   
  5  sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/../../../webhdfs-py/") 
  6   
  7  from pilot.api import State 
  8  from bigjob import logger 
  9  logger.debug(str(sys.path)) 
 10   
 11  from webhdfs.webhdfs import WebHDFS  
 12   
 13   
 35           
 37          """ Returns security context that needs to be available on the distributed 
 38              node in order to access this Pilot Data """ 
 39          return None 
  40           
 41           
 43          self.__webhdfs.mkdir(self.path) 
  44           
 45           
 48       
 49       
 51          self.__webhdfs.rmdir(self.path) 
  52           
 55               
 56               
 58          pd_dir = self.__get_pd_path(pd_id) 
 59          logger.debug("mkdir: " + pd_dir)  
 60          self.__webhdfs.mkdir(pd_dir) 
  61           
 62           
 64          for i in pd.list_data_units():      
 65              remote_path = os.path.join(self.__get_pd_path(pd.id), os.path.basename(i.local_url)) 
 66              logger.debug("Put file: %s to %s"%(i.local_url, remote_path)) 
 67                           
 68              if i.local_url.startswith("file://") or i.local_url.startswith("/"): 
 69                  if stat.S_ISDIR(os.stat(i.local_url).st_mode): 
 70                      logger.warning("Path %s is a directory. Ignored."%i.local_url)                 
 71                      continue             
 72                  self.__webhdfs.copyFromLocal(i.local_url, remote_path) 
 73              else: 
 74                  logger.error("File URLs: %s not supported"%i.local_url) 
  75                                
 76   
 78           
 79          if not remote_url.startswith("file://") and not remote_url.startswith("/"): 
 80              logger.error("Only local URLs supported") 
 81              return 
 82           
 83          result = urlparse.urlparse(remote_url) 
 84          path = result.path     
 85           
 86          try: 
 87              os.makedirs(path) 
 88          except: 
 89              logger.debug("Directory: %s already exists."%path) 
 90               
 91          base_dir = self.__get_pd_path(pd.id) 
 92          for filename in self.__webhdfs.listdir(base_dir): 
 93              file_url = local_url + "/" + filename 
 94              file_remote_url = remote_url + "/" + filename 
 95              logger.debug("GET " + file_url + " to " + file_remote_url) 
 96              self.__webhdfs.copyToLocal(file_url, file_remote_url) 
  97   
 98   
 99   
102       
103   
104 -    def get_pd(self, pd, target_url): 
 105          remote_url = target_url 
106          local_url =  self.__get_pd_path(pd.id) 
107          self.copy_pd_to_url(pd, local_url, remote_url)          
 108       
109           
111          self.__webhdfs.rmdir(self.__get_pd_path(pd.id)) 
 112       
113       
114       
115       
117          return os.path.join(self.path, str(pd_id)) 
 118