1 module radosd.ioctx;
2 
3 import deimos.rados;
4 
5 import std.string;
6 import std.exception;
7 import core.stdc.stdlib;
8 import std.traits;
9 import core.sync.mutex;
10 public import core.stdc.time;
11 import core.stdc.string;
12 
13 public import radosd.exception;
14 
15 alias iocBack =  void delegate(ref IoCompletion ioc);
16 
17 struct IoCompletion
18 {
19 	~this()
20 	{
21 		release();
22 	}
23 
24 	void waitForComplete()
25 	{
26 		if(_c is null) return;
27 		int err = rados_aio_wait_for_complete(_c);
28 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err))));
29 	}
30 
31 	void waitForSafe()
32 	{
33 		if(_c is null) return;
34 		int err = rados_aio_wait_for_safe(_c);
35 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err))));
36 	}
37 
38 	void waitForCompleteAndCb()
39 	{
40 		if(_c is null) return;
41 		int err = rados_aio_wait_for_complete_and_cb(_c);
42 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err))));
43 	}
44 	
45 	void waitForSafeAndCb()
46 	{
47 		if(_c is null) return;
48 		int err = rados_aio_wait_for_safe_and_cb(_c);
49 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err))));
50 	}
51 
52 	bool isComplete()
53 	{
54 		if(_c is null) return true;
55 		int err = rados_aio_is_complete(_c);
56 		return err != 0;
57 	}
58 
59 	bool isCompleteAndCb()
60 	{
61 		if(_c is null) return true;
62 		int err = rados_aio_is_complete_and_cb(_c);
63 		return err != 0;
64 	}
65 
66 	bool isSafe()
67 	{
68 		if(_c is null) return true;
69 		int err = rados_aio_is_safe(_c);
70 		return err != 0;
71 	}
72 
73 	bool isSafeAndCb()
74 	{
75 		if(_c is null) return true;
76 		int err = rados_aio_wait_for_safe_and_cb(_c);
77 		return err != 0;
78 	}
79 
80 	void cancel(IoCompletion com)
81 	{
82 		if(_c is null) return;
83 		int err = rados_aio_cancel(_io.ctx, _c);
84 		enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err))));
85 	}
86 
87 	void release()
88 	{
89 		if(_c)
90 			rados_aio_release(_c);
91 		_c = null;
92 	}
93 
94 	@property ctx(){return _io;}
95 	@property name(){return _name;}
96 	@property readData(){return _data;}
97 	@property statPsize(){return _psize;}
98 	@property statPmtime(){return _pmtime;}
99 private:
100 	this(IoCtx io, const(char) * name)
101 	{
102 		_io = io;
103 		_name = name;
104 		int err =  rados_aio_create_completion((&this),&doComplate,&doSafe,&_c);
105 		enforce(err >= 0,new IoCtxException(format("rados_aio_create_completion data erro : %s",strerror(-err))));
106 	}
107 
108 	void do_completion()
109 	{
110 		if(_completion)
111 			_completion(this);
112 	}
113 
114 	void do_safe()
115 	{
116 		if(_safe)
117 			_safe(this);
118 	}
119 
120 	iocBack _completion = null;
121 	iocBack _safe = null;
122 	rados_completion_t _c;
123 	IoCtx _io;
124 	const(char) * _name;
125 	char[] _data;
126 	size_t _psize;
127 	time_t _pmtime;
128 }
129 
130 class IoCtx
131 {
132 	alias IoCompletionPtr = IoCompletion *;
133 
134 	this(rados_t cluster, string poolname)
135 	{
136 		_cluster = cluster;
137 		_poolname = cast(char *)poolname.toStringz;
138 		int err = rados_ioctx_create(_cluster,_poolname,  &_io);
139 		enforce(err >= 0,new IoCtxException(format("create rados_ioctx_t erro!: %s",strerror(-err))));
140 		_mutex = new Mutex();
141 		//_cbacks = new RedBlackTree!(IoCompletionPtr)();
142 	}
143 
144 	~this()
145 	{
146 		if(_io) {
147 			rados_aio_flush(_io);
148 			rados_ioctx_destroy(_io);
149 		}
150 	}
151 
152 	@property ctx(){return _io;}
153 
154 	@property poolName(){return _poolname;}
155 
156 	void write(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T)
157 	{
158 		write(name.toStringz,data,offset);
159 	}
160 
161 	void write(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T)
162 	{
163 		int err = rados_write(_io, name,cast(const(char) *)data.ptr, data.length, offset);
164 		enforce(err >= 0,new IoCtxWriteException(format("rados_write data erro : %s",strerror(-err))));
165 	}
166 
167 	void writeFull(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T)
168 	{
169 		writeFull(name.toStringz,data,offset);
170 	}
171 
172 	void writeFull(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T)
173 	{
174 		int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length, offset);
175 		enforce(err >= 0,new IoCtxWriteException(format("rados_write_full data erro : %s",strerror(-err))));
176 	}
177 
178 	void writeSame(T)(const(char) * name,in T[] data, size_t writelen, ulong offset) if(isCharByte!T)
179 	{
180 		int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length,writelen, offset);
181 		enforce(err >= 0,new IoCtxWriteException(format("rados_writesame data erro : %s",strerror(-err))));
182 	}
183 
184 	void cloneRange(const(char) * dst, size_t dstOffset, const(char) * src, size_t srcOffset, size_t len)
185 	{
186 		int err = rados_clone_range(_io, dst,dstOffset, src,srcOffset, len);
187 		enforce(err >= 0,new IoCtxCloneException(format("rados_clone_range data erro : %s",strerror(-err))));
188 	}
189 
190 	void append(T)(const(char) * name,in T[] data)if(isCharByte!T)
191 	{
192 		int err = rados_append(_io, name,cast(const(char) *)data.ptr, data.length,writelen);
193 		enforce(err >= 0,new IoCtxCloneException(format("rados_append data erro : %s",strerror(-err))));
194 	}
195 
196 	void read(T)(const(char) * name,ref T[] data, ulong offset = 0) if(isCharByte!T.MutilCharByte)
197 	in{assert(data.length > 0);}
198 	body{
199 		int err = rados_read(_io, name,cast(char*)data.ptr, data.length, offset);
200 		enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err))));
201 	}
202 
203 	char[] read(const(char) * name,size_t readlen, ulong offset = 0)
204 	{
205 		char[] data = new char[readlen];
206 		int err = rados_read(_io, name,data.ptr, readlen, offset);
207 		enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err))));
208 		return data;
209 	}
210 
211 	void remove(const(char) * name)
212 	{
213 		int err = rados_remove(_io, name);
214 		enforce(err >= 0,new IoCtxException(format("rados_remove data erro : %s",strerror(-err))));
215 	}
216 
217 	void trunc(const(char) * name,ulong size)
218 	{
219 		int err = rados_trunc(_io, name,size);
220 		enforce(err >= 0,new IoCtxException(format("rados_trunc data erro : %s",strerror(-err))));
221 	}
222 	alias resize = trunc;
223 
224 	void state(const(char) * name, ref ulong psize, ref time_t pmtime)
225 	{
226 		int err = rados_stat(_io, name,&psize,&pmtime);
227 		enforce(err >= 0,new IoCtxException(format("rados_stat data erro : %s",strerror(-err))));
228 	}
229 
230 	void setxattr(T)(const(char) * name, const(char) * key, T[] value) if(isCharByte!T)
231 	{
232 		int err = rados_setxattr(_io, name,key,cast(const(char) *)value.ptr,value.length);
233 		enforce(err >= 0,new IoCtxException(format("rados_setxattr data erro : %s",strerror(-err))));
234 	}
235 
236 	void getxattr(T)(const(char) * name, const(char) * key,ref T[] value) if(isCharByte!T.MutilCharByte)
237 	{
238 		int err = rados_getxattr(_io, name,key,cast(char *)value.ptr,value.length);
239 		enforce(err >= 0,new IoCtxException(format("rados_getxattr data erro : %s",strerror(-err))));
240 	}
241 
242 	void rmxattr(const(char) * name, const(char) * key)
243 	{
244 		int err = rados_rmxattr(_io, name,key);
245 		enforce(err >= 0,new IoCtxException(format("rados_rmxattr data erro : %s",strerror(-err))));
246 	}
247 
248 	void getxattrs(const(char) * name, void delegate(string key, char[] value) cback)
249 	{
250 		rados_xattrs_iter_t iter;
251 		int err = rados_getxattrs(_io, name, &iter);
252 		enforce(err >= 0,new IoCtxException(format("rados_rmxattr data erro : %s",strerror(-err))));
253 		scope(exit)rados_getxattrs_end(iter);
254 		char * key = null;
255 		char * value = null;
256 		size_t len = 0;
257 		bool getNext() {
258 			len = 0;
259 			key = null;
260 			value = null;
261 			err = rados_getxattrs_next(iter,&key,&value,&len);
262 			if( err != 0 || len <= 0 || key is null || value is null)
263 				return false;
264 			return true;
265 		}
266 		while(getNext())
267 		{
268 			cback(fromStringz(key).dup,value[0..len].dup);
269 		}
270 	}
271 
272 	void asyncWrite(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
273 	{
274 		IoCompletion * com = newIoCompletion(name);
275 		scope(failure)removeIoCompletion(com);
276 		com._completion = thecomplate;
277 		com._safe = thesafe;
278 		int err = rados_aio_write(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset);
279 		enforce(err >= 0,new IoCtxWriteException(format("rados_rmxattr data erro : %s",strerror(-err))));
280 	}
281 
282 	void asyncWriteFull(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
283 	{
284 		IoCompletion * com = newIoCompletion(name);
285 		scope(failure)removeIoCompletion(com);
286 		com._completion = thecomplate;
287 		com._safe = thesafe;
288 		int err = rados_aio_write_full(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset);
289 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_write_full data erro : %s",strerror(-err))));
290 	}
291 
292 	void asyncAppend(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
293 	{
294 		IoCompletion * com = newIoCompletion(name);
295 		scope(failure)removeIoCompletion(com);
296 		com._completion = thecomplate;
297 		com._safe = thesafe;
298 		int err = rados_aio_append(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset);
299 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err))));
300 	}
301 
302 	void asyncWriteSame(T)(const(char) * name,T[] data,size_t wlen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
303 	{
304 		IoCompletion * com = newIoCompletion(name);
305 		scope(failure)removeIoCompletion(com);
306 		com._completion = thecomplate;
307 		com._safe = thesafe;
308 		int err = rados_aio_writesame(_io, name,com._c,cast(const(char) *)data.ptr,data.length,wlen,offset);
309 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err))));
310 	}
311 
312 	void asyncRemove(const(char) * name,iocBack thesafe, iocBack thecomplate = null)
313 	{
314 		IoCompletion * com = newIoCompletion(name);
315 		scope(failure)removeIoCompletion(com);
316 		com._completion = thecomplate;
317 		com._safe = thesafe;
318 		int err = rados_aio_remove(_io, name,com._c);
319 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err))));
320 	}
321 
322 	void asyncRead(const(char) * name,size_t readLen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
323 	{
324 		IoCompletion * com = newIoCompletion(name);
325 		scope(failure)removeIoCompletion(com);
326 		com._completion = thecomplate;
327 		com._safe = thesafe;
328 		com._data = new char[readLen];
329 		int err = rados_aio_read(_io, name,com._c,com._data.ptr,readLen,offset);
330 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err))));
331 	}
332 
333 	void asyncStat(const(char) * name,iocBack thecomplate)
334 	{
335 		IoCompletion * com = newIoCompletion(name);
336 		scope(failure)removeIoCompletion(com);
337 		com._completion = thecomplate;
338 		int err = rados_aio_stat(_io,name, com._c, &com._psize, &com._pmtime);
339 		enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err))));
340 	}
341 
342 protected:
343 	IoCompletion * newIoCompletion(const(char) * name)
344 	{
345 		IoCompletion * com = new IoCompletion(this,name);
346 		synchronized(_mutex){
347 			_cbacks[com] = 0;
348 		}
349 		return com;
350 	}
351 
352 	void removeIoCompletion(IoCompletion * com)
353 	{
354 		if(com is null) return;
355 		synchronized(_mutex){
356 			_cbacks.remove(com);
357 		}
358 
359 	}
360 
361 private:
362 	rados_ioctx_t _io;
363 	rados_t _cluster;
364 	char * _poolname;
365 	int[IoCompletionPtr] _cbacks;
366 	Mutex _mutex;
367 }
368 
369 
370 template isCharByte(T)
371 {
372 	enum isCharByte = is(Unqual!T == byte) || is(Unqual!T == ubyte) || is(Unqual!T == char) ;
373 	enum MutilCharByte = is(T == byte) || is(T == ubyte) || is(T == char) ;
374 }
375 
376 private:
377 import std.experimental.logger;
378 
379 extern(C) void doComplate(rados_completion_t cb, void* arg)
380 {
381 	trace("doComplate doComplate");
382 	IoCompletion * com = cast(IoCompletion *) arg;
383 	com.do_completion();
384 }
385 
386 extern(C) void doSafe(rados_completion_t cb, void* arg)
387 {
388 	trace("doSafe doSafe");
389 	IoCompletion * com = cast(IoCompletion *) arg;
390 	com.do_safe();
391 	com._io.removeIoCompletion(com);
392 }