Package pilot :: Package filemanagement :: Module webhdfs_adaptor

Source Code for Module pilot.filemanagement.webhdfs_adaptor

  1  import sys, os 
  2  import stat 
  3  import urlparse 
  4   
  5  sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/../../../webhdfs-py/") 
  6   
  7  from pilot.api import State 
  8  from bigjob import logger 
  9  logger.debug(str(sys.path)) 
 10   
 11  from webhdfs.webhdfs import WebHDFS  
 12   
 13   
14 -class WebHDFSFileAdaptor(object):
15 16 HDFS_USER_NAME="luckow" 17 HDFS_SERVICE_HOST="192.168.2.108" 18 HDFS_SERVICE_PORT=50070 19
20 - def __init__(self, service_url):
21 22 self.service_url = service_url 23 24 try: 25 result = urlparse.urlparse(service_url) 26 self.host = result.netloc 27 self.path = result.path 28 except: 29 logger.error("Error parsing URL.") 30 31 self.__state=State.New 32 self.__webhdfs= WebHDFS(self.HDFS_SERVICE_HOST, 33 self.HDFS_SERVICE_PORT, 34 self.HDFS_USER_NAME)
35
36 - def get_security_context(self):
37 """ Returns security context that needs to be available on the distributed 38 node in order to access this Pilot Data """ 39 return None
40 41
42 - def initialize_pilotstore(self):
43 self.__webhdfs.mkdir(self.path)
44 45
46 - def get_pilotstore_size(self):
47 return 0
48 49
50 - def delete_pilotstore(self):
51 self.__webhdfs.rmdir(self.path)
52
53 - def get_state(self):
54 return self.__state
55 56
57 - def create_pd(self, pd_id):
58 pd_dir = self.__get_pd_path(pd_id) 59 logger.debug("mkdir: " + pd_dir) 60 self.__webhdfs.mkdir(pd_dir)
61 62
63 - def put_pd(self, pd):
64 for i in pd.list_data_units(): 65 remote_path = os.path.join(self.__get_pd_path(pd.id), os.path.basename(i.local_url)) 66 logger.debug("Put file: %s to %s"%(i.local_url, remote_path)) 67 68 if i.local_url.startswith("file://") or i.local_url.startswith("/"): 69 if stat.S_ISDIR(os.stat(i.local_url).st_mode): 70 logger.warning("Path %s is a directory. Ignored."%i.local_url) 71 continue 72 self.__webhdfs.copyFromLocal(i.local_url, remote_path) 73 else: 74 logger.error("File URLs: %s not supported"%i.local_url)
75 76
77 - def copy_pd_to_url(self, pd, local_url, remote_url):
78 79 if not remote_url.startswith("file://") and not remote_url.startswith("/"): 80 logger.error("Only local URLs supported") 81 return 82 83 result = urlparse.urlparse(remote_url) 84 path = result.path 85 # create directory 86 try: 87 os.makedirs(path) 88 except: 89 logger.debug("Directory: %s already exists."%path) 90 91 base_dir = self.__get_pd_path(pd.id) 92 for filename in self.__webhdfs.listdir(base_dir): 93 file_url = local_url + "/" + filename 94 file_remote_url = remote_url + "/" + filename 95 logger.debug("GET " + file_url + " to " + file_remote_url) 96 self.__webhdfs.copyToLocal(file_url, file_remote_url)
97 98 99
100 - def copy_pd(self, pd, ps_new):
101 pass
102 103
104 - def get_pd(self, pd, target_url):
105 remote_url = target_url 106 local_url = self.__get_pd_path(pd.id) 107 self.copy_pd_to_url(pd, local_url, remote_url)
108 109
110 - def remove_pd(self, pd):
111 self.__webhdfs.rmdir(self.__get_pd_path(pd.id))
112 113 114 ########################################################################### 115 # Internal methods
116 - def __get_pd_path(self, pd_id):
117 return os.path.join(self.path, str(pd_id))
118