1 import sys, os
2 import stat
3 import urlparse
4
5 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/../../../webhdfs-py/")
6
7 from pilot.api import State
8 from bigjob import logger
9 logger.debug(str(sys.path))
10
11 from webhdfs.webhdfs import WebHDFS
12
13
35
37 """ Returns security context that needs to be available on the distributed
38 node in order to access this Pilot Data """
39 return None
40
41
43 self.__webhdfs.mkdir(self.path)
44
45
48
49
51 self.__webhdfs.rmdir(self.path)
52
55
56
58 pd_dir = self.__get_pd_path(pd_id)
59 logger.debug("mkdir: " + pd_dir)
60 self.__webhdfs.mkdir(pd_dir)
61
62
64 for i in pd.list_data_units():
65 remote_path = os.path.join(self.__get_pd_path(pd.id), os.path.basename(i.local_url))
66 logger.debug("Put file: %s to %s"%(i.local_url, remote_path))
67
68 if i.local_url.startswith("file://") or i.local_url.startswith("/"):
69 if stat.S_ISDIR(os.stat(i.local_url).st_mode):
70 logger.warning("Path %s is a directory. Ignored."%i.local_url)
71 continue
72 self.__webhdfs.copyFromLocal(i.local_url, remote_path)
73 else:
74 logger.error("File URLs: %s not supported"%i.local_url)
75
76
78
79 if not remote_url.startswith("file://") and not remote_url.startswith("/"):
80 logger.error("Only local URLs supported")
81 return
82
83 result = urlparse.urlparse(remote_url)
84 path = result.path
85
86 try:
87 os.makedirs(path)
88 except:
89 logger.debug("Directory: %s already exists."%path)
90
91 base_dir = self.__get_pd_path(pd.id)
92 for filename in self.__webhdfs.listdir(base_dir):
93 file_url = local_url + "/" + filename
94 file_remote_url = remote_url + "/" + filename
95 logger.debug("GET " + file_url + " to " + file_remote_url)
96 self.__webhdfs.copyToLocal(file_url, file_remote_url)
97
98
99
102
103
104 - def get_pd(self, pd, target_url):
105 remote_url = target_url
106 local_url = self.__get_pd_path(pd.id)
107 self.copy_pd_to_url(pd, local_url, remote_url)
108
109
111 self.__webhdfs.rmdir(self.__get_pd_path(pd.id))
112
113
114
115
117 return os.path.join(self.path, str(pd_id))
118