1 module radosd.ioctx; 2 3 import deimos.rados; 4 5 import std.string; 6 import std.exception; 7 import core.stdc.stdlib; 8 import std.traits; 9 import core.sync.mutex; 10 public import core.stdc.time; 11 import core.stdc.string; 12 13 public import radosd.exception; 14 15 alias iocBack = void delegate(ref IoCompletion ioc); 16 17 struct IoCompletion 18 { 19 ~this() 20 { 21 release(); 22 } 23 24 void waitForComplete() 25 { 26 if(_c is null) return; 27 int err = rados_aio_wait_for_complete(_c); 28 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err)))); 29 } 30 31 void waitForSafe() 32 { 33 if(_c is null) return; 34 int err = rados_aio_wait_for_safe(_c); 35 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err)))); 36 } 37 38 void waitForCompleteAndCb() 39 { 40 if(_c is null) return; 41 int err = rados_aio_wait_for_complete_and_cb(_c); 42 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_complete rados_ioctx_t erro!: %s",strerror(-err)))); 43 } 44 45 void waitForSafeAndCb() 46 { 47 if(_c is null) return; 48 int err = rados_aio_wait_for_safe_and_cb(_c); 49 enforce(err >= 0,new IoCompletionException(format("rados_aio_wait_for_safe rados_ioctx_t erro!: %s",strerror(-err)))); 50 } 51 52 bool isComplete() 53 { 54 if(_c is null) return true; 55 int err = rados_aio_is_complete(_c); 56 return err != 0; 57 } 58 59 bool isCompleteAndCb() 60 { 61 if(_c is null) return true; 62 int err = rados_aio_is_complete_and_cb(_c); 63 return err != 0; 64 } 65 66 bool isSafe() 67 { 68 if(_c is null) return true; 69 int err = rados_aio_is_safe(_c); 70 return err != 0; 71 } 72 73 bool isSafeAndCb() 74 { 75 if(_c is null) return true; 76 int err = rados_aio_wait_for_safe_and_cb(_c); 77 return err != 0; 78 } 79 80 void cancel(IoCompletion com) 81 { 82 if(_c is null) return; 83 int err = rados_aio_cancel(_io.ctx, _c); 84 enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err)))); 85 } 86 87 void release() 88 { 89 if(_c) 90 rados_aio_release(_c); 91 _c = null; 92 } 93 94 @property ctx(){return _io;} 95 @property name(){return _name;} 96 @property readData(){return _data;} 97 @property statPsize(){return _psize;} 98 @property statPmtime(){return _pmtime;} 99 private: 100 this(IoCtx io, const(char) * name) 101 { 102 _io = io; 103 _name = name; 104 int err = rados_aio_create_completion((&this),&doComplate,&doSafe,&_c); 105 enforce(err >= 0,new IoCtxException(format("rados_aio_create_completion data erro : %s",strerror(-err)))); 106 } 107 108 void do_completion() 109 { 110 if(_completion) 111 _completion(this); 112 } 113 114 void do_safe() 115 { 116 if(_safe) 117 _safe(this); 118 } 119 120 iocBack _completion = null; 121 iocBack _safe = null; 122 rados_completion_t _c; 123 IoCtx _io; 124 const(char) * _name; 125 char[] _data; 126 size_t _psize; 127 time_t _pmtime; 128 } 129 130 class IoCtx 131 { 132 alias IoCompletionPtr = IoCompletion *; 133 134 this(rados_t cluster, string poolname) 135 { 136 _cluster = cluster; 137 _poolname = cast(char *)poolname.toStringz; 138 int err = rados_ioctx_create(_cluster,_poolname, &_io); 139 enforce(err >= 0,new IoCtxException(format("create rados_ioctx_t erro!: %s",strerror(-err)))); 140 _mutex = new Mutex(); 141 //_cbacks = new RedBlackTree!(IoCompletionPtr)(); 142 } 143 144 ~this() 145 { 146 if(_io) { 147 rados_aio_flush(_io); 148 rados_ioctx_destroy(_io); 149 } 150 } 151 152 @property ctx(){return _io;} 153 154 @property poolName(){return _poolname;} 155 156 void write(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T) 157 { 158 write(name.toStringz,data,offset); 159 } 160 161 void write(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T) 162 { 163 int err = rados_write(_io, name,cast(const(char) *)data.ptr, data.length, offset); 164 enforce(err >= 0,new IoCtxWriteException(format("rados_write data erro : %s",strerror(-err)))); 165 } 166 167 void writeFull(T)(string name,T[] data, ulong offset = 0) if(isCharByte!T) 168 { 169 writeFull(name.toStringz,data,offset); 170 } 171 172 void writeFull(T)(const(char) * name,in T[] data, ulong offset) if(isCharByte!T) 173 { 174 int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length, offset); 175 enforce(err >= 0,new IoCtxWriteException(format("rados_write_full data erro : %s",strerror(-err)))); 176 } 177 178 void writeSame(T)(const(char) * name,in T[] data, size_t writelen, ulong offset) if(isCharByte!T) 179 { 180 int err = rados_write_full(_io, name,cast(const(char) *)data.ptr, data.length,writelen, offset); 181 enforce(err >= 0,new IoCtxWriteException(format("rados_writesame data erro : %s",strerror(-err)))); 182 } 183 184 void cloneRange(const(char) * dst, size_t dstOffset, const(char) * src, size_t srcOffset, size_t len) 185 { 186 int err = rados_clone_range(_io, dst,dstOffset, src,srcOffset, len); 187 enforce(err >= 0,new IoCtxCloneException(format("rados_clone_range data erro : %s",strerror(-err)))); 188 } 189 190 void append(T)(const(char) * name,in T[] data)if(isCharByte!T) 191 { 192 int err = rados_append(_io, name,cast(const(char) *)data.ptr, data.length,writelen); 193 enforce(err >= 0,new IoCtxCloneException(format("rados_append data erro : %s",strerror(-err)))); 194 } 195 196 void read(T)(const(char) * name,ref T[] data, ulong offset = 0) if(isCharByte!T.MutilCharByte) 197 in{assert(data.length > 0);} 198 body{ 199 int err = rados_read(_io, name,cast(char*)data.ptr, data.length, offset); 200 enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err)))); 201 } 202 203 char[] read(const(char) * name,size_t readlen, ulong offset = 0) 204 { 205 char[] data = new char[readlen]; 206 int err = rados_read(_io, name,data.ptr, readlen, offset); 207 enforce(err >= 0,new IoCtxReadException(format("rados_write data erro : %s",strerror(-err)))); 208 return data; 209 } 210 211 void remove(const(char) * name) 212 { 213 int err = rados_remove(_io, name); 214 enforce(err >= 0,new IoCtxException(format("rados_remove data erro : %s",strerror(-err)))); 215 } 216 217 void trunc(const(char) * name,ulong size) 218 { 219 int err = rados_trunc(_io, name,size); 220 enforce(err >= 0,new IoCtxException(format("rados_trunc data erro : %s",strerror(-err)))); 221 } 222 alias resize = trunc; 223 224 void state(const(char) * name, ref ulong psize, ref time_t pmtime) 225 { 226 int err = rados_stat(_io, name,&psize,&pmtime); 227 enforce(err >= 0,new IoCtxException(format("rados_stat data erro : %s",strerror(-err)))); 228 } 229 230 void setxattr(T)(const(char) * name, const(char) * key, T[] value) if(isCharByte!T) 231 { 232 int err = rados_setxattr(_io, name,key,cast(const(char) *)value.ptr,value.length); 233 enforce(err >= 0,new IoCtxException(format("rados_setxattr data erro : %s",strerror(-err)))); 234 } 235 236 void getxattr(T)(const(char) * name, const(char) * key,ref T[] value) if(isCharByte!T.MutilCharByte) 237 { 238 int err = rados_getxattr(_io, name,key,cast(char *)value.ptr,value.length); 239 enforce(err >= 0,new IoCtxException(format("rados_getxattr data erro : %s",strerror(-err)))); 240 } 241 242 void rmxattr(const(char) * name, const(char) * key) 243 { 244 int err = rados_rmxattr(_io, name,key); 245 enforce(err >= 0,new IoCtxException(format("rados_rmxattr data erro : %s",strerror(-err)))); 246 } 247 248 void getxattrs(const(char) * name, void delegate(string key, char[] value) cback) 249 { 250 rados_xattrs_iter_t iter; 251 int err = rados_getxattrs(_io, name, &iter); 252 enforce(err >= 0,new IoCtxException(format("rados_rmxattr data erro : %s",strerror(-err)))); 253 scope(exit)rados_getxattrs_end(iter); 254 char * key = null; 255 char * value = null; 256 size_t len = 0; 257 bool getNext() { 258 len = 0; 259 key = null; 260 value = null; 261 err = rados_getxattrs_next(iter,&key,&value,&len); 262 if( err != 0 || len <= 0 || key is null || value is null) 263 return false; 264 return true; 265 } 266 while(getNext()) 267 { 268 cback(fromStringz(key).dup,value[0..len].dup); 269 } 270 } 271 272 void asyncWrite(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 273 { 274 IoCompletion * com = newIoCompletion(name); 275 scope(failure)removeIoCompletion(com); 276 com._completion = thecomplate; 277 com._safe = thesafe; 278 int err = rados_aio_write(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset); 279 enforce(err >= 0,new IoCtxWriteException(format("rados_rmxattr data erro : %s",strerror(-err)))); 280 } 281 282 void asyncWriteFull(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 283 { 284 IoCompletion * com = newIoCompletion(name); 285 scope(failure)removeIoCompletion(com); 286 com._completion = thecomplate; 287 com._safe = thesafe; 288 int err = rados_aio_write_full(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset); 289 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_write_full data erro : %s",strerror(-err)))); 290 } 291 292 void asyncAppend(T)(const(char) * name,T[] data,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 293 { 294 IoCompletion * com = newIoCompletion(name); 295 scope(failure)removeIoCompletion(com); 296 com._completion = thecomplate; 297 com._safe = thesafe; 298 int err = rados_aio_append(_io, name,com._c,cast(const(char) *)data.ptr,data.length,offset); 299 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err)))); 300 } 301 302 void asyncWriteSame(T)(const(char) * name,T[] data,size_t wlen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 303 { 304 IoCompletion * com = newIoCompletion(name); 305 scope(failure)removeIoCompletion(com); 306 com._completion = thecomplate; 307 com._safe = thesafe; 308 int err = rados_aio_writesame(_io, name,com._c,cast(const(char) *)data.ptr,data.length,wlen,offset); 309 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_append data erro : %s",strerror(-err)))); 310 } 311 312 void asyncRemove(const(char) * name,iocBack thesafe, iocBack thecomplate = null) 313 { 314 IoCompletion * com = newIoCompletion(name); 315 scope(failure)removeIoCompletion(com); 316 com._completion = thecomplate; 317 com._safe = thesafe; 318 int err = rados_aio_remove(_io, name,com._c); 319 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err)))); 320 } 321 322 void asyncRead(const(char) * name,size_t readLen,iocBack thesafe, iocBack thecomplate = null, ulong offset = 0) 323 { 324 IoCompletion * com = newIoCompletion(name); 325 scope(failure)removeIoCompletion(com); 326 com._completion = thecomplate; 327 com._safe = thesafe; 328 com._data = new char[readLen]; 329 int err = rados_aio_read(_io, name,com._c,com._data.ptr,readLen,offset); 330 enforce(err >= 0,new IoCtxWriteException(format("rados_aio_remove data erro : %s",strerror(-err)))); 331 } 332 333 void asyncStat(const(char) * name,iocBack thecomplate) 334 { 335 IoCompletion * com = newIoCompletion(name); 336 scope(failure)removeIoCompletion(com); 337 com._completion = thecomplate; 338 int err = rados_aio_stat(_io,name, com._c, &com._psize, &com._pmtime); 339 enforce(err >= 0,new IoCtxException(format("rados_aio_cancel data erro : %s",strerror(-err)))); 340 } 341 342 protected: 343 IoCompletion * newIoCompletion(const(char) * name) 344 { 345 IoCompletion * com = new IoCompletion(this,name); 346 synchronized(_mutex){ 347 _cbacks[com] = 0; 348 } 349 return com; 350 } 351 352 void removeIoCompletion(IoCompletion * com) 353 { 354 if(com is null) return; 355 synchronized(_mutex){ 356 _cbacks.remove(com); 357 } 358 359 } 360 361 private: 362 rados_ioctx_t _io; 363 rados_t _cluster; 364 char * _poolname; 365 int[IoCompletionPtr] _cbacks; 366 Mutex _mutex; 367 } 368 369 370 template isCharByte(T) 371 { 372 enum isCharByte = is(Unqual!T == byte) || is(Unqual!T == ubyte) || is(Unqual!T == char) ; 373 enum MutilCharByte = is(T == byte) || is(T == ubyte) || is(T == char) ; 374 } 375 376 private: 377 import std.experimental.logger; 378 379 extern(C) void doComplate(rados_completion_t cb, void* arg) 380 { 381 trace("doComplate doComplate"); 382 IoCompletion * com = cast(IoCompletion *) arg; 383 com.do_completion(); 384 } 385 386 extern(C) void doSafe(rados_completion_t cb, void* arg) 387 { 388 trace("doSafe doSafe"); 389 IoCompletion * com = cast(IoCompletion *) arg; 390 com.do_safe(); 391 com._io.removeIoCompletion(com); 392 }