Volksdata 1.0b7
RDF library
Loading...
Searching...
No Matches
store_mdb.c
Go to the documentation of this file.
2
6#define N_DB 10
7
11#if (defined DEBUG || defined TESTING)
12#define DEFAULT_MAPSIZE 1<<24 // 16Mb (limit for Valgrind)
13#elif !(defined __LP64__ || defined __LLP64__) || \
14 defined _WIN32 && !defined _WIN64
15#define DEFAULT_MAPSIZE 1<<31 // 2Gb (limit for 32-bit systems)
16#else
17#define DEFAULT_MAPSIZE 1UL<<40 // 1Tb
18#endif
19
20#define ENV_DIR_MODE 0750
21#define ENV_FILE_MODE 0640
22
23/*
24 * Data types.
25 */
26
27typedef char DbLabel[8];
28typedef struct mdbstore_iter_t MDBIterator;
29
31typedef enum {
32 LSSTORE_OPEN = 1<<0,
34
36typedef enum {
47} IterFlags;
48
49typedef enum {
52} StoreOp;
53
54typedef struct mdbstore_t {
55 MDB_env * env;
56 MDB_dbi dbi[N_DB];
58} MDBStore;
59
70typedef void (*iter_op_fn_t)(MDBIterator *it);
71
72
74typedef struct mdbstore_iter_t {
77 MDB_txn * txn;
78 MDB_cursor * cur;
79 MDB_cursor * ctx_cur;
80 MDB_val key;
81 MDB_val data;
88 const uint8_t * term_order;
91 size_t i;
92 size_t ct;
94 int rc;
96
97
98/*
99 * Static variables.
100 */
101
102#define DUPSORT_MASK MDB_DUPSORT
103#define DUPFIXED_MASK MDB_DUPSORT | MDB_DUPFIXED
104
110#define MAIN_TABLE \
111/* #ID pfx #DB label #Flags */ \
112 ENTRY( T_ST, "t:st", 0 ) /* Key to ser. term */ \
113 ENTRY( SPO_C, "spo:c", DUPFIXED_MASK ) /* Triple to context */ \
114 ENTRY( IDK_ID, "idk:id", 0 ) /* ID key to ID */ \
115
116
119#define LOOKUP_TABLE \
120/* #ID pfx #DB label #Flags */ \
121 ENTRY( S_PO, "s:po", DUPFIXED_MASK ) /* 1-bound lookup */ \
122 ENTRY( P_SO, "p:so", DUPFIXED_MASK ) /* 1-bound lookup */ \
123 ENTRY( O_SP, "o:sp", DUPFIXED_MASK ) /* 1-bound lookup */ \
124 ENTRY( PO_S, "po:s", DUPFIXED_MASK ) /* 2-bound lookup */ \
125 ENTRY( SO_P, "so:p", DUPFIXED_MASK ) /* 2-bound lookup */ \
126 ENTRY( SP_O, "sp:o", DUPFIXED_MASK ) /* 2-bound lookup */ \
127 ENTRY( C_SPO, "c:spo", DUPFIXED_MASK ) /* Context lookup */ \
128
129
132#define ENTRY(a, b, c) static const DbLabel DB_##a = b;
135#undef ENTRY
136
137/*
138 * Numeric index of each DB. Prefixed with IDX_
139 *
140 * These index numbers are referred to in all the arrays defeined below. They
141 * are independent from the LMDB dbi values which are considered opaque here.
142 */
143typedef enum {
144#define ENTRY(a, b, c) IDX_##a,
147#undef ENTRY
148} DBIdx;
149
153static const char *db_labels[N_DB] = {
154#define ENTRY(a, b, c) DB_##a,
157#undef ENTRY
158};
159
160/*
161 * DB flags. These are aligned with the dbi_labels index.
162 */
163static const unsigned int db_flags[N_DB] = {
164#define ENTRY(a, b, c) c,
167#undef ENTRY
168};
169
170/*
171 * 1-bound and 2-bound lookup indices.
172 *
173 * N.B. Only the first 6 (1-bound and 2-bound term lookup) are used.
174 * The others are added just because they belong logically to the lookup table.
175 */
176static DBIdx lookup_indices[9] = {
177#define ENTRY(a, b, c) IDX_##a,
179#undef ENTRY
180};
181
182static const uint8_t lookup_ordering_1bound[3][3] = {
183 {0, 1, 2}, // s:po
184 {1, 0, 2}, // p:so
185 {2, 0, 1}, // o:sp
186};
187
188static const uint8_t lookup_ordering_2bound[3][3] = {
189 {1, 2, 0}, // po:s
190 {0, 2, 1}, // so:p
191 {0, 1, 2}, // sp:o
192};
193
194
195/*
196 * Static prototypes.
197 */
198static int index_triple(
199 MDBStore *store, StoreOp op, VOLK_TripleKey spok, VOLK_Key ck,
200 MDB_txn *txn);
201static VOLK_rc mdbstore_add_term (void *h, const VOLK_Buffer *sterm, void *th);
202
203inline static VOLK_rc lookup_0bound (MDBIterator *it, size_t *ct);
204inline static VOLK_rc lookup_1bound (
205 uint8_t idx0, MDBIterator *it, size_t *ct);
206inline static VOLK_rc lookup_2bound (
207 uint8_t idx0, uint8_t idx1, MDBIterator *it, size_t *ct);
208inline static VOLK_rc lookup_3bound(MDBIterator *it, size_t *ct);
209
210
211static const char *
212mdbstore_path_from_id (const char *id)
213{
214 // Set environment path.
215 if (!id) id = getenv ("VOLK_MDB_STORE_URN");
216 if (!id) {
218 log_info (
219 "`VOLK_MDB_STORE_URN' environment variable is not "
220 "set. The default URN %s has been set as the store ID.", id
221 );
222 } else if (strncmp ("file://", id, 7) != 0) {
223 log_error ("MDB store ID must be in the `file://<abs_path>` format.");
224
225 return NULL;
226 }
227
228 return id + 7;
229}
230
231
232/*
233 * Inliners.
234 */
235
236static inline VOLK_rc
237txn_begin (MDB_env *env, MDB_txn *p, unsigned int f, MDB_txn **tp) {
238 VOLK_rc rc = mdb_txn_begin (env, p, f, tp);
239 const char *path;
240 mdb_env_get_path (env, &path);
241 LOG_DEBUG (
242 "BEGIN %s transaction %p child of %p in env %s",
243 f == 0 ? "RW" : "RO", *tp, p, path);
244 return rc;
245}
246
247
248static inline VOLK_rc
249txn_commit (MDB_txn *t) {
250 LOG_DEBUG ("COMMIT transaction %p", t);
251 return mdb_txn_commit (t);
252}
253
254
258
265static VOLK_rc
266mdbstore_setup (const char *id, bool clear)
267{
268 if (!VOLK_env_is_init) return VOLK_ENV_ERR;
269
270 const char *path = mdbstore_path_from_id (id);
271 if (!path) return VOLK_VALUE_ERR;
272
273 // If the directory exists (unless clear == true), do nothing.
274 if (clear) rm_r (path);
275 VOLK_rc rc = mkdir_p (path, ENV_DIR_MODE);
276 log_info ("Create dir rc: %s", VOLK_strerror (rc));
277 PRCCK (rc);
278
279 // Open a temporary environment and txn to create the DBs.
280 MDB_env *env;
281 RCCK (mdb_env_create (&env));
282
283 RCCK (mdb_env_set_maxdbs (env, N_DB));
284 RCCK (mdb_env_open (env, path, 0, ENV_FILE_MODE));
285 LOG_DEBUG("Environment opened at %s.", path);
286
287 MDB_txn *txn;
288 RCCK (txn_begin (env, NULL, 0, &txn));
289 MDB_dbi dbi;
290 for (int i = 0; i < N_DB; i++) {
291 LOG_TRACE("Creating DB %s", db_labels[i]);
292 RCCK (
293 mdb_dbi_open (txn, db_labels[i], db_flags[i] | MDB_CREATE, &dbi)
294 );
295 }
296
297 // Bootstrap the permanent store with initial data.
298 MDB_stat stat;
299 CHECK (mdb_dbi_open (
300 txn, db_labels[IDX_T_ST], db_flags[IDX_T_ST], &dbi), fail);
301 CHECK (mdb_stat (txn, dbi, &stat), fail);
302
303 if (stat.ms_entries == 0) {
304 LOG_DEBUG ("Loading initial data into %s", path);
305 // Index default context.
306 MDB_cursor *cur;
307 CHECK (mdb_cursor_open (txn, dbi, &cur), fail);
309 MDB_val key, data;
310 key.mv_data = &k;
311 key.mv_size = sizeof (k);
312
313 data.mv_data = VOLK_default_ctx_buf->addr;
314 data.mv_size = VOLK_default_ctx_buf->size;
315
316 VOLK_rc db_rc = mdb_cursor_put (cur, &key, &data, 0);
317 CHECK (db_rc, fail);
318 }
319
320 CHECK (txn_commit (txn), fail);
321 mdb_env_close (env);
322
323 return clear ? VOLK_OK : rc;
324
325fail:
326 if (rc >= 0) rc = VOLK_DB_ERR;
327 mdb_txn_abort (txn);
328 return rc;
329}
330
331
342static void *
343mdbstore_new (const char *id, size_t _unused)
344{
345 if (!VOLK_env_is_init) {
346 log_error (VOLK_strerror (VOLK_ENV_ERR));
347 return NULL;
348 }
349
350 (void) _unused;
351 const char *path = mdbstore_path_from_id (id);
352 if (!path) return NULL;
353
354 MDBStore *store;
355 CALLOC_GUARD (store, NULL);
356
357 RCNL (mdb_env_create (&store->env));
358 MDB_txn *txn = NULL;
359
360 // Set map size.
361 size_t mapsize;
362 char *env_mapsize = getenv ("VOLK_MDB_MAPSIZE");
363 if (env_mapsize == NULL) mapsize = DEFAULT_MAPSIZE;
364 else sscanf (env_mapsize, "%zu", &mapsize);
365 log_info (
366 "Setting environment map size at %s to %zu Mb.",
367 path, mapsize / 1024 / 1024);
368 CHECK (mdb_env_set_mapsize (store->env, mapsize), fail);
369 CHECK (mdb_env_set_maxdbs (store->env, N_DB), fail);
370 CHECK (mdb_env_open (store->env, path, 0, ENV_FILE_MODE), fail);
371
372 // Assign DB handles to store->dbi.
373 CHECK (txn_begin (store->env, NULL, 0, &txn), fail);
374 for (int i = 0; i < N_DB; i++)
375 CHECK (mdb_dbi_open (
376 txn, db_labels[i], db_flags[i], store->dbi + i), fail);
377
378 store->flags |= LSSTORE_OPEN;
379 CHECK (txn_commit(txn), fail);
380 txn = NULL;
381
382 log_info ("Created environment at %s", path);
383
384 return store;
385
386fail:
387 if (txn) mdb_txn_abort (txn);
388 mdb_env_close (store->env);
389
390 return NULL;
391}
392
393
394static void
395mdbstore_free (void *h)
396{
397 MDBStore *store = h;
398 if (store->flags & LSSTORE_OPEN) {
399 const char *path;
400 mdb_env_get_path (store->env, &path);
401 log_info ("Closing MDB env at %s.", path);
402 mdb_env_close (store->env);
403 }
404
405 free (store);
406}
407
408
409static VOLK_rc
410mdbstore_stat (const MDBStore *store, MDB_stat *stat)
411{
412 if (!(store->flags & LSSTORE_OPEN)) return 0;
413
414 MDB_txn *txn;
415 RCCK (txn_begin (store->env, NULL, MDB_RDONLY, &txn));
416
417 if (mdb_stat (txn, store->dbi[IDX_SPO_C], stat) != MDB_SUCCESS)
418 return VOLK_DB_ERR;
419 mdb_txn_abort (txn);
420
421 return VOLK_OK;
422}
423
424
425static size_t
426mdbstore_size (const void *h)
427{
428 const MDBStore *store = h;
429 // Size is calculated outside of any pending write txn.
430
431 MDB_stat stat;
432 if (mdbstore_stat (store, &stat) != VOLK_OK) return 0;
433
434 return stat.ms_entries;
435}
436
437
438char *
439mdbstore_id (const void *h)
440{
441 const MDBStore *store = h;
442 const char *path;
443
444 RCNL (mdb_env_get_path (store->env, &path));
445
446 char *id = malloc (strlen (path) + 8);
447 sprintf (id, "file://%s", path);
448
449 return id;
450}
451
452
453static VOLK_rc
454mdbstore_txn_begin (void *h, int flags, void **th)
455{
456 MDBStore *store = h;
457
458 RCCK (txn_begin (store->env, NULL, flags, (MDB_txn **) th));
459
460 return VOLK_OK;
461}
462
463
464static VOLK_rc
465mdbstore_txn_commit (void *th)
466{
467 RCCK (txn_commit ((MDB_txn *) th));
468
469 return VOLK_OK;
470}
471
472
473static void
474mdbstore_txn_abort (void *th)
475{ mdb_txn_abort ((MDB_txn *) th); }
476
477
478static void *
479mdbiter_txn (void *h)
480{ return ((MDBIterator *) h)->txn; }
481
482
493static void *
494mdbstore_add_init (void *h, const VOLK_Buffer *sc, void *th)
495{
496 MDBStore *store = h;
497 /* An iterator is used here. Some members are a bit misused but it does
498 * its job without having to define a very similar struct.
499 */
500 MDBIterator *it;
501 MALLOC_GUARD (it, NULL);
502
503 it->store = store;
504 it->i = 0;
505
506 CHECK (txn_begin (store->env, (MDB_txn *) th, 0, &it->txn), fail);
507
508 if (sc) {
509 // Store context if it's not the default one.
510 it->luc = VOLK_buffer_hash (sc);
511
512 // Insert t:st for context.
513 //LOG_DEBUG("Adding context: %s", sc);
514 it->key.mv_data = &it->luc;
515 it->key.mv_size = KLEN;
516 it->data.mv_data = sc->addr;
517 it->data.mv_size = sc->size;
518
519 int db_rc = mdb_put (
520 it->txn, it->store->dbi[IDX_T_ST],
521 &it->key, &it->data, MDB_NOOVERWRITE);
522 if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
523 log_error (VOLK_strerror (db_rc));
524 mdb_txn_abort (it->txn);
525 return NULL;
526 }
527 } else {
528 LOG_DEBUG("No context passed to iterator, using default.");
530 }
531
532 return it;
533
534fail:
535 free (it);
536
537 return NULL;
538}
539
540
541/*
542 * NOTE: at the moment #mdbstore_remove() or another
543 * #mdbstore_init() cannot be called between #mdbstore_add_init and
544 * #mdbstore_add_abort or #mdbstore_add_done. FIXME
545 *
546 */
547static VOLK_rc
548mdbstore_add_iter (void *h, const VOLK_BufferTriple *sspo)
549{
550 if (UNLIKELY (!h)) return VOLK_VALUE_ERR;
551
552 MDBIterator *it = h;
553 int db_rc = VOLK_NOACTION;
555
556 // Add triple terms.
557 for (int i = 0; i < 3; i++) {
558 VOLK_Buffer *st = VOLK_btriple_pos (sspo, i);
559
560 spok[i] = VOLK_buffer_hash (st);
561
562 it->key.mv_data = spok + i;
563 it->key.mv_size = KLEN;
564 it->data.mv_data = st->addr;
565 it->data.mv_size = st->size;
566
567 db_rc = mdb_put(
568 it->txn, it->store->dbi[IDX_T_ST],
569 &it->key, &it->data, MDB_NOOVERWRITE);
570 if (db_rc != MDB_SUCCESS && db_rc != MDB_KEYEXIST) {
571 LOG_RC (db_rc);
572 return VOLK_DB_ERR;
573 }
574 }
575
576 LOG_TRACE("Inserting spok: {%lx, %lx, %lx}", spok[0], spok[1], spok[2]);
577 LOG_TRACE("Into context: %lx", it->luc);
578
579 // Insert spo:c.
580 it->key.mv_data = spok;
581 it->key.mv_size = TRP_KLEN;
582
583 // In triple mode, data is empty (= NULL_KEY).
584 it->data.mv_data = &it->luc;
585 it->data.mv_size = it->luc == NULL_KEY ? 0 : KLEN;
586
587 db_rc = mdb_put(
588 it->txn, it->store->dbi[IDX_SPO_C],
589 &it->key, &it->data, MDB_NODUPDATA);
590
591 if (db_rc == MDB_KEYEXIST) return VOLK_NOACTION;
592 if (db_rc != MDB_SUCCESS) {
593 log_error (
594 "MDB error while inserting triple: %s", VOLK_strerror(db_rc));
595 return VOLK_DB_ERR;
596 }
597
598 // Index.
599 VOLK_rc rc = index_triple (it->store, OP_ADD, spok, it->luc, it->txn);
600 if (rc == VOLK_OK) it->i++;
601
602 return rc;
603}
604
605
606static VOLK_rc
607mdbstore_add_done (void *h)
608{
609 MDBIterator *it = h;
610 VOLK_rc rc = VOLK_OK;
611 log_debug ("Committing add transaction.");
612
613 if (txn_commit (it->txn) != MDB_SUCCESS) {
614 mdb_txn_abort (it->txn);
615 rc = VOLK_TXN_ERR;
616 }
617
618 free (it);
619
620 RCCK (rc);
621 return rc;
622}
623
624
625static void
626mdbstore_add_abort (void *h)
627{
628 MDBIterator *it = h;
629 mdb_txn_abort (it->txn);
630
631 free (it);
632}
633
634
635static VOLK_rc
636key_to_sterm (
637 MDBStore *store, MDB_txn *txn, const VOLK_Key key, VOLK_Buffer *sterm)
638{
640 int db_rc;
641
642 MDB_val key_v, data_v;
643 key_v.mv_data = (void*)&key;
644 key_v.mv_size = KLEN;
645
646 db_rc = mdb_get (txn, store->dbi[IDX_T_ST], &key_v, &data_v);
647
648 sterm->flags |= VOLK_BUF_BORROWED;
649 if (db_rc == MDB_SUCCESS) {
650 sterm->addr = data_v.mv_data;
651 sterm->size = data_v.mv_size;
652 rc = VOLK_OK;
653 } else if (db_rc == MDB_NOTFOUND) {
654 sterm->addr = NULL;
655 sterm->size = 0;
656 } else rc = VOLK_DB_ERR;
657
658 return rc;
659}
660
661
662static void *
663mdbstore_lookup (
664 void *h, const VOLK_Buffer *ss, const VOLK_Buffer *sp,
665 const VOLK_Buffer *so, const VOLK_Buffer *sc, void *th, size_t *ct)
666{
667 VOLK_TripleKey spok = {
668 VOLK_buffer_hash (ss),
669 VOLK_buffer_hash (sp),
670 VOLK_buffer_hash (so),
671 };
672
673 MDBIterator *it;
674 CALLOC_GUARD (it, NULL);
675
676 it->store = h;
677 it->luc = VOLK_buffer_hash (sc);
678 LOG_DEBUG("Lookup context: %lx", it->luc);
679
680 if (ct) *ct = 0;
681
682 uint8_t idx0, idx1;
683
684 if (th) it->txn = th;
685 else if (!it->txn) {
686 // Start RO transaction if not in a write txn already.
687 it->rc = txn_begin (it->store->env, NULL, MDB_RDONLY, &it->txn);
688 if (it->rc != MDB_SUCCESS) {
689 log_error ("Database error in lookup: %s", VOLK_strerror (it->rc));
690 return NULL;
691 }
692 LOG_TRACE ("Opening new MDB transaction @%p", it->txn);
693 it->flags |= ITER_OPEN_TXN;
694 }
695
696 // Context index loop.
697 if (UNLIKELY (mdb_cursor_open (
698 it->txn, it->store->dbi[IDX_SPO_C], &it->ctx_cur) != MDB_SUCCESS))
699 return NULL;
700
701 /*
702 * Lookup decision tree.
703 */
704 // s p o (all terms bound)
705 if (spok[0] != NULL_KEY && spok[1] != NULL_KEY && spok[2] != NULL_KEY) {
706 it->luk[0] = spok[0];
707 it->luk[1] = spok[1];
708 it->luk[2] = spok[2];
709 PRCNL (lookup_3bound (it, ct));
710
711 } else if (spok[0] != NULL_KEY) {
712 it->luk[0] = spok[0];
713 idx0 = 0;
714
715 // s p ?
716 if (spok[1] != NULL_KEY) {
717 it->luk[1] = spok[1];
718 idx1 = 1;
719 PRCNL (lookup_2bound (idx0, idx1, it, ct));
720
721 // s ? o
722 } else if (spok[2] != NULL_KEY) {
723 it->luk[1] = spok[2];
724 idx1 = 2;
725 PRCNL (lookup_2bound (idx0, idx1, it, ct));
726
727 // s ? ?
728 } else PRCNL (lookup_1bound (idx0, it, ct));
729
730 } else if (spok[1] != NULL_KEY) {
731 it->luk[0] = spok[1];
732 idx0 = 1;
733
734 // ? p o
735 if (spok[2] != NULL_KEY) {
736 it->luk[1] = spok[2];
737 idx1 = 2;
738 PRCNL (lookup_2bound (idx0, idx1, it, ct));
739
740 // ? p ?
741 } else PRCNL (lookup_1bound (idx0, it, ct));
742
743 // ? ? o
744 } else if (spok[2] != NULL_KEY) {
745 it->luk[0] = spok[2];
746 idx0 = 2;
747 PRCNL (lookup_1bound (idx0, it, ct));
748
749 // ? ? ? (all terms unbound)
750 } else PRCNL (lookup_0bound (it, ct));
751
752 return it;
753}
754
755
761static VOLK_rc
762mdbiter_next_key (MDBIterator *it)
763{
764 if (UNLIKELY (!it)) return VOLK_VALUE_ERR;
765
766 // Only advance if the previous it->rc wasn't already at the end.
767 if (it->rc == MDB_NOTFOUND) return VOLK_END;
768
769 if (UNLIKELY (it->rc != MDB_SUCCESS)) {
770 log_error ("Database error: %s", VOLK_strerror (it->rc));
771 return VOLK_DB_ERR;
772 }
773
774 VOLK_rc rc;
775
776 /* Retrieve current value and advance cursor to the next result.
777 * it->rc is set to the result of the next iteration.
778 */
779 it->iter_op_fn (it);
780 MDB_val key, data;
781 int db_rc;
782
783 key.mv_size = TRP_KLEN;
784 data.mv_data = &it->luc;
785 data.mv_size = KLEN;
786
787 if (it->luc) {
788 rc = VOLK_NORESULT; // Flow control value, will never be returned.
789 do {
790 //LOG_DEBUG("begin ctx loop.");
791 /* If ctx is specified, look if the matching triple is associated
792 * with it. If not, move on to the next triple.
793 * The loop normally exits when a triple with matching ctx is found
794 * (VOLK_OK), if there are no more triples (VOLK_END), or if there
795 * is an error (VOLK_DB_ERR).
796 */
797 LOG_TRACE (
798 "Found spok: {%lx, %lx, %lx}",
799 it->spok[0], it->spok[1], it->spok[2]);
800
801 key.mv_data = it->spok;
802
803 db_rc = mdb_cursor_get (it->ctx_cur, &key, &data, MDB_GET_BOTH);
804
805 if (db_rc == MDB_SUCCESS) {
806 rc = VOLK_OK;
807 LOG_TRACE("Triple found in context: %x", it->luc);
808
809 } else if (db_rc == MDB_NOTFOUND) {
810 LOG_TRACE("Triple not found in context: %x", it->luc);
811 if (it->rc == MDB_NOTFOUND) rc = VOLK_END;
812 else it->iter_op_fn (it);
813
814 } else {
815 log_error ("Database error: %s", VOLK_strerror (db_rc));
816 rc = VOLK_DB_ERR;
817 }
818 } while (rc == VOLK_NORESULT);
819
820 } else {
821 LOG_TRACE (
822 "Found spok in any context: {%lx, %lx, %lx}",
823 it->spok[0], it->spok[1], it->spok[2]);
824
825 rc = VOLK_OK;
826 }
827
828 // Get all contexts for a triple.
829 key.mv_data = it->spok;
830 db_rc = mdb_cursor_get (it->ctx_cur, &key, &data, MDB_SET_KEY);
831 if (db_rc != MDB_SUCCESS) {
832 log_error ("No context found for triple!");
833 return VOLK_DB_ERR;
834 }
835
836 size_t ct;
837 db_rc = mdb_cursor_count (it->ctx_cur, &ct);
838 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
839 // 1 spare for sentinel. Always allocated even on zero matches.
840 VOLK_Key *tmp_ck = realloc (it->ck, sizeof (*it->ck) * (ct + 1));
841 if (!tmp_ck) return VOLK_MEM_ERR;
842 it->ck = tmp_ck;
843
844 size_t i = 0;
845 do {
846 //LOG_TRACE("Copying to slot #%lu @%p", i, it->ck + i);
847 memcpy (it->ck + i++, data.mv_data, sizeof (*it->ck));
848 } while (
849 mdb_cursor_get (it->ctx_cur, &key, &data, MDB_NEXT_DUP)
850 == MDB_SUCCESS);
851 //LOG_TRACE("setting sentinel @%p", it->ck + i);
852 it->ck[i] = NULL_KEY;
853
854 return rc;
855}
856
857
858static VOLK_rc
859mdbiter_next (
860 void *h, VOLK_BufferTriple *sspo, VOLK_Buffer **ctx_p)
861{
862 MDBIterator *it = h;
863 VOLK_rc rc = mdbiter_next_key (it);
864
865 if (rc == VOLK_OK) {
866 if (sspo) {
867 key_to_sterm (it->store, it->txn, it->spok[0], sspo->s);
868 key_to_sterm (it->store, it->txn, it->spok[1], sspo->p);
869 key_to_sterm (it->store, it->txn, it->spok[2], sspo->o);
870
871 // TODO error handling.
872 }
873
874 // Contexts for current triple.
875 if (ctx_p) {
876 // Preallocate.
877 size_t i = 0;
878 while (it->ck[i++]); // Include sentinel in count.
879 VOLK_Buffer *ctx;
880 LOG_TRACE("Allocating %lu context buffers + sentinel.", i - 1);
881 ctx = malloc(i * sizeof (*ctx));
882 if (!ctx) return VOLK_MEM_ERR;
883
884 for (i = 0; it->ck[i]; i++)
885 key_to_sterm (it->store, it->txn, it->ck[i], ctx + i);
886 memset (ctx + i, 0, sizeof (*ctx)); // Sentinel
887
888 // TODO error handling.
889 *ctx_p = ctx;
890 }
891 }
892
893 return rc;
894}
895
896
897static void
898mdbiter_free (void *h)
899{
900 if (!h) return;
901 MDBIterator *it = h;
902
903 if (it->cur) mdb_cursor_close (it->cur);
904 if (it->ctx_cur) mdb_cursor_close (it->ctx_cur);
905 if (it->flags & ITER_OPEN_TXN) mdb_txn_abort (it->txn);
906 free (it->ck);
907
908 free (it);
909}
910
911
912static VOLK_rc
913mdbstore_update_ctx (
914 void *h, const VOLK_Buffer *old_c, const VOLK_Buffer *new_c, void *th)
915{
917 MDBStore *store = h;
918 unsigned char *trp_data = NULL;
919
921 old_ck = VOLK_buffer_hash (old_c),
922 new_ck = VOLK_buffer_hash (new_c);
923 // lu_key, lu_data look up all triples with old context in c:spo, and
924 // replace old c with new c.
925 MDB_txn
926 *p_txn = th,
927 *txn;
928 CHECK (rc = txn_begin (store->env, p_txn, 0, &txn), finally);
929
930 MDB_cursor *i_cur, *d_cur;
931 CHECK (
932 rc = mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &i_cur),
933 close_txn);
934 // TODO error handling.
935
936 MDB_val key, data;
937
938 // Return error if the graph URI already exists.
939 key.mv_data = &new_ck;
940 key.mv_size = KLEN;
941 rc = mdb_cursor_get (i_cur, &key, &data, MDB_FIRST_DUP);
942 if (rc == MDB_SUCCESS) {
943 log_error (
944 "Context key %lu already exists. Not replacing old graph.",
945 new_ck);
946 rc = VOLK_CONFLICT;
947 goto close_i;
948 }
949
950 // Add new context term.
951 CHECK (rc = mdbstore_add_term (store, new_c, txn), close_i);
952
953 key.mv_data = &old_ck;
954 // Count triples in cursor.
955 rc = mdb_cursor_get (i_cur, &key, &data, MDB_SET);
956 if (rc == MDB_NOTFOUND) {
957 log_info ("No triples found associated with old context.");
958 rc = VOLK_NOACTION;
959 goto close_i;
960 }
961 if (rc != MDB_SUCCESS) {
962 rc = VOLK_DB_ERR;
963 goto close_i;
964 }
965
966 // From here on, it can only be VOLK_OK or error.
967 rc = VOLK_OK;
968 size_t trp_ct;
969 CHECK (rc = mdb_cursor_count (i_cur, &trp_ct), close_i);
970 trp_data = malloc (trp_ct * TRP_KLEN);
971 if (UNLIKELY (!trp_data)) {
972 rc = VOLK_MEM_ERR;
973 goto close_i;
974 }
975
976 // Copy triple data as one block to temp buffer so that entries can be
977 // deleted while cursors are active.
978 rc = mdb_cursor_get (i_cur, &key, &data, MDB_GET_MULTIPLE);
979 if (rc != MDB_SUCCESS) {
980 rc = rc == MDB_NOTFOUND ? VOLK_NOACTION : VOLK_DB_ERR;
981 goto close_i;
982 }
983 size_t loc_cur = 0;
984 do {
985 memcpy (trp_data + loc_cur, data.mv_data, data.mv_size);
986 loc_cur += data.mv_size;
987 } while (mdb_cursor_get (
988 i_cur, &key, &data, MDB_NEXT_MULTIPLE) == MDB_SUCCESS);
989
990 // Zap c:spo entries in one go.
991 key.mv_data = &old_ck;
992 key.mv_size = KLEN;
993 data.mv_size = TRP_KLEN;
994 CHECK (rc = mdb_cursor_get (i_cur, &key, NULL, MDB_SET), close_i);
995 CHECK (rc = mdb_cursor_del (i_cur, MDB_NODUPDATA), close_i);
996
997 // Re-ad c:spo data individually.
998 key.mv_data = &new_ck;
999 for (size_t i = 0; i < trp_ct; i++) {
1000 data.mv_data = trp_data + i * data.mv_size;
1001 CHECK (
1002 rc = mdb_cursor_put (i_cur, &key, &data, MDB_APPENDDUP),
1003 close_i);
1004 }
1005 // Re-add c:spo data in bulk from buffer with new context.
1006 // FIXME this is not working. Replaced by the for loop above.
1007 /*
1008 MDB_val data_block[] = {
1009 { .mv_data = &new_ck, .mv_size = TRP_KLEN },
1010 { .mv_data = NULL, .mv_size = trp_ct },
1011 };
1012 db_rc = mdb_cursor_put (i_cur, &key, data_block, MDB_MULTIPLE);
1013 */
1014
1015 // Main table.
1016 // Replace spo:c values one by one.
1017 CHECK (rc = mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &d_cur), close_i);
1018 key.mv_size = TRP_KLEN;
1019 data.mv_size = KLEN;
1020 for (size_t i = 0; i < trp_ct; i++) {
1021 key.mv_data = trp_data + i * key.mv_size;
1022 data.mv_data = &old_ck;
1023 CHECK (
1024 rc = mdb_cursor_get (d_cur, &key, &data, MDB_GET_BOTH),
1025 close_d);
1026 CHECK (rc = mdb_cursor_del (d_cur, 0), close_d);
1027 data.mv_data = &new_ck;
1028 CHECK (
1029 rc = mdb_cursor_put (d_cur, &key, &data, MDB_NOOVERWRITE),
1030 close_d);
1031 }
1032
1033close_d:
1034 mdb_cursor_close (d_cur);
1035close_i:
1036 mdb_cursor_close (i_cur);
1037close_txn:
1038 if (rc == VOLK_OK) RCCK (txn_commit (txn));
1039 else mdb_txn_abort (txn);
1040
1041 if (trp_data) free (trp_data);
1042finally:
1043
1044 return rc;
1045}
1046
1047
1048static VOLK_rc
1049mdbstore_remove (
1050 void *h, const VOLK_Buffer *ss, const VOLK_Buffer *sp,
1051 const VOLK_Buffer *so, const VOLK_Buffer *sc, void *th, size_t *ct)
1052{
1053 MDBStore *store = h;
1054 VOLK_rc
1055 rc = VOLK_NOACTION,
1056 db_rc = 0;
1057
1058 VOLK_Key ck = NULL_KEY;
1059
1060 if (sc == NULL) sc = VOLK_default_ctx_buf;
1061 ck = VOLK_buffer_hash (sc);
1062
1063 MDB_txn *txn;
1064 RCCK (txn_begin (store->env, (MDB_txn *) th, 0, &txn));
1065
1066 MDB_cursor *dcur, *icur;
1067 mdb_cursor_open (txn, store->dbi[IDX_SPO_C], &dcur);
1068 mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &icur);
1069
1070 MDB_val spok_v, ck_v;
1071
1072 spok_v.mv_size = TRP_KLEN;
1073 ck_v.mv_size = KLEN;
1074 ck_v.mv_data = &ck;
1075
1076 // The lookup operates within the current (bottom) write transaction.
1077 MDBIterator *it = mdbstore_lookup (store, ss, sp, so, sc, txn, ct);
1078 if (UNLIKELY (!it)) return VOLK_DB_ERR;
1079 if (ct) {LOG_DEBUG("Found %lu triples to remove.", *ct);}
1080
1081 while (mdbiter_next_key (it) == VOLK_OK) {
1082 spok_v.mv_data = it->spok;
1083
1084 db_rc = mdb_cursor_get (dcur, &spok_v, &ck_v, MDB_GET_BOTH);
1085 if (db_rc == MDB_NOTFOUND) continue;
1086 if (UNLIKELY (db_rc != MDB_SUCCESS)) goto fail;
1087
1088 LOG_TRACE(
1089 "Removing {%lx, %lx, %lx}",
1090 it->spok[0], it->spok[1], it->spok[2]);
1091
1092 // Delete spo:c entry.
1093 db_rc = mdb_cursor_del (dcur, 0);
1094 if (UNLIKELY (db_rc != MDB_SUCCESS)) goto fail;
1095
1096 // Restore ck address after each delete.
1097 spok_v.mv_data = it->spok;
1098 ck_v.mv_data = &ck;
1099
1100 // Delete c:spo entry.
1101 db_rc = mdb_cursor_get (icur, &ck_v, &spok_v, MDB_GET_BOTH);
1102 if (db_rc == MDB_NOTFOUND) continue;
1103 if (UNLIKELY (db_rc != MDB_SUCCESS)) goto fail;
1104
1105 db_rc = mdb_cursor_del (icur, 0);
1106 if (UNLIKELY (db_rc != MDB_SUCCESS)) goto fail;
1107
1108 spok_v.mv_data = it->spok;
1109 ck_v.mv_data = &ck;
1110
1111 // If there are no more contexts associated with this triple,
1112 // remove from indices.
1113 db_rc = mdb_cursor_get (dcur, &spok_v, NULL, MDB_SET);
1114 if (db_rc == MDB_SUCCESS) continue;
1115 if (UNLIKELY (db_rc != MDB_NOTFOUND)) goto fail;
1116
1117 rc = index_triple (store, OP_REMOVE, it->spok, ck, txn);
1118 }
1119
1120 mdbiter_free (it);
1121
1122 if (UNLIKELY (txn_commit (txn) != MDB_SUCCESS)) {
1123 rc = VOLK_TXN_ERR;
1124 goto fail;
1125 }
1126
1127 return rc;
1128
1129fail:
1130 mdb_txn_abort (txn);
1131
1132 log_error ("Database error: %s", VOLK_strerror (db_rc));
1133
1134 return rc == VOLK_TXN_ERR ? rc : VOLK_DB_ERR;
1135}
1136
1137
1138#if 0
1139static int
1140mdbstore_tkey_exists (MDBStore *store, VOLK_Key tkey)
1141{
1142 int db_rc, rc;
1143 MDB_val key, data;
1144 key.mv_data = &tkey;
1145 key.mv_size = KLEN;
1146
1147 MDB_txn *txn = NULL;
1148 txn_begin (store->env, NULL, MDB_RDONLY, &txn);
1149
1150 MDB_cursor *cur = NULL;
1151 mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur);
1152
1153 db_rc = mdb_cursor_get (cur, &key, &data, MDB_SET);
1154
1155 if (db_rc == MDB_SUCCESS) rc = 1;
1156 else if (db_rc == MDB_NOTFOUND) rc = 0;
1157 else {
1158 log_error ("DB error: %s", VOLK_strerror (db_rc));
1159 rc = VOLK_DB_ERR;
1160 }
1161
1162 if (cur) mdb_cursor_close (cur);
1163 if (txn) mdb_txn_abort (txn);
1164
1165 return rc;
1166}
1167#endif
1168
1169
1181static VOLK_rc
1182mdbstore_add_term (void *h, const VOLK_Buffer *sterm, void *th)
1183{
1184 //LOG_TRACE("Adding term to MDB store: %s", sterm->addr);
1185 MDBStore *store = h;
1186 int db_rc;
1187 MDB_val key, data;
1188
1189 MDB_txn *txn;
1190 // If an active transaction was passed, use it, otherwise open and
1191 // close a new one.
1192 bool borrowed_txn = (th != NULL);
1193 if (borrowed_txn) txn = th;
1194 else RCCK (txn_begin (store->env, NULL, 0, &txn));
1195
1196 MDB_cursor *cur;
1197 CHECK (mdb_cursor_open (txn, store->dbi[IDX_T_ST], &cur), fail);
1198
1199 VOLK_Key k = VOLK_buffer_hash (sterm);
1200 key.mv_data = &k;
1201 key.mv_size = sizeof (k);
1202
1203 data.mv_data = sterm->addr;
1204 data.mv_size = sterm->size;
1205
1206 db_rc = mdb_cursor_put (cur, &key, &data, MDB_NOOVERWRITE);
1207 if (db_rc != MDB_KEYEXIST) CHECK (db_rc, fail);
1208
1209 if (!borrowed_txn) CHECK (db_rc = txn_commit (txn), fail);
1210
1211 return VOLK_OK;
1212
1213fail:
1214 if (!borrowed_txn) mdb_txn_abort (txn);
1215 LOG_TRACE("Aborted txn for adding term.");
1216 return VOLK_DB_ERR;
1217}
1218
1219
1220VOLK_Buffer **
1221mdbstore_ctx_list (void *h, void *th)
1222{
1223 MDBStore *store = h;
1224 VOLK_rc db_rc;
1225 MDB_txn *txn;
1226 VOLK_Buffer **tdata = NULL;
1227 if (th) txn = th;
1228 else CHECK (txn_begin (store->env, NULL, MDB_RDONLY, &txn), fail);
1229
1230
1231 MDB_cursor *cur;
1232 CHECK (mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur), fail);
1233 MDB_val key, data;
1234 db_rc = mdb_cursor_get (cur, &key, &data, MDB_FIRST);
1235
1236 size_t i = 0;
1237 while (db_rc == MDB_SUCCESS) {
1238 tdata = realloc (tdata, (i + 1) * sizeof (*tdata));
1239 if (UNLIKELY (!tdata)) goto fail;
1240 tdata[i] = BUF_DUMMY;
1241 VOLK_Key tkey = *(VOLK_Key*)key.mv_data;
1242 CHECK (key_to_sterm (store, txn, tkey, tdata[i]), fail);
1243 db_rc = mdb_cursor_get (cur, &key, &data, MDB_NEXT_NODUP);
1244 i++;
1245 }
1246
1247 tdata = realloc (tdata, i * sizeof (data));
1248 tdata[i] = NULL; // Sentinel
1249 mdb_cursor_close (cur);
1250 if (txn != th && txn != NULL) mdb_txn_abort (txn);
1251
1252 return tdata;
1253
1254fail:
1255 if (txn != th && txn != NULL) mdb_txn_abort (txn);
1256 if (tdata) free (tdata);
1257 return NULL;
1258}
1259
1260
1262 .name = "MDB Store",
1265
1266 .setup_fn = mdbstore_setup,
1267 .new_fn = mdbstore_new,
1268 .free_fn = mdbstore_free,
1269
1270 .size_fn = mdbstore_size,
1271 .id_fn = mdbstore_id,
1272
1273 .txn_begin_fn = mdbstore_txn_begin,
1274 .txn_commit_fn = mdbstore_txn_commit,
1275 .txn_abort_fn = mdbstore_txn_abort,
1276 .iter_txn_fn = mdbiter_txn,
1277
1278 .add_init_fn = mdbstore_add_init,
1279 .add_iter_fn = mdbstore_add_iter,
1280 .add_abort_fn = mdbstore_add_abort,
1281 .add_done_fn = mdbstore_add_done,
1282 .add_term_fn = mdbstore_add_term,
1283
1284 .update_ctx_fn = mdbstore_update_ctx,
1285
1286 .lookup_fn = mdbstore_lookup,
1287 .lu_next_fn = mdbiter_next,
1288 .lu_free_fn = mdbiter_free,
1289
1290 .remove_fn = mdbstore_remove,
1291
1292 .ctx_list_fn = mdbstore_ctx_list,
1293};
1294
1295
1296/* * * Static functions. * * */
1297
1307static VOLK_rc
1308index_triple(
1309 MDBStore *store, StoreOp op, VOLK_TripleKey spok, VOLK_Key ck,
1310 MDB_txn *txn)
1311{
1312 int db_rc;
1314 MDB_val v1, v2;
1315
1316 LOG_TRACE("Indexing triple: {%lx %lx %lx}", spok[0], spok[1], spok[2]);
1317
1318 // Index c:spo.
1319 if (op == OP_REMOVE) {
1320 LOG_TRACE("Indexing op: REMOVE");
1321 if (ck != NULL_KEY) {
1322 MDB_cursor *cur;
1323
1324 v1.mv_data = &ck;
1325 v1.mv_size = KLEN;
1326 v2.mv_data = spok;
1327 v2.mv_size = TRP_KLEN;
1328
1329 mdb_cursor_open (txn, store->dbi[IDX_C_SPO], &cur);
1330 if (mdb_cursor_get (cur, &v1, &v2, MDB_GET_BOTH) == MDB_SUCCESS) {
1331 db_rc = mdb_cursor_del (cur, 0);
1332 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
1333
1334 rc = VOLK_OK;
1335 }
1336
1337 mdb_cursor_close (cur);
1338 }
1339
1340 } else if (op == OP_ADD) {
1341 LOG_TRACE("Indexing op: ADD");
1342 if (ck != NULL_KEY) {
1343 v1.mv_data = &ck;
1344 v1.mv_size = KLEN;
1345 v2.mv_data = spok;
1346 v2.mv_size = TRP_KLEN;
1347
1348 db_rc = mdb_put(
1349 txn, store->dbi[IDX_C_SPO],
1350 &v1, &v2, MDB_NODUPDATA);
1351 if (db_rc != MDB_SUCCESS) return VOLK_DB_ERR;
1352 if (db_rc != MDB_KEYEXIST) rc = VOLK_OK;
1353 }
1354
1355 } else return VOLK_VALUE_ERR;
1356
1357 VOLK_DoubleKey dbl_keys[3] = {
1358 {spok[1], spok[2]}, // po
1359 {spok[0], spok[2]}, // so
1360 {spok[0], spok[1]}, // sp
1361 };
1362
1363 // Add terms to index.
1364 v1.mv_size = KLEN;
1365 v2.mv_size = DBL_KLEN;
1366
1367 for (int i = 0; i < 3; i++) {
1368 MDB_dbi db1 = store->dbi[lookup_indices[i]]; // s:po, p:so, o:sp
1369 MDB_dbi db2 = store->dbi[lookup_indices[i + 3]]; // po:s, so:p, sp:o
1370
1371 v1.mv_data = spok + i;
1372 v2.mv_data = dbl_keys[i];
1373
1374 if (op == OP_REMOVE) {
1375 MDB_cursor *cur1, *cur2;
1376 mdb_cursor_open(txn, store->dbi[lookup_indices[i]], &cur1);
1377
1378 db_rc = mdb_cursor_get (cur1, &v1, &v2, MDB_GET_BOTH);
1379 if (db_rc == MDB_SUCCESS) mdb_cursor_del (cur1, 0);
1380
1381 mdb_cursor_close (cur1);
1382
1383 // Restore pointers invalidated after delete.
1384 v1.mv_data = spok + i;
1385 v2.mv_data = dbl_keys[i];
1386
1387 mdb_cursor_open(txn, store->dbi[lookup_indices[i + 3]], &cur2);
1388
1389 db_rc = mdb_cursor_get (cur2, &v2, &v1, MDB_GET_BOTH);
1390 if (db_rc == MDB_SUCCESS) mdb_cursor_del (cur2, 0);
1391 // TODO error handling.
1392 rc = VOLK_OK;
1393
1394 mdb_cursor_close (cur2);
1395
1396 } else { // OP_ADD is guaranteed.
1397 // 1-bound index.
1398 LOG_TRACE("Indexing in %s: ", db_labels[lookup_indices[i]]);
1399 LOG_TRACE(
1400 "%lx: %lx %lx", *(size_t*)(v1.mv_data),
1401 *(size_t*)(v2.mv_data), *(size_t*)(v2.mv_data) + 1);
1402
1403 db_rc = mdb_put (txn, db1, &v1, &v2, MDB_NODUPDATA);
1404
1405 if (db_rc == MDB_SUCCESS) rc = VOLK_OK;
1406 else if (db_rc != MDB_KEYEXIST) return VOLK_DB_ERR;
1407
1408 // 2-bound index.
1409 LOG_TRACE("Indexing in %s: ", db_labels[lookup_indices[i + 3]]);
1410 LOG_TRACE(
1411 "%lx %lx: %lx", *(size_t*)(v2.mv_data),
1412 *(size_t*)(v2.mv_data) + 1, *(size_t*)(v1.mv_data));
1413
1414 db_rc = mdb_put (txn, db2, &v2, &v1, MDB_NODUPDATA);
1415
1416 if (db_rc == MDB_SUCCESS) rc = VOLK_OK;
1417 else if (db_rc != MDB_KEYEXIST) return VOLK_DB_ERR;
1418 }
1419 }
1420
1421 return rc;
1422}
1423
1424
1425/* * * Term-specific iterators. * * */
1426
1431inline static void
1432it_next_0bound (MDBIterator *it)
1433{
1434 memcpy (it->spok, it->key.mv_data, sizeof (VOLK_TripleKey));
1435
1436 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT);
1437}
1438
1439
1446inline static void
1447it_next_1bound (MDBIterator *it)
1448{
1449 VOLK_DoubleKey *lu_dset = it->data.mv_data;
1450
1451 it->spok[it->term_order[0]] = it->luk[0];
1452 it->spok[it->term_order[1]] = lu_dset[it->i][0];
1453 it->spok[it->term_order[2]] = lu_dset[it->i][1];
1454
1455 LOG_TRACE(
1456 "Composed triple: {%lx %lx %lx}",
1457 it->spok[0], it->spok[1], it->spok[2]);
1458
1459 // Ensure next block within the same page is not beyond the last.
1460 if (it->i < it->data.mv_size / DBL_KLEN - 1) {
1461 it->i ++;
1462 //LOG_DEBUG("Increasing page cursor to %lu.", it->i);
1463 //LOG_DEBUG("it->rc: %d", it->rc);
1464
1465 } else {
1466 // If the last block in the page is being yielded,
1467 // move cursor to beginning of next page.
1468 it->i = 0;
1469 //LOG_DEBUG("Reset page cursor to %lu.", it->i);
1470 it->rc = mdb_cursor_get (
1471 it->cur, &it->key, &it->data, MDB_NEXT_MULTIPLE);
1472 }
1473}
1474
1475
1482inline static void
1483it_next_2bound (MDBIterator *it)
1484{
1485 VOLK_Key *lu_dset = it->data.mv_data;
1486
1487 it->spok[it->term_order[0]] = it->luk[0];
1488 it->spok[it->term_order[1]] = it->luk[1];
1489 it->spok[it->term_order[2]] = lu_dset[it->i];
1490
1491 // Ensure next block within the same page is not beyond the last.
1492 if (it->i < it->data.mv_size / KLEN - 1)
1493 it->i ++;
1494 else {
1495 // If the last block in the page is being yielded,
1496 // move cursor to beginning of next page.
1497 it->i = 0;
1498 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_NEXT_MULTIPLE);
1499 }
1500}
1501
1502
1509inline static void
1510it_next_3bound (MDBIterator *it)
1511{ it->rc = MDB_NOTFOUND; }
1512
1513
1514/* * * Term-specific lookups. * * */
1515
1516inline static VOLK_rc
1517lookup_0bound (MDBIterator *it, size_t *ct)
1518{
1519 LOG_DEBUG("Looking up 0 bound terms.");
1520
1521 if (ct) {
1522 if (it->luc != NULL_KEY) {
1523 // Look up by given context.
1524 it->rc = mdb_cursor_open (
1525 it->txn, it->store->dbi[IDX_C_SPO], &it->cur);
1526
1527 it->key.mv_data = &it->luc;
1528 it->key.mv_size = KLEN;
1529
1530 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1531 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1532
1533 mdb_cursor_close (it->cur);
1534 it->cur = NULL;
1535
1536 } else {
1537 // Look up all contexts.
1538 MDB_stat stat;
1539 mdb_stat (it->txn, it->store->dbi[IDX_S_PO], &stat);
1540
1541 *ct = stat.ms_entries;
1542 }
1543 LOG_DEBUG("Found %lu triples.", *ct);
1544 }
1545
1546 it->rc = mdb_cursor_open (it->txn, it->store->dbi[IDX_SPO_C], &it->cur);
1547 if (it->rc != MDB_SUCCESS) {
1548 log_error ("Database error: %s", VOLK_strerror (it->rc));
1549 return VOLK_DB_ERR;
1550 }
1551
1552 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_FIRST);
1553 /*
1554 mdb_cursor_close (it->cur);
1555 it->cur = NULL;
1556 */
1557
1558 it->iter_op_fn = it_next_0bound;
1559
1560 if (it->rc != MDB_SUCCESS && it->rc != MDB_NOTFOUND) {
1561 log_error ("Database error: %s", VOLK_strerror (it->rc));
1562 return VOLK_DB_ERR;
1563 }
1564
1565 return VOLK_OK;
1566}
1567
1568
1569inline static VOLK_rc
1570lookup_1bound (uint8_t idx0, MDBIterator *it, size_t *ct)
1571{
1572 it->term_order = (const uint8_t*)lookup_ordering_1bound[idx0];
1573
1574 LOG_DEBUG("Looking up 1 bound term: %lx", it->luk[0]);
1575
1576 mdb_cursor_open (it->txn, it->store->dbi[lookup_indices[idx0]], &it->cur);
1577
1578 it->key.mv_data = it->luk;
1579 it->key.mv_size = KLEN;
1580
1581 if (ct) {
1582 // If a context is specified, the only way to count triples matching
1583 // the context is to loop over them.
1584 if (it->luc != NULL_KEY) {
1585 LOG_DEBUG("Counting in context: %lx", it->luc);
1586 MDBIterator *ct_it;
1587 MALLOC_GUARD (ct_it, VOLK_MEM_ERR);
1588 /*
1589 memcpy (ct_it, it, sizeof (*ct_it));
1590 */
1591
1592 ct_it->store = it->store;
1593 ct_it->txn = it->txn;
1594 ct_it->ctx_cur = it->ctx_cur;
1595 ct_it->key = it->key;
1596 ct_it->data = it->data;
1597 ct_it->ck = NULL;
1598 ct_it->luk[0] = it->luk[0];
1599 ct_it->luc = it->luc;
1600 ct_it->i = 0;
1601
1602 VOLK_rc rc = lookup_1bound (idx0, ct_it, NULL);
1603 if (rc < 0) return rc;
1604
1605 VOLK_rc db_rc;
1606 while (VOLK_END != (db_rc = mdbiter_next_key (ct_it))) {
1607 if (UNLIKELY (db_rc < 0)) return db_rc;
1608 (*ct)++;
1609 }
1610
1611 // Free the counter iterator without freeing the shared txn.
1612 if (ct_it->cur) mdb_cursor_close (ct_it->cur);
1613 free (ct_it->ck);
1614 free (ct_it);
1615
1616 } else {
1617 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1618 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1619 }
1620 LOG_DEBUG("Found %lu triples.", *ct);
1621 }
1622
1623 it->i = 0;
1624 it->iter_op_fn = it_next_1bound;
1625
1626 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1627 if (it->rc == MDB_SUCCESS)
1628 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_MULTIPLE);
1629
1630 if (it->rc != MDB_SUCCESS && it->rc != MDB_NOTFOUND) {
1631 log_error ("Database error: %s", VOLK_strerror (it->rc));
1632 return VOLK_DB_ERR;
1633 }
1634
1635 return VOLK_OK;
1636}
1637
1638
1639inline static VOLK_rc
1640lookup_2bound(uint8_t idx0, uint8_t idx1, MDBIterator *it, size_t *ct)
1641{
1642 uint8_t luk1_offset, luk2_offset;
1643 MDB_dbi dbi = 0;
1644
1645 // Establish lookup ordering with some awkward offset math.
1646 for (int i = 0; i < 3; i++) {
1647 if (
1648 (
1649 idx0 == lookup_ordering_2bound[i][0] &&
1650 idx1 == lookup_ordering_2bound[i][1]
1651 ) || (
1652 idx0 == lookup_ordering_2bound[i][1] &&
1653 idx1 == lookup_ordering_2bound[i][0]
1654 )
1655 ) {
1656 it->term_order = (const uint8_t*)lookup_ordering_2bound[i];
1657 if (it->term_order[0] == idx0) {
1658 luk1_offset = 0;
1659 luk2_offset = 1;
1660 } else {
1661 luk1_offset = 1;
1662 luk2_offset = 0;
1663 }
1664 dbi = it->store->dbi[lookup_indices[i + 3]];
1665 LOG_DEBUG (
1666 "Looking up 2 bound in %s",
1667 db_labels[lookup_indices[i + 3]]);
1668
1669 break;
1670 }
1671 }
1672
1673 if (dbi == 0) {
1674 log_error (
1675 "Values %d and %d not found in lookup keys.",
1676 idx0, idx1);
1677 return VOLK_VALUE_ERR;
1678 }
1679
1680 // Compose term keys in lookup key.
1681 VOLK_DoubleKey luk;
1682 luk[luk1_offset] = it->luk[0];
1683 luk[luk2_offset] = it->luk[1];
1684
1685 it->key.mv_data = luk;
1686 it->key.mv_size = DBL_KLEN;
1687
1688 mdb_cursor_open (it->txn, dbi, &it->cur);
1689 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1690
1691 if (ct) {
1692 // If a context is specified, the only way to count triples matching
1693 // the context is to loop over them.
1694 if (it->luc != NULL_KEY) {
1695 MDBIterator *ct_it;
1696 MALLOC_GUARD (ct_it, VOLK_MEM_ERR);
1697
1698 ct_it->store = it->store;
1699 ct_it->txn = it->txn;
1700 ct_it->ctx_cur = it->ctx_cur;
1701 ct_it->ck = NULL;
1702 ct_it->luk[0] = it->luk[0];
1703 ct_it->luk[1] = it->luk[1];
1704 ct_it->luc = it->luc;
1705 ct_it->i = 0;
1706
1707 lookup_2bound (idx0, idx1, ct_it, NULL);
1708
1709 while (mdbiter_next_key (ct_it) != VOLK_END) (*ct) ++;
1710
1711 // Free the counter iterator without freeing the shared txn.
1712 if (ct_it->cur) mdb_cursor_close (ct_it->cur);
1713 free (ct_it->ck);
1714 free (ct_it);
1715
1716 } else {
1717 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1718 if (it->rc == MDB_SUCCESS) mdb_cursor_count (it->cur, ct);
1719 }
1720 LOG_DEBUG("Found %lu triples.", *ct);
1721 }
1722
1723 it->i = 0;
1724 it->iter_op_fn = it_next_2bound;
1725
1726 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_SET);
1727 if (it->rc == MDB_SUCCESS)
1728 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_MULTIPLE);
1729
1730 if (it->rc != MDB_SUCCESS && it->rc != MDB_NOTFOUND) {
1731 log_error ("Database error: %s", VOLK_strerror (it->rc));
1732 return VOLK_DB_ERR;
1733 }
1734
1735 return VOLK_OK;
1736}
1737
1738
1739inline static VOLK_rc
1740lookup_3bound (MDBIterator *it, size_t *ct)
1741{
1742 LOG_DEBUG(
1743 "Looking up 3 bound: {%lx, %lx, %lx}",
1744 it->luk[0], it->luk[1], it->luk[2]);
1745
1746 it->key.mv_data = it->luk;
1747
1748 if (it->luc != NULL_KEY) {
1749 it->rc = mdb_cursor_open (
1750 it->txn, it->store->dbi[IDX_SPO_C], &it->cur);
1751
1752 it->key.mv_size = TRP_KLEN;
1753 it->data.mv_data = &it->luc;
1754 it->data.mv_size = KLEN;
1755
1756 } else {
1757 it->rc = mdb_cursor_open (it->txn, it->store->dbi[IDX_S_PO], &it->cur);
1758
1759 it->key.mv_size = KLEN;
1760 it->data.mv_data = it->luk + 1;
1761 it->data.mv_size = DBL_KLEN;
1762 }
1763
1764 it->rc = mdb_cursor_get (it->cur, &it->key, &it->data, MDB_GET_BOTH);
1765
1766 if (it->rc != MDB_SUCCESS && it->rc != MDB_NOTFOUND) {
1767 log_error ("Database error: %s", VOLK_strerror (it->rc));
1768 return VOLK_DB_ERR;
1769 }
1770
1771 mdb_cursor_close (it->cur);
1772 it->cur = NULL;
1773
1774 if (ct && it->rc == MDB_SUCCESS) *ct = 1;
1775
1776 it->iter_op_fn = it_next_3bound;
1777 memcpy (it->spok, it->luk, sizeof (VOLK_TripleKey));
1778
1779 return VOLK_OK;
1780}
1781
1782
#define UNLIKELY(x)
Definition core.h:39
#define NULL_KEY
"NULL" key, a value that is never user-provided.
Definition buffer.h:15
VOLK_Key VOLK_buffer_hash(const VOLK_Buffer *buf)
Hash a buffer.
Definition buffer.h:175
VOLK_Buffer * VOLK_btriple_pos(const VOLK_BufferTriple *trp, VOLK_TriplePos n)
Get serialized triple by term position.
Definition buffer.h:297
VOLK_Buffer * VOLK_default_ctx_buf
Serialized default context.
Definition buffer.c:5
#define BUF_DUMMY
Dummy buffer to be used with VOLK_buffer_init.
Definition buffer.h:154
@ VOLK_BUF_BORROWED
Definition buffer.h:28
#define NULL_TRP
"NULL" triple, a value that is never user-provided.
Definition core.h:66
#define DBL_KLEN
Definition core.h:58
#define KLEN
Definition core.h:57
#define TRP_KLEN
Definition core.h:59
bool VOLK_env_is_init
Whether the environment is initialized.
Definition core.c:11
#define MALLOC_GUARD(var, rc)
Allocate one pointer with malloc and return rc if it fails.
Definition core.h:375
#define RCNL(exp)
Return NULL if exp returns a nonzero value.
Definition core.h:345
#define LOG_TRACE(...)
Definition core.h:276
#define PRCNL(exp)
Return NULL if exp returns a negative value (=error).
Definition core.h:356
#define CHECK(exp, marker)
Jump to marker if exp does not return VOLK_OK.
Definition core.h:291
#define LOG_DEBUG(...)
Definition core.h:275
#define CALLOC_GUARD(var, rc)
Allocate one pointer with calloc and return rc if it fails.
Definition core.h:381
VOLK_rc rm_r(const char *path)
Remove a directory recursively (POSIX compatible).
Definition core.c:124
#define RCCK(exp)
Return exp return value if it is of VOLK_rc type and nonzero.
Definition core.h:323
VOLK_rc mkdir_p(const char *_path, mode_t mode)
Make recursive directories.
Definition core.c:50
#define LOG_RC(rc)
Log an error or warning for return codes that are not VOLK_OK.
Definition core.h:285
#define PRCCK(exp)
Return exp return value if it is of VOLK_rc type and negative (=error).
Definition core.h:334
size_t VOLK_Key
Term key, i.e., hash of a serialized term.
Definition core.h:230
VOLK_Key VOLK_TripleKey[3]
Array of three VOLK_Key values, representing a triple.
Definition core.h:234
VOLK_Key VOLK_DoubleKey[2]
Array of two VOLK_Key values.
Definition core.h:232
#define VOLK_VALUE_ERR
An invalid input value was provided.
Definition core.h:129
#define VOLK_MEM_ERR
Memory allocation error.
Definition core.h:144
#define VOLK_NORESULT
No result yielded.
Definition core.h:100
#define VOLK_DB_ERR
Low-level database error.
Definition core.h:135
#define VOLK_CONFLICT
Conflict warning.
Definition core.h:120
#define VOLK_END
Loop end.
Definition core.h:107
#define VOLK_OK
Generic success return code.
Definition core.h:83
#define VOLK_NOACTION
No action taken.
Definition core.h:93
int VOLK_rc
Definition core.h:79
#define VOLK_ENV_ERR
Error while handling environment setup; or environment not initialized.
Definition core.h:158
#define VOLK_TXN_ERR
Error handling a store transaction.
Definition core.h:132
const char * VOLK_strerror(VOLK_rc rc)
Return an error message for a return code.
Definition core.c:195
@ VOLK_STORE_TXN
Supports transaction handling.
@ VOLK_STORE_COW
Copy on write.
@ VOLK_STORE_IDX
Store is fully SPO(C)-indexed.
@ VOLK_STORE_PERM
@ VOLK_STORE_CTX
VOLK_Buffer ** mdbstore_ctx_list(void *h, void *th)
Definition store_mdb.c:1221
StoreFlags
Store state flags.
Definition store_mdb.c:31
@ LSSTORE_OPEN
Env is open.
Definition store_mdb.c:32
#define MAIN_TABLE
Definition store_mdb.c:110
char DbLabel[8]
Definition store_mdb.c:27
#define ENV_FILE_MODE
Definition store_mdb.c:21
#define DEFAULT_MAPSIZE
Definition store_mdb.c:15
#define ENV_DIR_MODE
Definition store_mdb.c:20
const VOLK_StoreInt mdbstore_int
MDB store interface.
Definition store_mdb.c:1261
char * mdbstore_id(const void *h)
Definition store_mdb.c:439
#define LOOKUP_TABLE
Definition store_mdb.c:119
void(* iter_op_fn_t)(MDBIterator *it)
Iterator operation.
Definition store_mdb.c:70
#define N_DB
Definition store_mdb.c:6
IterFlags
Iterator state flags.
Definition store_mdb.c:36
@ ITER_OPEN_TXN
Definition store_mdb.c:37
StoreOp
Definition store_mdb.c:49
@ OP_ADD
Definition store_mdb.c:50
@ OP_REMOVE
Definition store_mdb.c:51
DBIdx
Definition store_mdb.c:143
LMDB graph store backend.
#define VOLK_MDB_STORE_URN
Default MDB store identifier and location.
Definition store_mdb.h:40
Triple iterator.
Definition store_mdb.c:74
MDB_cursor * ctx_cur
MDB c:spo index cursor.
Definition store_mdb.c:79
VOLK_TripleKey spok
Triple to be populated with match.
Definition store_mdb.c:82
VOLK_Key luk[3]
0รท3 lookup keys.
Definition store_mdb.c:89
MDB_cursor * cur
MDB cursor.
Definition store_mdb.c:78
const uint8_t * term_order
Term order used in 1-2bound look-ups.
Definition store_mdb.c:88
MDBStore * store
MDB store handle.
Definition store_mdb.c:75
IterFlags flags
Iterator flags.
Definition store_mdb.c:76
MDB_txn * txn
MDB transaction.
Definition store_mdb.c:77
VOLK_Key luc
Ctx key to filter by. May be NULL_KEY.
Definition store_mdb.c:90
size_t i
Internal counter for paged lookups.
Definition store_mdb.c:91
iter_op_fn_t iter_op_fn
Function used to look up next match.
Definition store_mdb.c:87
MDB_val key
Internal data handler.
Definition store_mdb.c:80
size_t ct
Definition store_mdb.c:92
int rc
MDB_* return code for the next result.
Definition store_mdb.c:94
MDB_val data
Internal data handler.
Definition store_mdb.c:81
VOLK_Key * ck
Definition store_mdb.c:83
StoreFlags flags
Store state flags.
Definition store_mdb.c:57
MDB_env * env
Environment handle.
Definition store_mdb.c:55
MDB_dbi dbi[N_DB]
DB handles. Refer to DbIdx enum.
Definition store_mdb.c:56
Triple of byte buffers.
Definition buffer.h:60
VOLK_Buffer * o
Definition buffer.h:63
VOLK_Buffer * s
Definition buffer.h:61
VOLK_Buffer * p
Definition buffer.h:62
General-purpose data buffer.
Definition buffer.h:47
VOLK_BufferFlag flags
Definition buffer.h:50
unsigned char * addr
Definition buffer.h:48
size_t size
Definition buffer.h:49
Store interface.