1 module radosd.ioctx;
2 
3 import deimos.rados;
4 
5 import std..string;
6 import std.exception;
7 import core.stdc.stdlib;
8 import std.traits;
9 import core.sync.mutex;
10 public import core.stdc.time;
11 import core.stdc..string;
12 
13 public import radosd.exception;
14 
15 alias iocBack =  void delegate(ref IoCompletion ioc) nothrow;
16 
17 struct IoCompletion
18 {
19 	~this()
20 	{
21 		release();
22 	}
23 
24 	void waitForComplete()
25 	{
26 		if(_c is null) return;
27 		int err = rados_aio_wait_for_complete(_c);
28 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err))));
29 	}
30 
31 	void waitForSafe()
32 	{
33 		if(_c is null) return;
34 		int err = rados_aio_wait_for_safe(_c);
35 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err))));
36 	}
37 
38 	void waitForCompleteAndCb()
39 	{
40 		if(_c is null) return;
41 		int err = rados_aio_wait_for_complete_and_cb(_c);
42 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err))));
43 	}
44 	
45 	void waitForSafeAndCb()
46 	{
47 		if(_c is null) return;
48 		int err = rados_aio_wait_for_safe_and_cb(_c);
49 		enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err))));
50 	}
51 
52 	bool isComplete()
53 	{
54 		if(_c is null) return true;
55 		int err = rados_aio_is_complete(_c);
56 		return err != 0;
57 	}
58 
59 	bool isCompleteAndCb()
60 	{
61 		if(_c is null) return true;
62 		int err = rados_aio_is_complete_and_cb(_c);
63 		return err != 0;
64 	}
65 
66 	bool isSafe()
67 	{
68 		if(_c is null) return true;
69 		int err = rados_aio_is_safe(_c);
70 		return err != 0;
71 	}
72 
73 	bool isSafeAndCb()
74 	{
75 		if(_c is null) return true;
76 		int err = rados_aio_wait_for_safe_and_cb(_c);
77 		return err != 0;
78 	}
79 
80 	void cancel(IoCompletion com)
81 	{
82 		if(_c is null) return;
83 		int err = rados_aio_cancel(_io.ctx, _c);
84 		enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err))));
85 	}
86 
87 	void release()
88 	{
89 		if(_c)
90 			rados_aio_release(_c);
91 		_c = null;
92 	}
93 
94 	int getReturnValue()
95 	{
96 		return rados_aio_get_return_value(_c);
97 	}
98 
99 	@property ctx(){return _io;}
100 	@property name(){return _name;}
101 	@property readData(){return _data;}
102 	@property statPsize(){return _psize;}
103 	@property statPmtime(){return _pmtime;}
104 
105 private:
106 	this(IoCtx io, const(char) * name,bool onlyCom)
107 	{
108 		_io = io;
109 		_name = name;
110 		int err = 0;
111 		if(onlyCom)
112 			err = rados_aio_create_completion((&this),&doSafe,null,&_c);
113 		else
114 			err = rados_aio_create_completion((&this),&doComplate,&doSafe,&_c);
115 		enforce(err >= 0,new IoCtxException(format("rados_aio_create_completion data erro : %s",strerror(-err))));
116 	}
117 
118 	void do_completion() nothrow
119 	{
120 		if(_completion)
121 			_completion(this);
122 	}
123 
124 	void do_safe() nothrow
125 	{
126 		if(_safe)
127 			_safe(this);
128 	}
129 
130 	iocBack _completion = null;
131 	iocBack _safe = null;
132 	rados_completion_t _c;
133 	IoCtx _io;
134 	const(char) * _name;
135 	char[] _data;
136 	size_t _psize;
137 	time_t _pmtime;
138 }
139 
140 class IoCtx
141 {
142 	alias IoCompletionPtr = IoCompletion *;
143 
144 	this(rados_t cluster, string poolname)
145 	{
146 		_cluster = cluster;
147 		_poolname = cast(char *)poolname.toStringz;
148 		int err = rados_ioctx_create(_cluster,_poolname,  &_io);
149 		enforce(err >= 0,new IoCtxException(format("create rados_ioctx_t erro!: %s",strerror(-err))));
150 		_mutex = new Mutex();
151 		//_cbacks = new RedBlackTree!(IoCompletionPtr)();
152 	}
153 
154 	~this()
155 	{
156 		if(_io) {
157 			rados_aio_flush(_io);
158 			rados_ioctx_destroy(_io);
159 		}
160 	}
161 
162 	@property ctx(){return _io;}
163 
164 	@property poolName(){return _poolname;}
165 
166 	void write(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T)
167 	{
168 		write(name.toStringz,data,offset);
169 	}
170 
171 	void write(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T)
172 	{
173 		int err = rados_write(_io, name,cast(const(char) *)data.ptr, data.length, offset);
174 		enforce(err >= 0,new IoCtxWriteException(format("rados_write data erro : %s",strerror(-err))));
175 	}
176 
177 	void writeFull(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T)
178 	{
179 		writeFull(name.toStringz,data,offset);
180 	}
181 
182 	void writeFull(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T)
183 	{
184 		int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length, offset);
185 		enforce(err >= 0,new IoCtxWriteException(format("rados_write_full data erro : %s",strerror(-err))));
186 	}
187 
188 	void writeSame(T)(const(char) * name,in T[] data, size_t writelen, ulong offset) if(isCharByte!T)
189 	{
190 		int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length,writelen, offset);
191 		enforce(err >= 0,new IoCtxWriteException(format("rados_writesame data erro : %s",strerror(-err))));
192 	}
193 
194 	void cloneRange(const(char) * dst, size_t dstOffset, const(char) * src, size_t srcOffset, size_t len)
195 	{
196 		int err = rados_clone_range(_io, dst,dstOffset, src,srcOffset, len);
197 		enforce(err >= 0,new IoCtxCloneException(format("rados_clone_range data erro : %s",strerror(-err))));
198 	}
199 
200 	void append(T)(const(char) * name,in T[] data)if(isCharByte!T)
201 	{
202 		int err = rados_append(_io, name,cast(const(char) *)data.ptr, data.length,writelen);
203 		enforce(err >= 0,new IoCtxCloneException(format("rados_append data erro : %s",strerror(-err))));
204 	}
205 
206 	int read(T)(const(char) * name,ref T[] data, ulong offset = 0) if(isMutilCharByte!T)
207 	in{assert(data.length > 0);}
208 	body{
209 		int err = rados_read(_io, name,cast(char*)data.ptr, data.length, offset);
210 		enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err))));
211 		return err;
212 	}
213 
214 	char[] read(const(char) * name,size_t readlen, ulong offset = 0)
215 	{
216 		char[] data = new char[readlen];
217 		int err = rados_read(_io, name,data.ptr, readlen, offset);
218 		enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err))));
219 		return data[0..err];
220 	}
221 
222 	void remove(const(char) * name)
223 	{
224 		int err = rados_remove(_io, name);
225 		enforce(err >= 0,new IoCtxException(format("rados_remove data erro : %s",strerror(-err))));
226 	}
227 
228 	void trunc(const(char) * name,ulong size)
229 	{
230 		int err = rados_trunc(_io, name,size);
231 		enforce(err >= 0,new IoCtxException(format("rados_trunc data erro : %s",strerror(-err))));
232 	}
233 	alias resize = trunc;
234 
235 	void state(const(char) * name, ref ulong psize, ref time_t pmtime)
236 	{
237 		int err = rados_stat(_io, name,&psize,&pmtime);
238 		enforce(err >= 0,new IoCtxException(format("rados_stat data erro : %s",strerror(-err))));
239 	}
240 
241 	void setxattr(T)(const(char) * name, const(char) * key, T[] value) if(isCharByte!T)
242 	{
243 		int err = rados_setxattr(_io, name,key,cast(const(char) *)value.ptr,value.length);
244 		enforce(err >= 0,new IoCtxAttrException(format("rados_setxattr data erro : %s",strerror(-err))));
245 	}
246 
247 	int getxattr(T)(const(char) * name, const(char) * key,ref T[] value) if(isMutilCharByte!T)
248 	{
249 		int err = rados_getxattr(_io, name,key,cast(char *)value.ptr,value.length);
250 		enforce(err >= 0,new IoCtxAttrException(format("rados_getxattr data erro : %s",strerror(-err))));
251 		return err;
252 	}
253 
254 	void rmxattr(const(char) * name, const(char) * key)
255 	{
256 		int err = rados_rmxattr(_io, name,key);
257 		enforce(err >= 0,new IoCtxAttrException(format("rados_rmxattr data erro : %s",strerror(-err))));
258 	}
259 
260 	void getxattrs(const(char) * name, void delegate(string key, char[] value) cback)
261 	{
262 		rados_xattrs_iter_t iter;
263 		int err = rados_getxattrs(_io, name, &iter);
264 		enforce(err >= 0,new IoCtxAttrException(format("rados_rmxattr data erro : %s",strerror(-err))));
265 		scope(exit)rados_getxattrs_end(iter);
266 		char * key = null;
267 		char * value = null;
268 		size_t len = 0;
269 		bool getNext() {
270 			len = 0;
271 			key = null;
272 			value = null;
273 			err = rados_getxattrs_next(iter,&key,&value,&len);
274 			if( err != 0 || len <= 0 || key is null || value is null)
275 				return false;
276 			return true;
277 		}
278 		while(getNext())
279 		{
280 			cback(fromStringz(key).dup,value[0..len].dup);
281 		}
282 	}
283 
284 	void asyncWrite(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
285 	{
286 		IoCompletion * com = newIoCompletion(name);
287 		scope(failure)removeIoCompletion(com);
288 		com._completion = thecomplate;
289 		com._safe = thesafe;
290 		int err = rados_aio_write(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset);
291 		enforce(err >= 0,new IoCtxWriteException(format("rados_rmxattr data erro : %s",strerror(-err))));
292 	}
293 
294 	void asyncWriteFull(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
295 	{
296 		IoCompletion * com = newIoCompletion(name);
297 		scope(failure)removeIoCompletion(com);
298 		com._completion = thecomplate;
299 		com._safe = thesafe;
300 		int err = rados_aio_write_full(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset);
301 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_write_full data erro : %s",strerror(-err))));
302 	}
303 
304 	void asyncAppend(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
305 	{
306 		IoCompletion * com = newIoCompletion(name);
307 		scope(failure)removeIoCompletion(com);
308 		com._completion = thecomplate;
309 		com._safe = thesafe;
310 		int err = rados_aio_append(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset);
311 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err))));
312 	}
313 
314 	void asyncWriteSame(T)(const(char) * name,T[] data,size_t wlen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
315 	{
316 		IoCompletion * com = newIoCompletion(name);
317 		scope(failure)removeIoCompletion(com);
318 		com._completion = thecomplate;
319 		com._safe = thesafe;
320 		int err = rados_aio_writesame(_io, name,com._c,cast(const(char) *)data.ptr,data.length,wlen,offset);
321 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err))));
322 	}
323 
324 	void asyncRemove(const(char) * name,iocBack thesafe, iocBack thecomplate = null)
325 	{
326 		IoCompletion * com = newIoCompletion(name);
327 		scope(failure)removeIoCompletion(com);
328 		com._completion = thecomplate;
329 		com._safe = thesafe;
330 		int err = rados_aio_remove(_io, name,com._c);
331 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err))));
332 	}
333 
334 	void asyncRead(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) if(isMutilCharByte!T)
335 	{
336 		IoCompletion * com = newIoCompletion(name);
337 		scope(failure)removeIoCompletion(com);
338 		com._completion = thecomplate;
339 		com._safe = thesafe;
340 		com._data = cast(char[])data;
341 		int err = rados_aio_read(_io, name,com._c,com._data.ptr,data.length,offset);
342 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err))));
343 	}
344 
345 	void asyncRead(const(char) * name,size_t readLen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0)
346 	{
347 		IoCompletion * com = newIoCompletion(name);
348 		scope(failure)removeIoCompletion(com);
349 		com._completion = thecomplate;
350 		com._safe = thesafe;
351 		com._data = new char[readLen];
352 		int err = rados_aio_read(_io, name,com._c,com._data.ptr,readLen,offset);
353 		enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err))));
354 	}
355 
356 	void asyncStat(const(char) * name,iocBack thecomplate)
357 	{
358 		IoCompletion * com = newIoCompletion(name,true);
359 		scope(failure)removeIoCompletion(com);
360 		com._safe = thecomplate;
361 		int err = rados_aio_stat(_io,name, com._c, &com._psize, &com._pmtime);
362 		enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err))));
363 	}
364 
365 protected:
366 	IoCompletion * newIoCompletion(const(char) * name, bool onlyCom = false)
367 	{
368 		IoCompletion * com = new IoCompletion(this,name,onlyCom);
369 		synchronized(_mutex){
370 			_cbacks[com] = 0;
371 		}
372 		return com;
373 	}
374 
375 	void removeIoCompletion(IoCompletion * com)
376 	{
377 		if(com is null) return;
378 		synchronized(_mutex){
379 			_cbacks.remove(com);
380 		}
381 		import core.memory;
382 		destroy(*com);
383 		GC.free(com);
384 	}
385 
386 private:
387 	rados_ioctx_t _io;
388 	rados_t _cluster;
389 	char * _poolname;
390 	int[IoCompletionPtr] _cbacks;
391 	Mutex _mutex;
392 }
393 
394 template isMutilCharByte(T)
395 {
396 	enum bool isMutilCharByte = is(T == byte) || is(T == ubyte) || is(T == char) ;
397 }
398 
399 template isCharByte(T)
400 {
401 	enum bool isCharByte = is(Unqual!T == byte) || is(Unqual!T == ubyte) || is(Unqual!T == char) ;
402 }
403 
404 private:
405 import std.experimental.logger;
406 
407 extern(C) void doComplate(rados_completion_t cb, void* arg)
408 {
409 	trace("doComplate doComplate");
410 	IoCompletion * com = cast(IoCompletion *) arg;
411 	com.do_completion();
412 }
413 
414 extern(C) void doSafe(rados_completion_t cb, void* arg)
415 {
416 	trace("doSafe doSafe");
417 	IoCompletion * com = cast(IoCompletion *) arg;
418 	com.do_safe();
419 	com._io.removeIoCompletion(com);
420 }