1 module radosd.ioctx; 2 3 import deimos.rados; 4 5 import std..string; 6 import std.exception; 7 import core.stdc.stdlib; 8 import std.traits; 9 import core.sync.mutex; 10 public import core.stdc.time; 11 import core.stdc..string; 12 13 public import radosd.exception; 14 15 alias iocBack = void delegate(ref IoCompletion ioc) nothrow; 16 17 struct IoCompletion 18 { 19 ~this() 20 { 21 release(); 22 } 23 24 void waitForComplete() 25 { 26 if(_c is null) return; 27 int err = rados_aio_wait_for_complete(_c); 28 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err)))); 29 } 30 31 void waitForSafe() 32 { 33 if(_c is null) return; 34 int err = rados_aio_wait_for_safe(_c); 35 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err)))); 36 } 37 38 void waitForCompleteAndCb() 39 { 40 if(_c is null) return; 41 int err = rados_aio_wait_for_complete_and_cb(_c); 42 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err)))); 43 } 44 45 void waitForSafeAndCb() 46 { 47 if(_c is null) return; 48 int err = rados_aio_wait_for_safe_and_cb(_c); 49 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err)))); 50 } 51 52 bool isComplete() 53 { 54 if(_c is null) return true; 55 int err = rados_aio_is_complete(_c); 56 return err != 0; 57 } 58 59 bool isCompleteAndCb() 60 { 61 if(_c is null) return true; 62 int err = rados_aio_is_complete_and_cb(_c); 63 return err != 0; 64 } 65 66 bool isSafe() 67 { 68 if(_c is null) return true; 69 int err = rados_aio_is_safe(_c); 70 return err != 0; 71 } 72 73 bool isSafeAndCb() 74 { 75 if(_c is null) return true; 76 int err = rados_aio_wait_for_safe_and_cb(_c); 77 return err != 0; 78 } 79 80 void cancel(IoCompletion com) 81 { 82 if(_c is null) return; 83 int err = rados_aio_cancel(_io.ctx, _c); 84 enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err)))); 85 } 86 87 void release() 88 { 89 if(_c) 90 rados_aio_release(_c); 91 _c = null; 92 } 93 94 int getReturnValue() 95 { 96 return rados_aio_get_return_value(_c); 97 } 98 99 @property ctx(){return _io;} 100 @property name(){return _name;} 101 @property readData(){return _data;} 102 @property statPsize(){return _psize;} 103 @property statPmtime(){return _pmtime;} 104 105 private: 106 this(IoCtx io, const(char) * name,bool onlyCom) 107 { 108 _io = io; 109 _name = name; 110 int err = 0; 111 if(onlyCom) 112 err = rados_aio_create_completion((&this),&doSafe,null,&_c); 113 else 114 err = rados_aio_create_completion((&this),&doComplate,&doSafe,&_c); 115 enforce(err >= 0,new IoCtxException(format("rados_aio_create_completion data erro : %s",strerror(-err)))); 116 } 117 118 void do_completion() nothrow 119 { 120 if(_completion) 121 _completion(this); 122 } 123 124 void do_safe() nothrow 125 { 126 if(_safe) 127 _safe(this); 128 } 129 130 iocBack _completion = null; 131 iocBack _safe = null; 132 rados_completion_t _c; 133 IoCtx _io; 134 const(char) * _name; 135 char[] _data; 136 size_t _psize; 137 time_t _pmtime; 138 } 139 140 class IoCtx 141 { 142 alias IoCompletionPtr = IoCompletion *; 143 144 this(rados_t cluster, string poolname) 145 { 146 _cluster = cluster; 147 _poolname = cast(char *)poolname.toStringz; 148 int err = rados_ioctx_create(_cluster,_poolname, &_io); 149 enforce(err >= 0,new IoCtxException(format("create rados_ioctx_t erro!: %s",strerror(-err)))); 150 _mutex = new Mutex(); 151 //_cbacks = new RedBlackTree!(IoCompletionPtr)(); 152 } 153 154 ~this() 155 { 156 if(_io) { 157 rados_aio_flush(_io); 158 rados_ioctx_destroy(_io); 159 } 160 } 161 162 @property ctx(){return _io;} 163 164 @property poolName(){return _poolname;} 165 166 void write(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T) 167 { 168 write(name.toStringz,data,offset); 169 } 170 171 void write(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T) 172 { 173 int err = rados_write(_io, name,cast(const(char) *)data.ptr, data.length, offset); 174 enforce(err >= 0,new IoCtxWriteException(format("rados_write data erro : %s",strerror(-err)))); 175 } 176 177 void writeFull(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T) 178 { 179 writeFull(name.toStringz,data,offset); 180 } 181 182 void writeFull(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T) 183 { 184 int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length, offset); 185 enforce(err >= 0,new IoCtxWriteException(format("rados_write_full data erro : %s",strerror(-err)))); 186 } 187 188 void writeSame(T)(const(char) * name,in T[] data, size_t writelen, ulong offset) if(isCharByte!T) 189 { 190 int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length,writelen, offset); 191 enforce(err >= 0,new IoCtxWriteException(format("rados_writesame data erro : %s",strerror(-err)))); 192 } 193 194 void cloneRange(const(char) * dst, size_t dstOffset, const(char) * src, size_t srcOffset, size_t len) 195 { 196 int err = rados_clone_range(_io, dst,dstOffset, src,srcOffset, len); 197 enforce(err >= 0,new IoCtxCloneException(format("rados_clone_range data erro : %s",strerror(-err)))); 198 } 199 200 void append(T)(const(char) * name,in T[] data)if(isCharByte!T) 201 { 202 int err = rados_append(_io, name,cast(const(char) *)data.ptr, data.length,writelen); 203 enforce(err >= 0,new IoCtxCloneException(format("rados_append data erro : %s",strerror(-err)))); 204 } 205 206 int read(T)(const(char) * name,ref T[] data, ulong offset = 0) if(isMutilCharByte!T) 207 in{assert(data.length > 0);} 208 body{ 209 int err = rados_read(_io, name,cast(char*)data.ptr, data.length, offset); 210 enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err)))); 211 return err; 212 } 213 214 char[] read(const(char) * name,size_t readlen, ulong offset = 0) 215 { 216 char[] data = new char[readlen]; 217 int err = rados_read(_io, name,data.ptr, readlen, offset); 218 enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err)))); 219 return data[0..err]; 220 } 221 222 void remove(const(char) * name) 223 { 224 int err = rados_remove(_io, name); 225 enforce(err >= 0,new IoCtxException(format("rados_remove data erro : %s",strerror(-err)))); 226 } 227 228 void trunc(const(char) * name,ulong size) 229 { 230 int err = rados_trunc(_io, name,size); 231 enforce(err >= 0,new IoCtxException(format("rados_trunc data erro : %s",strerror(-err)))); 232 } 233 alias resize = trunc; 234 235 void state(const(char) * name, ref ulong psize, ref time_t pmtime) 236 { 237 int err = rados_stat(_io, name,&psize,&pmtime); 238 enforce(err >= 0,new IoCtxException(format("rados_stat data erro : %s",strerror(-err)))); 239 } 240 241 void setxattr(T)(const(char) * name, const(char) * key, T[] value) if(isCharByte!T) 242 { 243 int err = rados_setxattr(_io, name,key,cast(const(char) *)value.ptr,value.length); 244 enforce(err >= 0,new IoCtxAttrException(format("rados_setxattr data erro : %s",strerror(-err)))); 245 } 246 247 int getxattr(T)(const(char) * name, const(char) * key,ref T[] value) if(isMutilCharByte!T) 248 { 249 int err = rados_getxattr(_io, name,key,cast(char *)value.ptr,value.length); 250 enforce(err >= 0,new IoCtxAttrException(format("rados_getxattr data erro : %s",strerror(-err)))); 251 return err; 252 } 253 254 void rmxattr(const(char) * name, const(char) * key) 255 { 256 int err = rados_rmxattr(_io, name,key); 257 enforce(err >= 0,new IoCtxAttrException(format("rados_rmxattr data erro : %s",strerror(-err)))); 258 } 259 260 void getxattrs(const(char) * name, void delegate(string key, char[] value) cback) 261 { 262 rados_xattrs_iter_t iter; 263 int err = rados_getxattrs(_io, name, &iter); 264 enforce(err >= 0,new IoCtxAttrException(format("rados_rmxattr data erro : %s",strerror(-err)))); 265 scope(exit)rados_getxattrs_end(iter); 266 char * key = null; 267 char * value = null; 268 size_t len = 0; 269 bool getNext() { 270 len = 0; 271 key = null; 272 value = null; 273 err = rados_getxattrs_next(iter,&key,&value,&len); 274 if( err != 0 || len <= 0 || key is null || value is null) 275 return false; 276 return true; 277 } 278 while(getNext()) 279 { 280 cback(fromStringz(key).dup,value[0..len].dup); 281 } 282 } 283 284 void asyncWrite(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 285 { 286 IoCompletion * com = newIoCompletion(name); 287 scope(failure)removeIoCompletion(com); 288 com._completion = thecomplate; 289 com._safe = thesafe; 290 int err = rados_aio_write(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset); 291 enforce(err >= 0,new IoCtxWriteException(format("rados_rmxattr data erro : %s",strerror(-err)))); 292 } 293 294 void asyncWriteFull(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 295 { 296 IoCompletion * com = newIoCompletion(name); 297 scope(failure)removeIoCompletion(com); 298 com._completion = thecomplate; 299 com._safe = thesafe; 300 int err = rados_aio_write_full(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset); 301 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_write_full data erro : %s",strerror(-err)))); 302 } 303 304 void asyncAppend(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 305 { 306 IoCompletion * com = newIoCompletion(name); 307 scope(failure)removeIoCompletion(com); 308 com._completion = thecomplate; 309 com._safe = thesafe; 310 int err = rados_aio_append(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset); 311 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err)))); 312 } 313 314 void asyncWriteSame(T)(const(char) * name,T[] data,size_t wlen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 315 { 316 IoCompletion * com = newIoCompletion(name); 317 scope(failure)removeIoCompletion(com); 318 com._completion = thecomplate; 319 com._safe = thesafe; 320 int err = rados_aio_writesame(_io, name,com._c,cast(const(char) *)data.ptr,data.length,wlen,offset); 321 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err)))); 322 } 323 324 void asyncRemove(const(char) * name,iocBack thesafe, iocBack thecomplate = null) 325 { 326 IoCompletion * com = newIoCompletion(name); 327 scope(failure)removeIoCompletion(com); 328 com._completion = thecomplate; 329 com._safe = thesafe; 330 int err = rados_aio_remove(_io, name,com._c); 331 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err)))); 332 } 333 334 void asyncRead(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) if(isMutilCharByte!T) 335 { 336 IoCompletion * com = newIoCompletion(name); 337 scope(failure)removeIoCompletion(com); 338 com._completion = thecomplate; 339 com._safe = thesafe; 340 com._data = cast(char[])data; 341 int err = rados_aio_read(_io, name,com._c,com._data.ptr,data.length,offset); 342 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err)))); 343 } 344 345 void asyncRead(const(char) * name,size_t readLen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 346 { 347 IoCompletion * com = newIoCompletion(name); 348 scope(failure)removeIoCompletion(com); 349 com._completion = thecomplate; 350 com._safe = thesafe; 351 com._data = new char[readLen]; 352 int err = rados_aio_read(_io, name,com._c,com._data.ptr,readLen,offset); 353 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err)))); 354 } 355 356 void asyncStat(const(char) * name,iocBack thecomplate) 357 { 358 IoCompletion * com = newIoCompletion(name,true); 359 scope(failure)removeIoCompletion(com); 360 com._safe = thecomplate; 361 int err = rados_aio_stat(_io,name, com._c, &com._psize, &com._pmtime); 362 enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err)))); 363 } 364 365 protected: 366 IoCompletion * newIoCompletion(const(char) * name, bool onlyCom = false) 367 { 368 IoCompletion * com = new IoCompletion(this,name,onlyCom); 369 synchronized(_mutex){ 370 _cbacks[com] = 0; 371 } 372 return com; 373 } 374 375 void removeIoCompletion(IoCompletion * com) 376 { 377 if(com is null) return; 378 synchronized(_mutex){ 379 _cbacks.remove(com); 380 } 381 import core.memory; 382 destroy(*com); 383 GC.free(com); 384 } 385 386 private: 387 rados_ioctx_t _io; 388 rados_t _cluster; 389 char * _poolname; 390 int[IoCompletionPtr] _cbacks; 391 Mutex _mutex; 392 } 393 394 template isMutilCharByte(T) 395 { 396 enum bool isMutilCharByte = is(T == byte) || is(T == ubyte) || is(T == char) ; 397 } 398 399 template isCharByte(T) 400 { 401 enum bool isCharByte = is(Unqual!T == byte) || is(Unqual!T == ubyte) || is(Unqual!T == char) ; 402 } 403 404 private: 405 import std.experimental.logger; 406 407 extern(C) void doComplate(rados_completion_t cb, void* arg) 408 { 409 trace("doComplate doComplate"); 410 IoCompletion * com = cast(IoCompletion *) arg; 411 com.do_completion(); 412 } 413 414 extern(C) void doSafe(rados_completion_t cb, void* arg) 415 { 416 trace("doSafe doSafe"); 417 IoCompletion * com = cast(IoCompletion *) arg; 418 com.do_safe(); 419 com._io.removeIoCompletion(com); 420 }