HashJoin.cpp (Firebird-3.0.2.32703-0.tar.bz2) | : | HashJoin.cpp (Firebird-3.0.4.33054-0.tar.bz2) | ||
---|---|---|---|---|
skipping to change at line 26 | skipping to change at line 26 | |||
* Copyright (c) 2009 Dmitry Yemanov <dimitr@firebirdsql.org> | * Copyright (c) 2009 Dmitry Yemanov <dimitr@firebirdsql.org> | |||
* and all contributors signed below. | * and all contributors signed below. | |||
* | * | |||
* All Rights Reserved. | * All Rights Reserved. | |||
* Contributor(s): ______________________________________. | * Contributor(s): ______________________________________. | |||
*/ | */ | |||
#include "firebird.h" | #include "firebird.h" | |||
#include "../common/classes/Hash.h" | #include "../common/classes/Hash.h" | |||
#include "../jrd/jrd.h" | #include "../jrd/jrd.h" | |||
#include "../jrd/btr.h" | ||||
#include "../jrd/req.h" | #include "../jrd/req.h" | |||
#include "../jrd/intl.h" | #include "../jrd/intl.h" | |||
#include "../jrd/cmp_proto.h" | #include "../jrd/cmp_proto.h" | |||
#include "../jrd/evl_proto.h" | #include "../jrd/evl_proto.h" | |||
#include "../jrd/mov_proto.h" | #include "../jrd/mov_proto.h" | |||
#include "../jrd/intl_proto.h" | #include "../jrd/intl_proto.h" | |||
#include "../jrd/Collation.h" | ||||
#include "RecordSource.h" | #include "RecordSource.h" | |||
#include <stdlib.h> | ||||
using namespace Firebird; | using namespace Firebird; | |||
using namespace Jrd; | using namespace Jrd; | |||
// ---------------------- | // ---------------------- | |||
// Data access: hash join | // Data access: hash join | |||
// ---------------------- | // ---------------------- | |||
namespace | ||||
{ | ||||
typedef int (*qsort_compare_callback)(const void* a1, const void* a2, voi | ||||
d* arg); | ||||
struct qsort_ctx_data | ||||
{ | ||||
void* arg; | ||||
qsort_compare_callback compare; | ||||
}; | ||||
#if defined(DARWIN) // || defined(FREEBSD) | ||||
#undef HAVE_QSORT_R | ||||
#endif | ||||
#ifndef HAVE_QSORT_R | ||||
#if defined(WIN_NT) || defined(DARWIN) || defined(FREEBSD) | ||||
int qsort_ctx_arg_swap(void* arg, const void* a1, const void* a2) | ||||
{ | ||||
struct qsort_ctx_data* ss = (struct qsort_ctx_data*) arg; | ||||
return (ss->compare)(a1, a2, ss->arg); | ||||
} | ||||
#endif | ||||
#endif | ||||
#define USE_QSORT_CTX | ||||
void qsort_ctx(void* base, size_t count, size_t width, qsort_compare_call | ||||
back compare, void* arg) | ||||
{ | ||||
#ifdef HAVE_QSORT_R | ||||
qsort_r(base, count, width, compare, arg); | ||||
#else | ||||
#if defined(WIN_NT) | ||||
struct qsort_ctx_data tmp = {arg, compare}; | ||||
qsort_s(base, count, width, &qsort_ctx_arg_swap, &tmp); | ||||
#elif defined(DARWIN) || defined(FREEBSD) | ||||
struct qsort_ctx_data tmp = {arg, compare}; | ||||
qsort_r(base, count, width, &tmp, &qsort_ctx_arg_swap); | ||||
#else | ||||
#undef USE_QSORT_CTX | ||||
#endif | ||||
#endif | ||||
} | ||||
} // namespace | ||||
// NS: FIXME - Why use static hash table here??? Hash table shall support dynami c resizing | // NS: FIXME - Why use static hash table here??? Hash table shall support dynami c resizing | |||
static const size_t HASH_SIZE = 1009; | static const ULONG HASH_SIZE = 1009; | |||
static const size_t COLLISION_PREALLOCATE_SIZE = 32; // 256 KB | static const ULONG BUCKET_PREALLOCATE_SIZE = 32; // 256 bytes per slot | |||
static const size_t KEYBUF_PREALLOCATE_SIZE = 64 * 1024; // 64 KB | ||||
static const size_t KEYBUF_SIZE_LIMIT = 1024 * 1024 * 1024; // 1 GB | ||||
class HashJoin::HashTable : public PermanentStorage | class HashJoin::HashTable : public PermanentStorage | |||
{ | { | |||
struct Collision | ||||
{ | ||||
#ifdef USE_QSORT_CTX | ||||
Collision() | ||||
: offset(0), position(0) | ||||
{} | ||||
Collision(void* /*ctx*/, ULONG off, ULONG pos) | ||||
: offset(off), position(pos) | ||||
{} | ||||
#else | ||||
Collision() | ||||
: context(NULL), offset(0), position(0) | ||||
{} | ||||
Collision(void* ctx, ULONG off, ULONG pos) | ||||
: context(ctx), offset(off), position(pos) | ||||
{} | ||||
void* context; | ||||
#endif | ||||
ULONG offset; | ||||
ULONG position; | ||||
}; | ||||
class CollisionList | class CollisionList | |||
{ | { | |||
static const FB_SIZE_T INVALID_ITERATOR = FB_SIZE_T(~0); | static const FB_SIZE_T INVALID_ITERATOR = FB_SIZE_T(~0); | |||
public: | struct Entry | |||
CollisionList(MemoryPool& pool, const KeyBuffer* keyBuffer, ULONG | { | |||
itemLength) | Entry() | |||
: m_collisions(pool, COLLISION_PREALLOCATE_SIZE), | : hash(0), position(0) | |||
m_keyBuffer(keyBuffer), m_itemLength(itemLength), m_ite | {} | |||
rator(INVALID_ITERATOR) | ||||
{} | ||||
#ifdef USE_QSORT_CTX | ||||
static int compare(const void* p1, const void* p2, void* arg) | ||||
#else | ||||
static int compare(const void* p1, const void* p2) | ||||
#endif | ||||
{ | ||||
const Collision* const c1 = static_cast<const Collision*> | ||||
(p1); | ||||
const Collision* const c2 = static_cast<const Collision*> | ||||
(p2); | ||||
#ifndef USE_QSORT_CTX | ||||
fb_assert(c1->context == c2->context); | ||||
#endif | ||||
const CollisionList* const collisions = | ||||
#ifdef USE_QSORT_CTX | ||||
static_cast<const CollisionList*>(arg); | ||||
#else | ||||
static_cast<const CollisionList*>(c1->context); | ||||
#endif | ||||
const UCHAR* const baseAddress = collisions->m_keyBuffer- | ||||
>begin(); | ||||
const UCHAR* const ptr1 = baseAddress + c1->offset; | Entry(ULONG h, ULONG pos) | |||
const UCHAR* const ptr2 = baseAddress + c2->offset; | : hash(h), position(pos) | |||
{} | ||||
return memcmp(ptr1, ptr2, collisions->m_itemLength); | static const ULONG generate(const Entry& item) | |||
{ | ||||
return item.hash; | ||||
} | ||||
ULONG hash; | ||||
ULONG position; | ||||
}; | ||||
public: | ||||
CollisionList(MemoryPool& pool) | ||||
: m_collisions(pool, BUCKET_PREALLOCATE_SIZE), | ||||
m_iterator(INVALID_ITERATOR) | ||||
{ | ||||
m_collisions.setSortMode(FB_ARRAY_SORT_MANUAL); | ||||
} | } | |||
void sort() | void sort() | |||
{ | { | |||
Collision* const base = m_collisions.begin(); | m_collisions.sort(); | |||
const size_t count = m_collisions.getCount(); | ||||
const size_t width = sizeof(Collision); | ||||
#ifdef USE_QSORT_CTX | ||||
qsort_ctx(base, count, width, compare, this); | ||||
#else | ||||
qsort(base, count, width, compare); | ||||
#endif | ||||
} | } | |||
void add(const Collision& collision) | void add(ULONG hash, ULONG position) | |||
{ | { | |||
m_collisions.add(collision); | m_collisions.add(Entry(hash, position)); | |||
} | } | |||
bool locate(ULONG length, const UCHAR* data) | bool locate(ULONG hash) | |||
{ | { | |||
const UCHAR* const ptr1 = data; | if (m_collisions.find(hash, m_iterator)) | |||
const ULONG len1 = length; | return true; | |||
const ULONG len2 = m_itemLength; | ||||
const ULONG minLen = MIN(len1, len2); | ||||
FB_SIZE_T highBound = m_collisions.getCount(), lowBound = | ||||
0; | ||||
const UCHAR* const baseAddress = m_keyBuffer->begin(); | ||||
while (highBound > lowBound) | ||||
{ | ||||
const FB_SIZE_T temp = (highBound + lowBound) >> | ||||
1; | ||||
const UCHAR* const ptr2 = baseAddress + m_collisi | ||||
ons[temp].offset; | ||||
const int result = memcmp(ptr1, ptr2, minLen); | ||||
if (result > 0 || (!result && len1 > len2)) | ||||
lowBound = temp + 1; | ||||
else | ||||
highBound = temp; | ||||
} | ||||
if (highBound >= m_collisions.getCount() || | ||||
lowBound >= m_collisions.getCount()) | ||||
{ | ||||
m_iterator = INVALID_ITERATOR; | ||||
return false; | ||||
} | ||||
const UCHAR* const ptr2 = baseAddress + m_collisions[lowB | ||||
ound].offset; | ||||
if (memcmp(ptr1, ptr2, minLen)) | ||||
{ | ||||
m_iterator = INVALID_ITERATOR; | ||||
return false; | ||||
} | ||||
m_iterator = lowBound; | m_iterator = INVALID_ITERATOR; | |||
return true; | return false; | |||
} | } | |||
bool iterate(ULONG length, const UCHAR* data, ULONG& position) | bool iterate(ULONG hash, ULONG& position) | |||
{ | { | |||
if (m_iterator >= m_collisions.getCount()) | if (m_iterator >= m_collisions.getCount()) | |||
return false; | return false; | |||
const Collision& collision = m_collisions[m_iterator++]; | const Entry& collision = m_collisions[m_iterator++]; | |||
const UCHAR* const baseAddress = m_keyBuffer->begin(); | ||||
const UCHAR* const ptr1 = data; | if (hash != collision.hash) | |||
const ULONG len1 = length; | ||||
const UCHAR* const ptr2 = baseAddress + collision.offset; | ||||
const ULONG len2 = m_itemLength; | ||||
const ULONG minLen = MIN(len1, len2); | ||||
if (memcmp(ptr1, ptr2, minLen)) | ||||
{ | { | |||
m_iterator = INVALID_ITERATOR; | m_iterator = INVALID_ITERATOR; | |||
return false; | return false; | |||
} | } | |||
position = collision.position; | position = collision.position; | |||
return true; | return true; | |||
} | } | |||
private: | private: | |||
Array<Collision> m_collisions; | SortedArray<Entry, EmptyStorage<Entry>, ULONG, Entry> m_collision | |||
const KeyBuffer* const m_keyBuffer; | s; | |||
const ULONG m_itemLength; | ||||
FB_SIZE_T m_iterator; | FB_SIZE_T m_iterator; | |||
}; | }; | |||
public: | public: | |||
HashTable(MemoryPool& pool, size_t streamCount, unsigned int tableSize = HASH_SIZE) | HashTable(MemoryPool& pool, ULONG streamCount, ULONG tableSize = HASH_SIZ E) | |||
: PermanentStorage(pool), m_streamCount(streamCount), | : PermanentStorage(pool), m_streamCount(streamCount), | |||
m_tableSize(tableSize), m_slot(0) | m_tableSize(tableSize), m_slot(0) | |||
{ | { | |||
m_collisions = FB_NEW_POOL(pool) CollisionList*[streamCount * tab leSize]; | m_collisions = FB_NEW_POOL(pool) CollisionList*[streamCount * tab leSize]; | |||
memset(m_collisions, 0, streamCount * tableSize * sizeof(Collisio nList*)); | memset(m_collisions, 0, streamCount * tableSize * sizeof(Collisio nList*)); | |||
} | } | |||
~HashTable() | ~HashTable() | |||
{ | { | |||
for (size_t i = 0; i < m_streamCount * m_tableSize; i++) | for (ULONG i = 0; i < m_streamCount * m_tableSize; i++) | |||
delete m_collisions[i]; | delete m_collisions[i]; | |||
delete[] m_collisions; | delete[] m_collisions; | |||
} | } | |||
void put(size_t stream, | void put(ULONG stream, ULONG hash, ULONG position) | |||
ULONG keyLength, const KeyBuffer* keyBuffer, | ||||
ULONG offset, ULONG position) | ||||
{ | { | |||
const unsigned int slot = | const ULONG slot = hash % m_tableSize; | |||
InternalHash::hash(keyLength, keyBuffer->begin() + offset | ||||
, m_tableSize); | ||||
fb_assert(stream < m_streamCount); | fb_assert(stream < m_streamCount); | |||
fb_assert(slot < m_tableSize); | fb_assert(slot < m_tableSize); | |||
CollisionList* collisions = m_collisions[stream * m_tableSize + s lot]; | CollisionList* collisions = m_collisions[stream * m_tableSize + s lot]; | |||
if (!collisions) | if (!collisions) | |||
{ | { | |||
collisions = FB_NEW_POOL(getPool()) CollisionList(getPool (), keyBuffer, keyLength); | collisions = FB_NEW_POOL(getPool()) CollisionList(getPool ()); | |||
m_collisions[stream * m_tableSize + slot] = collisions; | m_collisions[stream * m_tableSize + slot] = collisions; | |||
} | } | |||
collisions->add(Collision(collisions, offset, position)); | collisions->add(hash, position); | |||
} | } | |||
bool setup(ULONG length, const UCHAR* data) | bool setup(ULONG hash) | |||
{ | { | |||
const unsigned int slot = InternalHash::hash(length, data, m_tabl eSize); | const ULONG slot = hash % m_tableSize; | |||
for (size_t i = 0; i < m_streamCount; i++) | for (ULONG i = 0; i < m_streamCount; i++) | |||
{ | { | |||
CollisionList* const collisions = m_collisions[i * m_tabl eSize + slot]; | CollisionList* const collisions = m_collisions[i * m_tabl eSize + slot]; | |||
if (!collisions) | if (!collisions) | |||
return false; | return false; | |||
if (!collisions->locate(length, data)) | if (!collisions->locate(hash)) | |||
return false; | return false; | |||
} | } | |||
m_slot = slot; | m_slot = slot; | |||
return true; | return true; | |||
} | } | |||
void reset(size_t stream, ULONG length, const UCHAR* data) | void reset(ULONG stream, ULONG hash) | |||
{ | { | |||
fb_assert(stream < m_streamCount); | fb_assert(stream < m_streamCount); | |||
CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot]; | CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot]; | |||
collisions->locate(length, data); | collisions->locate(hash); | |||
} | } | |||
bool iterate(size_t stream, ULONG length, const UCHAR* data, ULONG& posit ion) | bool iterate(ULONG stream, ULONG hash, ULONG& position) | |||
{ | { | |||
fb_assert(stream < m_streamCount); | fb_assert(stream < m_streamCount); | |||
CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot]; | CollisionList* const collisions = m_collisions[stream * m_tableSi ze + m_slot]; | |||
return collisions->iterate(length, data, position); | return collisions->iterate(hash, position); | |||
} | } | |||
void sort() | void sort() | |||
{ | { | |||
for (size_t i = 0; i < m_streamCount * m_tableSize; i++) | for (ULONG i = 0; i < m_streamCount * m_tableSize; i++) | |||
{ | { | |||
CollisionList* const collisions = m_collisions[i]; | CollisionList* const collisions = m_collisions[i]; | |||
if (collisions) | if (collisions) | |||
collisions->sort(); | collisions->sort(); | |||
} | } | |||
} | } | |||
private: | private: | |||
const size_t m_streamCount; | const ULONG m_streamCount; | |||
const unsigned int m_tableSize; | const ULONG m_tableSize; | |||
CollisionList** m_collisions; | CollisionList** m_collisions; | |||
size_t m_slot; | ULONG m_slot; | |||
}; | }; | |||
HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, | HashJoin::HashJoin(thread_db* tdbb, CompilerScratch* csb, FB_SIZE_T count, | |||
RecordSource* const* args, NestValueArray* con st* keys) | RecordSource* const* args, NestValueArray* con st* keys) | |||
: m_args(csb->csb_pool, count - 1) | : m_args(csb->csb_pool, count - 1) | |||
{ | { | |||
fb_assert(count >= 2); | fb_assert(count >= 2); | |||
m_impure = CMP_impure(csb, sizeof(Impure)); | m_impure = CMP_impure(csb, sizeof(Impure)); | |||
m_leader.source = args[0]; | m_leader.source = args[0]; | |||
m_leader.keys = keys[0]; | m_leader.keys = keys[0]; | |||
m_leader.keyLengths = FB_NEW_POOL(csb->csb_pool) | const FB_SIZE_T leaderKeyCount = m_leader.keys->getCount(); | |||
KeyLengthArray(csb->csb_pool, m_leader.keys->getCount()); | m_leader.keyLengths = FB_NEW_POOL(csb->csb_pool) ULONG[leaderKeyCount]; | |||
m_leader.totalKeyLength = 0; | m_leader.totalKeyLength = 0; | |||
for (FB_SIZE_T j = 0; j < m_leader.keys->getCount(); j++) | for (FB_SIZE_T j = 0; j < leaderKeyCount; j++) | |||
{ | { | |||
dsc desc; | dsc desc; | |||
(*m_leader.keys)[j]->getDesc(tdbb, csb, &desc); | (*m_leader.keys)[j]->getDesc(tdbb, csb, &desc); | |||
USHORT keyLength = desc.isText() ? desc.getStringLength() : desc. dsc_length; | USHORT keyLength = desc.isText() ? desc.getStringLength() : desc. dsc_length; | |||
if (IS_INTL_DATA(&desc)) | if (IS_INTL_DATA(&desc)) | |||
keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE(&desc), keyLength); | keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE(&desc), keyLength); | |||
m_leader.keyLengths->add(keyLength); | m_leader.keyLengths[j] = keyLength; | |||
m_leader.totalKeyLength += keyLength; | m_leader.totalKeyLength += keyLength; | |||
} | } | |||
for (size_t i = 1; i < count; i++) | for (FB_SIZE_T i = 1; i < count; i++) | |||
{ | { | |||
RecordSource* const sub_rsb = args[i]; | RecordSource* const sub_rsb = args[i]; | |||
fb_assert(sub_rsb); | fb_assert(sub_rsb); | |||
SubStream sub; | SubStream sub; | |||
sub.buffer = FB_NEW_POOL(csb->csb_pool) BufferedStream(csb, sub_r sb); | sub.buffer = FB_NEW_POOL(csb->csb_pool) BufferedStream(csb, sub_r sb); | |||
sub.keys = keys[i]; | sub.keys = keys[i]; | |||
sub.keyLengths = FB_NEW_POOL(csb->csb_pool) | const FB_SIZE_T subKeyCount = sub.keys->getCount(); | |||
KeyLengthArray(csb->csb_pool, sub.keys->getCount()); | sub.keyLengths = FB_NEW_POOL(csb->csb_pool) ULONG[subKeyCount]; | |||
sub.totalKeyLength = 0; | sub.totalKeyLength = 0; | |||
for (FB_SIZE_T j = 0; j < sub.keys->getCount(); j++) | for (FB_SIZE_T j = 0; j < subKeyCount; j++) | |||
{ | { | |||
dsc desc; | dsc desc; | |||
(*sub.keys)[j]->getDesc(tdbb, csb, &desc); | (*sub.keys)[j]->getDesc(tdbb, csb, &desc); | |||
USHORT keyLength = desc.isText() ? desc.getStringLength() : desc.dsc_length; | USHORT keyLength = desc.isText() ? desc.getStringLength() : desc.dsc_length; | |||
if (IS_INTL_DATA(&desc)) | if (IS_INTL_DATA(&desc)) | |||
keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE (&desc), keyLength); | keyLength = INTL_key_length(tdbb, INTL_INDEX_TYPE (&desc), keyLength); | |||
sub.keyLengths->add(keyLength); | sub.keyLengths[j] = keyLength; | |||
sub.totalKeyLength += keyLength; | sub.totalKeyLength += keyLength; | |||
} | } | |||
m_args.add(sub); | m_args.add(sub); | |||
} | } | |||
} | } | |||
void HashJoin::open(thread_db* tdbb) const | void HashJoin::open(thread_db* tdbb) const | |||
{ | { | |||
jrd_req* const request = tdbb->getRequest(); | jrd_req* const request = tdbb->getRequest(); | |||
Impure* const impure = request->getImpure<Impure>(m_impure); | Impure* const impure = request->getImpure<Impure>(m_impure); | |||
impure->irsb_flags = irsb_open | irsb_mustread; | impure->irsb_flags = irsb_open | irsb_mustread; | |||
delete impure->irsb_arg_buffer; | ||||
delete impure->irsb_hash_table; | delete impure->irsb_hash_table; | |||
delete[] impure->irsb_leader_buffer; | delete[] impure->irsb_leader_buffer; | |||
delete[] impure->irsb_record_counts; | ||||
MemoryPool& pool = *tdbb->getDefaultPool(); | MemoryPool& pool = *tdbb->getDefaultPool(); | |||
const size_t argCount = m_args.getCount(); | const FB_SIZE_T argCount = m_args.getCount(); | |||
impure->irsb_arg_buffer = FB_NEW_POOL(pool) KeyBuffer(pool, KEYBUF_PREALL OCATE_SIZE); | ||||
impure->irsb_hash_table = FB_NEW_POOL(pool) HashTable(pool, argCount); | impure->irsb_hash_table = FB_NEW_POOL(pool) HashTable(pool, argCount); | |||
impure->irsb_leader_buffer = FB_NEW_POOL(pool) UCHAR[m_leader.totalKeyLen gth]; | impure->irsb_leader_buffer = FB_NEW_POOL(pool) UCHAR[m_leader.totalKeyLen gth]; | |||
impure->irsb_record_counts = FB_NEW_POOL(pool) ULONG[argCount]; | ||||
UCharBuffer buffer(pool); | ||||
for (FB_SIZE_T i = 0; i < argCount; i++) | for (FB_SIZE_T i = 0; i < argCount; i++) | |||
{ | { | |||
// Read and cache the inner streams. While doing that, | // Read and cache the inner streams. While doing that, | |||
// hash the join condition values and populate hash tables. | // hash the join condition values and populate hash tables. | |||
m_args[i].buffer->open(tdbb); | m_args[i].buffer->open(tdbb); | |||
ULONG& counter = impure->irsb_record_counts[i]; | ULONG counter = 0; | |||
counter = 0; | UCHAR* const keyBuffer = buffer.getBuffer(m_args[i].totalKeyLengt | |||
h, false); | ||||
while (m_args[i].buffer->getRecord(tdbb)) | while (m_args[i].buffer->getRecord(tdbb)) | |||
{ | { | |||
const ULONG offset = (ULONG) impure->irsb_arg_buffer->get | const ULONG hash = computeHash(tdbb, request, m_args[i], | |||
Count(); | keyBuffer); | |||
if (offset > KEYBUF_SIZE_LIMIT) | impure->irsb_hash_table->put(i, hash, counter++); | |||
status_exception::raise(Arg::Gds(isc_imp_exc) << | ||||
Arg::Gds(isc_blktoobig)); | ||||
impure->irsb_arg_buffer->resize(offset + m_args[i].totalK | ||||
eyLength); | ||||
UCHAR* const keys = impure->irsb_arg_buffer->begin() + of | ||||
fset; | ||||
computeKeys(tdbb, request, m_args[i], keys); | ||||
impure->irsb_hash_table->put(i, m_args[i].totalKeyLength, | ||||
impure->irsb_arg_buffer, | ||||
offset, counter++); | ||||
} | } | |||
} | } | |||
impure->irsb_hash_table->sort(); | impure->irsb_hash_table->sort(); | |||
m_leader.source->open(tdbb); | m_leader.source->open(tdbb); | |||
} | } | |||
void HashJoin::close(thread_db* tdbb) const | void HashJoin::close(thread_db* tdbb) const | |||
{ | { | |||
jrd_req* const request = tdbb->getRequest(); | jrd_req* const request = tdbb->getRequest(); | |||
skipping to change at line 468 | skipping to change at line 321 | |||
invalidateRecords(request); | invalidateRecords(request); | |||
if (impure->irsb_flags & irsb_open) | if (impure->irsb_flags & irsb_open) | |||
{ | { | |||
impure->irsb_flags &= ~irsb_open; | impure->irsb_flags &= ~irsb_open; | |||
delete impure->irsb_hash_table; | delete impure->irsb_hash_table; | |||
impure->irsb_hash_table = NULL; | impure->irsb_hash_table = NULL; | |||
delete impure->irsb_arg_buffer; | ||||
impure->irsb_arg_buffer = NULL; | ||||
delete[] impure->irsb_leader_buffer; | delete[] impure->irsb_leader_buffer; | |||
impure->irsb_leader_buffer = NULL; | impure->irsb_leader_buffer = NULL; | |||
delete[] impure->irsb_record_counts; | ||||
impure->irsb_record_counts = NULL; | ||||
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) | for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) | |||
m_args[i].buffer->close(tdbb); | m_args[i].buffer->close(tdbb); | |||
m_leader.source->close(tdbb); | m_leader.source->close(tdbb); | |||
} | } | |||
} | } | |||
bool HashJoin::getRecord(thread_db* tdbb) const | bool HashJoin::getRecord(thread_db* tdbb) const | |||
{ | { | |||
if (--tdbb->tdbb_quantum < 0) | if (--tdbb->tdbb_quantum < 0) | |||
JRD_reschedule(tdbb, 0, true); | JRD_reschedule(tdbb, 0, true); | |||
jrd_req* const request = tdbb->getRequest(); | jrd_req* const request = tdbb->getRequest(); | |||
Impure* const impure = request->getImpure<Impure>(m_impure); | Impure* const impure = request->getImpure<Impure>(m_impure); | |||
if (!(impure->irsb_flags & irsb_open)) | if (!(impure->irsb_flags & irsb_open)) | |||
return false; | return false; | |||
const ULONG leaderKeyLength = m_leader.totalKeyLength; | ||||
UCHAR* leaderKeyBuffer = impure->irsb_leader_buffer; | ||||
while (true) | while (true) | |||
{ | { | |||
if (impure->irsb_flags & irsb_mustread) | if (impure->irsb_flags & irsb_mustread) | |||
{ | { | |||
// Fetch the record from the leading stream | // Fetch the record from the leading stream | |||
if (!m_leader.source->getRecord(tdbb)) | if (!m_leader.source->getRecord(tdbb)) | |||
return false; | return false; | |||
// Compute and hash the comparison keys | // Compute and hash the comparison keys | |||
memset(leaderKeyBuffer, 0, leaderKeyLength); | impure->irsb_leader_hash = | |||
computeKeys(tdbb, request, m_leader, leaderKeyBuffer); | computeHash(tdbb, request, m_leader, impure->irsb | |||
_leader_buffer); | ||||
// Ensure the every inner stream having matches for this hash slot. | // Ensure the every inner stream having matches for this hash slot. | |||
// Setup the hash table for the iteration through collisi ons. | // Setup the hash table for the iteration through collisi ons. | |||
if (!impure->irsb_hash_table->setup(leaderKeyLength, lead erKeyBuffer)) | if (!impure->irsb_hash_table->setup(impure->irsb_leader_h ash)) | |||
continue; | continue; | |||
impure->irsb_flags &= ~irsb_mustread; | impure->irsb_flags &= ~irsb_mustread; | |||
impure->irsb_flags |= irsb_first; | impure->irsb_flags |= irsb_first; | |||
} | } | |||
// Fetch collisions from the inner streams | // Fetch collisions from the inner streams | |||
if (impure->irsb_flags & irsb_first) | if (impure->irsb_flags & irsb_first) | |||
{ | { | |||
skipping to change at line 628 | skipping to change at line 472 | |||
} | } | |||
void HashJoin::nullRecords(thread_db* tdbb) const | void HashJoin::nullRecords(thread_db* tdbb) const | |||
{ | { | |||
m_leader.source->nullRecords(tdbb); | m_leader.source->nullRecords(tdbb); | |||
for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) | for (FB_SIZE_T i = 0; i < m_args.getCount(); i++) | |||
m_args[i].source->nullRecords(tdbb); | m_args[i].source->nullRecords(tdbb); | |||
} | } | |||
void HashJoin::computeKeys(thread_db* tdbb, jrd_req* request, | ULONG HashJoin::computeHash(thread_db* tdbb, | |||
const SubStream& sub, UCHAR* k | jrd_req* request, | |||
eyBuffer) const | const SubStream& sub, | |||
UCHAR* keyBuffer) const | ||||
{ | { | |||
memset(keyBuffer, 0, sub.totalKeyLength); | ||||
UCHAR* keyPtr = keyBuffer; | ||||
for (FB_SIZE_T i = 0; i < sub.keys->getCount(); i++) | for (FB_SIZE_T i = 0; i < sub.keys->getCount(); i++) | |||
{ | { | |||
dsc* const desc = EVL_expr(tdbb, request, (*sub.keys)[i]); | dsc* const desc = EVL_expr(tdbb, request, (*sub.keys)[i]); | |||
const USHORT keyLength = (*sub.keyLengths)[i]; | const USHORT keyLength = sub.keyLengths[i]; | |||
if (desc && !(request->req_flags & req_null)) | if (desc && !(request->req_flags & req_null)) | |||
{ | { | |||
if (desc->isText()) | if (desc->isText()) | |||
{ | { | |||
dsc to; | dsc to; | |||
to.makeText(keyLength, desc->getTextType(), keyBu ffer); | to.makeText(keyLength, desc->getTextType(), keyPt r); | |||
if (IS_INTL_DATA(desc)) | if (IS_INTL_DATA(desc)) | |||
{ | { | |||
// Convert the INTL string into the binar y comparable form | // Convert the INTL string into the binar y comparable form | |||
INTL_string_to_key(tdbb, INTL_INDEX_TYPE( desc), | INTL_string_to_key(tdbb, INTL_INDEX_TYPE( desc), | |||
desc, &to, INTL_KEY_UNIQUE); | desc, &to, INTL_KEY_UNIQUE); | |||
} | } | |||
else | else | |||
{ | { | |||
// This call ensures that the padding byt es are appended | // This call ensures that the padding byt es are appended | |||
MOV_move(tdbb, desc, &to); | MOV_move(tdbb, desc, &to); | |||
} | } | |||
} | } | |||
else | else | |||
{ | { | |||
// We don't enforce proper alignments inside the key buffer, | // We don't enforce proper alignments inside the key buffer, | |||
// so use plain byte copying instead of MOV_move( ) to avoid bus errors | // so use plain byte copying instead of MOV_move( ) to avoid bus errors | |||
fb_assert(keyLength == desc->dsc_length); | fb_assert(keyLength == desc->dsc_length); | |||
memcpy(keyBuffer, desc->dsc_address, keyLength); | memcpy(keyPtr, desc->dsc_address, keyLength); | |||
} | } | |||
} | } | |||
keyBuffer += keyLength; | keyPtr += keyLength; | |||
} | } | |||
fb_assert(keyPtr - keyBuffer == sub.totalKeyLength); | ||||
return InternalHash::hash(sub.totalKeyLength, keyBuffer); | ||||
} | } | |||
bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co nst | bool HashJoin::fetchRecord(thread_db* tdbb, Impure* impure, FB_SIZE_T stream) co nst | |||
{ | { | |||
HashTable* const hashTable = impure->irsb_hash_table; | HashTable* const hashTable = impure->irsb_hash_table; | |||
const BufferedStream* const arg = m_args[stream].buffer; | const BufferedStream* const arg = m_args[stream].buffer; | |||
const ULONG leaderKeyLength = m_leader.totalKeyLength; | ||||
const UCHAR* leaderKeyBuffer = impure->irsb_leader_buffer; | ||||
ULONG position; | ULONG position; | |||
if (hashTable->iterate(stream, leaderKeyLength, leaderKeyBuffer, position )) | if (hashTable->iterate(stream, impure->irsb_leader_hash, position)) | |||
{ | { | |||
arg->locate(tdbb, position); | arg->locate(tdbb, position); | |||
if (arg->getRecord(tdbb)) | if (arg->getRecord(tdbb)) | |||
return true; | return true; | |||
} | } | |||
while (true) | while (true) | |||
{ | { | |||
if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1)) | if (stream == 0 || !fetchRecord(tdbb, impure, stream - 1)) | |||
return false; | return false; | |||
hashTable->reset(stream, leaderKeyLength, leaderKeyBuffer); | hashTable->reset(stream, impure->irsb_leader_hash); | |||
if (hashTable->iterate(stream, leaderKeyLength, leaderKeyBuffer, position)) | if (hashTable->iterate(stream, impure->irsb_leader_hash, position )) | |||
{ | { | |||
arg->locate(tdbb, position); | arg->locate(tdbb, position); | |||
if (arg->getRecord(tdbb)) | if (arg->getRecord(tdbb)) | |||
return true; | return true; | |||
} | } | |||
} | } | |||
} | } | |||
End of changes. 67 change blocks. | ||||
261 lines changed or deleted | 97 lines changed or added |