Hey guys, do you support Spark in your ingestion f...
# ingestion
f
Hey guys, do you support Spark in your ingestion framework? It could either go via JDBC/ODBC.
m
@faint-hair-91313 I’m not sure I understand the question. What is the metadata you are looking to ingest?
f
I am using a Spark cluster from Databricks! I am also trying going through a HDInsight and Hive, which requires port 443, but I get a thrift error.
thrift.transport.TTransport.TTransportException: unexpected exception
m
So you’re following the hive source example?
f
Yes.
m
Could you paste the full error here?
f
[linadmin@vmsbxdocker ~]$ cat example_to_datahub_console_hive.yml source: type: hive config: username: admin password: pass host_port: hdinsightSbxHive.azurehdinsight.net:443 database: default options: connect_args: auth: CUSTOM sink: type: console
Copy code
datahub ingest -c example_to_datahub_console_hive.yml
[2021-04-09 14:43:22,969] INFO     {datahub.entrypoints:66} - Using config: {'source': {'type': 'hive', 'config': {'username': 'admin', 'password': 'pass', 'host_port': 'hdin
<http://sightSbxHive.azurehdinsight.net:443|sightSbxHive.azurehdinsight.net:443>', 'database': 'default', 'options': {'connect_args': {'auth': 'CUSTOM'}}}}, 'sink': {'type': 'console'}}
Traceback (most recent call last):
  File "/home/linadmin/.local/lib/python3.6/site-packages/thrift/transport/TSocket.py", line 126, in read
    buff = self.handle.recv(sz)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/datahub", line 8, in <module>
    sys.exit(datahub())
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/datahub/entrypoints.py", line 72, in ingest
    pipeline.run()
  File "/home/linadmin/.local/lib/python3.6/site-packages/datahub/ingestion/run/pipeline.py", line 80, in run
    for wu in self.source.get_workunits():
  File "/home/linadmin/.local/lib/python3.6/site-packages/datahub/ingestion/source/sql_common.py", line 198, in get_workunits
    inspector = reflection.Inspector.from_engine(engine)
  File "<string>", line 2, in from_engine
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/util/deprecations.py", line 390, in warned
    return fn(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/reflection.py", line 171, in from_engine
    return cls._construct(cls._init_legacy, bind)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/reflection.py", line 117, in _construct
    init(self, bind)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/reflection.py", line 124, in _init_legacy
    self._init_engine(bind)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/reflection.py", line 128, in _init_engine
    engine.connect().close()
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 3095, in connect
    return self._connection_cls(self, close_with_result=close_with_result)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 91, in __init__
    else engine.raw_connection()
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 3174, in raw_connection
    return self._wrap_pool_connect(self.pool.connect, _connection)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 3141, in _wrap_pool_connect
    return fn()
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 301, in connect
    return _ConnectionFairy._checkout(self)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 755, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
    rec = pool._do_get()
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
    self._dec_overflow()
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
    return self._create_connection()
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
    return _ConnectionRecord(self)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
    self.__connect(first_connect_check=True)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 605, in __connect
    pool.logger.debug("Error on connect(): %s", e)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
    connection = pool._invoke_creator(self)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/create.py", line 578, in connect
    return dialect.connect(*cargs, **cparams)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 558, in connect
    return self.dbapi.connect(*cargs, **cparams)
  File "/home/linadmin/.local/lib/python3.6/site-packages/pyhive/hive.py", line 94, in connect
    return Connection(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/pyhive/hive.py", line 192, in __init__
    self._transport.open()
  File "/home/linadmin/.local/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 93, in open
    status, payload = self._recv_sasl_message()
  File "/home/linadmin/.local/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 112, in _recv_sasl_message
    header = self._trans_read_all(5)
  File "/home/linadmin/.local/lib/python3.6/site-packages/thrift_sasl/__init__.py", line 198, in _trans_read_all
    return read_all(sz)
  File "/home/linadmin/.local/lib/python3.6/site-packages/thrift/transport/TTransport.py", line 62, in readAll
    chunk = self.read(sz - have)
  File "/home/linadmin/.local/lib/python3.6/site-packages/thrift/transport/TSocket.py", line 140, in read
    raise TTransportException(message="unexpected exception", inner=e)
thrift.transport.TTransport.TTransportException: unexpected exception
I can connect with an external client. Here is the JDBC URL: jdbc:hive2://hdinsightSbxHive.azurehdinsight.net:443/default;transportMode=http;ssl=true;httpPath=/hive2
m
It seems like pyhive doesn’t work with Hive http mode.
Can you check if you can enable binary mode on your hive server?
f
Probably not.
This is from HDInsight's documentation.
Hmm, what about spark, are you supporting that?
Or maybe even better, read the Parquet files from ADLS 🙂
Their metadata is embeeded.
m
Got it, in that case we need to write a separate integration for the jdbc over http. (This should cover both spark and hive use cases)
f
That would be great!
m
You can file an issue for this if you want to be notified when we pick this up
b
I think we should prefer to read from DataBricks / Hive as that will provide the additional context around the raw Parquet files providing the dataset-level semantics that DH is interested in
i
Sorry for butting in... Are you querying a Hive instance in from an Azure HDInsight cluster?
If yes, It is most likely related to a long standing limitation of pyhive about not supporting http which is the only externally available port in an azure hdinsight cluster see: https://github.com/dropbox/PyHive/pull/325
g
@faint-hair-91313 we now support ingesting from HDInsight over Thrift + HTTP https://datahubproject.io/docs/metadata-ingestion/#hive-hive in acryl-datahub v0.3.0. The docs have a little sample recipe as well since its still a little bit cumbersome
🙌 1
We ended up (hopefully temporarily) forking pyhive to make it work, so you might need to
pip uninstall pyhive
before running
pip install --upgrade 'acryl-datahub[hive]'
f
Thanks for that, I've set some time aside to test it out tomorrow. I will try with Databricks, too (not HDInsight).
I believe they still use a Hive Metastore, so ideally should work, that would be great!
Ok, so tried to get it running on Databricks.
Copy code
source:
  type: hive
  config:
    scheme: 'hive+https'
    username: token
    password: dapi8dfbd3073717dcc751e903883d319c47
    host_port: <http://adb-3571544599855006.6.azuredatabricks.net:443|adb-3571544599855006.6.azuredatabricks.net:443>
    database: default
    options:
      connect_args: LDAP

sink:
  type: console
and got this error
Copy code
$  datahub ingest -c example_to_datahub_console_hive.yml
[2021-05-06 15:00:32,708] INFO     {datahub.entrypoints:68} - Using config: {'source': {'type': 'hive', 'config': {'scheme': 'hive+https', 'username': 'token', 'password': 'dapi8dfb
d3073717dcc751e903883d319c47', 'host_port': '<http://adb-3571544599855006.6.azuredatabricks.net:443|adb-3571544599855006.6.azuredatabricks.net:443>', 'database': 'default', 'options': {'connect_args': 'LDAP'}}}, 'sink': {'type': 'console
'}}
Traceback (most recent call last):
  File "/usr/local/bin/datahub", line 8, in <module>
    sys.exit(datahub())
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/linadmin/.local/lib/python3.6/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/datahub/entrypoints.py", line 74, in ingest
    pipeline.run()
  File "/home/linadmin/.local/lib/python3.6/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run
    for wu in self.source.get_workunits():
  File "/home/linadmin/.local/lib/python3.6/site-packages/datahub/ingestion/source/sql_common.py", line 206, in get_workunits
    engine = create_engine(url, **sql_config.options)
  File "<string>", line 2, in create_engine
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/util/deprecations.py", line 298, in warned
    return fn(*args, **kwargs)
  File "/home/linadmin/.local/lib/python3.6/site-packages/sqlalchemy/engine/create.py", line 565, in create_engine
    cparams.update(pop_kwarg("connect_args", {}))
ValueError: dictionary update sequence element #0 has length 1; 2 is required
m
@faint-hair-91313 : what does pip freeze return
g
Ah it should be
auth: LDAP
as a key nested underneath connect_args
m
@faint-hair-91313 hope that one worked
f
Hi there, it didn't work ...
Copy code
source:
  type: hive
  config:
    scheme: 'hive+https'
    username: admin
    password: 
    host_port: <http://hdinsight-dataeng-muac.azurehdinsight.net:443|hdinsight-dataeng-muac.azurehdinsight.net:443>
    database: default
    options:
connect_args:
auth: BASIC

sink:
  type: console
I got this:
Copy code
$ datahub ingest -c example_to_datahub_console_hive_hd.yml
[2021-05-10 10:03:25,936] INFO     {datahub.entrypoints:68} - Using config: {'source': {'type': 'hive', 'config': {'scheme': 'hive+https', 'username': 'admin', 'password': '', 'host_port': '<http://hdinsight-dataeng-muac.azurehdinsight.net:443|hdinsight-dataeng-muac.azurehdinsight.net:443>', 'database': 'default', 'options': {'connect_args': {'auth': 'BASIC'}}}}, 'sink': {'type': 'console'}}

Aborted!
Pip freeze shows this:
Copy code
pip freeze
acryl-datahub==0.3.0
acryl-PyHive==0.6.6
avro-gen3==0.5.0
avro-python3==1.10.2
certifi==2020.12.5
chardet==4.0.0
click==7.1.2
dataclasses==0.8
docker==5.0.0
entrypoints==0.3
expandvars==0.7.0
future==0.18.2
greenlet==1.0.0
idna==2.10
importlib-metadata==3.10.0
mypy-extensions==0.4.3
py4j==0.10.9
pydantic==1.8.1
pyhocon==0.3.57
pyparsing==2.4.7
pyspark==3.1.1
python-dateutil==2.8.1
python-tds==1.10.0
pytz==2021.1
PyYAML==5.4.1
requests==2.25.1
sasl==0.2.1
six==1.15.0
SQLAlchemy==1.4.6
sqlalchemy-pytds==0.3.1
thrift==0.13.0
thrift-sasl==0.4.2
toml==0.10.2
typing-extensions==3.7.4.3
typing-inspect==0.6.0
tzlocal==2.1
urllib3==1.26.4
websocket-client==0.58.0
zipp==3.4.1
Note that I am trying now on a HD Insight cluster.
m
@gray-shoe-75895 : could you take a look here?
g
looking into it
Hi @faint-hair-91313, can you try upgrading to acryl-datahub v0.3.1 and running with
datahub --debug ingest …
?
f
Copy code
$ pip freeze | grep acryl
acryl-datahub==0.3.1
acryl-PyHive==0.6.6
and the debug output
Copy code
datahub ingest -c example_to_datahub_console_hive_hd.yml
[2021-05-12 10:17:16,578] INFO     {datahub.entrypoints:68} - Using config: {'source': {'type': 'hive', 'config': {'scheme': 'hive+https', 'username': 'admin', 'password': '', 'host_port': '<http://hdinsight-dataeng-muac.azurehdinsight.net:443|hdinsight-dataeng-muac.azurehdinsight.net:443>', 'database': 'default', 'options': {'connect_args': {'auth': 'BASIC'}}}}, 'sink': {'type': 'console'}}

Aborted!
[linadmin@vmsbxdocker ~]$ datahub --debug ingest -c example_to_datahub_console_hive_hd.yml
[2021-05-12 10:17:32,918] INFO     {datahub.entrypoints:68} - Using config: {'source': {'type': 'hive', 'config': {'scheme': 'hive+https', 'username': 'admin', 'password': '', 'host_port': '<http://hdinsight-dataeng-muac.azurehdinsight.net:443|hdinsight-dataeng-muac.azurehdinsight.net:443>', 'database': 'default', 'options': {'connect_args': {'auth': 'BASIC'}}}}, 'sink': {'type': 'console'}}
[2021-05-12 10:17:32,918] DEBUG    {datahub.ingestion.run.pipeline:74} - Source type:hive,<class 'datahub.ingestion.source.hive.HiveSource'> configured
[2021-05-12 10:17:32,919] DEBUG    {datahub.ingestion.run.pipeline:80} - Sink type:console,<class 'datahub.ingestion.sink.console.ConsoleSink'> configured
[2021-05-12 10:17:32,919] DEBUG    {datahub.ingestion.source.sql_common:205} - sql_alchemy_url=hive+<https://admin>:@hdinsight-dataeng-muac.azurehdinsight.net:443/default

Aborted!
I've removed the password ...
Here is an example how I connect with DBeaver (a SQL client):
b
@gray-shoe-75895 @faint-hair-91313 Maybe it makes sense to schedule a meeting to try to hash this out?
f
Sure, how about next week some time?
b
I believe early next week sometime should work. @gray-shoe-75895 pls provide your availability on Mon-Tues 🙂
g
@faint-hair-91313 let’s set up some time https://calendly.com/harshalsheth/30min - Mon would be ideal since I’m traveling Tue onwards
Also, I just noticed that your recipe config is missing the
http_path
option (https://datahubproject.io/docs/metadata-ingestion/#hive-hive). Can you try adding that?
f
Bingo, missed that. Now it ends, with some warnings.
Copy code
[2021-05-12 21:02:11,333] INFO     {datahub.ingestion.run.pipeline:44} - sink wrote workunit default.partitioned_full_efds_rtepts

Source report:
{'failures': {},
 'filtered': [],
 'tables_scanned': 6,
 'warnings': {'default.partitioned_full_efds_asplist': ['unable to map type HiveTimestamp() to metadata schema',
                                                        'unable to map type HiveTimestamp() to metadata schema'],
              'default.partitioned_full_efds_main': ['unable to map type HiveDate() to metadata schema',
                                                     'unable to map type HiveDate() to metadata schema',
                                                     'unable to map type HiveTimestamp() to metadata schema',
                                                     'unable to map type HiveDate() to metadata schema',
                                                     'unable to map type HiveDate() to metadata schema',
                                                     'unable to map type HiveDate() to metadata schema',
                                                     'unable to map type HiveDate() to metadata schema',
                                                     'unable to map type HiveDate() to metadata schema'],
              'default.partitioned_full_efds_rtepts': ['unable to map type HiveTimestamp() to metadata schema']},
 'workunit_ids': ['default.hivesampletable',
                  'default.partitioned_full_efds_afregullist',
                  'default.partitioned_full_efds_asplist',
                  'default.partitioned_full_efds_geo',
                  'default.partitioned_full_efds_main',
                  'default.partitioned_full_efds_rtepts'],
 'workunits_produced': 6}
Sink report:
{'failures': [], 'records_written': 6, 'warnings': []}
Didn't load anything eventually ...
I've set 30 min for Monday. Thanks!
👍 2
l
Did you guys get this to work finally? We were able to get this working with Azure Databricks yesterday.
Happy to create a pull request with the details if it's interesting
m
@little-smartphone-52405 that would be great 🙏
Was it just configs?
l
yeah...: 1. pip install databricks-dbapi 2.
scheme: 'databricks+pyhive'
that was pretty much it
options.connect_args.http_path was also required
m
Please send that PR. Would be super useful for the community.
👍 1
f
Can you post a full yml file here, please?
l
Copy code
source:
  type: hive
  config:
    host_port: <databricks workspace URL>:443
    username: token
    password: <api token>
    scheme: 'databricks+pyhive'

    options:
      connect_args:
        http_path: 'sql/protocolv1/o/xxxyyyzzzaaasa/1234-567890-hello123'

sink:
  type: "datahub-rest"
  config:
    server: "http://<datahubip>:8080"